use std::cmp::Ordering;
use std::time::Duration;
use crate::adapter::net::behavior::placement::NodeId;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ElectionOutcome {
SelfWins,
PeerWins(NodeId),
NoEligibleReplica,
}
impl ElectionOutcome {
pub fn winner(self) -> Option<NodeId> {
match self {
Self::SelfWins => None, Self::PeerWins(node) => Some(node),
Self::NoEligibleReplica => None,
}
}
pub fn is_self(self) -> bool {
matches!(self, Self::SelfWins)
}
}
pub fn elect<R, H>(
replica_set: &[NodeId],
self_id: NodeId,
rtt_to: R,
health_of: H,
) -> ElectionOutcome
where
R: Fn(NodeId) -> Option<Duration>,
H: Fn(NodeId) -> bool,
{
let mut candidates: Vec<(NodeId, Duration)> = Vec::with_capacity(replica_set.len());
for &node in replica_set {
if !health_of(node) {
continue;
}
let rtt = if node == self_id {
Duration::ZERO
} else {
rtt_to(node).unwrap_or(Duration::MAX)
};
candidates.push((node, rtt));
}
if candidates.is_empty() {
return ElectionOutcome::NoEligibleReplica;
}
candidates.sort_unstable_by(|a, b| match a.1.cmp(&b.1) {
Ordering::Equal => a.0.cmp(&b.0),
other => other,
});
let (winner, _) = candidates[0];
if winner == self_id {
ElectionOutcome::SelfWins
} else {
ElectionOutcome::PeerWins(winner)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn rtt_table(entries: &[(NodeId, u64)]) -> impl Fn(NodeId) -> Option<Duration> + '_ {
move |node: NodeId| {
entries
.iter()
.find_map(|(n, ms)| (*n == node).then_some(Duration::from_millis(*ms)))
}
}
fn alive_set(alive: &[NodeId]) -> impl Fn(NodeId) -> bool + '_ {
move |node: NodeId| alive.contains(&node)
}
#[test]
fn single_eligible_replica_wins() {
let set = [0x1];
let outcome = elect(&set, 0x1, rtt_table(&[]), alive_set(&[0x1]));
assert_eq!(outcome, ElectionOutcome::SelfWins);
}
#[test]
fn empty_replica_set_yields_no_eligible() {
let set: [NodeId; 0] = [];
let outcome = elect(&set, 0x1, rtt_table(&[]), alive_set(&[0x1]));
assert_eq!(outcome, ElectionOutcome::NoEligibleReplica);
}
#[test]
fn self_unhealthy_falls_to_peer() {
let set = [0x1, 0x2];
let outcome = elect(&set, 0x1, rtt_table(&[(0x2, 50)]), alive_set(&[0x2]));
assert_eq!(outcome, ElectionOutcome::PeerWins(0x2));
}
#[test]
fn all_peers_unhealthy_falls_to_self() {
let set = [0x1, 0x2, 0x3];
let outcome = elect(
&set,
0x1,
rtt_table(&[(0x2, 50), (0x3, 75)]),
alive_set(&[0x1]),
);
assert_eq!(outcome, ElectionOutcome::SelfWins);
}
#[test]
fn all_unhealthy_yields_no_eligible() {
let set = [0x1, 0x2, 0x3];
let outcome = elect(&set, 0x1, rtt_table(&[]), alive_set(&[]));
assert_eq!(outcome, ElectionOutcome::NoEligibleReplica);
}
#[test]
fn self_wins_against_distant_peers() {
let set = [0x1, 0x2, 0x3];
let outcome = elect(
&set,
0x1,
rtt_table(&[(0x2, 10), (0x3, 5)]),
alive_set(&[0x1, 0x2, 0x3]),
);
assert_eq!(outcome, ElectionOutcome::SelfWins);
}
#[test]
fn lowest_rtt_peer_wins_when_self_excluded() {
let set = [0x10, 0x20, 0x30];
let outcome = elect(
&set,
0xFF, rtt_table(&[(0x10, 30), (0x20, 10), (0x30, 20)]),
alive_set(&[0x10, 0x20, 0x30]),
);
assert_eq!(outcome, ElectionOutcome::PeerWins(0x20));
}
#[test]
fn tied_rtt_breaks_by_lex_node_id() {
let set = [0xCC, 0xAA, 0xBB];
let outcome = elect(
&set,
0xFF,
rtt_table(&[(0xAA, 25), (0xBB, 25), (0xCC, 25)]),
alive_set(&[0xAA, 0xBB, 0xCC]),
);
assert_eq!(outcome, ElectionOutcome::PeerWins(0xAA));
}
#[test]
fn rtt_takes_priority_over_node_id() {
let set = [0xAA, 0xBB];
let outcome = elect(
&set,
0xFF,
rtt_table(&[(0xAA, 50), (0xBB, 10)]),
alive_set(&[0xAA, 0xBB]),
);
assert_eq!(outcome, ElectionOutcome::PeerWins(0xBB));
}
#[test]
fn peer_without_rtt_measurement_ranks_at_worst_rtt() {
let set = [0xAA, 0xBB, 0xCC];
let outcome = elect(
&set,
0xFF,
rtt_table(&[(0xAA, 30), (0xBB, 50)]), alive_set(&[0xAA, 0xBB, 0xCC]), );
assert_eq!(outcome, ElectionOutcome::PeerWins(0xAA));
}
#[test]
fn only_unmeasured_peers_still_elect_a_leader() {
let set = [0xAA, 0xBB];
let outcome = elect(
&set,
0xFF, rtt_table(&[]), alive_set(&[0xAA, 0xBB]),
);
assert_eq!(outcome, ElectionOutcome::PeerWins(0xAA));
}
#[test]
fn determinism_across_call_orders() {
let perms = [
[0x1, 0x2, 0x3],
[0x3, 0x2, 0x1],
[0x2, 0x1, 0x3],
[0x1, 0x3, 0x2],
[0x3, 0x1, 0x2],
[0x2, 0x3, 0x1],
];
for set in perms {
let outcome = elect(
&set,
0xFF,
rtt_table(&[(0x1, 20), (0x2, 20), (0x3, 20)]),
alive_set(&[0x1, 0x2, 0x3]),
);
assert_eq!(
outcome,
ElectionOutcome::PeerWins(0x1),
"perm {set:?} must converge to same winner",
);
}
}
#[test]
fn self_at_zero_rtt_beats_peer_at_zero_rtt_via_lex_node_id() {
let set = [0x5, 0x10];
let outcome_smaller = elect(&set, 0x5, rtt_table(&[(0x10, 0)]), alive_set(&[0x5, 0x10]));
assert_eq!(outcome_smaller, ElectionOutcome::SelfWins);
let outcome_larger = elect(&set, 0x10, rtt_table(&[(0x5, 0)]), alive_set(&[0x5, 0x10]));
assert_eq!(outcome_larger, ElectionOutcome::PeerWins(0x5));
}
#[test]
fn winner_helper_strips_self_id() {
assert_eq!(ElectionOutcome::SelfWins.winner(), None);
assert_eq!(ElectionOutcome::PeerWins(0xAA).winner(), Some(0xAA));
assert_eq!(ElectionOutcome::NoEligibleReplica.winner(), None);
}
#[test]
fn is_self_helper() {
assert!(ElectionOutcome::SelfWins.is_self());
assert!(!ElectionOutcome::PeerWins(0xAA).is_self());
assert!(!ElectionOutcome::NoEligibleReplica.is_self());
}
#[test]
fn cross_partition_independent_evaluation() {
let global_set = [0xA, 0xB, 0xC, 0xD];
let outcome_a = elect(
&global_set,
0xA,
rtt_table(&[(0xB, 5)]),
alive_set(&[0xA, 0xB]),
);
assert_eq!(outcome_a, ElectionOutcome::SelfWins);
let outcome_d = elect(
&global_set,
0xD,
rtt_table(&[(0xC, 5)]),
alive_set(&[0xC, 0xD]),
);
assert_eq!(outcome_d, ElectionOutcome::SelfWins);
}
#[test]
fn central_peer_at_zero_rtt_wins_across_all_voters() {
let set = [0x10, 0x20, 0x30];
let rtt_for = |_: NodeId, to: NodeId| -> Option<Duration> {
match to {
0x10 => Some(Duration::ZERO),
0x20 => Some(Duration::from_millis(10)),
0x30 => Some(Duration::from_millis(20)),
_ => None,
}
};
let healthy: std::collections::HashSet<NodeId> = set.iter().copied().collect();
let mut winners: Vec<NodeId> = Vec::new();
for &self_id in &set {
let outcome = elect(
&set,
self_id,
|peer| rtt_for(self_id, peer),
|peer| healthy.contains(&peer),
);
let winner = match outcome {
ElectionOutcome::SelfWins => self_id,
ElectionOutcome::PeerWins(w) => w,
ElectionOutcome::NoEligibleReplica => {
panic!("expected a winner; healthy partition has eligible replicas");
}
};
winners.push(winner);
}
assert!(
winners.windows(2).all(|w| w[0] == w[1]),
"every node converges on the central-zero-RTT winner; got {winners:?}"
);
assert_eq!(winners[0], 0x10);
}
#[test]
fn symmetric_failover_yields_dual_self_winners_as_expected() {
let dead_leader = 0x05;
let survivors = [0x10, 0x20, 0x30];
let set = [dead_leader, 0x10, 0x20, 0x30];
let rtt_for = |from: NodeId, to: NodeId| -> Option<Duration> {
if from == to {
return Some(Duration::ZERO);
}
Some(Duration::from_millis(5))
};
let healthy_for_survivor = |peer: NodeId| peer != dead_leader;
let mut self_winners = Vec::new();
for &self_id in &survivors {
let outcome = elect(
&set,
self_id,
|peer| rtt_for(self_id, peer),
healthy_for_survivor,
);
if outcome == ElectionOutcome::SelfWins {
self_winners.push(self_id);
}
}
assert_eq!(
self_winners, survivors,
"symmetric-RTT failover produces N self-winners; convergence is broader-system"
);
}
}