Skip to main content

camel_api/
platform.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10/// Node identity in the platform environment.
11/// In Kubernetes: pod name, namespace, labels from Downward API.
12/// In local/test: hostname or user-supplied string.
13#[derive(Debug, Clone)]
14pub struct PlatformIdentity {
15    pub node_id: String,
16    pub namespace: Option<String>,
17    pub labels: HashMap<String, String>,
18}
19
20impl PlatformIdentity {
21    pub fn local(node_id: impl Into<String>) -> Self {
22        Self {
23            node_id: node_id.into(),
24            namespace: None,
25            labels: HashMap::new(),
26        }
27    }
28}
29
30/// Leadership state change events delivered asynchronously.
31#[derive(Debug, Clone, PartialEq)]
32pub enum LeadershipEvent {
33    StartedLeading,
34    StoppedLeading,
35}
36
37/// Platform errors.
38#[derive(Debug, Error)]
39pub enum PlatformError {
40    #[error("leadership lock already active: {lock_name}")]
41    LockAlreadyActive { lock_name: String },
42    #[error("step_down failed: elector loop terminated unexpectedly")]
43    StepDownFailed,
44    #[error("platform not available: {0}")]
45    NotAvailable(String),
46    #[error("configuration error: {0}")]
47    Config(String),
48}
49
50/// Handle returned by `LeadershipService::start()`.
51pub struct LeadershipHandle {
52    /// Subscribe to leadership state changes.
53    pub events: watch::Receiver<Option<LeadershipEvent>>,
54    /// Atomic readable shortcut for current leadership state.
55    is_leader: Arc<AtomicBool>,
56    /// Internal — used by `step_down()` to cancel the elector loop.
57    cancel: CancellationToken,
58    /// Await full loop termination after `step_down()`.
59    terminated: Option<tokio::sync::oneshot::Receiver<()>>,
60}
61
62impl LeadershipHandle {
63    /// Public constructor — required by `camel-platform-kubernetes` which lives in a separate crate
64    /// and cannot use struct literal syntax for fields that are private.
65    pub fn new(
66        events: watch::Receiver<Option<LeadershipEvent>>,
67        is_leader: Arc<AtomicBool>,
68        cancel: CancellationToken,
69        terminated: tokio::sync::oneshot::Receiver<()>,
70    ) -> Self {
71        Self {
72            events,
73            is_leader,
74            cancel,
75            terminated: Some(terminated),
76        }
77    }
78
79    pub fn is_leader(&self) -> bool {
80        self.is_leader.load(Ordering::Acquire)
81    }
82
83    /// Signal step-down AND await full teardown:
84    /// lease release + loop termination + `StoppedLeading` delivered.
85    pub async fn step_down(mut self) -> Result<(), PlatformError> {
86        self.cancel.cancel();
87        self.terminated
88            .take()
89            .ok_or(PlatformError::StepDownFailed)?
90            .await
91            .map_err(|_| PlatformError::StepDownFailed)
92    }
93}
94
95impl Drop for LeadershipHandle {
96    fn drop(&mut self) {
97        self.cancel.cancel();
98    }
99}
100
101/// Leadership abstraction.
102#[async_trait]
103pub trait LeadershipService: Send + Sync {
104    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
105}
106
107/// Platform service abstraction.
108pub trait PlatformService: Send + Sync {
109    fn identity(&self) -> PlatformIdentity;
110    fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
111    fn leadership(&self) -> Arc<dyn LeadershipService>;
112}
113
114/// Readiness gate — local override that forces readiness state regardless of `HealthSource`.
115/// Name reflects actual role: a gate on local health state, not a push to external system.
116///
117/// Precedence (highest to lowest):
118///   1. `notify_starting()` → NotReady, always
119///   2. `notify_not_ready(reason)` → NotReady
120///   3. `HealthSource::readiness()` — fallback when no override active
121#[async_trait]
122pub trait ReadinessGate: Send + Sync {
123    async fn notify_ready(&self);
124    async fn notify_not_ready(&self, reason: &str);
125    async fn notify_starting(&self);
126}
127
128/// No-op leadership service.
129/// Correct for single-node deployments and tests that do not need real K8s.
130///
131/// Allows multiple `start()` calls for the same lock name — each returns an
132/// independent `LeadershipHandle`. This matches the `master:` semantics where
133/// multiple routes can compete for the same lock.
134pub struct NoopLeadershipService;
135
136impl NoopLeadershipService {
137    pub fn new() -> Self {
138        Self
139    }
140}
141
142impl Default for NoopLeadershipService {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148#[async_trait]
149impl LeadershipService for NoopLeadershipService {
150    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
151        let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
152        let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
153        let cancel = CancellationToken::new();
154        let cancel_for_task = cancel.clone();
155        let is_leader = Arc::new(AtomicBool::new(true));
156        let is_leader_for_task = Arc::clone(&is_leader);
157        let lock_name = lock_name.to_string();
158
159        tokio::spawn(async move {
160            cancel_for_task.cancelled().await;
161            is_leader_for_task.store(false, Ordering::Release);
162            let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
163            drop(lock_name);
164            let _ = term_tx.send(());
165        });
166
167        Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
168    }
169}
170
171/// No-op platform service.
172pub struct NoopPlatformService {
173    identity: PlatformIdentity,
174    readiness_gate: Arc<dyn ReadinessGate>,
175    leadership: Arc<dyn LeadershipService>,
176}
177
178impl NoopPlatformService {
179    pub fn new(identity: PlatformIdentity) -> Self {
180        Self {
181            identity,
182            readiness_gate: Arc::new(NoopReadinessGate),
183            leadership: Arc::new(NoopLeadershipService::new()),
184        }
185    }
186}
187
188impl Default for NoopPlatformService {
189    fn default() -> Self {
190        Self::new(PlatformIdentity::local("noop"))
191    }
192}
193
194impl PlatformService for NoopPlatformService {
195    fn identity(&self) -> PlatformIdentity {
196        self.identity.clone()
197    }
198
199    fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
200        Arc::clone(&self.readiness_gate)
201    }
202
203    fn leadership(&self) -> Arc<dyn LeadershipService> {
204        Arc::clone(&self.leadership)
205    }
206}
207
208/// No-op readiness gate — all calls are no-ops, never blocks.
209pub struct NoopReadinessGate;
210
211#[async_trait]
212impl ReadinessGate for NoopReadinessGate {
213    async fn notify_ready(&self) {}
214    async fn notify_not_ready(&self, _reason: &str) {}
215    async fn notify_starting(&self) {}
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221
222    #[test]
223    fn test_platform_identity_local() {
224        let id = PlatformIdentity::local("my-node");
225        assert_eq!(id.node_id, "my-node");
226        assert!(id.namespace.is_none());
227        assert!(id.labels.is_empty());
228    }
229
230    #[tokio::test]
231    async fn test_noop_leadership_service_is_leader() {
232        let leadership = NoopLeadershipService::new();
233        let handle = leadership.start("lock-a").await.unwrap();
234        assert!(handle.is_leader());
235    }
236
237    #[tokio::test]
238    async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
239        let leadership = NoopLeadershipService::new();
240        let lock_a = leadership.start("lock-a").await.unwrap();
241        let lock_b = leadership.start("lock-b").await.unwrap();
242
243        assert!(lock_a.is_leader());
244        assert!(lock_b.is_leader());
245
246        lock_a.step_down().await.unwrap();
247        lock_b.step_down().await.unwrap();
248    }
249
250    #[tokio::test]
251    async fn test_noop_leadership_service_same_lock_allows_multiple() {
252        let leadership = NoopLeadershipService::new();
253        let first = leadership.start("lock-a").await.unwrap();
254        let second = leadership.start("lock-a").await.unwrap();
255
256        assert!(first.is_leader());
257        assert!(second.is_leader());
258
259        first.step_down().await.unwrap();
260        second.step_down().await.unwrap();
261    }
262
263    #[tokio::test]
264    async fn test_noop_leadership_handle_semantics_and_reacquire() {
265        let leadership = NoopLeadershipService::new();
266        let handle = leadership.start("lock-a").await.unwrap();
267        let mut events = handle.events.clone();
268        let is_leader = Arc::clone(&handle.is_leader);
269
270        let event = handle.events.borrow().clone();
271        assert_eq!(event, Some(LeadershipEvent::StartedLeading));
272
273        handle.step_down().await.unwrap();
274        events.changed().await.unwrap();
275        assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
276        assert!(!is_leader.load(Ordering::Acquire));
277
278        let reacquired = leadership.start("lock-a").await;
279        assert!(reacquired.is_ok());
280    }
281
282    #[tokio::test]
283    async fn test_noop_leadership_drop_cleans_up() {
284        let leadership = NoopLeadershipService::new();
285        let handle = leadership.start("lock-drop").await.unwrap();
286        assert!(handle.is_leader());
287        drop(handle);
288
289        let handle2 = leadership.start("lock-drop").await.unwrap();
290        assert!(handle2.is_leader());
291        handle2.step_down().await.unwrap();
292    }
293
294    #[tokio::test]
295    async fn test_noop_readiness_gate_all_methods() {
296        let gate = NoopReadinessGate;
297        gate.notify_starting().await;
298        gate.notify_not_ready("test").await;
299        gate.notify_ready().await;
300    }
301
302    #[test]
303    fn test_leadership_event_equality() {
304        assert_eq!(
305            LeadershipEvent::StartedLeading,
306            LeadershipEvent::StartedLeading
307        );
308        assert_ne!(
309            LeadershipEvent::StartedLeading,
310            LeadershipEvent::StoppedLeading
311        );
312    }
313
314    #[test]
315    fn test_platform_error_display() {
316        let e = PlatformError::LockAlreadyActive {
317            lock_name: "alpha".into(),
318        };
319        assert!(e.to_string().contains("alpha"));
320        let e2 = PlatformError::NotAvailable("no k8s".into());
321        assert!(e2.to_string().contains("no k8s"));
322    }
323}