use tracing::info;
use crate::state::NodeRole;
use crate::storage::LogStorage;
use super::core::RaftNode;
impl<S: LogStorage> RaftNode<S> {
pub(super) fn set_voters(&mut self, new_voters: Vec<u64>) {
let last_index = self.log.last_index();
if let Some(ref mut leader) = self.leader_state {
for &peer in &new_voters {
if !self.config.peers.contains(&peer) && !self.config.learners.contains(&peer) {
leader.add_peer(peer, last_index);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"added voter to leader tracking"
);
}
}
for &peer in &self.config.peers {
if !new_voters.contains(&peer) && !self.config.learners.contains(&peer) {
leader.remove_peer(peer);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"removed voter from leader tracking"
);
}
}
}
self.config.peers = new_voters;
}
pub fn add_peer(&mut self, peer: u64) {
if peer == self.config.node_id
|| self.config.peers.contains(&peer)
|| self.config.learners.contains(&peer)
{
return;
}
let mut new_peers = self.config.peers.clone();
new_peers.push(peer);
self.set_voters(new_peers);
}
pub fn remove_peer(&mut self, peer: u64) {
if !self.config.peers.contains(&peer) {
return;
}
let new_peers: Vec<u64> = self
.config
.peers
.iter()
.copied()
.filter(|&id| id != peer)
.collect();
self.set_voters(new_peers);
}
pub fn add_learner(&mut self, peer: u64) {
if peer == self.config.node_id
|| self.config.peers.contains(&peer)
|| self.config.learners.contains(&peer)
{
return;
}
let last_index = self.log.last_index();
if let Some(ref mut leader) = self.leader_state {
leader.add_peer(peer, last_index);
}
self.config.learners.push(peer);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"added learner peer"
);
}
pub fn remove_learner(&mut self, peer: u64) {
if !self.config.learners.contains(&peer) {
return;
}
if let Some(ref mut leader) = self.leader_state {
leader.remove_peer(peer);
}
self.config.learners.retain(|&id| id != peer);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"removed learner peer"
);
}
pub fn promote_learner(&mut self, peer: u64) -> bool {
if !self.config.learners.contains(&peer) {
return false;
}
self.config.learners.retain(|&id| id != peer);
if !self.config.peers.contains(&peer) {
self.config.peers.push(peer);
}
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"promoted learner to voter"
);
true
}
pub fn promote_self_to_voter(&mut self) {
if self.role == NodeRole::Learner {
self.role = NodeRole::Follower;
self.config.starts_as_learner = false;
info!(
node = self.config.node_id,
group = self.config.group_id,
"promoted self from learner to follower"
);
}
}
}
#[cfg(test)]
mod tests {
use crate::node::config::RaftConfig;
use crate::node::core::RaftNode;
use crate::state::NodeRole;
use crate::storage::MemStorage;
use std::time::{Duration, Instant};
fn cfg(node_id: u64, peers: Vec<u64>) -> RaftConfig {
RaftConfig {
node_id,
group_id: 0,
peers,
learners: vec![],
starts_as_learner: false,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
heartbeat_interval: Duration::from_millis(50),
}
}
fn force_leader(node: &mut RaftNode<MemStorage>) {
node.election_deadline_override(Instant::now() - Duration::from_millis(1));
node.tick();
let _ = node.take_ready();
}
#[test]
fn add_learner_does_not_change_quorum() {
let mut node = RaftNode::new(cfg(1, vec![2, 3]), MemStorage::new());
assert_eq!(node.config.quorum(), 2);
node.add_learner(4);
assert_eq!(node.learners(), &[4]);
assert_eq!(node.config.quorum(), 2);
assert_eq!(node.config.cluster_size(), 3);
}
#[test]
fn promote_learner_grows_quorum() {
let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
assert_eq!(node.config.quorum(), 2);
node.add_learner(3);
assert_eq!(node.config.quorum(), 2);
let promoted = node.promote_learner(3);
assert!(promoted);
assert_eq!(node.voters(), &[2, 3]);
assert!(node.learners().is_empty());
assert_eq!(node.config.cluster_size(), 3);
assert_eq!(node.config.quorum(), 2);
}
#[test]
fn remove_learner_drops_peer() {
let mut node = RaftNode::new(cfg(1, vec![2]), MemStorage::new());
node.add_learner(3);
assert_eq!(node.learners(), &[3]);
node.remove_learner(3);
assert!(node.learners().is_empty());
}
#[test]
fn add_learner_on_leader_starts_tracking() {
let mut node = RaftNode::new(cfg(1, vec![]), MemStorage::new());
force_leader(&mut node);
assert_eq!(node.role(), NodeRole::Leader);
let _ = node.propose(b"x".to_vec()).unwrap();
let _ = node.take_ready();
node.add_learner(2);
assert_eq!(node.match_index_for(2), Some(0));
node.replicate_to_all();
let ready = node.take_ready();
let targets: Vec<u64> = ready.messages.iter().map(|(p, _)| *p).collect();
assert!(
targets.contains(&2),
"learner should receive AE, got {targets:?}"
);
}
#[test]
fn promote_self_flips_role() {
let mut c = cfg(2, vec![1]);
c.starts_as_learner = true;
let mut node = RaftNode::new(c, MemStorage::new());
assert_eq!(node.role(), NodeRole::Learner);
node.promote_self_to_voter();
assert_eq!(node.role(), NodeRole::Follower);
}
}