Skip to main content

camel_api/
platform.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10use crate::CamelError;
11
12/// Node identity in the platform environment.
13/// In Kubernetes: pod name, namespace, labels from Downward API.
14/// In local/test: hostname or user-supplied string.
15#[derive(Debug, Clone)]
16pub struct PlatformIdentity {
17    pub node_id: String,
18    pub namespace: Option<String>,
19    pub labels: HashMap<String, String>,
20}
21
22impl PlatformIdentity {
23    pub fn local(node_id: impl Into<String>) -> Self {
24        Self {
25            node_id: node_id.into(),
26            namespace: None,
27            labels: HashMap::new(),
28        }
29    }
30}
31
32/// Leadership state change events delivered asynchronously.
33#[derive(Debug, Clone, PartialEq)]
34pub enum LeadershipEvent {
35    StartedLeading,
36    StoppedLeading,
37}
38
39/// Platform errors.
40#[derive(Debug, Error)]
41pub enum PlatformError {
42    #[error("leadership lock already active: {lock_name}")]
43    LockAlreadyActive { lock_name: String },
44    #[error("step_down failed: elector loop terminated unexpectedly")]
45    StepDownFailed,
46    #[error("platform not available: {0}")]
47    NotAvailable(String),
48    #[error("configuration error: {0}")]
49    Config(String),
50}
51
52/// Handle returned by `LeadershipService::start()`.
53pub struct LeadershipHandle {
54    /// Subscribe to leadership state changes.
55    pub events: watch::Receiver<Option<LeadershipEvent>>,
56    /// Atomic readable shortcut for current leadership state.
57    is_leader: Arc<AtomicBool>,
58    /// Internal — used by `step_down()` to cancel the elector loop.
59    cancel: CancellationToken,
60    /// Await full loop termination after `step_down()`.
61    terminated: Option<tokio::sync::oneshot::Receiver<()>>,
62}
63
64impl LeadershipHandle {
65    /// Public constructor — required by `camel-platform-kubernetes` which lives in a separate crate
66    /// and cannot use struct literal syntax for fields that are private.
67    pub fn new(
68        events: watch::Receiver<Option<LeadershipEvent>>,
69        is_leader: Arc<AtomicBool>,
70        cancel: CancellationToken,
71        terminated: tokio::sync::oneshot::Receiver<()>,
72    ) -> Self {
73        Self {
74            events,
75            is_leader,
76            cancel,
77            terminated: Some(terminated),
78        }
79    }
80
81    pub fn is_leader(&self) -> bool {
82        self.is_leader.load(Ordering::Acquire)
83    }
84
85    /// Signal step-down AND await full teardown with a 10-second timeout:
86    /// lease release + loop termination + `StoppedLeading` delivered.
87    /// Returns `StepDownFailed` if teardown does not complete in time.
88    pub async fn step_down(mut self) -> Result<(), PlatformError> {
89        self.cancel.cancel();
90        let rx = self
91            .terminated
92            .take()
93            .ok_or(PlatformError::StepDownFailed)?;
94        tokio::time::timeout(std::time::Duration::from_secs(10), rx)
95            .await
96            .map_err(|_| PlatformError::StepDownFailed)?
97            .map_err(|_| PlatformError::StepDownFailed)
98    }
99}
100
101impl Drop for LeadershipHandle {
102    fn drop(&mut self) {
103        self.cancel.cancel();
104    }
105}
106
107/// Leadership abstraction.
108#[async_trait]
109pub trait LeadershipService: Send + Sync {
110    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
111}
112
113/// Platform service abstraction.
114pub trait PlatformService: Send + Sync {
115    fn identity(&self) -> PlatformIdentity;
116    fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
117    fn leadership(&self) -> Arc<dyn LeadershipService>;
118}
119
120/// Readiness gate — local override that forces readiness state regardless of `HealthSource`.
121/// Name reflects actual role: a gate on local health state, not a push to external system.
122///
123/// Precedence (highest to lowest):
124///   1. `notify_starting()` → NotReady, always
125///   2. `notify_not_ready(reason)` → NotReady
126///   3. `HealthSource::readiness()` — fallback when no override active
127#[async_trait]
128pub trait ReadinessGate: Send + Sync {
129    async fn notify_ready(&self) -> Result<(), CamelError>;
130    async fn notify_not_ready(&self, reason: &str) -> Result<(), CamelError>;
131    async fn notify_starting(&self) -> Result<(), CamelError>;
132}
133
134/// No-op leadership service.
135/// Correct for single-node deployments and tests that do not need real K8s.
136///
137/// Allows multiple `start()` calls for the same lock name — each returns an
138/// independent `LeadershipHandle`. This matches the `master:` semantics where
139/// multiple routes can compete for the same lock.
140pub struct NoopLeadershipService;
141
142impl NoopLeadershipService {
143    pub fn new() -> Self {
144        Self
145    }
146}
147
148impl Default for NoopLeadershipService {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154#[async_trait]
155impl LeadershipService for NoopLeadershipService {
156    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
157        let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
158        let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
159        let cancel = CancellationToken::new();
160        let cancel_for_task = cancel.clone();
161        let is_leader = Arc::new(AtomicBool::new(true));
162        let is_leader_for_task = Arc::clone(&is_leader);
163        let lock_name = lock_name.to_string();
164
165        tokio::spawn(async move {
166            cancel_for_task.cancelled().await;
167            is_leader_for_task.store(false, Ordering::Release);
168            let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
169            drop(lock_name);
170            let _ = term_tx.send(());
171        });
172
173        Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
174    }
175}
176
177/// No-op platform service.
178pub struct NoopPlatformService {
179    identity: PlatformIdentity,
180    readiness_gate: Arc<dyn ReadinessGate>,
181    leadership: Arc<dyn LeadershipService>,
182}
183
184impl NoopPlatformService {
185    pub fn new(identity: PlatformIdentity) -> Self {
186        Self {
187            identity,
188            readiness_gate: Arc::new(NoopReadinessGate),
189            leadership: Arc::new(NoopLeadershipService::new()),
190        }
191    }
192}
193
194impl Default for NoopPlatformService {
195    fn default() -> Self {
196        Self::new(PlatformIdentity::local("noop"))
197    }
198}
199
200impl PlatformService for NoopPlatformService {
201    fn identity(&self) -> PlatformIdentity {
202        self.identity.clone()
203    }
204
205    fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
206        Arc::clone(&self.readiness_gate)
207    }
208
209    fn leadership(&self) -> Arc<dyn LeadershipService> {
210        Arc::clone(&self.leadership)
211    }
212}
213
214/// No-op readiness gate — all calls are no-ops, never blocks.
215pub struct NoopReadinessGate;
216
217#[async_trait]
218impl ReadinessGate for NoopReadinessGate {
219    async fn notify_ready(&self) -> Result<(), CamelError> {
220        Ok(())
221    }
222    async fn notify_not_ready(&self, _reason: &str) -> Result<(), CamelError> {
223        Ok(())
224    }
225    async fn notify_starting(&self) -> Result<(), CamelError> {
226        Ok(())
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn test_platform_identity_local() {
236        let id = PlatformIdentity::local("my-node");
237        assert_eq!(id.node_id, "my-node");
238        assert!(id.namespace.is_none());
239        assert!(id.labels.is_empty());
240    }
241
242    #[tokio::test]
243    async fn test_noop_leadership_service_is_leader() {
244        let leadership = NoopLeadershipService::new();
245        let handle = leadership.start("lock-a").await.unwrap();
246        assert!(handle.is_leader());
247    }
248
249    #[tokio::test]
250    async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
251        let leadership = NoopLeadershipService::new();
252        let lock_a = leadership.start("lock-a").await.unwrap();
253        let lock_b = leadership.start("lock-b").await.unwrap();
254
255        assert!(lock_a.is_leader());
256        assert!(lock_b.is_leader());
257
258        lock_a.step_down().await.unwrap();
259        lock_b.step_down().await.unwrap();
260    }
261
262    #[tokio::test]
263    async fn test_noop_leadership_service_same_lock_allows_multiple() {
264        let leadership = NoopLeadershipService::new();
265        let first = leadership.start("lock-a").await.unwrap();
266        let second = leadership.start("lock-a").await.unwrap();
267
268        assert!(first.is_leader());
269        assert!(second.is_leader());
270
271        first.step_down().await.unwrap();
272        second.step_down().await.unwrap();
273    }
274
275    #[tokio::test]
276    async fn test_noop_leadership_handle_semantics_and_reacquire() {
277        let leadership = NoopLeadershipService::new();
278        let handle = leadership.start("lock-a").await.unwrap();
279        let mut events = handle.events.clone();
280        let is_leader = Arc::clone(&handle.is_leader);
281
282        let event = handle.events.borrow().clone();
283        assert_eq!(event, Some(LeadershipEvent::StartedLeading));
284
285        handle.step_down().await.unwrap();
286        events.changed().await.unwrap();
287        assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
288        assert!(!is_leader.load(Ordering::Acquire));
289
290        let reacquired = leadership.start("lock-a").await;
291        assert!(reacquired.is_ok());
292    }
293
294    #[tokio::test]
295    async fn test_noop_leadership_drop_cleans_up() {
296        let leadership = NoopLeadershipService::new();
297        let handle = leadership.start("lock-drop").await.unwrap();
298        assert!(handle.is_leader());
299        drop(handle);
300
301        let handle2 = leadership.start("lock-drop").await.unwrap();
302        assert!(handle2.is_leader());
303        handle2.step_down().await.unwrap();
304    }
305
306    #[tokio::test]
307    async fn test_noop_readiness_gate_all_methods() {
308        let gate = NoopReadinessGate;
309        gate.notify_starting().await.unwrap();
310        gate.notify_not_ready("test").await.unwrap();
311        gate.notify_ready().await.unwrap();
312    }
313
314    #[test]
315    fn test_leadership_event_equality() {
316        assert_eq!(
317            LeadershipEvent::StartedLeading,
318            LeadershipEvent::StartedLeading
319        );
320        assert_ne!(
321            LeadershipEvent::StartedLeading,
322            LeadershipEvent::StoppedLeading
323        );
324    }
325
326    #[test]
327    fn test_platform_error_display() {
328        let e = PlatformError::LockAlreadyActive {
329            lock_name: "alpha".into(),
330        };
331        assert!(e.to_string().contains("alpha"));
332        let e2 = PlatformError::NotAvailable("no k8s".into());
333        assert!(e2.to_string().contains("no k8s"));
334    }
335}