use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use super::replication::ReplicaRole;
use crate::adapter::net::behavior::placement::NodeId;
pub const DEFAULT_MISS_THRESHOLD: u8 = 3;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PeerState {
pub last_seen: Instant,
pub role: ReplicaRole,
pub tail_seq: u64,
}
pub struct HeartbeatTracker {
heartbeat_ms: u64,
miss_threshold: u8,
peers: BTreeMap<NodeId, PeerState>,
believed_leader: Option<NodeId>,
}
impl HeartbeatTracker {
pub fn new(heartbeat_ms: u64) -> Self {
Self::with_miss_threshold(heartbeat_ms, DEFAULT_MISS_THRESHOLD)
}
pub fn with_miss_threshold(heartbeat_ms: u64, miss_threshold: u8) -> Self {
Self {
heartbeat_ms,
miss_threshold: miss_threshold.max(1),
peers: BTreeMap::new(),
believed_leader: None,
}
}
pub fn heartbeat_ms(&self) -> u64 {
self.heartbeat_ms
}
pub fn miss_threshold(&self) -> u8 {
self.miss_threshold
}
pub fn record_heartbeat(
&mut self,
peer: NodeId,
role: ReplicaRole,
tail_seq: u64,
now: Instant,
) {
self.peers.insert(
peer,
PeerState {
last_seen: now,
role,
tail_seq,
},
);
if role == ReplicaRole::Leader {
match self.believed_leader {
None => self.believed_leader = Some(peer),
Some(existing) if existing == peer => {
}
Some(existing) => {
let existing_tail = self.peers.get(&existing).map(|p| p.tail_seq).unwrap_or(0);
let peer_beats =
tail_seq > existing_tail || (tail_seq == existing_tail && peer < existing);
if peer_beats {
self.believed_leader = Some(peer);
}
}
}
}
}
pub fn is_leader_silent(&self, now: Instant) -> bool {
let Some(leader_id) = self.believed_leader else {
return false;
};
let Some(leader) = self.peers.get(&leader_id) else {
return true;
};
let threshold =
Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
now.saturating_duration_since(leader.last_seen) >= threshold
}
pub fn believed_leader(&self) -> Option<NodeId> {
self.believed_leader
}
pub fn clear_believed_leader(&mut self) {
self.believed_leader = None;
}
pub fn drop_peer(&mut self, peer: NodeId) {
self.peers.remove(&peer);
if self.believed_leader == Some(peer) {
self.believed_leader = None;
}
}
pub fn peer_state(&self, peer: NodeId) -> Option<PeerState> {
self.peers.get(&peer).copied()
}
pub fn peer_lag(&self, peer: NodeId, now: Instant) -> Option<Duration> {
self.peers
.get(&peer)
.map(|p| now.saturating_duration_since(p.last_seen))
}
pub fn healthy_peers(&self, now: Instant) -> Vec<NodeId> {
let threshold =
Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
self.peers
.iter()
.filter(|(_, state)| now.saturating_duration_since(state.last_seen) < threshold)
.map(|(id, _)| *id)
.collect()
}
pub fn peer_tail_seqs(&self) -> Vec<(NodeId, u64)> {
self.peers
.iter()
.map(|(id, state)| (*id, state.tail_seq))
.collect()
}
pub fn peer_count(&self) -> usize {
self.peers.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn t0() -> Instant {
Instant::now()
}
fn at(base: Instant, ms: u64) -> Instant {
base + Duration::from_millis(ms)
}
#[test]
fn new_tracker_has_no_peers_or_leader() {
let t = HeartbeatTracker::new(500);
assert_eq!(t.peer_count(), 0);
assert!(t.believed_leader().is_none());
assert!(!t.is_leader_silent(t0()));
assert_eq!(t.heartbeat_ms(), 500);
assert_eq!(t.miss_threshold(), DEFAULT_MISS_THRESHOLD);
}
#[test]
fn miss_threshold_zero_clamped_to_one() {
let t = HeartbeatTracker::with_miss_threshold(100, 0);
assert_eq!(t.miss_threshold(), 1);
}
#[test]
fn record_heartbeat_tracks_peer_state() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
assert_eq!(t.peer_count(), 1);
assert_eq!(t.believed_leader(), Some(0x42));
let p = t.peer_state(0x42).unwrap();
assert_eq!(p.role, ReplicaRole::Leader);
assert_eq!(p.tail_seq, 100);
assert_eq!(p.last_seen, base);
}
#[test]
fn leader_tiebreak_prefers_higher_tail_then_lower_id() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x43, ReplicaRole::Leader, 200, base);
assert_eq!(t.believed_leader(), Some(0x43));
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, at(base, 100));
assert_eq!(
t.believed_leader(),
Some(0x43),
"higher-tail Leader should keep believed-leader against a lower-id claimant with lower tail",
);
t.record_heartbeat(0x42, ReplicaRole::Leader, 300, at(base, 200));
assert_eq!(
t.believed_leader(),
Some(0x42),
"strictly higher tail wins the tiebreak",
);
t.record_heartbeat(0x41, ReplicaRole::Leader, 300, at(base, 300));
assert_eq!(
t.believed_leader(),
Some(0x41),
"on a tail tie the lex-smaller id wins",
);
}
#[test]
fn heartbeat_tiebreak_aligns_with_runtime_convergence_rule() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0xAA, ReplicaRole::Leader, 500, base);
assert_eq!(t.believed_leader(), Some(0xAA));
t.record_heartbeat(0x11, ReplicaRole::Leader, 100, at(base, 50));
assert_eq!(
t.believed_leader(),
Some(0xAA),
"lower-tail Leader claimant must NOT win the heartbeat tiebreak; \
pre-fix the lex-only rule made L2 win here and the local node \
ended up treating L2's SyncResponses as authoritative while \
still emitting Leader heartbeats itself — split brain",
);
}
#[test]
fn replica_role_heartbeat_does_not_change_believed_leader() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
t.record_heartbeat(0x99, ReplicaRole::Replica, 95, at(base, 50));
assert_eq!(t.believed_leader(), Some(0x42));
assert_eq!(t.peer_state(0x99).unwrap().role, ReplicaRole::Replica);
}
#[test]
fn leader_not_silent_within_window() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
assert!(!t.is_leader_silent(at(base, 500)));
assert!(!t.is_leader_silent(at(base, 1499)));
}
#[test]
fn leader_silent_at_threshold() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
assert!(t.is_leader_silent(at(base, 1500)));
}
#[test]
fn leader_silent_past_threshold() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
assert!(t.is_leader_silent(at(base, 5000)));
}
#[test]
fn fresh_leader_heartbeat_resets_silence() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
assert!(t.is_leader_silent(at(base, 1500)));
t.record_heartbeat(0x42, ReplicaRole::Leader, 105, at(base, 1500));
assert!(!t.is_leader_silent(at(base, 1600)));
}
#[test]
fn dropped_believed_leader_treated_as_silent() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
t.drop_peer(0x42);
assert!(!t.is_leader_silent(at(base, 100)));
assert!(t.believed_leader().is_none());
}
#[test]
fn clear_believed_leader_does_not_drop_peer_entry() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
t.clear_believed_leader();
assert!(t.believed_leader().is_none());
assert!(t.peer_state(0x42).is_some());
}
#[test]
fn peer_lag_returns_elapsed() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x42, ReplicaRole::Replica, 100, base);
let lag = t.peer_lag(0x42, at(base, 750)).unwrap();
assert_eq!(lag, Duration::from_millis(750));
}
#[test]
fn peer_lag_unknown_returns_none() {
let t = HeartbeatTracker::new(500);
assert!(t.peer_lag(0x42, t0()).is_none());
}
#[test]
fn healthy_peers_filters_stale_entries() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x1, ReplicaRole::Leader, 100, base);
t.record_heartbeat(0x2, ReplicaRole::Replica, 100, at(base, 200));
t.record_heartbeat(0x3, ReplicaRole::Replica, 100, at(base, 400));
let healthy = t.healthy_peers(at(base, 1500));
assert_eq!(healthy, vec![0x2, 0x3]);
}
#[test]
fn healthy_peers_sorted_by_node_id() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x30, ReplicaRole::Replica, 0, base);
t.record_heartbeat(0x10, ReplicaRole::Replica, 0, base);
t.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
let healthy = t.healthy_peers(at(base, 100));
assert_eq!(healthy, vec![0x10, 0x20, 0x30]);
}
#[test]
fn peer_tail_seqs_snapshot() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x10, ReplicaRole::Leader, 1000, base);
t.record_heartbeat(0x20, ReplicaRole::Replica, 950, base);
t.record_heartbeat(0x30, ReplicaRole::Replica, 980, base);
let mut tails = t.peer_tail_seqs();
tails.sort_by_key(|(id, _)| *id);
assert_eq!(tails, vec![(0x10, 1000), (0x20, 950), (0x30, 980)]);
}
#[test]
fn drop_peer_removes_and_decrements_count() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
assert_eq!(t.peer_count(), 2);
t.drop_peer(0x1);
assert_eq!(t.peer_count(), 1);
assert!(t.peer_state(0x1).is_none());
assert!(t.peer_state(0x2).is_some());
assert!(t.believed_leader().is_none());
}
#[test]
fn drop_non_leader_peer_preserves_believed_leader() {
let base = t0();
let mut t = HeartbeatTracker::new(500);
t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
t.drop_peer(0x2);
assert_eq!(t.believed_leader(), Some(0x1));
}
#[test]
fn miss_threshold_one_triggers_after_one_window() {
let base = t0();
let mut t = HeartbeatTracker::with_miss_threshold(500, 1);
t.record_heartbeat(0x42, ReplicaRole::Leader, 0, base);
assert!(!t.is_leader_silent(at(base, 499)));
assert!(t.is_leader_silent(at(base, 500)));
}
#[test]
fn no_believed_leader_never_silent_regardless_of_time() {
let base = t0();
let t = HeartbeatTracker::new(500);
assert!(!t.is_leader_silent(at(base, 60_000)));
}
}