Skip to main content

camel_api/
platform.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10/// Node identity in the platform environment.
11/// In Kubernetes: pod name, namespace, labels from Downward API.
12/// In local/test: hostname or user-supplied string.
13#[derive(Debug, Clone)]
14pub struct PlatformIdentity {
15    pub node_id: String,
16    pub namespace: Option<String>,
17    pub labels: HashMap<String, String>,
18}
19
20impl PlatformIdentity {
21    pub fn local(node_id: impl Into<String>) -> Self {
22        Self {
23            node_id: node_id.into(),
24            namespace: None,
25            labels: HashMap::new(),
26        }
27    }
28}
29
30/// Leadership state change events delivered asynchronously.
31#[derive(Debug, Clone, PartialEq)]
32pub enum LeadershipEvent {
33    StartedLeading,
34    StoppedLeading,
35}
36
37/// Platform errors.
38#[derive(Debug, Error)]
39pub enum PlatformError {
40    #[error("leader elector already started")]
41    AlreadyStarted,
42    #[error("step_down failed: elector loop terminated unexpectedly")]
43    StepDownFailed,
44    #[error("platform not available: {0}")]
45    NotAvailable(String),
46    #[error("configuration error: {0}")]
47    Config(String),
48}
49
50/// Handle returned by `LeaderElector::start()`.
51/// One-shot per `LeaderElector` instance — create a new instance to restart.
52pub struct LeadershipHandle {
53    /// Subscribe to leadership state changes.
54    pub events: watch::Receiver<Option<LeadershipEvent>>,
55    /// Atomic readable shortcut for current leadership state.
56    is_leader: Arc<AtomicBool>,
57    /// Internal — used by `step_down()` to cancel the elector loop.
58    cancel: CancellationToken,
59    /// Await full loop termination after `step_down()`.
60    terminated: tokio::sync::oneshot::Receiver<()>,
61}
62
63impl LeadershipHandle {
64    /// Public constructor — required by `camel-platform-kubernetes` which lives in a separate crate
65    /// and cannot use struct literal syntax for fields that are private.
66    pub fn new(
67        events: watch::Receiver<Option<LeadershipEvent>>,
68        is_leader: Arc<AtomicBool>,
69        cancel: CancellationToken,
70        terminated: tokio::sync::oneshot::Receiver<()>,
71    ) -> Self {
72        Self {
73            events,
74            is_leader,
75            cancel,
76            terminated,
77        }
78    }
79
80    pub fn is_leader(&self) -> bool {
81        self.is_leader.load(Ordering::Acquire)
82    }
83
84    /// Signal step-down AND await full teardown:
85    /// lease release + loop termination + `StoppedLeading` delivered.
86    pub async fn step_down(self) -> Result<(), PlatformError> {
87        self.cancel.cancel();
88        self.terminated
89            .await
90            .map_err(|_| PlatformError::StepDownFailed)
91    }
92}
93
94/// Leader election abstraction.
95/// The elector owns the renew loop — callers do not manage timers.
96/// One-shot: `start()` may only be called once per instance.
97#[async_trait]
98pub trait LeaderElector: Send + Sync {
99    async fn start(&self, identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError>;
100}
101
102/// Readiness gate — local override that forces readiness state regardless of `HealthSource`.
103/// Name reflects actual role: a gate on local health state, not a push to external system.
104///
105/// Precedence (highest to lowest):
106///   1. `notify_starting()` → NotReady, always
107///   2. `notify_not_ready(reason)` → NotReady
108///   3. `HealthSource::readiness()` — fallback when no override active
109#[async_trait]
110pub trait ReadinessGate: Send + Sync {
111    async fn notify_ready(&self);
112    async fn notify_not_ready(&self, reason: &str);
113    async fn notify_starting(&self);
114}
115
116/// No-op leader elector — always wins leadership immediately.
117/// Correct for single-node deployments and tests that do not need real K8s.
118pub struct NoopLeaderElector;
119
120#[async_trait]
121impl LeaderElector for NoopLeaderElector {
122    async fn start(&self, _identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError> {
123        let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
124        let (_term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
125        // Drop tx immediately — channel stays at StartedLeading, never changes.
126        drop(tx);
127        Ok(LeadershipHandle::new(
128            rx,
129            Arc::new(AtomicBool::new(true)),
130            CancellationToken::new(),
131            term_rx,
132        ))
133    }
134}
135
136/// No-op readiness gate — all calls are no-ops, never blocks.
137pub struct NoopReadinessGate;
138
139#[async_trait]
140impl ReadinessGate for NoopReadinessGate {
141    async fn notify_ready(&self) {}
142    async fn notify_not_ready(&self, _reason: &str) {}
143    async fn notify_starting(&self) {}
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn test_platform_identity_local() {
152        let id = PlatformIdentity::local("my-node");
153        assert_eq!(id.node_id, "my-node");
154        assert!(id.namespace.is_none());
155        assert!(id.labels.is_empty());
156    }
157
158    #[tokio::test]
159    async fn test_noop_leader_elector_is_leader() {
160        let elector = NoopLeaderElector;
161        let handle = elector
162            .start(PlatformIdentity::local("test"))
163            .await
164            .unwrap();
165        assert!(handle.is_leader());
166    }
167
168    #[tokio::test]
169    async fn test_noop_leader_elector_event_started_leading() {
170        let elector = NoopLeaderElector;
171        let handle = elector
172            .start(PlatformIdentity::local("test"))
173            .await
174            .unwrap();
175        let event = handle.events.borrow().clone();
176        assert_eq!(event, Some(LeadershipEvent::StartedLeading));
177    }
178
179    #[tokio::test]
180    async fn test_noop_leader_elector_step_down() {
181        let elector = NoopLeaderElector;
182        let handle = elector
183            .start(PlatformIdentity::local("test"))
184            .await
185            .unwrap();
186        // step_down() should not hang — term_rx resolves when _term_tx is dropped
187        let result = handle.step_down().await;
188        // _term_tx was dropped in start(), so Receiver returns Err(RecvError) → StepDownFailed
189        assert!(
190            result.is_err(),
191            "NoopLeaderElector step_down should return StepDownFailed because _term_tx is dropped"
192        );
193    }
194
195    #[tokio::test]
196    async fn test_noop_readiness_gate_all_methods() {
197        let gate = NoopReadinessGate;
198        gate.notify_starting().await;
199        gate.notify_not_ready("test").await;
200        gate.notify_ready().await;
201        // All methods must complete without panicking
202    }
203
204    #[test]
205    fn test_leadership_event_equality() {
206        assert_eq!(
207            LeadershipEvent::StartedLeading,
208            LeadershipEvent::StartedLeading
209        );
210        assert_ne!(
211            LeadershipEvent::StartedLeading,
212            LeadershipEvent::StoppedLeading
213        );
214    }
215
216    #[test]
217    fn test_platform_error_display() {
218        let e = PlatformError::AlreadyStarted;
219        assert!(e.to_string().contains("already started"));
220        let e2 = PlatformError::NotAvailable("no k8s".into());
221        assert!(e2.to_string().contains("no k8s"));
222    }
223}