use std::time::{Duration, Instant};
use tracing::info;
use crate::error::{RaftError, Result};
use crate::log::RaftLog;
use crate::message::{AppendEntriesRequest, LogEntry};
use crate::state::{HardState, LeaderState, NodeRole, VolatileState};
use crate::storage::LogStorage;
#[derive(Debug, Clone)]
pub struct RaftConfig {
pub node_id: u64,
pub group_id: u64,
pub peers: Vec<u64>,
pub election_timeout_min: Duration,
pub election_timeout_max: Duration,
pub heartbeat_interval: Duration,
}
impl RaftConfig {
pub fn cluster_size(&self) -> usize {
self.peers.len() + 1
}
pub fn quorum(&self) -> usize {
self.cluster_size() / 2 + 1
}
}
#[derive(Debug, Default)]
pub struct Ready {
pub hard_state: Option<HardState>,
pub messages: Vec<(u64, AppendEntriesRequest)>,
pub vote_requests: Vec<(u64, crate::message::RequestVoteRequest)>,
pub committed_entries: Vec<LogEntry>,
pub snapshots_needed: Vec<u64>,
}
impl Ready {
pub fn is_empty(&self) -> bool {
self.hard_state.is_none()
&& self.messages.is_empty()
&& self.vote_requests.is_empty()
&& self.committed_entries.is_empty()
&& self.snapshots_needed.is_empty()
}
}
pub struct RaftNode<S: LogStorage> {
pub(super) config: RaftConfig,
pub(super) role: NodeRole,
pub(super) hard_state: HardState,
pub(super) volatile: VolatileState,
pub(super) leader_state: Option<LeaderState>,
pub(super) log: RaftLog<S>,
pub(super) election_deadline: Instant,
pub(super) heartbeat_deadline: Instant,
pub(super) votes_received: Vec<u64>,
pub(super) ready: Ready,
pub(super) leader_id: u64,
}
impl<S: LogStorage> RaftNode<S> {
pub fn new(config: RaftConfig, storage: S) -> Self {
let now = Instant::now();
Self {
log: RaftLog::new(storage),
role: NodeRole::Follower,
hard_state: HardState::new(),
volatile: VolatileState::new(),
leader_state: None,
election_deadline: now + config.election_timeout_max,
heartbeat_deadline: now,
votes_received: Vec::new(),
ready: Ready::default(),
leader_id: 0,
config,
}
}
pub fn restore(&mut self) -> Result<()> {
self.hard_state = self.log.storage().load_hard_state()?;
self.log.restore()?;
self.reset_election_timeout();
Ok(())
}
pub fn node_id(&self) -> u64 {
self.config.node_id
}
pub fn group_id(&self) -> u64 {
self.config.group_id
}
pub fn role(&self) -> NodeRole {
self.role
}
pub fn leader_id(&self) -> u64 {
self.leader_id
}
pub fn current_term(&self) -> u64 {
self.hard_state.current_term
}
pub fn commit_index(&self) -> u64 {
self.volatile.commit_index
}
pub fn last_applied(&self) -> u64 {
self.volatile.last_applied
}
pub fn election_deadline_override(&mut self, deadline: Instant) {
self.election_deadline = deadline;
}
pub fn take_ready(&mut self) -> Ready {
std::mem::take(&mut self.ready)
}
pub fn advance_applied(&mut self, applied_to: u64) {
self.volatile.last_applied = applied_to;
}
pub fn match_index_for(&self, peer: u64) -> Option<u64> {
self.leader_state
.as_ref()
.map(|ls| ls.match_index_for(peer))
}
pub fn log_snapshot_index(&self) -> u64 {
self.log.snapshot_index()
}
pub fn log_snapshot_term(&self) -> u64 {
self.log.snapshot_term()
}
pub fn peers(&self) -> &[u64] {
&self.config.peers
}
pub fn set_peers(&mut self, new_peers: Vec<u64>) {
let last_index = self.log.last_index();
if let Some(ref mut leader) = self.leader_state {
for &peer in &new_peers {
if !self.config.peers.contains(&peer) {
leader.add_peer(peer, last_index);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"added peer to leader tracking"
);
}
}
for &peer in &self.config.peers {
if !new_peers.contains(&peer) {
leader.remove_peer(peer);
info!(
node = self.config.node_id,
group = self.config.group_id,
peer,
"removed peer from leader tracking"
);
}
}
}
self.config.peers = new_peers;
}
pub fn add_peer(&mut self, peer: u64) {
if peer == self.config.node_id || self.config.peers.contains(&peer) {
return;
}
let mut new_peers = self.config.peers.clone();
new_peers.push(peer);
self.set_peers(new_peers);
}
pub fn remove_peer(&mut self, peer: u64) {
if !self.config.peers.contains(&peer) {
return;
}
let new_peers: Vec<u64> = self
.config
.peers
.iter()
.copied()
.filter(|&id| id != peer)
.collect();
self.set_peers(new_peers);
}
pub fn tick(&mut self) {
let now = Instant::now();
match self.role {
NodeRole::Follower | NodeRole::Candidate => {
if now >= self.election_deadline {
self.start_election();
}
}
NodeRole::Leader => {
if now >= self.heartbeat_deadline {
self.replicate_to_all();
self.heartbeat_deadline = now + self.config.heartbeat_interval;
}
}
NodeRole::Learner => {
}
}
}
pub fn propose(&mut self, data: Vec<u8>) -> Result<u64> {
if self.role != NodeRole::Leader {
return Err(RaftError::NotLeader {
leader_hint: if self.leader_id != 0 {
Some(self.leader_id)
} else {
None
},
});
}
let index = self.log.last_index() + 1;
let entry = LogEntry {
term: self.hard_state.current_term,
index,
data,
};
self.log.append(entry)?;
self.replicate_to_all();
if self.config.cluster_size() == 1 {
self.volatile.commit_index = index;
self.collect_committed_entries();
}
Ok(index)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::MemStorage;
fn test_config(node_id: u64, peers: Vec<u64>) -> RaftConfig {
RaftConfig {
node_id,
group_id: 1,
peers,
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
heartbeat_interval: Duration::from_millis(50),
}
}
#[test]
fn single_node_election() {
let config = test_config(1, vec![]);
let mut node = RaftNode::new(config, MemStorage::new());
node.election_deadline = Instant::now() - Duration::from_millis(1);
node.tick();
assert_eq!(node.role(), NodeRole::Leader);
assert_eq!(node.current_term(), 1);
assert_eq!(node.leader_id(), 1);
}
#[test]
fn single_node_propose_and_commit() {
let config = test_config(1, vec![]);
let mut node = RaftNode::new(config, MemStorage::new());
node.election_deadline = Instant::now() - Duration::from_millis(1);
node.tick();
assert_eq!(node.role(), NodeRole::Leader);
let ready = node.take_ready();
assert!(!ready.committed_entries.is_empty());
node.advance_applied(ready.committed_entries.last().unwrap().index);
let idx = node.propose(b"hello".to_vec()).unwrap();
assert_eq!(idx, 2);
let ready = node.take_ready();
assert_eq!(ready.committed_entries.len(), 1);
assert_eq!(ready.committed_entries[0].data, b"hello");
}
#[test]
fn propose_as_follower_fails() {
let config = test_config(1, vec![2, 3]);
let node = &mut RaftNode::new(config, MemStorage::new());
let err = node.propose(b"data".to_vec()).unwrap_err();
assert!(matches!(err, RaftError::NotLeader { .. }));
}
#[test]
fn quorum_calculation() {
let config3 = test_config(1, vec![2, 3]);
assert_eq!(config3.quorum(), 2);
let config5 = RaftConfig {
node_id: 1,
group_id: 1,
peers: vec![2, 3, 4, 5],
election_timeout_min: Duration::from_millis(150),
election_timeout_max: Duration::from_millis(300),
heartbeat_interval: Duration::from_millis(50),
};
assert_eq!(config5.quorum(), 3);
}
#[test]
fn snapshot_needed_after_compaction() {
let config = test_config(1, vec![2, 3]);
let mut node = RaftNode::new(config, MemStorage::new());
node.election_deadline = Instant::now() - Duration::from_millis(1);
node.tick();
let _ready = node.take_ready();
let resp = crate::message::RequestVoteResponse {
term: 1,
vote_granted: true,
};
node.handle_request_vote_response(2, &resp);
assert_eq!(node.role(), NodeRole::Leader);
let _ = node.take_ready();
for i in 0..9 {
node.propose(vec![i]).unwrap();
}
let _ = node.take_ready();
node.log.apply_snapshot(8, 1);
node.replicate_to_all();
let ready = node.take_ready();
assert!(
!ready.snapshots_needed.is_empty(),
"expected snapshots_needed to be non-empty"
);
}
}