1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10use crate::CamelError;
11
12#[derive(Debug, Clone)]
16pub struct PlatformIdentity {
17 pub node_id: String,
18 pub namespace: Option<String>,
19 pub labels: HashMap<String, String>,
20}
21
22impl PlatformIdentity {
23 pub fn local(node_id: impl Into<String>) -> Self {
24 Self {
25 node_id: node_id.into(),
26 namespace: None,
27 labels: HashMap::new(),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq)]
34pub enum LeadershipEvent {
35 StartedLeading,
36 StoppedLeading,
37}
38
39#[derive(Debug, Error)]
41pub enum PlatformError {
42 #[error("leadership lock already active: {lock_name}")]
43 LockAlreadyActive { lock_name: String },
44 #[error("step_down failed: elector loop terminated unexpectedly")]
45 StepDownFailed,
46 #[error("platform not available: {0}")]
47 NotAvailable(String),
48 #[error("configuration error: {0}")]
49 Config(String),
50}
51
52pub struct LeadershipHandle {
54 pub events: watch::Receiver<Option<LeadershipEvent>>,
56 is_leader: Arc<AtomicBool>,
58 cancel: CancellationToken,
60 terminated: Option<tokio::sync::oneshot::Receiver<()>>,
62}
63
64impl LeadershipHandle {
65 pub fn new(
68 events: watch::Receiver<Option<LeadershipEvent>>,
69 is_leader: Arc<AtomicBool>,
70 cancel: CancellationToken,
71 terminated: tokio::sync::oneshot::Receiver<()>,
72 ) -> Self {
73 Self {
74 events,
75 is_leader,
76 cancel,
77 terminated: Some(terminated),
78 }
79 }
80
81 pub fn is_leader(&self) -> bool {
82 self.is_leader.load(Ordering::Acquire)
83 }
84
85 pub async fn step_down(mut self) -> Result<(), PlatformError> {
89 self.cancel.cancel();
90 let rx = self
91 .terminated
92 .take()
93 .ok_or(PlatformError::StepDownFailed)?;
94 tokio::time::timeout(std::time::Duration::from_secs(10), rx)
95 .await
96 .map_err(|_| PlatformError::StepDownFailed)?
97 .map_err(|_| PlatformError::StepDownFailed)
98 }
99}
100
101impl Drop for LeadershipHandle {
102 fn drop(&mut self) {
103 self.cancel.cancel();
104 }
105}
106
107#[async_trait]
109pub trait LeadershipService: Send + Sync {
110 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
111}
112
113pub trait PlatformService: Send + Sync {
115 fn identity(&self) -> PlatformIdentity;
116 fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
117 fn leadership(&self) -> Arc<dyn LeadershipService>;
118}
119
120#[async_trait]
128pub trait ReadinessGate: Send + Sync {
129 async fn notify_ready(&self) -> Result<(), CamelError>;
130 async fn notify_not_ready(&self, reason: &str) -> Result<(), CamelError>;
131 async fn notify_starting(&self) -> Result<(), CamelError>;
132}
133
134pub struct NoopLeadershipService;
141
142impl NoopLeadershipService {
143 pub fn new() -> Self {
144 Self
145 }
146}
147
148impl Default for NoopLeadershipService {
149 fn default() -> Self {
150 Self::new()
151 }
152}
153
154#[async_trait]
155impl LeadershipService for NoopLeadershipService {
156 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
157 let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
158 let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
159 let cancel = CancellationToken::new();
160 let cancel_for_task = cancel.clone();
161 let is_leader = Arc::new(AtomicBool::new(true));
162 let is_leader_for_task = Arc::clone(&is_leader);
163 let lock_name = lock_name.to_string();
164
165 tokio::spawn(async move {
166 cancel_for_task.cancelled().await;
167 is_leader_for_task.store(false, Ordering::Release);
168 let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
169 drop(lock_name);
170 let _ = term_tx.send(());
171 });
172
173 Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
174 }
175}
176
177pub struct NoopPlatformService {
179 identity: PlatformIdentity,
180 readiness_gate: Arc<dyn ReadinessGate>,
181 leadership: Arc<dyn LeadershipService>,
182}
183
184impl NoopPlatformService {
185 pub fn new(identity: PlatformIdentity) -> Self {
186 Self {
187 identity,
188 readiness_gate: Arc::new(NoopReadinessGate),
189 leadership: Arc::new(NoopLeadershipService::new()),
190 }
191 }
192}
193
194impl Default for NoopPlatformService {
195 fn default() -> Self {
196 Self::new(PlatformIdentity::local("noop"))
197 }
198}
199
200impl PlatformService for NoopPlatformService {
201 fn identity(&self) -> PlatformIdentity {
202 self.identity.clone()
203 }
204
205 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
206 Arc::clone(&self.readiness_gate)
207 }
208
209 fn leadership(&self) -> Arc<dyn LeadershipService> {
210 Arc::clone(&self.leadership)
211 }
212}
213
214pub struct NoopReadinessGate;
216
217#[async_trait]
218impl ReadinessGate for NoopReadinessGate {
219 async fn notify_ready(&self) -> Result<(), CamelError> {
220 Ok(())
221 }
222 async fn notify_not_ready(&self, _reason: &str) -> Result<(), CamelError> {
223 Ok(())
224 }
225 async fn notify_starting(&self) -> Result<(), CamelError> {
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn test_platform_identity_local() {
236 let id = PlatformIdentity::local("my-node");
237 assert_eq!(id.node_id, "my-node");
238 assert!(id.namespace.is_none());
239 assert!(id.labels.is_empty());
240 }
241
242 #[tokio::test]
243 async fn test_noop_leadership_service_is_leader() {
244 let leadership = NoopLeadershipService::new();
245 let handle = leadership.start("lock-a").await.unwrap();
246 assert!(handle.is_leader());
247 }
248
249 #[tokio::test]
250 async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
251 let leadership = NoopLeadershipService::new();
252 let lock_a = leadership.start("lock-a").await.unwrap();
253 let lock_b = leadership.start("lock-b").await.unwrap();
254
255 assert!(lock_a.is_leader());
256 assert!(lock_b.is_leader());
257
258 lock_a.step_down().await.unwrap();
259 lock_b.step_down().await.unwrap();
260 }
261
262 #[tokio::test]
263 async fn test_noop_leadership_service_same_lock_allows_multiple() {
264 let leadership = NoopLeadershipService::new();
265 let first = leadership.start("lock-a").await.unwrap();
266 let second = leadership.start("lock-a").await.unwrap();
267
268 assert!(first.is_leader());
269 assert!(second.is_leader());
270
271 first.step_down().await.unwrap();
272 second.step_down().await.unwrap();
273 }
274
275 #[tokio::test]
276 async fn test_noop_leadership_handle_semantics_and_reacquire() {
277 let leadership = NoopLeadershipService::new();
278 let handle = leadership.start("lock-a").await.unwrap();
279 let mut events = handle.events.clone();
280 let is_leader = Arc::clone(&handle.is_leader);
281
282 let event = handle.events.borrow().clone();
283 assert_eq!(event, Some(LeadershipEvent::StartedLeading));
284
285 handle.step_down().await.unwrap();
286 events.changed().await.unwrap();
287 assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
288 assert!(!is_leader.load(Ordering::Acquire));
289
290 let reacquired = leadership.start("lock-a").await;
291 assert!(reacquired.is_ok());
292 }
293
294 #[tokio::test]
295 async fn test_noop_leadership_drop_cleans_up() {
296 let leadership = NoopLeadershipService::new();
297 let handle = leadership.start("lock-drop").await.unwrap();
298 assert!(handle.is_leader());
299 drop(handle);
300
301 let handle2 = leadership.start("lock-drop").await.unwrap();
302 assert!(handle2.is_leader());
303 handle2.step_down().await.unwrap();
304 }
305
306 #[tokio::test]
307 async fn test_noop_readiness_gate_all_methods() {
308 let gate = NoopReadinessGate;
309 gate.notify_starting().await.unwrap();
310 gate.notify_not_ready("test").await.unwrap();
311 gate.notify_ready().await.unwrap();
312 }
313
314 #[test]
315 fn test_leadership_event_equality() {
316 assert_eq!(
317 LeadershipEvent::StartedLeading,
318 LeadershipEvent::StartedLeading
319 );
320 assert_ne!(
321 LeadershipEvent::StartedLeading,
322 LeadershipEvent::StoppedLeading
323 );
324 }
325
326 #[test]
327 fn test_platform_error_display() {
328 let e = PlatformError::LockAlreadyActive {
329 lock_name: "alpha".into(),
330 };
331 assert!(e.to_string().contains("alpha"));
332 let e2 = PlatformError::NotAvailable("no k8s".into());
333 assert!(e2.to_string().contains("no k8s"));
334 }
335}