Skip to main content

camel_api/
platform.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10use crate::CamelError;
11
12/// Node identity in the platform environment.
13/// In Kubernetes: pod name, namespace, labels from Downward API.
14/// In local/test: hostname or user-supplied string.
15#[derive(Debug, Clone)]
16pub struct PlatformIdentity {
17    pub node_id: String,
18    pub namespace: Option<String>,
19    pub labels: HashMap<String, String>,
20}
21
22impl PlatformIdentity {
23    pub fn local(node_id: impl Into<String>) -> Self {
24        Self {
25            node_id: node_id.into(),
26            namespace: None,
27            labels: HashMap::new(),
28        }
29    }
30}
31
32/// Leadership state change events delivered asynchronously.
33#[derive(Debug, Clone, PartialEq)]
34pub enum LeadershipEvent {
35    StartedLeading,
36    StoppedLeading,
37}
38
39/// Platform errors.
40#[derive(Debug, Error)]
41pub enum PlatformError {
42    #[error("leadership lock already active: {lock_name}")]
43    LockAlreadyActive { lock_name: String },
44    #[error("step_down failed: elector loop terminated unexpectedly")]
45    StepDownFailed,
46    #[error("platform not available: {0}")]
47    NotAvailable(String),
48    #[error("configuration error: {0}")]
49    Config(String),
50}
51
52/// Handle returned by `LeadershipService::start()`.
53pub struct LeadershipHandle {
54    /// Subscribe to leadership state changes.
55    pub events: watch::Receiver<Option<LeadershipEvent>>,
56    /// Atomic readable shortcut for current leadership state.
57    is_leader: Arc<AtomicBool>,
58    /// Internal — used by `step_down()` to cancel the elector loop.
59    cancel: CancellationToken,
60    /// Await full loop termination after `step_down()`.
61    terminated: Option<tokio::sync::oneshot::Receiver<()>>,
62}
63
64impl LeadershipHandle {
65    /// Public constructor — required by `camel-platform-kubernetes` which lives in a separate crate
66    /// and cannot use struct literal syntax for fields that are private.
67    pub fn new(
68        events: watch::Receiver<Option<LeadershipEvent>>,
69        is_leader: Arc<AtomicBool>,
70        cancel: CancellationToken,
71        terminated: tokio::sync::oneshot::Receiver<()>,
72    ) -> Self {
73        Self {
74            events,
75            is_leader,
76            cancel,
77            terminated: Some(terminated),
78        }
79    }
80
81    pub fn is_leader(&self) -> bool {
82        self.is_leader.load(Ordering::Acquire)
83    }
84
85    /// Signal step-down AND await full teardown:
86    /// lease release + loop termination + `StoppedLeading` delivered.
87    pub async fn step_down(mut self) -> Result<(), PlatformError> {
88        self.cancel.cancel();
89        self.terminated
90            .take()
91            .ok_or(PlatformError::StepDownFailed)?
92            .await
93            .map_err(|_| PlatformError::StepDownFailed)
94    }
95}
96
97impl Drop for LeadershipHandle {
98    fn drop(&mut self) {
99        self.cancel.cancel();
100    }
101}
102
103/// Leadership abstraction.
104#[async_trait]
105pub trait LeadershipService: Send + Sync {
106    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
107}
108
109/// Platform service abstraction.
110pub trait PlatformService: Send + Sync {
111    fn identity(&self) -> PlatformIdentity;
112    fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
113    fn leadership(&self) -> Arc<dyn LeadershipService>;
114}
115
116/// Readiness gate — local override that forces readiness state regardless of `HealthSource`.
117/// Name reflects actual role: a gate on local health state, not a push to external system.
118///
119/// Precedence (highest to lowest):
120///   1. `notify_starting()` → NotReady, always
121///   2. `notify_not_ready(reason)` → NotReady
122///   3. `HealthSource::readiness()` — fallback when no override active
123#[async_trait]
124pub trait ReadinessGate: Send + Sync {
125    async fn notify_ready(&self) -> Result<(), CamelError>;
126    async fn notify_not_ready(&self, reason: &str) -> Result<(), CamelError>;
127    async fn notify_starting(&self) -> Result<(), CamelError>;
128}
129
130/// No-op leadership service.
131/// Correct for single-node deployments and tests that do not need real K8s.
132///
133/// Allows multiple `start()` calls for the same lock name — each returns an
134/// independent `LeadershipHandle`. This matches the `master:` semantics where
135/// multiple routes can compete for the same lock.
136pub struct NoopLeadershipService;
137
138impl NoopLeadershipService {
139    pub fn new() -> Self {
140        Self
141    }
142}
143
144impl Default for NoopLeadershipService {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150#[async_trait]
151impl LeadershipService for NoopLeadershipService {
152    async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
153        let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
154        let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
155        let cancel = CancellationToken::new();
156        let cancel_for_task = cancel.clone();
157        let is_leader = Arc::new(AtomicBool::new(true));
158        let is_leader_for_task = Arc::clone(&is_leader);
159        let lock_name = lock_name.to_string();
160
161        tokio::spawn(async move {
162            cancel_for_task.cancelled().await;
163            is_leader_for_task.store(false, Ordering::Release);
164            let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
165            drop(lock_name);
166            let _ = term_tx.send(());
167        });
168
169        Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
170    }
171}
172
173/// No-op platform service.
174pub struct NoopPlatformService {
175    identity: PlatformIdentity,
176    readiness_gate: Arc<dyn ReadinessGate>,
177    leadership: Arc<dyn LeadershipService>,
178}
179
180impl NoopPlatformService {
181    pub fn new(identity: PlatformIdentity) -> Self {
182        Self {
183            identity,
184            readiness_gate: Arc::new(NoopReadinessGate),
185            leadership: Arc::new(NoopLeadershipService::new()),
186        }
187    }
188}
189
190impl Default for NoopPlatformService {
191    fn default() -> Self {
192        Self::new(PlatformIdentity::local("noop"))
193    }
194}
195
196impl PlatformService for NoopPlatformService {
197    fn identity(&self) -> PlatformIdentity {
198        self.identity.clone()
199    }
200
201    fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
202        Arc::clone(&self.readiness_gate)
203    }
204
205    fn leadership(&self) -> Arc<dyn LeadershipService> {
206        Arc::clone(&self.leadership)
207    }
208}
209
210/// No-op readiness gate — all calls are no-ops, never blocks.
211pub struct NoopReadinessGate;
212
213#[async_trait]
214impl ReadinessGate for NoopReadinessGate {
215    async fn notify_ready(&self) -> Result<(), CamelError> {
216        Ok(())
217    }
218    async fn notify_not_ready(&self, _reason: &str) -> Result<(), CamelError> {
219        Ok(())
220    }
221    async fn notify_starting(&self) -> Result<(), CamelError> {
222        Ok(())
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229
230    #[test]
231    fn test_platform_identity_local() {
232        let id = PlatformIdentity::local("my-node");
233        assert_eq!(id.node_id, "my-node");
234        assert!(id.namespace.is_none());
235        assert!(id.labels.is_empty());
236    }
237
238    #[tokio::test]
239    async fn test_noop_leadership_service_is_leader() {
240        let leadership = NoopLeadershipService::new();
241        let handle = leadership.start("lock-a").await.unwrap();
242        assert!(handle.is_leader());
243    }
244
245    #[tokio::test]
246    async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
247        let leadership = NoopLeadershipService::new();
248        let lock_a = leadership.start("lock-a").await.unwrap();
249        let lock_b = leadership.start("lock-b").await.unwrap();
250
251        assert!(lock_a.is_leader());
252        assert!(lock_b.is_leader());
253
254        lock_a.step_down().await.unwrap();
255        lock_b.step_down().await.unwrap();
256    }
257
258    #[tokio::test]
259    async fn test_noop_leadership_service_same_lock_allows_multiple() {
260        let leadership = NoopLeadershipService::new();
261        let first = leadership.start("lock-a").await.unwrap();
262        let second = leadership.start("lock-a").await.unwrap();
263
264        assert!(first.is_leader());
265        assert!(second.is_leader());
266
267        first.step_down().await.unwrap();
268        second.step_down().await.unwrap();
269    }
270
271    #[tokio::test]
272    async fn test_noop_leadership_handle_semantics_and_reacquire() {
273        let leadership = NoopLeadershipService::new();
274        let handle = leadership.start("lock-a").await.unwrap();
275        let mut events = handle.events.clone();
276        let is_leader = Arc::clone(&handle.is_leader);
277
278        let event = handle.events.borrow().clone();
279        assert_eq!(event, Some(LeadershipEvent::StartedLeading));
280
281        handle.step_down().await.unwrap();
282        events.changed().await.unwrap();
283        assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
284        assert!(!is_leader.load(Ordering::Acquire));
285
286        let reacquired = leadership.start("lock-a").await;
287        assert!(reacquired.is_ok());
288    }
289
290    #[tokio::test]
291    async fn test_noop_leadership_drop_cleans_up() {
292        let leadership = NoopLeadershipService::new();
293        let handle = leadership.start("lock-drop").await.unwrap();
294        assert!(handle.is_leader());
295        drop(handle);
296
297        let handle2 = leadership.start("lock-drop").await.unwrap();
298        assert!(handle2.is_leader());
299        handle2.step_down().await.unwrap();
300    }
301
302    #[tokio::test]
303    async fn test_noop_readiness_gate_all_methods() {
304        let gate = NoopReadinessGate;
305        gate.notify_starting().await.unwrap();
306        gate.notify_not_ready("test").await.unwrap();
307        gate.notify_ready().await.unwrap();
308    }
309
310    #[test]
311    fn test_leadership_event_equality() {
312        assert_eq!(
313            LeadershipEvent::StartedLeading,
314            LeadershipEvent::StartedLeading
315        );
316        assert_ne!(
317            LeadershipEvent::StartedLeading,
318            LeadershipEvent::StoppedLeading
319        );
320    }
321
322    #[test]
323    fn test_platform_error_display() {
324        let e = PlatformError::LockAlreadyActive {
325            lock_name: "alpha".into(),
326        };
327        assert!(e.to_string().contains("alpha"));
328        let e2 = PlatformError::NotAvailable("no k8s".into());
329        assert!(e2.to_string().contains("no k8s"));
330    }
331}