use std::time::Instant;
use crate::Elector;
use crate::elector::{Outbound, PeerView};
use crate::message::{Message, Role};
impl Elector {
pub(crate) fn on_hb(
&mut self,
from: &str,
epoch: u64,
role: Role,
repl_offset: u64,
now: Instant,
) {
self.peer_views.insert(
from.to_string(),
PeerView {
last_seen: now,
last_epoch: epoch,
last_role: role,
last_repl_offset: repl_offset,
},
);
if role == Role::Primary
&& epoch >= self.epoch
&& (epoch > self.epoch
|| (self.current_primary.is_none() && self.role == Role::Replica))
{
self.epoch = epoch;
self.current_primary = Some(from.to_string());
if self.role == Role::Primary && from != self.node_id {
self.role = Role::Replica;
} else if self.role == Role::Candidate {
self.role = Role::Replica;
self.offer_at = None;
self.accept_votes.clear();
}
}
}
pub(crate) fn on_offer(
&mut self,
new_epoch: u64,
candidate_id: String,
repl_offset: u64,
out: &mut Vec<Outbound>,
) {
if new_epoch <= self.epoch {
return;
}
if self.my_repl_offset > repl_offset {
return;
}
if self.my_repl_offset == repl_offset && self.node_id.as_bytes() < candidate_id.as_bytes() {
return;
}
if self.last_accept_epoch == Some(new_epoch) {
return;
}
self.last_accept_epoch = Some(new_epoch);
self.epoch = new_epoch;
out.push(Outbound {
to: candidate_id,
msg: Message::Accept {
epoch: new_epoch,
accepter_id: self.node_id.clone(),
},
});
}
pub(crate) fn on_accept(&mut self, epoch: u64, accepter_id: String, now: Instant, out: &mut Vec<Outbound>) {
if self.role != Role::Candidate || self.epoch != epoch {
return;
}
self.accept_votes.insert(accepter_id, ());
let _ = now;
let _ = out;
}
pub(crate) fn on_announce(
&mut self,
epoch: u64,
new_primary_id: String,
_new_primary_addr: String,
_out: &mut Vec<Outbound>,
) {
if epoch <= self.epoch && self.current_primary.as_deref() == Some(new_primary_id.as_str()) {
return;
}
if epoch < self.epoch {
return;
}
self.epoch = epoch;
self.current_primary = Some(new_primary_id.clone());
self.last_accept_epoch = Some(epoch);
self.accept_votes.clear();
self.offer_at = None;
if new_primary_id == self.node_id {
self.role = Role::Primary;
} else {
self.role = Role::Replica;
}
}
pub(crate) fn quorum_size(&self) -> usize {
self.peer_ids.len() / 2 + 1
}
pub(crate) fn is_peer_down(&self, peer: &str, now: Instant) -> bool {
match self.peer_views.get(peer) {
Some(v) => now.duration_since(v.last_seen) >= self.config.down_after,
None => true,
}
}
#[cfg(test)]
pub(crate) fn force_known_primary(&mut self, primary: &str) {
self.current_primary = Some(primary.to_string());
}
pub(crate) fn am_best_candidate(&self, now: Instant) -> bool {
for other in &self.peer_ids {
if other == &self.node_id {
continue;
}
if self.current_primary.as_deref() == Some(other) {
continue;
}
let Some(v) = self.peer_views.get(other) else {
continue;
};
if now.duration_since(v.last_seen) >= self.config.down_after {
continue; }
if v.last_repl_offset > self.my_repl_offset {
return false;
}
if v.last_repl_offset == self.my_repl_offset
&& other.as_bytes() < self.node_id.as_bytes()
{
return false;
}
}
true
}
}