1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10use crate::CamelError;
11
12#[derive(Debug, Clone)]
16pub struct PlatformIdentity {
17 pub node_id: String,
18 pub namespace: Option<String>,
19 pub labels: HashMap<String, String>,
20}
21
22impl PlatformIdentity {
23 pub fn local(node_id: impl Into<String>) -> Self {
24 Self {
25 node_id: node_id.into(),
26 namespace: None,
27 labels: HashMap::new(),
28 }
29 }
30}
31
32#[derive(Debug, Clone, PartialEq)]
34pub enum LeadershipEvent {
35 StartedLeading,
36 StoppedLeading,
37}
38
39#[derive(Debug, Error)]
41pub enum PlatformError {
42 #[error("leadership lock already active: {lock_name}")]
43 LockAlreadyActive { lock_name: String },
44 #[error("step_down failed: elector loop terminated unexpectedly")]
45 StepDownFailed,
46 #[error("platform not available: {0}")]
47 NotAvailable(String),
48 #[error("configuration error: {0}")]
49 Config(String),
50}
51
52pub struct LeadershipHandle {
54 pub events: watch::Receiver<Option<LeadershipEvent>>,
56 is_leader: Arc<AtomicBool>,
58 cancel: CancellationToken,
60 terminated: Option<tokio::sync::oneshot::Receiver<()>>,
62}
63
64impl LeadershipHandle {
65 pub fn new(
68 events: watch::Receiver<Option<LeadershipEvent>>,
69 is_leader: Arc<AtomicBool>,
70 cancel: CancellationToken,
71 terminated: tokio::sync::oneshot::Receiver<()>,
72 ) -> Self {
73 Self {
74 events,
75 is_leader,
76 cancel,
77 terminated: Some(terminated),
78 }
79 }
80
81 pub fn is_leader(&self) -> bool {
82 self.is_leader.load(Ordering::Acquire)
83 }
84
85 pub async fn step_down(mut self) -> Result<(), PlatformError> {
88 self.cancel.cancel();
89 self.terminated
90 .take()
91 .ok_or(PlatformError::StepDownFailed)?
92 .await
93 .map_err(|_| PlatformError::StepDownFailed)
94 }
95}
96
97impl Drop for LeadershipHandle {
98 fn drop(&mut self) {
99 self.cancel.cancel();
100 }
101}
102
103#[async_trait]
105pub trait LeadershipService: Send + Sync {
106 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
107}
108
109pub trait PlatformService: Send + Sync {
111 fn identity(&self) -> PlatformIdentity;
112 fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
113 fn leadership(&self) -> Arc<dyn LeadershipService>;
114}
115
116#[async_trait]
124pub trait ReadinessGate: Send + Sync {
125 async fn notify_ready(&self) -> Result<(), CamelError>;
126 async fn notify_not_ready(&self, reason: &str) -> Result<(), CamelError>;
127 async fn notify_starting(&self) -> Result<(), CamelError>;
128}
129
130pub struct NoopLeadershipService;
137
138impl NoopLeadershipService {
139 pub fn new() -> Self {
140 Self
141 }
142}
143
144impl Default for NoopLeadershipService {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150#[async_trait]
151impl LeadershipService for NoopLeadershipService {
152 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
153 let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
154 let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
155 let cancel = CancellationToken::new();
156 let cancel_for_task = cancel.clone();
157 let is_leader = Arc::new(AtomicBool::new(true));
158 let is_leader_for_task = Arc::clone(&is_leader);
159 let lock_name = lock_name.to_string();
160
161 tokio::spawn(async move {
162 cancel_for_task.cancelled().await;
163 is_leader_for_task.store(false, Ordering::Release);
164 let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
165 drop(lock_name);
166 let _ = term_tx.send(());
167 });
168
169 Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
170 }
171}
172
173pub struct NoopPlatformService {
175 identity: PlatformIdentity,
176 readiness_gate: Arc<dyn ReadinessGate>,
177 leadership: Arc<dyn LeadershipService>,
178}
179
180impl NoopPlatformService {
181 pub fn new(identity: PlatformIdentity) -> Self {
182 Self {
183 identity,
184 readiness_gate: Arc::new(NoopReadinessGate),
185 leadership: Arc::new(NoopLeadershipService::new()),
186 }
187 }
188}
189
190impl Default for NoopPlatformService {
191 fn default() -> Self {
192 Self::new(PlatformIdentity::local("noop"))
193 }
194}
195
196impl PlatformService for NoopPlatformService {
197 fn identity(&self) -> PlatformIdentity {
198 self.identity.clone()
199 }
200
201 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
202 Arc::clone(&self.readiness_gate)
203 }
204
205 fn leadership(&self) -> Arc<dyn LeadershipService> {
206 Arc::clone(&self.leadership)
207 }
208}
209
210pub struct NoopReadinessGate;
212
213#[async_trait]
214impl ReadinessGate for NoopReadinessGate {
215 async fn notify_ready(&self) -> Result<(), CamelError> {
216 Ok(())
217 }
218 async fn notify_not_ready(&self, _reason: &str) -> Result<(), CamelError> {
219 Ok(())
220 }
221 async fn notify_starting(&self) -> Result<(), CamelError> {
222 Ok(())
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229
230 #[test]
231 fn test_platform_identity_local() {
232 let id = PlatformIdentity::local("my-node");
233 assert_eq!(id.node_id, "my-node");
234 assert!(id.namespace.is_none());
235 assert!(id.labels.is_empty());
236 }
237
238 #[tokio::test]
239 async fn test_noop_leadership_service_is_leader() {
240 let leadership = NoopLeadershipService::new();
241 let handle = leadership.start("lock-a").await.unwrap();
242 assert!(handle.is_leader());
243 }
244
245 #[tokio::test]
246 async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
247 let leadership = NoopLeadershipService::new();
248 let lock_a = leadership.start("lock-a").await.unwrap();
249 let lock_b = leadership.start("lock-b").await.unwrap();
250
251 assert!(lock_a.is_leader());
252 assert!(lock_b.is_leader());
253
254 lock_a.step_down().await.unwrap();
255 lock_b.step_down().await.unwrap();
256 }
257
258 #[tokio::test]
259 async fn test_noop_leadership_service_same_lock_allows_multiple() {
260 let leadership = NoopLeadershipService::new();
261 let first = leadership.start("lock-a").await.unwrap();
262 let second = leadership.start("lock-a").await.unwrap();
263
264 assert!(first.is_leader());
265 assert!(second.is_leader());
266
267 first.step_down().await.unwrap();
268 second.step_down().await.unwrap();
269 }
270
271 #[tokio::test]
272 async fn test_noop_leadership_handle_semantics_and_reacquire() {
273 let leadership = NoopLeadershipService::new();
274 let handle = leadership.start("lock-a").await.unwrap();
275 let mut events = handle.events.clone();
276 let is_leader = Arc::clone(&handle.is_leader);
277
278 let event = handle.events.borrow().clone();
279 assert_eq!(event, Some(LeadershipEvent::StartedLeading));
280
281 handle.step_down().await.unwrap();
282 events.changed().await.unwrap();
283 assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
284 assert!(!is_leader.load(Ordering::Acquire));
285
286 let reacquired = leadership.start("lock-a").await;
287 assert!(reacquired.is_ok());
288 }
289
290 #[tokio::test]
291 async fn test_noop_leadership_drop_cleans_up() {
292 let leadership = NoopLeadershipService::new();
293 let handle = leadership.start("lock-drop").await.unwrap();
294 assert!(handle.is_leader());
295 drop(handle);
296
297 let handle2 = leadership.start("lock-drop").await.unwrap();
298 assert!(handle2.is_leader());
299 handle2.step_down().await.unwrap();
300 }
301
302 #[tokio::test]
303 async fn test_noop_readiness_gate_all_methods() {
304 let gate = NoopReadinessGate;
305 gate.notify_starting().await.unwrap();
306 gate.notify_not_ready("test").await.unwrap();
307 gate.notify_ready().await.unwrap();
308 }
309
310 #[test]
311 fn test_leadership_event_equality() {
312 assert_eq!(
313 LeadershipEvent::StartedLeading,
314 LeadershipEvent::StartedLeading
315 );
316 assert_ne!(
317 LeadershipEvent::StartedLeading,
318 LeadershipEvent::StoppedLeading
319 );
320 }
321
322 #[test]
323 fn test_platform_error_display() {
324 let e = PlatformError::LockAlreadyActive {
325 lock_name: "alpha".into(),
326 };
327 assert!(e.to_string().contains("alpha"));
328 let e2 = PlatformError::NotAvailable("no k8s".into());
329 assert!(e2.to_string().contains("no k8s"));
330 }
331}