use std::cmp::Ordering;
use crate::{
sequence_paxos::{Phase, Role},
util::{defaults::*, ConfigurationId, FlexibleQuorum, Quorum},
};
#[cfg(feature = "logging")]
use crate::utils::logger::create_logger;
use crate::{
messages::ballot_leader_election::{
BLEMessage, HeartbeatMsg, HeartbeatReply, HeartbeatRequest,
},
util::NodeId,
OmniPaxosConfig,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
#[cfg(feature = "logging")]
use slog::{info, trace, Logger};
#[derive(Clone, Copy, Eq, Debug, Default, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Ballot {
pub config_id: ConfigurationId,
pub n: u32,
pub priority: u32,
pub pid: NodeId,
}
impl Ballot {
pub fn with(config_id: ConfigurationId, n: u32, priority: u32, pid: NodeId) -> Ballot {
Ballot {
config_id,
n,
priority,
pid,
}
}
}
impl Ord for Ballot {
fn cmp(&self, other: &Self) -> Ordering {
(self.n, self.priority, self.pid).cmp(&(other.n, other.priority, other.pid))
}
}
impl PartialOrd for Ballot {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
const INITIAL_ROUND: u32 = 1;
const RECOVERY_ROUND: u32 = 0;
pub(crate) struct BallotLeaderElection {
configuration_id: ConfigurationId,
pid: NodeId,
peers: Vec<NodeId>,
hb_round: u32,
heartbeat_replies: Vec<HeartbeatReply>,
prev_replies: Vec<HeartbeatReply>,
current_ballot: Ballot,
leader: Ballot,
happy: bool,
quorum: Quorum,
outgoing: Vec<BLEMessage>,
#[cfg(feature = "logging")]
logger: Logger,
}
impl BallotLeaderElection {
pub(crate) fn with(config: BLEConfig, recovered_leader: Option<Ballot>) -> Self {
let config_id = config.configuration_id;
let pid = config.pid;
let peers = config.peers;
let num_nodes = &peers.len() + 1;
let quorum = Quorum::with(config.flexible_quorum, num_nodes);
let mut initial_ballot = Ballot::with(config_id, INITIAL_ROUND, config.priority, pid);
let initial_leader = match recovered_leader {
Some(b) if b != Ballot::default() => {
initial_ballot.n = RECOVERY_ROUND;
b
}
_ => initial_ballot,
};
let mut ble = BallotLeaderElection {
configuration_id: config_id,
pid,
peers,
hb_round: 0,
heartbeat_replies: Vec::with_capacity(num_nodes),
prev_replies: Vec::with_capacity(num_nodes),
current_ballot: initial_ballot,
leader: initial_leader,
happy: true,
quorum,
outgoing: Vec::with_capacity(config.buffer_size),
#[cfg(feature = "logging")]
logger: {
if let Some(logger) = config.custom_logger {
logger
} else {
let s = config
.logger_file_path
.unwrap_or_else(|| format!("logs/paxos_{}.log", pid));
create_logger(s.as_str())
}
},
};
#[cfg(feature = "logging")]
{
info!(
ble.logger,
"Ballot Leader Election component pid: {} created!", pid
);
}
ble.new_hb_round();
ble
}
pub(crate) fn set_priority(&mut self, p: u32) {
self.current_ballot.priority = p;
}
pub(crate) fn get_outgoing_msgs(&mut self) -> Vec<BLEMessage> {
std::mem::take(&mut self.outgoing)
}
pub(crate) fn handle(&mut self, m: BLEMessage) {
match m.msg {
HeartbeatMsg::Request(req) => self.handle_request(m.from, req),
HeartbeatMsg::Reply(rep) => self.handle_reply(rep),
}
}
pub(crate) fn new_hb_round(&mut self) {
self.prev_replies = std::mem::take(&mut self.heartbeat_replies);
self.hb_round += 1;
#[cfg(feature = "logging")]
trace!(
self.logger,
"Initiate new heartbeat round: {}",
self.hb_round
);
for peer in &self.peers {
let hb_request = HeartbeatRequest {
round: self.hb_round,
};
self.outgoing.push(BLEMessage {
from: self.pid,
to: *peer,
msg: HeartbeatMsg::Request(hb_request),
});
}
}
pub(crate) fn hb_timeout(
&mut self,
seq_paxos_state: &(Role, Phase),
seq_paxos_promise: Ballot,
) -> Option<Ballot> {
self.update_leader();
self.update_happiness(seq_paxos_state);
self.check_takeover();
self.new_hb_round();
if seq_paxos_promise > self.leader {
self.leader = seq_paxos_promise;
self.happy = true;
}
if self.leader == self.current_ballot {
Some(self.current_ballot)
} else {
None
}
}
fn update_leader(&mut self) {
let max_reply_ballot = self.heartbeat_replies.iter().map(|r| r.ballot).max();
if let Some(max) = max_reply_ballot {
if max > self.leader {
self.leader = max;
}
}
}
fn update_happiness(&mut self, seq_paxos_state: &(Role, Phase)) {
self.happy = if self.leader == self.current_ballot {
let potential_followers = self
.heartbeat_replies
.iter()
.filter(|hb_reply| hb_reply.leader <= self.current_ballot)
.count();
let can_form_quorum = match seq_paxos_state {
(Role::Leader, Phase::Accept) => {
self.quorum.is_accept_quorum(potential_followers + 1)
}
_ => self.quorum.is_prepare_quorum(potential_followers + 1),
};
if can_form_quorum {
true
} else {
let see_larger_happy_leader = self
.heartbeat_replies
.iter()
.any(|r| r.leader > self.current_ballot && r.happy);
see_larger_happy_leader
}
} else {
self.heartbeat_replies
.iter()
.any(|r| r.ballot == self.leader && r.happy)
};
}
fn check_takeover(&mut self) {
if !self.happy {
let all_neighbors_unhappy = self.heartbeat_replies.iter().all(|r| !r.happy);
let im_quorum_connected = self
.quorum
.is_prepare_quorum(self.heartbeat_replies.len() + 1);
if all_neighbors_unhappy && im_quorum_connected {
self.current_ballot.n = self.leader.n + 1;
self.leader = self.current_ballot;
self.happy = true;
}
}
}
fn handle_request(&mut self, from: NodeId, req: HeartbeatRequest) {
let hb_reply = HeartbeatReply {
round: req.round,
ballot: self.current_ballot,
leader: self.leader,
happy: self.happy,
};
self.outgoing.push(BLEMessage {
from: self.pid,
to: from,
msg: HeartbeatMsg::Reply(hb_reply),
});
}
fn handle_reply(&mut self, rep: HeartbeatReply) {
if rep.round == self.hb_round && rep.ballot.config_id == self.configuration_id {
self.heartbeat_replies.push(rep);
}
}
pub(crate) fn get_current_ballot(&self) -> Ballot {
self.current_ballot
}
pub(crate) fn get_ballots(&self) -> Vec<HeartbeatReply> {
self.prev_replies.clone()
}
}
#[derive(Clone, Debug)]
pub(crate) struct BLEConfig {
configuration_id: ConfigurationId,
pid: NodeId,
peers: Vec<NodeId>,
priority: u32,
flexible_quorum: Option<FlexibleQuorum>,
buffer_size: usize,
#[cfg(feature = "logging")]
logger_file_path: Option<String>,
#[cfg(feature = "logging")]
custom_logger: Option<Logger>,
}
impl From<OmniPaxosConfig> for BLEConfig {
fn from(config: OmniPaxosConfig) -> Self {
let pid = config.server_config.pid;
let peers = config
.cluster_config
.nodes
.into_iter()
.filter(|x| *x != pid)
.collect();
Self {
configuration_id: config.cluster_config.configuration_id,
pid,
peers,
priority: config.server_config.leader_priority,
flexible_quorum: config.cluster_config.flexible_quorum,
buffer_size: BLE_BUFFER_SIZE,
#[cfg(feature = "logging")]
logger_file_path: config.server_config.logger_file_path,
#[cfg(feature = "logging")]
custom_logger: config.server_config.custom_logger,
}
}
}