1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Debug, Clone)]
14pub struct PlatformIdentity {
15 pub node_id: String,
16 pub namespace: Option<String>,
17 pub labels: HashMap<String, String>,
18}
19
20impl PlatformIdentity {
21 pub fn local(node_id: impl Into<String>) -> Self {
22 Self {
23 node_id: node_id.into(),
24 namespace: None,
25 labels: HashMap::new(),
26 }
27 }
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum LeadershipEvent {
33 StartedLeading,
34 StoppedLeading,
35}
36
37#[derive(Debug, Error)]
39pub enum PlatformError {
40 #[error("leadership lock already active: {lock_name}")]
41 LockAlreadyActive { lock_name: String },
42 #[error("step_down failed: elector loop terminated unexpectedly")]
43 StepDownFailed,
44 #[error("platform not available: {0}")]
45 NotAvailable(String),
46 #[error("configuration error: {0}")]
47 Config(String),
48}
49
50pub struct LeadershipHandle {
52 pub events: watch::Receiver<Option<LeadershipEvent>>,
54 is_leader: Arc<AtomicBool>,
56 cancel: CancellationToken,
58 terminated: Option<tokio::sync::oneshot::Receiver<()>>,
60}
61
62impl LeadershipHandle {
63 pub fn new(
66 events: watch::Receiver<Option<LeadershipEvent>>,
67 is_leader: Arc<AtomicBool>,
68 cancel: CancellationToken,
69 terminated: tokio::sync::oneshot::Receiver<()>,
70 ) -> Self {
71 Self {
72 events,
73 is_leader,
74 cancel,
75 terminated: Some(terminated),
76 }
77 }
78
79 pub fn is_leader(&self) -> bool {
80 self.is_leader.load(Ordering::Acquire)
81 }
82
83 pub async fn step_down(mut self) -> Result<(), PlatformError> {
86 self.cancel.cancel();
87 self.terminated
88 .take()
89 .ok_or(PlatformError::StepDownFailed)?
90 .await
91 .map_err(|_| PlatformError::StepDownFailed)
92 }
93}
94
95impl Drop for LeadershipHandle {
96 fn drop(&mut self) {
97 self.cancel.cancel();
98 }
99}
100
101#[async_trait]
103pub trait LeadershipService: Send + Sync {
104 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
105}
106
107pub trait PlatformService: Send + Sync {
109 fn identity(&self) -> PlatformIdentity;
110 fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
111 fn leadership(&self) -> Arc<dyn LeadershipService>;
112}
113
114#[async_trait]
122pub trait ReadinessGate: Send + Sync {
123 async fn notify_ready(&self);
124 async fn notify_not_ready(&self, reason: &str);
125 async fn notify_starting(&self);
126}
127
128pub struct NoopLeadershipService;
135
136impl NoopLeadershipService {
137 pub fn new() -> Self {
138 Self
139 }
140}
141
142impl Default for NoopLeadershipService {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148#[async_trait]
149impl LeadershipService for NoopLeadershipService {
150 async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
151 let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
152 let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
153 let cancel = CancellationToken::new();
154 let cancel_for_task = cancel.clone();
155 let is_leader = Arc::new(AtomicBool::new(true));
156 let is_leader_for_task = Arc::clone(&is_leader);
157 let lock_name = lock_name.to_string();
158
159 tokio::spawn(async move {
160 cancel_for_task.cancelled().await;
161 is_leader_for_task.store(false, Ordering::Release);
162 let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
163 drop(lock_name);
164 let _ = term_tx.send(());
165 });
166
167 Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
168 }
169}
170
171pub struct NoopPlatformService {
173 identity: PlatformIdentity,
174 readiness_gate: Arc<dyn ReadinessGate>,
175 leadership: Arc<dyn LeadershipService>,
176}
177
178impl NoopPlatformService {
179 pub fn new(identity: PlatformIdentity) -> Self {
180 Self {
181 identity,
182 readiness_gate: Arc::new(NoopReadinessGate),
183 leadership: Arc::new(NoopLeadershipService::new()),
184 }
185 }
186}
187
188impl Default for NoopPlatformService {
189 fn default() -> Self {
190 Self::new(PlatformIdentity::local("noop"))
191 }
192}
193
194impl PlatformService for NoopPlatformService {
195 fn identity(&self) -> PlatformIdentity {
196 self.identity.clone()
197 }
198
199 fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
200 Arc::clone(&self.readiness_gate)
201 }
202
203 fn leadership(&self) -> Arc<dyn LeadershipService> {
204 Arc::clone(&self.leadership)
205 }
206}
207
208pub struct NoopReadinessGate;
210
211#[async_trait]
212impl ReadinessGate for NoopReadinessGate {
213 async fn notify_ready(&self) {}
214 async fn notify_not_ready(&self, _reason: &str) {}
215 async fn notify_starting(&self) {}
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221
222 #[test]
223 fn test_platform_identity_local() {
224 let id = PlatformIdentity::local("my-node");
225 assert_eq!(id.node_id, "my-node");
226 assert!(id.namespace.is_none());
227 assert!(id.labels.is_empty());
228 }
229
230 #[tokio::test]
231 async fn test_noop_leadership_service_is_leader() {
232 let leadership = NoopLeadershipService::new();
233 let handle = leadership.start("lock-a").await.unwrap();
234 assert!(handle.is_leader());
235 }
236
237 #[tokio::test]
238 async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
239 let leadership = NoopLeadershipService::new();
240 let lock_a = leadership.start("lock-a").await.unwrap();
241 let lock_b = leadership.start("lock-b").await.unwrap();
242
243 assert!(lock_a.is_leader());
244 assert!(lock_b.is_leader());
245
246 lock_a.step_down().await.unwrap();
247 lock_b.step_down().await.unwrap();
248 }
249
250 #[tokio::test]
251 async fn test_noop_leadership_service_same_lock_allows_multiple() {
252 let leadership = NoopLeadershipService::new();
253 let first = leadership.start("lock-a").await.unwrap();
254 let second = leadership.start("lock-a").await.unwrap();
255
256 assert!(first.is_leader());
257 assert!(second.is_leader());
258
259 first.step_down().await.unwrap();
260 second.step_down().await.unwrap();
261 }
262
263 #[tokio::test]
264 async fn test_noop_leadership_handle_semantics_and_reacquire() {
265 let leadership = NoopLeadershipService::new();
266 let handle = leadership.start("lock-a").await.unwrap();
267 let mut events = handle.events.clone();
268 let is_leader = Arc::clone(&handle.is_leader);
269
270 let event = handle.events.borrow().clone();
271 assert_eq!(event, Some(LeadershipEvent::StartedLeading));
272
273 handle.step_down().await.unwrap();
274 events.changed().await.unwrap();
275 assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
276 assert!(!is_leader.load(Ordering::Acquire));
277
278 let reacquired = leadership.start("lock-a").await;
279 assert!(reacquired.is_ok());
280 }
281
282 #[tokio::test]
283 async fn test_noop_leadership_drop_cleans_up() {
284 let leadership = NoopLeadershipService::new();
285 let handle = leadership.start("lock-drop").await.unwrap();
286 assert!(handle.is_leader());
287 drop(handle);
288
289 let handle2 = leadership.start("lock-drop").await.unwrap();
290 assert!(handle2.is_leader());
291 handle2.step_down().await.unwrap();
292 }
293
294 #[tokio::test]
295 async fn test_noop_readiness_gate_all_methods() {
296 let gate = NoopReadinessGate;
297 gate.notify_starting().await;
298 gate.notify_not_ready("test").await;
299 gate.notify_ready().await;
300 }
301
302 #[test]
303 fn test_leadership_event_equality() {
304 assert_eq!(
305 LeadershipEvent::StartedLeading,
306 LeadershipEvent::StartedLeading
307 );
308 assert_ne!(
309 LeadershipEvent::StartedLeading,
310 LeadershipEvent::StoppedLeading
311 );
312 }
313
314 #[test]
315 fn test_platform_error_display() {
316 let e = PlatformError::LockAlreadyActive {
317 lock_name: "alpha".into(),
318 };
319 assert!(e.to_string().contains("alpha"));
320 let e2 = PlatformError::NotAvailable("no k8s".into());
321 assert!(e2.to_string().contains("no k8s"));
322 }
323}