mod election;
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use anyhow::{Context, Result};
use chitchat::transport::UdpTransport;
use chitchat::{ChitchatConfig, ChitchatHandle, ChitchatId, FailureDetectorConfig};
use election::elect_leader;
pub use election::EpochFencer;
const EPOCH_KEY: &str = "epoch";
const ROLE_KEY: &str = "role";
const ELECTION_TICK: Duration = Duration::from_millis(1000);
pub struct ClusterConfig {
pub node_id: String,
pub listen_addr: SocketAddr,
pub advertise_addr: SocketAddr,
pub seeds: Vec<String>,
pub cluster_id: String,
}
pub struct ClusterCoordinator {
node_id: String,
_handle: ChitchatHandle,
leader_epoch: Arc<AtomicU64>,
fencer: Arc<EpochFencer>,
is_leader: Arc<AtomicBool>,
}
impl ClusterCoordinator {
pub async fn start(cfg: ClusterConfig) -> Result<Arc<Self>> {
let generation = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let chitchat_id = ChitchatId::new(cfg.node_id.clone(), generation, cfg.advertise_addr);
let config = ChitchatConfig {
chitchat_id,
cluster_id: cfg.cluster_id,
gossip_interval: Duration::from_millis(1000),
listen_addr: cfg.listen_addr,
seed_nodes: cfg.seeds,
failure_detector_config: FailureDetectorConfig::default(),
marked_for_deletion_grace_period: Duration::from_secs(3600),
catchup_callback: None,
extra_liveness_predicate: None,
};
let handle = chitchat::spawn_chitchat(
config,
vec![
(EPOCH_KEY.to_string(), "0".to_string()),
(ROLE_KEY.to_string(), "standby".to_string()),
],
&UdpTransport,
)
.await
.context("starting chitchat gossip")?;
let coord = Arc::new(Self {
node_id: cfg.node_id,
leader_epoch: Arc::new(AtomicU64::new(0)),
fencer: Arc::new(EpochFencer::new()),
is_leader: Arc::new(AtomicBool::new(false)),
_handle: handle,
});
coord.spawn_election_loop();
tracing::info!(node = %coord.node_id, "joined cluster (chitchat); election loop running");
Ok(coord)
}
pub fn node_id(&self) -> &str {
&self.node_id
}
pub fn is_leader(&self) -> bool {
self.is_leader.load(Ordering::Acquire)
}
pub fn current_epoch(&self) -> u64 {
self.leader_epoch.load(Ordering::Acquire)
}
pub fn leader_epoch_handle(&self) -> Arc<AtomicU64> {
Arc::clone(&self.leader_epoch)
}
pub fn fencer(&self) -> Arc<EpochFencer> {
Arc::clone(&self.fencer)
}
fn spawn_election_loop(&self) {
let chitchat = self._handle.chitchat();
let node_id = self.node_id.clone();
let leader_epoch = Arc::clone(&self.leader_epoch);
let is_leader = Arc::clone(&self.is_leader);
tokio::spawn(async move {
let mut tick = tokio::time::interval(ELECTION_TICK);
loop {
tick.tick().await;
let mut cc = chitchat.lock().await;
let live_ids: Vec<String> =
cc.live_nodes().map(|id| id.node_id.clone()).collect();
let mut max_epoch = 0u64;
for id in cc.live_nodes() {
if let Some(epoch) = cc
.node_state(id)
.and_then(|ns| ns.get(EPOCH_KEY))
.and_then(|v| v.parse::<u64>().ok())
{
max_epoch = max_epoch.max(epoch);
}
}
let am_leader = elect_leader(&live_ids).map(|l| *l == node_id).unwrap_or(false);
let was_leader = is_leader.swap(am_leader, Ordering::AcqRel);
if am_leader && !was_leader {
let new_epoch = max_epoch + 1;
leader_epoch.store(new_epoch, Ordering::Release);
cc.self_node_state().set(EPOCH_KEY, new_epoch.to_string());
cc.self_node_state().set(ROLE_KEY, "leader");
tracing::info!(node = %node_id, epoch = new_epoch, "promoted to group leader");
} else if !am_leader && was_leader {
cc.self_node_state().set(ROLE_KEY, "standby");
tracing::warn!(node = %node_id, "stepped down as group leader");
}
}
});
}
}