#![allow(dead_code)]
use std::collections::{HashMap, HashSet};
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum BrokerLivenessState {
Alive,
Dead,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum LivenessTransition {
DeadToAlive(u64),
AliveToDead(u64),
}
struct BrokerEntry {
last_heartbeat: Instant,
state: BrokerLivenessState,
}
pub(crate) struct ControllerLivenessState {
timeout: Duration,
brokers: Mutex<HashMap<u64, BrokerEntry>>,
wants_shutdown: Mutex<HashSet<u64>>,
}
impl ControllerLivenessState {
pub(crate) fn new(timeout: Duration) -> Self {
Self {
timeout,
brokers: Mutex::new(HashMap::new()),
wants_shutdown: Mutex::new(HashSet::new()),
}
}
pub(crate) async fn set_wants_shutdown(&self, broker_id: u64, want: bool) {
let mut set = self.wants_shutdown.lock().await;
if want {
set.insert(broker_id);
} else {
set.remove(&broker_id);
}
}
pub(crate) async fn wants_shutdown(&self, broker_id: u64) -> bool {
self.wants_shutdown.lock().await.contains(&broker_id)
}
pub(crate) async fn record_heartbeat(&self, broker_id: u64) -> Option<LivenessTransition> {
let mut map = self.brokers.lock().await;
let now = Instant::now();
let entry = map.entry(broker_id).or_insert(BrokerEntry {
last_heartbeat: now,
state: BrokerLivenessState::Alive,
});
let prev = entry.state;
entry.last_heartbeat = now;
entry.state = BrokerLivenessState::Alive;
if prev == BrokerLivenessState::Dead {
Some(LivenessTransition::DeadToAlive(broker_id))
} else {
None
}
}
pub(crate) async fn tick(&self) -> Vec<LivenessTransition> {
let mut map = self.brokers.lock().await;
let now = Instant::now();
let mut transitions = Vec::new();
for (&id, entry) in map.iter_mut() {
if entry.state == BrokerLivenessState::Alive
&& now.duration_since(entry.last_heartbeat) > self.timeout
{
entry.state = BrokerLivenessState::Dead;
transitions.push(LivenessTransition::AliveToDead(id));
}
}
transitions
}
pub(crate) async fn state(&self, broker_id: u64) -> Option<BrokerLivenessState> {
let map = self.brokers.lock().await;
map.get(&broker_id).map(|e| e.state)
}
pub(crate) async fn is_alive(&self, broker_id: u64) -> bool {
matches!(
self.state(broker_id).await,
Some(BrokerLivenessState::Alive)
)
}
pub(crate) async fn alive_snapshot(&self) -> HashSet<u64> {
let map = self.brokers.lock().await;
map.iter()
.filter(|(_, e)| e.state == BrokerLivenessState::Alive)
.map(|(&id, _)| id)
.collect()
}
pub(crate) async fn seed_brokers(&self, broker_ids: impl IntoIterator<Item = u64>) {
let mut map = self.brokers.lock().await;
let now = Instant::now();
for id in broker_ids {
map.entry(id).or_insert(BrokerEntry {
last_heartbeat: now,
state: BrokerLivenessState::Alive,
});
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::time::Duration;
#[tokio::test]
async fn new_broker_starts_alive_after_first_heartbeat() {
let liveness = ControllerLivenessState::new(Duration::from_secs(10));
let transition = liveness.record_heartbeat(1).await;
assert!(transition == None); assert!(liveness.state(1).await == Some(BrokerLivenessState::Alive));
}
#[tokio::test]
async fn tick_marks_expired_broker_dead() {
let liveness = ControllerLivenessState::new(Duration::from_nanos(1));
liveness.record_heartbeat(2).await;
std::thread::sleep(Duration::from_millis(1));
let transitions = liveness.tick().await;
assert!(transitions == vec![LivenessTransition::AliveToDead(2)]);
assert!(liveness.state(2).await == Some(BrokerLivenessState::Dead));
}
#[tokio::test]
async fn heartbeat_after_dead_emits_revival() {
let liveness = ControllerLivenessState::new(Duration::from_nanos(1));
liveness.record_heartbeat(3).await;
std::thread::sleep(Duration::from_millis(1));
let _ = liveness.tick().await; let transition = liveness.record_heartbeat(3).await;
assert!(transition == Some(LivenessTransition::DeadToAlive(3)));
assert!(liveness.state(3).await == Some(BrokerLivenessState::Alive));
}
#[tokio::test]
async fn wants_shutdown_set_and_unset() {
let liveness = ControllerLivenessState::new(Duration::from_secs(10));
assert!(!liveness.wants_shutdown(5).await);
liveness.set_wants_shutdown(5, true).await;
assert!(liveness.wants_shutdown(5).await);
liveness.set_wants_shutdown(5, false).await;
assert!(!liveness.wants_shutdown(5).await);
}
#[tokio::test]
async fn wants_shutdown_is_per_broker() {
let liveness = ControllerLivenessState::new(Duration::from_secs(10));
liveness.set_wants_shutdown(1, true).await;
liveness.set_wants_shutdown(2, true).await;
assert!(liveness.wants_shutdown(1).await);
assert!(liveness.wants_shutdown(2).await);
assert!(!liveness.wants_shutdown(3).await);
liveness.set_wants_shutdown(1, false).await;
assert!(!liveness.wants_shutdown(1).await);
assert!(liveness.wants_shutdown(2).await);
}
#[tokio::test]
async fn tick_does_not_expire_recently_heartbeated_broker() {
let liveness = ControllerLivenessState::new(Duration::from_mins(1));
liveness.record_heartbeat(4).await;
let transitions = liveness.tick().await;
assert!(
transitions.is_empty(),
"broker 4 should not expire with 60s timeout"
);
assert!(liveness.state(4).await == Some(BrokerLivenessState::Alive));
}
}