use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use spin::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeRole {
Follower,
Candidate,
Leader,
}
impl NodeRole {
pub fn name(&self) -> &'static str {
match self {
NodeRole::Follower => "Follower",
NodeRole::Candidate => "Candidate",
NodeRole::Leader => "Leader",
}
}
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub index: u64,
pub term: u64,
pub command: Command,
pub committed: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Command {
Write {
offset: u64,
size: u64,
},
Delete {
offset: u64,
},
Snapshot {
id: u64,
},
NoOp,
}
#[derive(Debug, Clone)]
pub struct ClusterNode {
pub id: u64,
pub address: String,
pub alive: bool,
pub last_heartbeat: u64,
pub next_index: u64,
pub match_index: u64,
}
impl ClusterNode {
pub fn new(id: u64, address: String) -> Self {
Self {
id,
address,
alive: true,
last_heartbeat: 0,
next_index: 1,
match_index: 0,
}
}
pub fn heartbeat(&mut self, timestamp: u64) {
self.last_heartbeat = timestamp;
self.alive = true;
}
pub fn is_alive(&self, current_time: u64, timeout_ms: u64) -> bool {
current_time < self.last_heartbeat + timeout_ms
}
}
#[derive(Debug, Clone)]
pub struct VoteRequest {
pub term: u64,
pub candidate_id: u64,
pub last_log_index: u64,
pub last_log_term: u64,
}
#[derive(Debug, Clone)]
pub struct VoteResponse {
pub term: u64,
pub vote_granted: bool,
}
#[derive(Debug, Clone)]
pub struct AppendEntriesRequest {
pub term: u64,
pub leader_id: u64,
pub prev_log_index: u64,
pub prev_log_term: u64,
pub entries: Vec<LogEntry>,
pub leader_commit: u64,
}
#[derive(Debug, Clone)]
pub struct AppendEntriesResponse {
pub term: u64,
pub success: bool,
pub match_index: u64,
}
#[derive(Debug, Clone, Default)]
pub struct DistributedStats {
pub elections: u64,
pub successful_elections: u64,
pub log_entries: u64,
pub committed_entries: u64,
pub heartbeats_sent: u64,
}
lazy_static! {
static ref RAFT_CONSENSUS: Mutex<RaftConsensus> = Mutex::new(RaftConsensus::new());
}
pub struct RaftConsensus {
node_id: u64,
role: NodeRole,
current_term: u64,
voted_for: Option<u64>,
log: Vec<LogEntry>,
commit_index: u64,
last_applied: u64,
nodes: BTreeMap<u64, ClusterNode>,
leader_id: Option<u64>,
election_timeout: u64,
heartbeat_interval: u64,
last_heartbeat_time: u64,
votes_received: u64,
stats: DistributedStats,
}
impl Default for RaftConsensus {
fn default() -> Self {
Self::new()
}
}
impl RaftConsensus {
pub fn new() -> Self {
Self {
node_id: 1,
role: NodeRole::Follower,
current_term: 0,
voted_for: None,
log: Vec::new(),
commit_index: 0,
last_applied: 0,
nodes: BTreeMap::new(),
leader_id: None,
election_timeout: 1000, heartbeat_interval: 250, last_heartbeat_time: 0,
votes_received: 0,
stats: DistributedStats::default(),
}
}
pub fn set_node_id(&mut self, id: u64) {
self.node_id = id;
}
pub fn add_node(&mut self, node: ClusterNode) {
crate::lcpfs_println!("[ RAFT ] Added node {} at {}", node.id, node.address);
self.nodes.insert(node.id, node);
}
pub fn start_election(&mut self, timestamp: u64) {
self.current_term += 1;
self.role = NodeRole::Candidate;
self.voted_for = Some(self.node_id);
self.votes_received = 1;
self.stats.elections += 1;
crate::lcpfs_println!(
"[ RAFT ] Node {} starting election for term {}",
self.node_id,
self.current_term
);
self.last_heartbeat_time = timestamp;
}
pub fn handle_vote_request(&mut self, request: VoteRequest) -> VoteResponse {
let mut vote_granted = false;
if request.term > self.current_term {
self.current_term = request.term;
self.role = NodeRole::Follower;
self.voted_for = None;
}
if request.term == self.current_term
&& (self.voted_for.is_none() || self.voted_for == Some(request.candidate_id))
{
let last_log_index = self.log.len() as u64;
let last_log_term = self.log.last().map(|e| e.term).unwrap_or(0);
if request.last_log_term > last_log_term
|| (request.last_log_term == last_log_term
&& request.last_log_index >= last_log_index)
{
vote_granted = true;
self.voted_for = Some(request.candidate_id);
}
}
VoteResponse {
term: self.current_term,
vote_granted,
}
}
pub fn handle_vote_response(&mut self, response: VoteResponse) {
if self.role != NodeRole::Candidate {
return;
}
if response.term > self.current_term {
self.current_term = response.term;
self.role = NodeRole::Follower;
self.voted_for = None;
return;
}
if response.vote_granted {
self.votes_received += 1;
let total_nodes = self.nodes.len() as u64 + 1; if self.votes_received > total_nodes / 2 {
self.become_leader();
}
}
}
fn become_leader(&mut self) {
self.role = NodeRole::Leader;
self.leader_id = Some(self.node_id);
self.stats.successful_elections += 1;
crate::lcpfs_println!(
"[ RAFT ] Node {} became leader for term {}",
self.node_id,
self.current_term
);
for node in self.nodes.values_mut() {
node.next_index = self.log.len() as u64 + 1;
node.match_index = 0;
}
self.append_entry(Command::NoOp, self.current_term);
}
pub fn append_entry(&mut self, command: Command, term: u64) -> u64 {
let index = self.log.len() as u64 + 1;
let entry = LogEntry {
index,
term,
command,
committed: false,
};
self.log.push(entry);
self.stats.log_entries += 1;
index
}
pub fn handle_append_entries(
&mut self,
request: AppendEntriesRequest,
timestamp: u64,
) -> AppendEntriesResponse {
let mut success = false;
if request.term > self.current_term {
self.current_term = request.term;
self.role = NodeRole::Follower;
self.voted_for = None;
}
self.last_heartbeat_time = timestamp;
self.leader_id = Some(request.leader_id);
if request.term == self.current_term {
if request.prev_log_index == 0
|| (request.prev_log_index as usize <= self.log.len()
&& self.log[request.prev_log_index as usize - 1].term == request.prev_log_term)
{
success = true;
for entry in request.entries {
if entry.index as usize <= self.log.len() {
self.log.truncate(entry.index as usize - 1);
}
self.log.push(entry);
}
if request.leader_commit > self.commit_index {
self.commit_index = request.leader_commit.min(self.log.len() as u64);
}
}
}
AppendEntriesResponse {
term: self.current_term,
success,
match_index: self.log.len() as u64,
}
}
pub fn send_heartbeat(&mut self, timestamp: u64) {
if self.role != NodeRole::Leader {
return;
}
self.last_heartbeat_time = timestamp;
self.stats.heartbeats_sent += 1;
}
pub fn check_election_timeout(&mut self, current_time: u64) -> bool {
if self.role == NodeRole::Leader {
return false;
}
current_time > self.last_heartbeat_time + self.election_timeout
}
pub fn commit_entries(&mut self) {
if self.role != NodeRole::Leader {
return;
}
let total_nodes = self.nodes.len() as u64 + 1;
let majority = total_nodes / 2 + 1;
for index in (self.commit_index + 1)..=(self.log.len() as u64) {
let mut replicated_count = 1;
for node in self.nodes.values() {
if node.match_index >= index {
replicated_count += 1;
}
}
if replicated_count >= majority {
self.commit_index = index;
if let Some(entry) = self.log.get_mut(index as usize - 1) {
entry.committed = true;
self.stats.committed_entries += 1;
}
}
}
}
pub fn role(&self) -> NodeRole {
self.role
}
pub fn leader(&self) -> Option<u64> {
self.leader_id
}
pub fn stats(&self) -> DistributedStats {
self.stats.clone()
}
}
pub struct Distributed;
impl Distributed {
pub fn add_node(node: ClusterNode) {
let mut raft = RAFT_CONSENSUS.lock();
raft.add_node(node);
}
pub fn start_election(timestamp: u64) {
let mut raft = RAFT_CONSENSUS.lock();
raft.start_election(timestamp);
}
pub fn append_entry(command: Command, term: u64) -> u64 {
let mut raft = RAFT_CONSENSUS.lock();
raft.append_entry(command, term)
}
pub fn role() -> NodeRole {
let raft = RAFT_CONSENSUS.lock();
raft.role()
}
pub fn stats() -> DistributedStats {
let raft = RAFT_CONSENSUS.lock();
raft.stats()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_node_role() {
assert_eq!(NodeRole::Follower.name(), "Follower");
assert_eq!(NodeRole::Leader.name(), "Leader");
}
#[test]
fn test_cluster_node() {
let node = ClusterNode::new(1, "192.168.1.1:8000".into());
assert_eq!(node.id, 1);
assert!(node.alive);
assert_eq!(node.next_index, 1);
}
#[test]
fn test_heartbeat() {
let mut node = ClusterNode::new(1, "192.168.1.1:8000".into());
node.heartbeat(1000);
assert_eq!(node.last_heartbeat, 1000);
assert!(node.is_alive(1500, 1000)); assert!(!node.is_alive(2500, 1000)); }
#[test]
fn test_initial_state() {
let raft = RaftConsensus::new();
assert_eq!(raft.role, NodeRole::Follower);
assert_eq!(raft.current_term, 0);
assert!(raft.voted_for.is_none());
assert!(raft.leader_id.is_none());
}
#[test]
fn test_start_election() {
let mut raft = RaftConsensus::new();
raft.start_election(1000);
assert_eq!(raft.role, NodeRole::Candidate);
assert_eq!(raft.current_term, 1);
assert_eq!(raft.voted_for, Some(1));
assert_eq!(raft.votes_received, 1);
}
#[test]
fn test_vote_request() {
let mut raft = RaftConsensus::new();
let request = VoteRequest {
term: 1,
candidate_id: 2,
last_log_index: 0,
last_log_term: 0,
};
let response = raft.handle_vote_request(request);
assert!(response.vote_granted);
assert_eq!(raft.voted_for, Some(2));
}
#[test]
fn test_election_win() {
let mut raft = RaftConsensus::new();
raft.add_node(ClusterNode::new(2, "192.168.1.2:8000".into()));
raft.add_node(ClusterNode::new(3, "192.168.1.3:8000".into()));
raft.start_election(1000);
raft.handle_vote_response(VoteResponse {
term: 1,
vote_granted: true,
});
assert_eq!(raft.role, NodeRole::Leader);
assert_eq!(raft.leader_id, Some(1));
}
#[test]
fn test_log_append() {
let mut raft = RaftConsensus::new();
let index = raft.append_entry(
Command::Write {
offset: 0,
size: 4096,
},
1,
);
assert_eq!(index, 1);
assert_eq!(raft.log.len(), 1);
assert_eq!(raft.stats.log_entries, 1);
}
#[test]
fn test_append_entries_heartbeat() {
let mut raft = RaftConsensus::new();
let request = AppendEntriesRequest {
term: 1,
leader_id: 2,
prev_log_index: 0,
prev_log_term: 0,
entries: Vec::new(),
leader_commit: 0,
};
let response = raft.handle_append_entries(request, 1000);
assert!(response.success);
assert_eq!(raft.current_term, 1);
assert_eq!(raft.leader_id, Some(2));
}
#[test]
fn test_commit_entries() {
let mut raft = RaftConsensus::new();
raft.role = NodeRole::Leader;
let mut node2 = ClusterNode::new(2, "192.168.1.2:8000".into());
let mut node3 = ClusterNode::new(3, "192.168.1.3:8000".into());
node2.match_index = 2;
node3.match_index = 2;
raft.add_node(node2);
raft.add_node(node3);
raft.append_entry(
Command::Write {
offset: 0,
size: 4096,
},
1,
);
raft.append_entry(
Command::Write {
offset: 4096,
size: 4096,
},
1,
);
raft.commit_entries();
assert_eq!(raft.commit_index, 2);
assert_eq!(raft.stats.committed_entries, 2);
}
#[test]
fn test_election_timeout() {
let mut raft = RaftConsensus::new();
raft.last_heartbeat_time = 1000;
assert!(!raft.check_election_timeout(1500)); assert!(raft.check_election_timeout(2500)); }
#[test]
fn test_higher_term_step_down() {
let mut raft = RaftConsensus::new();
raft.role = NodeRole::Leader;
raft.current_term = 1;
let request = VoteRequest {
term: 2,
candidate_id: 2,
last_log_index: 0,
last_log_term: 0,
};
raft.handle_vote_request(request);
assert_eq!(raft.role, NodeRole::Follower);
assert_eq!(raft.current_term, 2);
}
#[test]
fn test_no_op_entry() {
let mut raft = RaftConsensus::new();
let index = raft.append_entry(Command::NoOp, 1);
assert_eq!(index, 1);
assert_eq!(raft.log[0].command, Command::NoOp);
}
#[test]
fn test_statistics() {
let mut raft = RaftConsensus::new();
raft.start_election(1000);
raft.append_entry(
Command::Write {
offset: 0,
size: 4096,
},
1,
);
let stats = raft.stats();
assert_eq!(stats.elections, 1);
assert_eq!(stats.log_entries, 1);
}
}