#[cfg(feature = "kubernetes")]
pub mod kubernetes;
pub mod noop;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::watch;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LeadershipState {
Leader,
#[allow(dead_code)] Standby,
}
impl LeadershipState {
pub fn is_leader(&self) -> bool {
matches!(self, LeadershipState::Leader)
}
}
#[derive(Clone)]
pub struct LeadershipStatus {
is_leader: Arc<AtomicBool>,
#[allow(dead_code)] state_rx: watch::Receiver<LeadershipState>,
}
impl LeadershipStatus {
pub fn new(initial_state: LeadershipState) -> (Self, LeadershipStateUpdater) {
let is_leader = Arc::new(AtomicBool::new(initial_state.is_leader()));
let (state_tx, state_rx) = watch::channel(initial_state);
let status = Self {
is_leader: Arc::clone(&is_leader),
state_rx,
};
let updater = LeadershipStateUpdater {
is_leader,
state_tx,
};
(status, updater)
}
pub fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
#[allow(dead_code)] pub fn state(&self) -> LeadershipState {
*self.state_rx.borrow()
}
#[allow(dead_code)] pub async fn wait_for_change(&mut self) -> LeadershipState {
let _ = self.state_rx.changed().await;
*self.state_rx.borrow()
}
}
#[allow(dead_code)] pub struct LeadershipStateUpdater {
is_leader: Arc<AtomicBool>,
state_tx: watch::Sender<LeadershipState>,
}
impl LeadershipStateUpdater {
#[allow(dead_code)] pub fn set_state(&self, state: LeadershipState) {
self.is_leader.store(state.is_leader(), Ordering::Relaxed);
let _ = self.state_tx.send(state);
}
}
#[async_trait::async_trait]
pub trait LeadershipProvider: Send + Sync {
async fn start(&self) -> crate::error::Result<LeadershipStatus>;
async fn stop(&self);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_leadership_state_is_leader() {
assert!(LeadershipState::Leader.is_leader());
assert!(!LeadershipState::Standby.is_leader());
}
#[tokio::test]
async fn test_leadership_status_initial_state() {
let (status, _updater) = LeadershipStatus::new(LeadershipState::Leader);
assert!(status.is_leader());
assert_eq!(status.state(), LeadershipState::Leader);
}
#[tokio::test]
async fn test_leadership_status_update() {
let (status, updater) = LeadershipStatus::new(LeadershipState::Standby);
assert!(!status.is_leader());
updater.set_state(LeadershipState::Leader);
assert!(status.is_leader());
assert_eq!(status.state(), LeadershipState::Leader);
}
#[tokio::test]
async fn test_leadership_status_wait_for_change() {
let (mut status, updater) = LeadershipStatus::new(LeadershipState::Standby);
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
updater.set_state(LeadershipState::Leader);
});
let new_state = status.wait_for_change().await;
assert_eq!(new_state, LeadershipState::Leader);
}
}