use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::message::{Message, Role};
#[derive(Debug, Clone, Copy)]
pub struct ElectConfig {
pub hb_interval: Duration,
pub down_after: Duration,
pub election_timeout: Duration,
pub election_backoff: Duration,
pub election_backoff_jitter: Duration,
}
impl Default for ElectConfig {
fn default() -> Self {
Self {
hb_interval: Duration::from_millis(200),
down_after: Duration::from_millis(5_000),
election_timeout: Duration::from_millis(3_000),
election_backoff: Duration::from_millis(1_000),
election_backoff_jitter: Duration::from_millis(4_000),
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub(crate) struct PeerView {
pub(crate) last_seen: Instant,
pub(crate) last_epoch: u64,
pub(crate) last_role: Role,
pub(crate) last_repl_offset: u64,
}
pub struct Elector {
pub(crate) node_id: String,
pub(crate) peer_ids: Vec<String>,
pub(crate) config: ElectConfig,
pub(crate) role: Role,
pub(crate) epoch: u64,
pub(crate) current_primary: Option<String>,
pub(crate) my_repl_offset: u64,
pub(crate) last_hb_sent: HashMap<String, Instant>,
pub(crate) peer_views: HashMap<String, PeerView>,
pub(crate) accept_votes: HashMap<String, ()>,
pub(crate) offer_at: Option<Instant>,
pub(crate) backoff_until: Option<Instant>,
pub(crate) last_accept_epoch: Option<u64>,
pub(crate) my_advertised_addr: String,
pub(crate) jitter: ElectJitter,
}
#[derive(Debug, Clone)]
pub enum ElectJitter {
Fixed(Duration),
System,
}
impl ElectJitter {
fn sample(&self, max: Duration, now: Instant, node_id: &str) -> Duration {
match self {
Self::Fixed(d) => *d.min(&max),
Self::System => {
let mut h: u64 = 1469598103934665603;
for b in node_id.as_bytes() {
h = h.wrapping_mul(1099511628211) ^ u64::from(*b);
}
let _ = now; let span_ns = max.as_nanos().max(1) as u64;
Duration::from_nanos(h % span_ns)
}
}
}
}
#[derive(Debug, Clone)]
pub struct Outbound {
pub to: String,
pub msg: Message,
}
impl Outbound {
pub const BROADCAST: &'static str = "*";
}
impl Elector {
pub fn new(
node_id: impl Into<String>,
peer_ids: Vec<String>,
my_advertised_addr: impl Into<String>,
start_role: Role,
config: ElectConfig,
jitter: ElectJitter,
) -> Self {
let node_id = node_id.into();
Self {
node_id,
peer_ids,
config,
role: start_role,
epoch: 1,
current_primary: None,
my_repl_offset: 0,
last_hb_sent: HashMap::new(),
peer_views: HashMap::new(),
accept_votes: HashMap::new(),
offer_at: None,
backoff_until: None,
last_accept_epoch: None,
my_advertised_addr: my_advertised_addr.into(),
jitter,
}
}
pub fn set_repl_offset(&mut self, offset: u64) {
self.my_repl_offset = offset;
}
pub fn role(&self) -> Role {
self.role
}
pub fn epoch(&self) -> u64 {
self.epoch
}
pub fn current_primary(&self) -> Option<&str> {
self.current_primary.as_deref()
}
pub fn tick(&mut self, now: Instant) -> Vec<Outbound> {
let mut out = Vec::new();
self.emit_heartbeats(now, &mut out);
self.maybe_start_election(now, &mut out);
self.maybe_finish_candidacy(now, &mut out);
out
}
pub fn on_message(
&mut self,
from_node_id: &str,
msg: Message,
now: Instant,
) -> Vec<Outbound> {
let mut out = Vec::new();
match msg {
Message::Hb {
epoch,
node_id: _,
role,
repl_offset,
} => self.on_hb(from_node_id, epoch, role, repl_offset, now),
Message::Offer {
new_epoch,
candidate_id,
repl_offset,
} => self.on_offer(new_epoch, candidate_id, repl_offset, &mut out),
Message::Accept {
epoch,
accepter_id,
} => self.on_accept(epoch, accepter_id, now, &mut out),
Message::Announce {
epoch,
new_primary_id,
new_primary_addr,
} => self.on_announce(epoch, new_primary_id, new_primary_addr, &mut out),
}
out
}
fn emit_heartbeats(&mut self, now: Instant, out: &mut Vec<Outbound>) {
for peer in self.peer_ids.clone() {
if peer == self.node_id {
continue;
}
let due = match self.last_hb_sent.get(&peer) {
Some(prev) => now.duration_since(*prev) >= self.config.hb_interval,
None => true,
};
if due {
self.last_hb_sent.insert(peer.clone(), now);
out.push(Outbound {
to: peer,
msg: Message::Hb {
epoch: self.epoch,
node_id: self.node_id.clone(),
role: self.role,
repl_offset: self.my_repl_offset,
},
});
}
}
}
fn maybe_start_election(&mut self, now: Instant, out: &mut Vec<Outbound>) {
if self.role != Role::Replica {
return;
}
if let Some(b) = self.backoff_until
&& now < b
{
return;
}
let Some(primary) = self.current_primary.clone() else {
return;
};
if !self.is_peer_down(&primary, now) {
return;
}
if !self.am_best_candidate(now) {
return;
}
self.epoch = self.epoch.saturating_add(1);
self.role = Role::Candidate;
self.accept_votes.clear();
self.accept_votes.insert(self.node_id.clone(), ());
self.offer_at = Some(now);
out.push(Outbound {
to: Outbound::BROADCAST.to_string(),
msg: Message::Offer {
new_epoch: self.epoch,
candidate_id: self.node_id.clone(),
repl_offset: self.my_repl_offset,
},
});
}
fn maybe_finish_candidacy(&mut self, now: Instant, out: &mut Vec<Outbound>) {
if self.role != Role::Candidate {
return;
}
let Some(offer_at) = self.offer_at else {
return;
};
let quorum = self.quorum_size();
if self.accept_votes.len() >= quorum {
self.role = Role::Primary;
self.current_primary = Some(self.node_id.clone());
self.offer_at = None;
self.accept_votes.clear();
out.push(Outbound {
to: Outbound::BROADCAST.to_string(),
msg: Message::Announce {
epoch: self.epoch,
new_primary_id: self.node_id.clone(),
new_primary_addr: self.my_advertised_addr.clone(),
},
});
return;
}
if now.duration_since(offer_at) >= self.config.election_timeout {
self.role = Role::Replica;
self.offer_at = None;
self.accept_votes.clear();
let jitter = self
.jitter
.sample(self.config.election_backoff_jitter, now, &self.node_id);
self.backoff_until = Some(now + self.config.election_backoff + jitter);
}
}
}