1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4
5use async_trait::async_trait;
6use thiserror::Error;
7use tokio::sync::watch;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Debug, Clone)]
14pub struct PlatformIdentity {
15 pub node_id: String,
16 pub namespace: Option<String>,
17 pub labels: HashMap<String, String>,
18}
19
20impl PlatformIdentity {
21 pub fn local(node_id: impl Into<String>) -> Self {
22 Self {
23 node_id: node_id.into(),
24 namespace: None,
25 labels: HashMap::new(),
26 }
27 }
28}
29
30#[derive(Debug, Clone, PartialEq)]
32pub enum LeadershipEvent {
33 StartedLeading,
34 StoppedLeading,
35}
36
37#[derive(Debug, Error)]
39pub enum PlatformError {
40 #[error("leader elector already started")]
41 AlreadyStarted,
42 #[error("step_down failed: elector loop terminated unexpectedly")]
43 StepDownFailed,
44 #[error("platform not available: {0}")]
45 NotAvailable(String),
46 #[error("configuration error: {0}")]
47 Config(String),
48}
49
50pub struct LeadershipHandle {
53 pub events: watch::Receiver<Option<LeadershipEvent>>,
55 is_leader: Arc<AtomicBool>,
57 cancel: CancellationToken,
59 terminated: tokio::sync::oneshot::Receiver<()>,
61}
62
63impl LeadershipHandle {
64 pub fn new(
67 events: watch::Receiver<Option<LeadershipEvent>>,
68 is_leader: Arc<AtomicBool>,
69 cancel: CancellationToken,
70 terminated: tokio::sync::oneshot::Receiver<()>,
71 ) -> Self {
72 Self {
73 events,
74 is_leader,
75 cancel,
76 terminated,
77 }
78 }
79
80 pub fn is_leader(&self) -> bool {
81 self.is_leader.load(Ordering::Acquire)
82 }
83
84 pub async fn step_down(self) -> Result<(), PlatformError> {
87 self.cancel.cancel();
88 self.terminated
89 .await
90 .map_err(|_| PlatformError::StepDownFailed)
91 }
92}
93
94#[async_trait]
98pub trait LeaderElector: Send + Sync {
99 async fn start(&self, identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError>;
100}
101
102#[async_trait]
110pub trait ReadinessGate: Send + Sync {
111 async fn notify_ready(&self);
112 async fn notify_not_ready(&self, reason: &str);
113 async fn notify_starting(&self);
114}
115
116pub struct NoopLeaderElector;
119
120#[async_trait]
121impl LeaderElector for NoopLeaderElector {
122 async fn start(&self, _identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError> {
123 let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
124 let (_term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
125 drop(tx);
127 Ok(LeadershipHandle::new(
128 rx,
129 Arc::new(AtomicBool::new(true)),
130 CancellationToken::new(),
131 term_rx,
132 ))
133 }
134}
135
136pub struct NoopReadinessGate;
138
139#[async_trait]
140impl ReadinessGate for NoopReadinessGate {
141 async fn notify_ready(&self) {}
142 async fn notify_not_ready(&self, _reason: &str) {}
143 async fn notify_starting(&self) {}
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn test_platform_identity_local() {
152 let id = PlatformIdentity::local("my-node");
153 assert_eq!(id.node_id, "my-node");
154 assert!(id.namespace.is_none());
155 assert!(id.labels.is_empty());
156 }
157
158 #[tokio::test]
159 async fn test_noop_leader_elector_is_leader() {
160 let elector = NoopLeaderElector;
161 let handle = elector
162 .start(PlatformIdentity::local("test"))
163 .await
164 .unwrap();
165 assert!(handle.is_leader());
166 }
167
168 #[tokio::test]
169 async fn test_noop_leader_elector_event_started_leading() {
170 let elector = NoopLeaderElector;
171 let handle = elector
172 .start(PlatformIdentity::local("test"))
173 .await
174 .unwrap();
175 let event = handle.events.borrow().clone();
176 assert_eq!(event, Some(LeadershipEvent::StartedLeading));
177 }
178
179 #[tokio::test]
180 async fn test_noop_leader_elector_step_down() {
181 let elector = NoopLeaderElector;
182 let handle = elector
183 .start(PlatformIdentity::local("test"))
184 .await
185 .unwrap();
186 let result = handle.step_down().await;
188 assert!(
190 result.is_err(),
191 "NoopLeaderElector step_down should return StepDownFailed because _term_tx is dropped"
192 );
193 }
194
195 #[tokio::test]
196 async fn test_noop_readiness_gate_all_methods() {
197 let gate = NoopReadinessGate;
198 gate.notify_starting().await;
199 gate.notify_not_ready("test").await;
200 gate.notify_ready().await;
201 }
203
204 #[test]
205 fn test_leadership_event_equality() {
206 assert_eq!(
207 LeadershipEvent::StartedLeading,
208 LeadershipEvent::StartedLeading
209 );
210 assert_ne!(
211 LeadershipEvent::StartedLeading,
212 LeadershipEvent::StoppedLeading
213 );
214 }
215
216 #[test]
217 fn test_platform_error_display() {
218 let e = PlatformError::AlreadyStarted;
219 assert!(e.to_string().contains("already started"));
220 let e2 = PlatformError::NotAvailable("no k8s".into());
221 assert!(e2.to_string().contains("no k8s"));
222 }
223}