use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct PlatformIdentity {
pub node_id: String,
pub namespace: Option<String>,
pub labels: HashMap<String, String>,
}
impl PlatformIdentity {
pub fn local(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
namespace: None,
labels: HashMap::new(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum LeadershipEvent {
StartedLeading,
StoppedLeading,
}
#[derive(Debug, Error)]
pub enum PlatformError {
#[error("leader elector already started")]
AlreadyStarted,
#[error("step_down failed: elector loop terminated unexpectedly")]
StepDownFailed,
#[error("platform not available: {0}")]
NotAvailable(String),
#[error("configuration error: {0}")]
Config(String),
}
pub struct LeadershipHandle {
pub events: watch::Receiver<Option<LeadershipEvent>>,
is_leader: Arc<AtomicBool>,
cancel: CancellationToken,
terminated: tokio::sync::oneshot::Receiver<()>,
}
impl LeadershipHandle {
pub fn new(
events: watch::Receiver<Option<LeadershipEvent>>,
is_leader: Arc<AtomicBool>,
cancel: CancellationToken,
terminated: tokio::sync::oneshot::Receiver<()>,
) -> Self {
Self {
events,
is_leader,
cancel,
terminated,
}
}
pub fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Acquire)
}
pub async fn step_down(self) -> Result<(), PlatformError> {
self.cancel.cancel();
self.terminated
.await
.map_err(|_| PlatformError::StepDownFailed)
}
}
#[async_trait]
pub trait LeaderElector: Send + Sync {
async fn start(&self, identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError>;
}
#[async_trait]
pub trait ReadinessGate: Send + Sync {
async fn notify_ready(&self);
async fn notify_not_ready(&self, reason: &str);
async fn notify_starting(&self);
}
pub struct NoopLeaderElector;
#[async_trait]
impl LeaderElector for NoopLeaderElector {
async fn start(&self, _identity: PlatformIdentity) -> Result<LeadershipHandle, PlatformError> {
let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
let (_term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
drop(tx);
Ok(LeadershipHandle::new(
rx,
Arc::new(AtomicBool::new(true)),
CancellationToken::new(),
term_rx,
))
}
}
pub struct NoopReadinessGate;
#[async_trait]
impl ReadinessGate for NoopReadinessGate {
async fn notify_ready(&self) {}
async fn notify_not_ready(&self, _reason: &str) {}
async fn notify_starting(&self) {}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_platform_identity_local() {
let id = PlatformIdentity::local("my-node");
assert_eq!(id.node_id, "my-node");
assert!(id.namespace.is_none());
assert!(id.labels.is_empty());
}
#[tokio::test]
async fn test_noop_leader_elector_is_leader() {
let elector = NoopLeaderElector;
let handle = elector
.start(PlatformIdentity::local("test"))
.await
.unwrap();
assert!(handle.is_leader());
}
#[tokio::test]
async fn test_noop_leader_elector_event_started_leading() {
let elector = NoopLeaderElector;
let handle = elector
.start(PlatformIdentity::local("test"))
.await
.unwrap();
let event = handle.events.borrow().clone();
assert_eq!(event, Some(LeadershipEvent::StartedLeading));
}
#[tokio::test]
async fn test_noop_leader_elector_step_down() {
let elector = NoopLeaderElector;
let handle = elector
.start(PlatformIdentity::local("test"))
.await
.unwrap();
let result = handle.step_down().await;
assert!(
result.is_err(),
"NoopLeaderElector step_down should return StepDownFailed because _term_tx is dropped"
);
}
#[tokio::test]
async fn test_noop_readiness_gate_all_methods() {
let gate = NoopReadinessGate;
gate.notify_starting().await;
gate.notify_not_ready("test").await;
gate.notify_ready().await;
}
#[test]
fn test_leadership_event_equality() {
assert_eq!(
LeadershipEvent::StartedLeading,
LeadershipEvent::StartedLeading
);
assert_ne!(
LeadershipEvent::StartedLeading,
LeadershipEvent::StoppedLeading
);
}
#[test]
fn test_platform_error_display() {
let e = PlatformError::AlreadyStarted;
assert!(e.to_string().contains("already started"));
let e2 = PlatformError::NotAvailable("no k8s".into());
assert!(e2.to_string().contains("no k8s"));
}
}