use anyhow::{anyhow, Result};
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
pub type NodeId = u64;
pub type LogIndex = u64;
pub type Term = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct VectorEntry {
pub vector_id: String,
pub vector: Vec<f32>,
pub metadata: HashMap<String, String>,
pub inserted_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum IndexCommand {
Upsert(VectorEntry),
Delete { vector_id: String },
Rebuild,
UpdateMetadata {
vector_id: String,
metadata: HashMap<String, String>,
},
NoOp,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub index: LogIndex,
pub term: Term,
pub command: IndexCommand,
pub client_id: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum NodeRole {
Follower,
Candidate,
Leader,
}
impl std::fmt::Display for NodeRole {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Follower => write!(f, "Follower"),
Self::Candidate => write!(f, "Candidate"),
Self::Leader => write!(f, "Leader"),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendEntriesRequest {
pub term: Term,
pub leader_id: NodeId,
pub prev_log_index: LogIndex,
pub prev_log_term: Term,
pub entries: Vec<LogEntry>,
pub leader_commit: LogIndex,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AppendEntriesResponse {
pub term: Term,
pub success: bool,
pub node_id: NodeId,
pub conflict_index: Option<LogIndex>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestVoteRequest {
pub term: Term,
pub candidate_id: NodeId,
pub last_log_index: LogIndex,
pub last_log_term: Term,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RequestVoteResponse {
pub term: Term,
pub vote_granted: bool,
pub node_id: NodeId,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RaftConfig {
pub node_id: NodeId,
pub cluster_nodes: Vec<NodeId>,
pub heartbeat_interval_ms: u64,
pub election_timeout_min_ms: u64,
pub election_timeout_max_ms: u64,
pub max_entries_per_batch: usize,
pub enable_snapshots: bool,
pub snapshot_threshold: usize,
pub max_rpc_retries: usize,
}
impl RaftConfig {
pub fn single_node(node_id: NodeId) -> Self {
Self {
node_id,
cluster_nodes: vec![node_id],
heartbeat_interval_ms: 150,
election_timeout_min_ms: 300,
election_timeout_max_ms: 600,
max_entries_per_batch: 100,
enable_snapshots: true,
snapshot_threshold: 10_000,
max_rpc_retries: 3,
}
}
pub fn three_node_cluster(node_id: NodeId) -> Self {
Self {
node_id,
cluster_nodes: vec![1, 2, 3],
heartbeat_interval_ms: 150,
election_timeout_min_ms: 300,
election_timeout_max_ms: 600,
max_entries_per_batch: 100,
enable_snapshots: true,
snapshot_threshold: 10_000,
max_rpc_retries: 3,
}
}
pub fn quorum_size(&self) -> usize {
self.cluster_nodes.len() / 2 + 1
}
}
impl Default for RaftConfig {
fn default() -> Self {
Self::single_node(1)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct RaftStats {
pub current_term: Term,
pub role: String,
pub current_leader: Option<NodeId>,
pub log_length: usize,
pub commit_index: LogIndex,
pub last_applied: LogIndex,
pub elections_participated: u64,
pub terms_as_leader: u64,
pub operations_applied: u64,
pub vector_count: usize,
pub rpcs_sent: u64,
pub rpcs_received: u64,
}
#[derive(Debug, Default)]
struct IndexStateMachine {
vectors: HashMap<String, VectorEntry>,
operations_applied: u64,
}
impl IndexStateMachine {
fn apply(&mut self, command: &IndexCommand) {
match command {
IndexCommand::Upsert(entry) => {
self.vectors.insert(entry.vector_id.clone(), entry.clone());
self.operations_applied += 1;
debug!("Applied Upsert for vector '{}'", entry.vector_id);
}
IndexCommand::Delete { vector_id } => {
self.vectors.remove(vector_id);
self.operations_applied += 1;
debug!("Applied Delete for vector '{}'", vector_id);
}
IndexCommand::UpdateMetadata {
vector_id,
metadata,
} => {
if let Some(entry) = self.vectors.get_mut(vector_id) {
entry.metadata.clone_from(metadata);
self.operations_applied += 1;
}
}
IndexCommand::Rebuild => {
debug!("Applied Rebuild command");
self.operations_applied += 1;
}
IndexCommand::NoOp => {
}
}
}
fn len(&self) -> usize {
self.vectors.len()
}
fn get(&self, vector_id: &str) -> Option<&VectorEntry> {
self.vectors.get(vector_id)
}
fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
let mut similarities: Vec<(String, f32)> = self
.vectors
.iter()
.filter_map(|(id, entry)| {
if entry.vector.len() != query.len() {
return None;
}
let dot: f32 = entry
.vector
.iter()
.zip(query.iter())
.map(|(a, b)| a * b)
.sum();
let na: f32 = entry.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
let sim = if na < 1e-9 || nb < 1e-9 {
0.0
} else {
dot / (na * nb)
};
Some((id.clone(), sim))
})
.collect();
similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
similarities.truncate(k);
similarities
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PersistentState {
pub current_term: Term,
pub voted_for: Option<NodeId>,
pub log: Vec<LogEntry>,
}
impl PersistentState {
fn last_log_index(&self) -> LogIndex {
self.log.last().map(|e| e.index).unwrap_or(0)
}
fn last_log_term(&self) -> Term {
self.log.last().map(|e| e.term).unwrap_or(0)
}
fn get_entry(&self, index: LogIndex) -> Option<&LogEntry> {
if index == 0 {
return None;
}
self.log.iter().find(|e| e.index == index)
}
fn truncate_from(&mut self, from_index: LogIndex) {
self.log.retain(|e| e.index < from_index);
}
}
#[derive(Debug)]
pub struct RaftIndexNode {
config: RaftConfig,
persistent: Arc<RwLock<PersistentState>>,
role: Arc<Mutex<NodeRole>>,
current_leader: Arc<Mutex<Option<NodeId>>>,
commit_index: Arc<Mutex<LogIndex>>,
last_applied: Arc<Mutex<LogIndex>>,
next_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
match_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
state_machine: Arc<RwLock<IndexStateMachine>>,
votes_received: Arc<Mutex<HashMap<NodeId, bool>>>,
last_heartbeat: Arc<Mutex<Instant>>,
stats: Arc<Mutex<RaftStats>>,
elections_participated: Arc<Mutex<u64>>,
terms_as_leader: Arc<Mutex<u64>>,
rpcs_sent: Arc<Mutex<u64>>,
rpcs_received: Arc<Mutex<u64>>,
}
impl RaftIndexNode {
pub fn new(config: RaftConfig) -> Self {
let node_id = config.node_id;
let cluster_nodes: Vec<NodeId> = config.cluster_nodes.clone();
let next_index: HashMap<NodeId, LogIndex> = cluster_nodes
.iter()
.filter(|&&n| n != node_id)
.map(|&n| (n, 1))
.collect();
let match_index: HashMap<NodeId, LogIndex> = cluster_nodes
.iter()
.filter(|&&n| n != node_id)
.map(|&n| (n, 0))
.collect();
info!(
"Raft node {} initialized in cluster {:?}",
node_id, cluster_nodes
);
Self {
config,
persistent: Arc::new(RwLock::new(PersistentState::default())),
role: Arc::new(Mutex::new(NodeRole::Follower)),
current_leader: Arc::new(Mutex::new(None)),
commit_index: Arc::new(Mutex::new(0)),
last_applied: Arc::new(Mutex::new(0)),
next_index: Arc::new(Mutex::new(next_index)),
match_index: Arc::new(Mutex::new(match_index)),
state_machine: Arc::new(RwLock::new(IndexStateMachine::default())),
votes_received: Arc::new(Mutex::new(HashMap::new())),
last_heartbeat: Arc::new(Mutex::new(Instant::now())),
stats: Arc::new(Mutex::new(RaftStats::default())),
elections_participated: Arc::new(Mutex::new(0)),
terms_as_leader: Arc::new(Mutex::new(0)),
rpcs_sent: Arc::new(Mutex::new(0)),
rpcs_received: Arc::new(Mutex::new(0)),
}
}
pub fn start_election(&self) -> RequestVoteRequest {
let mut persistent = self.persistent.write();
persistent.current_term += 1;
let new_term = persistent.current_term;
persistent.voted_for = Some(self.config.node_id);
*self.role.lock() = NodeRole::Candidate;
let mut votes = self.votes_received.lock();
votes.clear();
votes.insert(self.config.node_id, true);
*self.elections_participated.lock() += 1;
info!(
"Node {} starting election for term {}",
self.config.node_id, new_term
);
RequestVoteRequest {
term: new_term,
candidate_id: self.config.node_id,
last_log_index: persistent.last_log_index(),
last_log_term: persistent.last_log_term(),
}
}
pub fn handle_request_vote(&self, request: RequestVoteRequest) -> RequestVoteResponse {
*self.rpcs_received.lock() += 1;
let mut persistent = self.persistent.write();
if request.term > persistent.current_term {
persistent.current_term = request.term;
persistent.voted_for = None;
*self.role.lock() = NodeRole::Follower;
}
let vote_granted = if request.term < persistent.current_term {
false
} else {
let already_voted = persistent
.voted_for
.map(|v| v != request.candidate_id)
.unwrap_or(false);
if already_voted {
false
} else {
let our_last_index = persistent.last_log_index();
let our_last_term = persistent.last_log_term();
let log_ok = request.last_log_term > our_last_term
|| (request.last_log_term == our_last_term
&& request.last_log_index >= our_last_index);
if log_ok {
persistent.voted_for = Some(request.candidate_id);
*self.last_heartbeat.lock() = Instant::now();
true
} else {
false
}
}
};
debug!(
"Node {} {:?} vote to {} for term {}",
self.config.node_id,
if vote_granted { "grants" } else { "denies" },
request.candidate_id,
request.term
);
RequestVoteResponse {
term: persistent.current_term,
vote_granted,
node_id: self.config.node_id,
}
}
pub fn process_vote_response(&self, response: RequestVoteResponse) -> bool {
*self.rpcs_received.lock() += 1;
let persistent = self.persistent.read();
if response.term > persistent.current_term {
drop(persistent);
let mut p = self.persistent.write();
p.current_term = response.term;
p.voted_for = None;
*self.role.lock() = NodeRole::Follower;
return false;
}
if *self.role.lock() != NodeRole::Candidate {
return false;
}
if response.term != persistent.current_term {
return false;
}
if response.vote_granted {
let mut votes = self.votes_received.lock();
votes.insert(response.node_id, true);
let vote_count = votes.values().filter(|&&v| v).count();
if vote_count >= self.config.quorum_size() {
drop(votes);
drop(persistent);
self.become_leader();
return true;
}
}
false
}
fn become_leader(&self) {
let term = self.persistent.read().current_term;
*self.role.lock() = NodeRole::Leader;
*self.current_leader.lock() = Some(self.config.node_id);
*self.terms_as_leader.lock() += 1;
let last_log_index = self.persistent.read().last_log_index();
let mut next_idx = self.next_index.lock();
let mut match_idx = self.match_index.lock();
for &peer in &self.config.cluster_nodes {
if peer != self.config.node_id {
next_idx.insert(peer, last_log_index + 1);
match_idx.insert(peer, 0);
}
}
info!(
"Node {} became leader for term {}",
self.config.node_id, term
);
drop(next_idx);
drop(match_idx);
let _ = self.append_entry(IndexCommand::NoOp, None);
}
pub fn handle_append_entries(&self, request: AppendEntriesRequest) -> AppendEntriesResponse {
*self.rpcs_received.lock() += 1;
let mut persistent = self.persistent.write();
if request.term > persistent.current_term {
persistent.current_term = request.term;
persistent.voted_for = None;
*self.role.lock() = NodeRole::Follower;
}
if request.term < persistent.current_term {
return AppendEntriesResponse {
term: persistent.current_term,
success: false,
node_id: self.config.node_id,
conflict_index: None,
};
}
*self.last_heartbeat.lock() = Instant::now();
*self.current_leader.lock() = Some(request.leader_id);
*self.role.lock() = NodeRole::Follower;
if request.prev_log_index > 0 {
let entry = persistent.get_entry(request.prev_log_index);
match entry {
None => {
return AppendEntriesResponse {
term: persistent.current_term,
success: false,
node_id: self.config.node_id,
conflict_index: Some(persistent.last_log_index() + 1),
};
}
Some(e) if e.term != request.prev_log_term => {
let conflict_index = e.index;
return AppendEntriesResponse {
term: persistent.current_term,
success: false,
node_id: self.config.node_id,
conflict_index: Some(conflict_index),
};
}
_ => {}
}
}
for entry in &request.entries {
let existing = persistent.get_entry(entry.index).cloned();
match existing {
Some(e) if e.term != entry.term => {
persistent.truncate_from(entry.index);
persistent.log.push(entry.clone());
}
None => {
persistent.log.push(entry.clone());
}
_ => {} }
}
let prev_commit = *self.commit_index.lock();
if request.leader_commit > prev_commit {
let new_commit = request.leader_commit.min(persistent.last_log_index());
drop(persistent);
*self.commit_index.lock() = new_commit;
self.apply_committed_entries();
}
AppendEntriesResponse {
term: self.persistent.read().current_term,
success: true,
node_id: self.config.node_id,
conflict_index: None,
}
}
pub fn process_append_entries_response(
&self,
peer_id: NodeId,
response: AppendEntriesResponse,
entries_sent_count: usize,
) {
*self.rpcs_received.lock() += 1;
let current_term = self.persistent.read().current_term;
if response.term > current_term {
let mut p = self.persistent.write();
p.current_term = response.term;
p.voted_for = None;
*self.role.lock() = NodeRole::Follower;
return;
}
if *self.role.lock() != NodeRole::Leader {
return;
}
if response.success {
let mut next_idx = self.next_index.lock();
let mut match_idx = self.match_index.lock();
let new_next =
next_idx.get(&peer_id).copied().unwrap_or(1) + entries_sent_count as LogIndex;
next_idx.insert(peer_id, new_next);
match_idx.insert(peer_id, new_next - 1);
drop(next_idx);
drop(match_idx);
self.try_advance_commit_index();
} else {
let mut next_idx = self.next_index.lock();
if let Some(conflict) = response.conflict_index {
next_idx.insert(peer_id, conflict);
} else {
let current = next_idx.get(&peer_id).copied().unwrap_or(1);
if current > 1 {
next_idx.insert(peer_id, current - 1);
}
}
}
}
fn try_advance_commit_index(&self) {
let persistent = self.persistent.read();
let current_term = persistent.current_term;
let last_log_index = persistent.last_log_index();
drop(persistent);
let match_idx = self.match_index.lock();
let mut commit = *self.commit_index.lock();
for n in (commit + 1)..=last_log_index {
let p = self.persistent.read();
let entry_term = p.get_entry(n).map(|e| e.term).unwrap_or(0);
drop(p);
if entry_term != current_term {
continue;
}
let replication_count = 1 + match_idx.values().filter(|&&m| m >= n).count();
if replication_count >= self.config.quorum_size() {
commit = n;
}
}
drop(match_idx);
let old_commit = *self.commit_index.lock();
if commit > old_commit {
*self.commit_index.lock() = commit;
self.apply_committed_entries();
}
}
fn apply_committed_entries(&self) {
let commit = *self.commit_index.lock();
let mut last = *self.last_applied.lock();
while last < commit {
last += 1;
let persistent = self.persistent.read();
let entry = persistent.get_entry(last).cloned();
drop(persistent);
if let Some(entry) = entry {
let mut sm = self.state_machine.write();
sm.apply(&entry.command);
debug!("Node {} applied log entry {}", self.config.node_id, last);
}
}
*self.last_applied.lock() = last;
}
pub fn propose(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
if *self.role.lock() != NodeRole::Leader {
let leader = self.current_leader.lock().map(|l| l.to_string());
return Err(anyhow!(
"Not the leader. Current leader: {:?}",
leader.unwrap_or_else(|| "unknown".to_string())
));
}
self.append_entry(command, client_id)
}
fn append_entry(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
let mut persistent = self.persistent.write();
let term = persistent.current_term;
let index = persistent.last_log_index() + 1;
let entry = LogEntry {
index,
term,
command,
client_id,
};
persistent.log.push(entry);
info!(
"Node {} appended log entry {} in term {}",
self.config.node_id, index, term
);
Ok(index)
}
pub fn create_append_entries_request(&self, peer_id: NodeId) -> Result<AppendEntriesRequest> {
if *self.role.lock() != NodeRole::Leader {
return Err(anyhow!("Not the leader"));
}
let persistent = self.persistent.read();
let next_idx = self.next_index.lock();
let next = next_idx.get(&peer_id).copied().unwrap_or(1);
let prev_log_index = next.saturating_sub(1);
let prev_log_term = if prev_log_index > 0 {
persistent
.get_entry(prev_log_index)
.map(|e| e.term)
.unwrap_or(0)
} else {
0
};
let entries: Vec<LogEntry> = persistent
.log
.iter()
.filter(|e| e.index >= next)
.take(self.config.max_entries_per_batch)
.cloned()
.collect();
let commit = *self.commit_index.lock();
*self.rpcs_sent.lock() += 1;
Ok(AppendEntriesRequest {
term: persistent.current_term,
leader_id: self.config.node_id,
prev_log_index,
prev_log_term,
entries,
leader_commit: commit,
})
}
pub fn force_commit_single_node(&self) {
if self.config.cluster_nodes.len() != 1 {
warn!("force_commit_single_node called on multi-node cluster");
return;
}
let last_index = self.persistent.read().last_log_index();
*self.commit_index.lock() = last_index;
self.apply_committed_entries();
}
pub fn role(&self) -> NodeRole {
*self.role.lock()
}
pub fn current_term(&self) -> Term {
self.persistent.read().current_term
}
pub fn current_leader(&self) -> Option<NodeId> {
*self.current_leader.lock()
}
pub fn is_leader(&self) -> bool {
*self.role.lock() == NodeRole::Leader
}
pub fn log_length(&self) -> usize {
self.persistent.read().log.len()
}
pub fn commit_index(&self) -> LogIndex {
*self.commit_index.lock()
}
pub fn last_applied(&self) -> LogIndex {
*self.last_applied.lock()
}
pub fn vector_count(&self) -> usize {
self.state_machine.read().len()
}
pub fn get_vector(&self, vector_id: &str) -> Option<VectorEntry> {
self.state_machine.read().get(vector_id).cloned()
}
pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
self.state_machine.read().search_similar(query, k)
}
pub fn get_stats(&self) -> RaftStats {
let persistent = self.persistent.read();
RaftStats {
current_term: persistent.current_term,
role: self.role().to_string(),
current_leader: *self.current_leader.lock(),
log_length: persistent.log.len(),
commit_index: *self.commit_index.lock(),
last_applied: *self.last_applied.lock(),
elections_participated: *self.elections_participated.lock(),
terms_as_leader: *self.terms_as_leader.lock(),
operations_applied: self.state_machine.read().operations_applied,
vector_count: self.state_machine.read().len(),
rpcs_sent: *self.rpcs_sent.lock(),
rpcs_received: *self.rpcs_received.lock(),
}
}
pub fn election_timeout_elapsed(&self) -> bool {
let elapsed = self.last_heartbeat.lock().elapsed();
elapsed > Duration::from_millis(self.config.election_timeout_max_ms)
}
pub fn reset_heartbeat(&self) {
*self.last_heartbeat.lock() = Instant::now();
}
}
pub struct ClusterSimulator {
pub nodes: Vec<RaftIndexNode>,
}
impl ClusterSimulator {
pub fn new(n: usize) -> Result<Self> {
let cluster_nodes: Vec<NodeId> = (1..=(n as NodeId)).collect();
let nodes = cluster_nodes
.iter()
.map(|&id| {
let config = RaftConfig {
node_id: id,
cluster_nodes: cluster_nodes.clone(),
heartbeat_interval_ms: 50,
election_timeout_min_ms: 150,
election_timeout_max_ms: 300,
max_entries_per_batch: 10,
enable_snapshots: false,
snapshot_threshold: 1000,
max_rpc_retries: 2,
};
RaftIndexNode::new(config)
})
.collect();
Ok(Self { nodes })
}
pub fn elect_leader(&self, leader_idx: usize) {
let vote_request = self.nodes[leader_idx].start_election();
let mut all_won = false;
for (i, node) in self.nodes.iter().enumerate() {
if i == leader_idx {
continue;
}
let response = node.handle_request_vote(vote_request.clone());
if self.nodes[leader_idx].process_vote_response(response) {
all_won = true;
}
}
if all_won || self.nodes[leader_idx].is_leader() {
for (i, node) in self.nodes.iter().enumerate() {
if i == leader_idx {
continue;
}
if let Ok(ae_req) =
self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
{
let response = node.handle_append_entries(ae_req.clone());
self.nodes[leader_idx].process_append_entries_response(
node.config.node_id,
response,
ae_req.entries.len(),
);
}
}
}
}
pub fn replicate_all(&self) -> Result<()> {
let leader_idx = self
.nodes
.iter()
.position(|n| n.is_leader())
.ok_or_else(|| anyhow!("No leader elected"))?;
for (i, node) in self.nodes.iter().enumerate() {
if i == leader_idx {
continue;
}
if let Ok(ae_req) =
self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
{
let entries_len = ae_req.entries.len();
let response = node.handle_append_entries(ae_req);
self.nodes[leader_idx].process_append_entries_response(
node.config.node_id,
response,
entries_len,
);
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
fn make_vector_entry(id: &str, vec: Vec<f32>) -> VectorEntry {
VectorEntry {
vector_id: id.to_string(),
vector: vec,
metadata: HashMap::new(),
inserted_at: 0,
}
}
#[test]
fn test_raft_config_single_node() {
let config = RaftConfig::single_node(1);
assert_eq!(config.node_id, 1);
assert_eq!(config.cluster_nodes, vec![1]);
assert_eq!(config.quorum_size(), 1);
}
#[test]
fn test_raft_config_three_node() {
let config = RaftConfig::three_node_cluster(1);
assert_eq!(config.quorum_size(), 2);
}
#[test]
fn test_node_starts_as_follower() {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
assert_eq!(node.role(), NodeRole::Follower);
assert_eq!(node.current_term(), 0);
}
#[test]
fn test_single_node_becomes_leader() {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
let vote_req = node.start_election();
assert_eq!(vote_req.term, 1);
assert_eq!(node.current_term(), 1);
let won = node.process_vote_response(RequestVoteResponse {
term: 1,
vote_granted: true,
node_id: 1,
});
assert!(node.is_leader() || won);
}
#[test]
fn test_single_node_leader_force_commit() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
node.start_election();
let _ = node.process_vote_response(RequestVoteResponse {
term: node.current_term(),
vote_granted: true,
node_id: 1,
});
if !node.is_leader() {
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
}
let entry = make_vector_entry("v1", vec![1.0, 2.0, 3.0]);
node.propose(IndexCommand::Upsert(entry), None)?;
node.force_commit_single_node();
assert_eq!(node.vector_count(), 1);
assert!(node.get_vector("v1").is_some());
Ok(())
}
#[test]
fn test_propose_fails_when_not_leader() {
let config = RaftConfig::three_node_cluster(1);
let node = RaftIndexNode::new(config);
let result = node.propose(IndexCommand::NoOp, None);
assert!(result.is_err(), "Should fail to propose when not leader");
}
#[test]
fn test_request_vote_grants_to_newer_term() {
let config = RaftConfig::three_node_cluster(2);
let voter = RaftIndexNode::new(config);
let req = RequestVoteRequest {
term: 5,
candidate_id: 1,
last_log_index: 10,
last_log_term: 5,
};
let response = voter.handle_request_vote(req);
assert!(response.vote_granted, "Should grant vote to higher term");
assert_eq!(response.term, 5);
}
#[test]
fn test_request_vote_rejects_stale_term() {
let config = RaftConfig::three_node_cluster(2);
let voter = RaftIndexNode::new(config);
voter.persistent.write().current_term = 5;
let req = RequestVoteRequest {
term: 3, candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
};
let response = voter.handle_request_vote(req);
assert!(!response.vote_granted, "Should reject stale term vote");
assert_eq!(response.term, 5);
}
#[test]
fn test_request_vote_rejects_duplicate_vote() {
let config = RaftConfig::three_node_cluster(2);
let voter = RaftIndexNode::new(config);
let req1 = RequestVoteRequest {
term: 1,
candidate_id: 1,
last_log_index: 0,
last_log_term: 0,
};
let req2 = RequestVoteRequest {
term: 1,
candidate_id: 3, last_log_index: 0,
last_log_term: 0,
};
let r1 = voter.handle_request_vote(req1);
assert!(r1.vote_granted, "First vote should be granted");
let r2 = voter.handle_request_vote(req2);
assert!(
!r2.vote_granted,
"Duplicate vote in same term should be rejected"
);
}
#[test]
#[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
fn test_append_entries_heartbeat() {
let config = RaftConfig::three_node_cluster(2);
let follower = RaftIndexNode::new(config);
let heartbeat = AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit: 0,
};
let response = follower.handle_append_entries(heartbeat);
assert!(response.success, "Heartbeat should succeed");
assert_eq!(follower.current_leader(), Some(1));
}
#[test]
fn test_append_entries_stale_term() {
let config = RaftConfig::three_node_cluster(2);
let follower = RaftIndexNode::new(config);
follower.persistent.write().current_term = 5;
let request = AppendEntriesRequest {
term: 3, leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![],
leader_commit: 0,
};
let response = follower.handle_append_entries(request);
assert!(!response.success, "Stale term should be rejected");
assert_eq!(response.term, 5);
}
#[test]
#[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
fn test_cluster_simulator_election() -> Result<()> {
let sim = ClusterSimulator::new(3)?;
sim.elect_leader(0);
let leaders: Vec<_> = sim.nodes.iter().filter(|n| n.is_leader()).collect();
assert!(!leaders.is_empty(), "At least one node should be leader");
Ok(())
}
#[test]
#[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
fn test_cluster_simulator_replication() -> Result<()> {
let sim = ClusterSimulator::new(3)?;
sim.elect_leader(0);
let leader_idx = sim
.nodes
.iter()
.position(|n| n.is_leader())
.expect("no leader found");
let entry = make_vector_entry("v1", vec![1.0, 0.0, 0.0]);
sim.nodes[leader_idx].propose(IndexCommand::Upsert(entry), None)?;
sim.replicate_all()?;
let leader = &sim.nodes[leader_idx];
leader.force_commit_single_node();
let vec = leader.get_vector("v1");
assert!(vec.is_some() || leader.log_length() > 0);
Ok(())
}
#[test]
fn test_delete_command() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
node.start_election();
let _ = node.process_vote_response(RequestVoteResponse {
term: node.current_term(),
vote_granted: true,
node_id: 1,
});
if !node.is_leader() {
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
}
let entry = make_vector_entry("v1", vec![1.0]);
node.propose(IndexCommand::Upsert(entry), None)?;
node.force_commit_single_node();
assert_eq!(node.vector_count(), 1);
node.propose(
IndexCommand::Delete {
vector_id: "v1".to_string(),
},
None,
)?;
node.force_commit_single_node();
assert_eq!(node.vector_count(), 0);
Ok(())
}
#[test]
fn test_update_metadata_command() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
let entry = make_vector_entry("v1", vec![1.0, 2.0]);
node.propose(IndexCommand::Upsert(entry), None)?;
let mut new_meta = HashMap::new();
new_meta.insert("tag".to_string(), "important".to_string());
node.propose(
IndexCommand::UpdateMetadata {
vector_id: "v1".to_string(),
metadata: new_meta,
},
None,
)?;
node.force_commit_single_node();
let stored = node.get_vector("v1").expect("v1 not found");
assert_eq!(stored.metadata.get("tag"), Some(&"important".to_string()));
Ok(())
}
#[test]
fn test_search_similar() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
node.propose(
IndexCommand::Upsert(make_vector_entry("v1", vec![1.0, 0.0, 0.0])),
None,
)?;
node.propose(
IndexCommand::Upsert(make_vector_entry("v2", vec![0.0, 1.0, 0.0])),
None,
)?;
node.propose(
IndexCommand::Upsert(make_vector_entry("v3", vec![0.0, 0.0, 1.0])),
None,
)?;
node.force_commit_single_node();
let results = node.search_similar(&[1.0, 0.0, 0.0], 2);
assert!(!results.is_empty());
assert_eq!(results[0].0, "v1");
assert!((results[0].1 - 1.0).abs() < 1e-5);
Ok(())
}
#[test]
fn test_stats_populated() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
node.propose(IndexCommand::NoOp, None)?;
node.force_commit_single_node();
let stats = node.get_stats();
assert_eq!(stats.role, "Leader");
assert!(stats.log_length > 0);
Ok(())
}
#[test]
fn test_raft_log_length_increases() -> Result<()> {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
*node.role.lock() = NodeRole::Leader;
*node.current_leader.lock() = Some(1);
assert_eq!(node.log_length(), 0);
node.propose(IndexCommand::NoOp, None)?;
assert_eq!(node.log_length(), 1);
node.propose(IndexCommand::Rebuild, None)?;
assert_eq!(node.log_length(), 2);
Ok(())
}
#[test]
fn test_persistent_state_default() {
let state = PersistentState::default();
assert_eq!(state.current_term, 0);
assert!(state.voted_for.is_none());
assert!(state.log.is_empty());
assert_eq!(state.last_log_index(), 0);
assert_eq!(state.last_log_term(), 0);
}
#[test]
fn test_node_role_display() {
assert_eq!(NodeRole::Follower.to_string(), "Follower");
assert_eq!(NodeRole::Candidate.to_string(), "Candidate");
assert_eq!(NodeRole::Leader.to_string(), "Leader");
}
#[test]
fn test_election_timeout_not_elapsed_immediately() {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
assert!(!node.election_timeout_elapsed());
}
#[test]
fn test_reset_heartbeat() {
let config = RaftConfig::single_node(1);
let node = RaftIndexNode::new(config);
node.reset_heartbeat();
assert!(!node.election_timeout_elapsed());
}
#[test]
fn test_append_entries_appends_new_log_entries() {
let config = RaftConfig::three_node_cluster(2);
let follower = RaftIndexNode::new(config);
let entry = LogEntry {
index: 1,
term: 1,
command: IndexCommand::NoOp,
client_id: None,
};
let request = AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![entry],
leader_commit: 1,
};
let response = follower.handle_append_entries(request);
assert!(response.success);
assert_eq!(follower.log_length(), 1);
}
#[test]
fn test_commit_advances_last_applied() {
let config = RaftConfig::three_node_cluster(2);
let follower = RaftIndexNode::new(config);
let entry = LogEntry {
index: 1,
term: 1,
command: IndexCommand::Upsert(make_vector_entry("v1", vec![1.0])),
client_id: None,
};
let request = AppendEntriesRequest {
term: 1,
leader_id: 1,
prev_log_index: 0,
prev_log_term: 0,
entries: vec![entry],
leader_commit: 1, };
follower.handle_append_entries(request);
assert_eq!(follower.last_applied(), 1);
assert_eq!(follower.vector_count(), 1);
}
}