use async_trait::async_trait;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use tokio::time::sleep;
use tracing::{info, warn};
use crate::distributed::{DistributedError, DistributedLock, LockOptions};
#[async_trait]
pub trait LeaderElection: Send + Sync {
async fn try_become_leader(&self) -> Result<bool, DistributedError>;
fn is_leader(&self) -> bool;
async fn step_down(&self) -> Result<(), DistributedError>;
}
pub struct LockBasedElection<L: DistributedLock> {
lock: Arc<L>,
node_id: String,
resource_key: String,
is_leader: Arc<AtomicBool>,
ttl_ms: u64,
}
impl<L: DistributedLock> LockBasedElection<L> {
pub fn new(lock: Arc<L>, node_id: String, resource_key: String, ttl_ms: u64) -> Self {
Self {
lock,
node_id,
resource_key,
is_leader: Arc::new(AtomicBool::new(false)),
ttl_ms,
}
}
}
#[async_trait]
impl<L: DistributedLock> LeaderElection for LockBasedElection<L> {
async fn try_become_leader(&self) -> Result<bool, DistributedError> {
let opts = LockOptions {
ttl_ms: self.ttl_ms,
retry_count: 0,
retry_delay_ms: 0,
};
match self.lock.acquire(&self.resource_key, opts).await {
Ok(_guard) => {
if !self.is_leader.load(Ordering::Relaxed) {
info!("Node {} became the cluster leader", self.node_id);
self.is_leader.store(true, Ordering::SeqCst);
}
Ok(true)
}
Err(DistributedError::LockError(_)) => {
if self.is_leader.load(Ordering::Relaxed) {
warn!("Node {} lost leadership!", self.node_id);
self.is_leader.store(false, Ordering::SeqCst);
}
Ok(false)
}
Err(e) => Err(e),
}
}
fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Relaxed)
}
async fn step_down(&self) -> Result<(), DistributedError> {
if self.is_leader.swap(false, Ordering::SeqCst) {
info!("Node {} stepping down from leadership", self.node_id);
}
Ok(())
}
}
pub struct ClusterManager {
}
impl ClusterManager {
pub fn start_election_loop<E: LeaderElection + 'static>(
election: Arc<E>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
match election.try_become_leader().await {
Ok(true) => {
}
Ok(false) => {
}
Err(err) => {
warn!("Error in leader election: {}", err);
}
}
sleep(interval).await;
}
})
}
}