use std::sync::Arc;
use std::time::Duration;
use crate::elections::phi_detector::PhiAccrualDetector;
use crate::elections::proposal::Proposal;
use crate::error::{RepError, Result};
use crate::net::channel::Channel;
use crate::protocol::ProtocolMessage;
use crate::rep_group::RepGroup;
pub type NodeId = u32;
pub fn run_election(
node_id: NodeId,
node_name: &str,
group: &RepGroup,
channels: &[Arc<dyn Channel>],
proposed_vlsn: u64,
priority: u32,
term: u64,
) -> Option<NodeId> {
run_election_with_phi(
node_id,
node_name,
group,
channels,
proposed_vlsn,
priority,
term,
None,
Duration::from_millis(500),
)
}
pub fn run_election_with_phi(
node_id: NodeId,
node_name: &str,
group: &RepGroup,
channels: &[Arc<dyn Channel>],
proposed_vlsn: u64,
priority: u32,
term: u64,
phi_detector: Option<&PhiAccrualDetector>,
fallback_timeout: Duration,
) -> Option<NodeId> {
let our_node_can_be_master = group
.get_node(node_name)
.map(|n| n.can_be_master())
.unwrap_or(false);
if !our_node_can_be_master {
log::warn!(
"election: node {} (non-electable-as-master) refusing to \
propose; arbiter / monitor / secondary cannot be master",
node_name
);
return None;
}
let phase1_quorum = group.phase1_quorum();
let phase2_quorum = group.phase2_quorum();
if phase1_quorum == 0 || phase2_quorum == 0 {
return None;
}
let phase_timeout = phi_detector
.map(|p| p.suggested_phase_timeout(3.0, fallback_timeout))
.unwrap_or(fallback_timeout);
let self_needed = 1usize;
let our_proposal =
Proposal::new(node_name.to_string(), proposed_vlsn, priority, term);
let phase1_msg = ProtocolMessage::ElectionProposal {
node_name: node_name.to_string(),
vlsn: proposed_vlsn,
priority,
term,
};
let mut promises: Vec<Arc<dyn Channel>> = Vec::new();
let mut best_proposal = our_proposal;
let phase1_timeout = phase_timeout;
for ch in channels {
if let Ok(()) = send_message(ch.as_ref(), &phase1_msg) {
match receive_message(ch.as_ref(), phase1_timeout) {
Ok(Some(ProtocolMessage::ElectionVote {
granted: true,
..
})) => {
promises.push(Arc::clone(ch));
}
Ok(Some(ProtocolMessage::ElectionProposal {
node_name: peer_name,
vlsn: peer_vlsn,
priority: peer_priority,
term: peer_term,
})) => {
let peer_can_be_master = group
.get_node(&peer_name)
.map(|n| n.can_be_master())
.unwrap_or(false);
if peer_can_be_master {
let peer_p = Proposal::new(
peer_name,
peer_vlsn,
peer_priority,
peer_term,
);
if peer_p.is_better_than(&best_proposal) {
best_proposal = peer_p;
}
}
promises.push(Arc::clone(ch));
}
_ => {
}
}
}
}
let total_promises = promises.len() + self_needed;
if total_promises < phase1_quorum {
return None;
}
let winner_name = best_proposal.node_name;
let accept_msg =
ProtocolMessage::ElectionResult { master: winner_name.clone(), term };
let mut accepts = 0usize;
let phase2_timeout = phase_timeout;
for ch in &promises {
if send_message(ch.as_ref(), &accept_msg).is_ok()
&& let Ok(Some(ProtocolMessage::ElectionVote {
granted: true, ..
})) = receive_message(ch.as_ref(), phase2_timeout)
{
accepts += 1;
}
}
accepts += self_needed;
if accepts >= phase2_quorum {
let winner_id = if winner_name == node_name {
node_id
} else {
group.get_node(&winner_name).map(|n| n.node_id()).unwrap_or(node_id)
};
Some(winner_id)
} else {
None
}
}
pub fn run_acceptor(
channel: &dyn Channel,
node_name: &str,
own_vlsn: u64,
own_priority: u32,
own_term: u64,
) -> Result<Option<String>> {
let timeout = Duration::from_millis(500);
let mut promised_term: Option<u64> = None;
let phase1 = match receive_message(channel, timeout)? {
Some(m) => m,
None => return Ok(None),
};
match phase1 {
ProtocolMessage::ElectionProposal {
node_name: _proposer,
vlsn: _vlsn,
priority: _priority,
term,
} => {
let should_promise =
promised_term.is_none_or(|promised| term >= promised);
if should_promise {
promised_term = Some(term);
send_message(
channel,
&ProtocolMessage::ElectionProposal {
node_name: node_name.to_string(),
vlsn: own_vlsn,
priority: own_priority,
term: own_term,
},
)?;
} else {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: false,
term: promised_term.unwrap_or(own_term),
},
)?;
return Ok(None);
}
}
_ => {
return Err(RepError::ProtocolError(
"acceptor: expected ElectionProposal in phase 1".into(),
));
}
}
let phase2 = match receive_message(channel, timeout)? {
Some(m) => m,
None => return Ok(None),
};
match phase2 {
ProtocolMessage::ElectionResult { master, term } => {
if promised_term.is_some_and(|p| term >= p) {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: true,
term,
},
)?;
Ok(Some(master))
} else {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: false,
term,
},
)?;
Ok(None)
}
}
_ => Err(RepError::ProtocolError(
"acceptor: expected ElectionResult in phase 2".into(),
)),
}
}
pub fn run_acceptor_with_state(
channel: &dyn Channel,
node_name: &str,
own_vlsn: u64,
own_priority: u32,
own_term: u64,
state: &crate::elections::acceptor_state::PersistentAcceptorState,
) -> Result<Option<String>> {
let timeout = Duration::from_millis(500);
let phase1 = match receive_message(channel, timeout)? {
Some(m) => m,
None => return Ok(None),
};
let phase1_term = match phase1 {
ProtocolMessage::ElectionProposal {
node_name: _proposer,
vlsn: _vlsn,
priority: _priority,
term,
} => {
if state.try_promise(term) {
send_message(
channel,
&ProtocolMessage::ElectionProposal {
node_name: node_name.to_string(),
vlsn: own_vlsn,
priority: own_priority,
term: own_term,
},
)?;
term
} else {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: false,
term: state.promised_term(),
},
)?;
return Ok(None);
}
}
_ => {
return Err(RepError::ProtocolError(
"acceptor: expected ElectionProposal in phase 1".into(),
));
}
};
let phase2 = match receive_message(channel, timeout)? {
Some(m) => m,
None => return Ok(None),
};
match phase2 {
ProtocolMessage::ElectionResult { master, term } => {
if term >= phase1_term && state.try_accept(term, &master) {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: true,
term,
},
)?;
Ok(Some(master))
} else {
send_message(
channel,
&ProtocolMessage::ElectionVote {
voter: node_name.to_string(),
granted: false,
term,
},
)?;
Ok(None)
}
}
_ => Err(RepError::ProtocolError(
"acceptor: expected ElectionResult in phase 2".into(),
)),
}
}
fn send_message(ch: &dyn Channel, msg: &ProtocolMessage) -> Result<()> {
ch.send(&msg.encode())
}
fn receive_message(
ch: &dyn Channel,
timeout: Duration,
) -> Result<Option<ProtocolMessage>> {
match ch.receive(timeout)? {
Some(bytes) => Ok(Some(ProtocolMessage::decode(&bytes)?)),
None => Ok(None),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::net::channel::LocalChannelPair;
use crate::node_type::NodeType;
use crate::rep_group::RepGroup;
use crate::rep_node::RepNode;
fn make_group_3() -> RepGroup {
let mut g = RepGroup::new("testgroup".into(), 1);
for i in 1u32..=3 {
g.add_node(RepNode::new(
format!("node{}", i),
NodeType::Electable,
"127.0.0.1".into(),
5000 + i as u16,
i,
));
}
g
}
fn make_group_5() -> RepGroup {
let mut g = RepGroup::new("testgroup".into(), 1);
for i in 1u32..=5 {
g.add_node(RepNode::new(
format!("node{}", i),
NodeType::Electable,
"127.0.0.1".into(),
5000 + i as u16,
i,
));
}
g
}
fn spawn_acceptors(
pairs: Vec<LocalChannelPair>,
acceptor_name: &str,
own_vlsn: u64,
own_priority: u32,
own_term: u64,
) -> (Vec<Arc<dyn Channel>>, Vec<std::thread::JoinHandle<Option<String>>>)
{
let mut proposer_channels: Vec<Arc<dyn Channel>> = Vec::new();
let mut handles = Vec::new();
for pair in pairs {
let ch_a: Arc<dyn Channel> = Arc::new(pair.channel_a);
let ch_b: Arc<dyn Channel> = Arc::new(pair.channel_b);
proposer_channels.push(ch_a);
let name = acceptor_name.to_string();
handles.push(std::thread::spawn(move || {
run_acceptor(&*ch_b, &name, own_vlsn, own_priority, own_term)
.unwrap_or(None)
}));
}
(proposer_channels, handles)
}
#[test]
fn test_election_majority_3_node_cluster() {
let group = make_group_3();
let pairs: Vec<LocalChannelPair> =
(0..2).map(|_| LocalChannelPair::new()).collect();
let (channels, acceptor_handles) =
spawn_acceptors(pairs, "node2", 50, 1, 1);
let winner = run_election(
1, "node1", &group, &channels, 100, 1, 1, );
for h in acceptor_handles {
h.join().unwrap();
}
assert!(winner.is_some(), "expected election to succeed");
assert_eq!(winner.unwrap(), 1, "node1 should win (higher VLSN)");
}
#[test]
fn test_election_majority_5_node_cluster() {
let group = make_group_5();
let pairs: Vec<LocalChannelPair> =
(0..4).map(|_| LocalChannelPair::new()).collect();
let (channels, acceptor_handles) =
spawn_acceptors(pairs, "peerN", 50, 1, 1);
let winner = run_election(
1, "node1", &group, &channels, 200, 1, 1, );
for h in acceptor_handles {
h.join().unwrap();
}
assert!(winner.is_some());
assert_eq!(winner.unwrap(), 1);
}
#[test]
fn test_election_no_quorum_no_peers() {
let group = make_group_3();
let winner = run_election(1, "node1", &group, &[], 100, 1, 1);
assert!(
winner.is_none(),
"should fail: self alone does not reach quorum of 2"
);
}
#[test]
fn test_election_single_node_group() {
let mut group = RepGroup::new("g".into(), 1);
group.add_node(RepNode::new(
"node1".into(),
NodeType::Electable,
"127.0.0.1".into(),
5001,
1,
));
let winner = run_election(
1,
"node1",
&group,
&[],
100,
1,
1,
);
assert_eq!(winner, Some(1));
}
#[test]
fn test_acceptor_returns_own_suggestion_for_better_candidate() {
let pair = LocalChannelPair::new();
let proposer_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let acceptor_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let handle = std::thread::spawn(move || {
run_acceptor(&*acceptor_ch, "node2", 999, 1, 1).unwrap()
});
let mut group = RepGroup::new("g".into(), 1);
group.add_node(RepNode::new(
"node1".into(),
NodeType::Electable,
"127.0.0.1".into(),
5001,
1,
));
group.add_node(RepNode::new(
"node2".into(),
NodeType::Electable,
"127.0.0.1".into(),
5002,
2,
));
let winner = run_election(1, "node1", &group, &[proposer_ch], 10, 1, 1);
let accepted = handle.join().unwrap();
assert!(accepted.is_some());
assert_eq!(accepted.unwrap(), "node2");
assert_eq!(winner, Some(2));
}
#[test]
fn test_election_best_candidate_wins() {
let group = {
let mut g = RepGroup::new("g".into(), 1);
g.add_node(RepNode::new(
"node1".into(),
NodeType::Electable,
"127.0.0.1".into(),
5001,
1,
));
g.add_node(RepNode::new(
"node2".into(),
NodeType::Electable,
"127.0.0.1".into(),
5002,
2,
));
g
};
let pair = LocalChannelPair::new();
let proposer_ch: Arc<dyn Channel> = Arc::new(pair.channel_a);
let acceptor_ch: Arc<dyn Channel> = Arc::new(pair.channel_b);
let handle = std::thread::spawn(move || {
run_acceptor(&*acceptor_ch, "node2", 999, 1, 1).unwrap()
});
let winner = run_election(1, "node1", &group, &[proposer_ch], 10, 1, 1);
let accepted = handle.join().unwrap();
assert!(accepted.is_some());
assert_eq!(winner, Some(2), "node2 should win with higher VLSN");
}
}