use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::watch;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct PlatformIdentity {
pub node_id: String,
pub namespace: Option<String>,
pub labels: HashMap<String, String>,
}
impl PlatformIdentity {
pub fn local(node_id: impl Into<String>) -> Self {
Self {
node_id: node_id.into(),
namespace: None,
labels: HashMap::new(),
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum LeadershipEvent {
StartedLeading,
StoppedLeading,
}
#[derive(Debug, Error)]
pub enum PlatformError {
#[error("leadership lock already active: {lock_name}")]
LockAlreadyActive { lock_name: String },
#[error("step_down failed: elector loop terminated unexpectedly")]
StepDownFailed,
#[error("platform not available: {0}")]
NotAvailable(String),
#[error("configuration error: {0}")]
Config(String),
}
pub struct LeadershipHandle {
pub events: watch::Receiver<Option<LeadershipEvent>>,
is_leader: Arc<AtomicBool>,
cancel: CancellationToken,
terminated: Option<tokio::sync::oneshot::Receiver<()>>,
}
impl LeadershipHandle {
pub fn new(
events: watch::Receiver<Option<LeadershipEvent>>,
is_leader: Arc<AtomicBool>,
cancel: CancellationToken,
terminated: tokio::sync::oneshot::Receiver<()>,
) -> Self {
Self {
events,
is_leader,
cancel,
terminated: Some(terminated),
}
}
pub fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Acquire)
}
pub async fn step_down(mut self) -> Result<(), PlatformError> {
self.cancel.cancel();
self.terminated
.take()
.ok_or(PlatformError::StepDownFailed)?
.await
.map_err(|_| PlatformError::StepDownFailed)
}
}
impl Drop for LeadershipHandle {
fn drop(&mut self) {
self.cancel.cancel();
}
}
#[async_trait]
pub trait LeadershipService: Send + Sync {
async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError>;
}
pub trait PlatformService: Send + Sync {
fn identity(&self) -> PlatformIdentity;
fn readiness_gate(&self) -> Arc<dyn ReadinessGate>;
fn leadership(&self) -> Arc<dyn LeadershipService>;
}
#[async_trait]
pub trait ReadinessGate: Send + Sync {
async fn notify_ready(&self);
async fn notify_not_ready(&self, reason: &str);
async fn notify_starting(&self);
}
pub struct NoopLeadershipService;
impl NoopLeadershipService {
pub fn new() -> Self {
Self
}
}
impl Default for NoopLeadershipService {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl LeadershipService for NoopLeadershipService {
async fn start(&self, lock_name: &str) -> Result<LeadershipHandle, PlatformError> {
let (tx, rx) = watch::channel(Some(LeadershipEvent::StartedLeading));
let (term_tx, term_rx) = tokio::sync::oneshot::channel::<()>();
let cancel = CancellationToken::new();
let cancel_for_task = cancel.clone();
let is_leader = Arc::new(AtomicBool::new(true));
let is_leader_for_task = Arc::clone(&is_leader);
let lock_name = lock_name.to_string();
tokio::spawn(async move {
cancel_for_task.cancelled().await;
is_leader_for_task.store(false, Ordering::Release);
let _ = tx.send(Some(LeadershipEvent::StoppedLeading));
drop(lock_name);
let _ = term_tx.send(());
});
Ok(LeadershipHandle::new(rx, is_leader, cancel, term_rx))
}
}
pub struct NoopPlatformService {
identity: PlatformIdentity,
readiness_gate: Arc<dyn ReadinessGate>,
leadership: Arc<dyn LeadershipService>,
}
impl NoopPlatformService {
pub fn new(identity: PlatformIdentity) -> Self {
Self {
identity,
readiness_gate: Arc::new(NoopReadinessGate),
leadership: Arc::new(NoopLeadershipService::new()),
}
}
}
impl Default for NoopPlatformService {
fn default() -> Self {
Self::new(PlatformIdentity::local("noop"))
}
}
impl PlatformService for NoopPlatformService {
fn identity(&self) -> PlatformIdentity {
self.identity.clone()
}
fn readiness_gate(&self) -> Arc<dyn ReadinessGate> {
Arc::clone(&self.readiness_gate)
}
fn leadership(&self) -> Arc<dyn LeadershipService> {
Arc::clone(&self.leadership)
}
}
pub struct NoopReadinessGate;
#[async_trait]
impl ReadinessGate for NoopReadinessGate {
async fn notify_ready(&self) {}
async fn notify_not_ready(&self, _reason: &str) {}
async fn notify_starting(&self) {}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_platform_identity_local() {
let id = PlatformIdentity::local("my-node");
assert_eq!(id.node_id, "my-node");
assert!(id.namespace.is_none());
assert!(id.labels.is_empty());
}
#[tokio::test]
async fn test_noop_leadership_service_is_leader() {
let leadership = NoopLeadershipService::new();
let handle = leadership.start("lock-a").await.unwrap();
assert!(handle.is_leader());
}
#[tokio::test]
async fn test_noop_leadership_service_allows_multiple_distinct_locks() {
let leadership = NoopLeadershipService::new();
let lock_a = leadership.start("lock-a").await.unwrap();
let lock_b = leadership.start("lock-b").await.unwrap();
assert!(lock_a.is_leader());
assert!(lock_b.is_leader());
lock_a.step_down().await.unwrap();
lock_b.step_down().await.unwrap();
}
#[tokio::test]
async fn test_noop_leadership_service_same_lock_allows_multiple() {
let leadership = NoopLeadershipService::new();
let first = leadership.start("lock-a").await.unwrap();
let second = leadership.start("lock-a").await.unwrap();
assert!(first.is_leader());
assert!(second.is_leader());
first.step_down().await.unwrap();
second.step_down().await.unwrap();
}
#[tokio::test]
async fn test_noop_leadership_handle_semantics_and_reacquire() {
let leadership = NoopLeadershipService::new();
let handle = leadership.start("lock-a").await.unwrap();
let mut events = handle.events.clone();
let is_leader = Arc::clone(&handle.is_leader);
let event = handle.events.borrow().clone();
assert_eq!(event, Some(LeadershipEvent::StartedLeading));
handle.step_down().await.unwrap();
events.changed().await.unwrap();
assert_eq!(*events.borrow(), Some(LeadershipEvent::StoppedLeading));
assert!(!is_leader.load(Ordering::Acquire));
let reacquired = leadership.start("lock-a").await;
assert!(reacquired.is_ok());
}
#[tokio::test]
async fn test_noop_leadership_drop_cleans_up() {
let leadership = NoopLeadershipService::new();
let handle = leadership.start("lock-drop").await.unwrap();
assert!(handle.is_leader());
drop(handle);
let handle2 = leadership.start("lock-drop").await.unwrap();
assert!(handle2.is_leader());
handle2.step_down().await.unwrap();
}
#[tokio::test]
async fn test_noop_readiness_gate_all_methods() {
let gate = NoopReadinessGate;
gate.notify_starting().await;
gate.notify_not_ready("test").await;
gate.notify_ready().await;
}
#[test]
fn test_leadership_event_equality() {
assert_eq!(
LeadershipEvent::StartedLeading,
LeadershipEvent::StartedLeading
);
assert_ne!(
LeadershipEvent::StartedLeading,
LeadershipEvent::StoppedLeading
);
}
#[test]
fn test_platform_error_display() {
let e = PlatformError::LockAlreadyActive {
lock_name: "alpha".into(),
};
assert!(e.to_string().contains("alpha"));
let e2 = PlatformError::NotAvailable("no k8s".into());
assert!(e2.to_string().contains("no k8s"));
}
}