Skip to main content

oxirs_vec/distributed/
raft_index.rs

1//! Raft-based distributed index consensus for vector stores
2//!
3//! This module implements the Raft consensus protocol for distributed
4//! vector index management. It provides:
5//! - Leader election among index nodes
6//! - Replicated log for index mutations (insertions, deletions, updates)
7//! - Consistent reads via quorum
8//! - Automatic failover and leader re-election
9//!
10//! # Design
11//!
12//! Each node in the cluster participates in Raft. Index mutations (vector
13//! insertions/deletions) are proposed as log entries. Once a majority of
14//! nodes acknowledge an entry, it is committed and applied to the local
15//! in-memory index.
16//!
17//! # Pure Rust
18//!
19//! This module is 100% Pure Rust - no CUDA or FFI dependencies.
20
21use anyhow::{anyhow, Result};
22use parking_lot::{Mutex, RwLock};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29/// Node ID type for Raft cluster members
30pub type NodeId = u64;
31
32/// Log index (1-based, 0 means no entry)
33pub type LogIndex = u64;
34
35/// Term number
36pub type Term = u64;
37
38/// A vector entry stored in the distributed index
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VectorEntry {
41    /// Unique identifier for this vector
42    pub vector_id: String,
43    /// Vector data
44    pub vector: Vec<f32>,
45    /// Associated metadata
46    pub metadata: HashMap<String, String>,
47    /// Timestamp of insertion
48    pub inserted_at: u64,
49}
50
51/// Commands that can be applied to the replicated state machine
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum IndexCommand {
54    /// Insert or update a vector
55    Upsert(VectorEntry),
56    /// Delete a vector by ID
57    Delete { vector_id: String },
58    /// Rebuild the index (triggers background rebuild)
59    Rebuild,
60    /// Update metadata for a vector
61    UpdateMetadata {
62        vector_id: String,
63        metadata: HashMap<String, String>,
64    },
65    /// No-op entry for leadership heartbeat and linearization
66    NoOp,
67}
68
69/// A single entry in the replicated log
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LogEntry {
72    /// Index of this entry in the log (1-based)
73    pub index: LogIndex,
74    /// Term when this entry was created
75    pub term: Term,
76    /// The command to be applied
77    pub command: IndexCommand,
78    /// Client request ID for deduplication
79    pub client_id: Option<String>,
80}
81
82/// Raft node role
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum NodeRole {
85    Follower,
86    Candidate,
87    Leader,
88}
89
90impl std::fmt::Display for NodeRole {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            Self::Follower => write!(f, "Follower"),
94            Self::Candidate => write!(f, "Candidate"),
95            Self::Leader => write!(f, "Leader"),
96        }
97    }
98}
99
100/// AppendEntries RPC request
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEntriesRequest {
103    /// Leader's term
104    pub term: Term,
105    /// Leader's ID
106    pub leader_id: NodeId,
107    /// Log index immediately preceding new entries
108    pub prev_log_index: LogIndex,
109    /// Term of prev_log_index entry
110    pub prev_log_term: Term,
111    /// New log entries to store (empty for heartbeat)
112    pub entries: Vec<LogEntry>,
113    /// Leader's commit index
114    pub leader_commit: LogIndex,
115}
116
117/// AppendEntries RPC response
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct AppendEntriesResponse {
120    /// Current term for leader to update itself
121    pub term: Term,
122    /// True if follower contained entry matching prev_log_index and prev_log_term
123    pub success: bool,
124    /// The responding node's ID
125    pub node_id: NodeId,
126    /// Conflict index for fast log rollback (optimization)
127    pub conflict_index: Option<LogIndex>,
128}
129
130/// RequestVote RPC request
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct RequestVoteRequest {
133    /// Candidate's term
134    pub term: Term,
135    /// Candidate requesting vote
136    pub candidate_id: NodeId,
137    /// Index of candidate's last log entry
138    pub last_log_index: LogIndex,
139    /// Term of candidate's last log entry
140    pub last_log_term: Term,
141}
142
143/// RequestVote RPC response
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct RequestVoteResponse {
146    /// Current term for candidate to update itself
147    pub term: Term,
148    /// True means candidate received vote
149    pub vote_granted: bool,
150    /// The responding node's ID
151    pub node_id: NodeId,
152}
153
154/// Raft node configuration
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RaftConfig {
157    /// This node's ID
158    pub node_id: NodeId,
159    /// All node IDs in the cluster (including self)
160    pub cluster_nodes: Vec<NodeId>,
161    /// Heartbeat interval in milliseconds
162    pub heartbeat_interval_ms: u64,
163    /// Election timeout range (min, max) in milliseconds
164    pub election_timeout_min_ms: u64,
165    pub election_timeout_max_ms: u64,
166    /// Maximum log entries per AppendEntries batch
167    pub max_entries_per_batch: usize,
168    /// Enable log compaction via snapshotting
169    pub enable_snapshots: bool,
170    /// Snapshot threshold (entries before snapshot)
171    pub snapshot_threshold: usize,
172    /// Maximum retries for failed RPCs
173    pub max_rpc_retries: usize,
174}
175
176impl RaftConfig {
177    /// Create a single-node cluster configuration (useful for testing)
178    pub fn single_node(node_id: NodeId) -> Self {
179        Self {
180            node_id,
181            cluster_nodes: vec![node_id],
182            heartbeat_interval_ms: 150,
183            election_timeout_min_ms: 300,
184            election_timeout_max_ms: 600,
185            max_entries_per_batch: 100,
186            enable_snapshots: true,
187            snapshot_threshold: 10_000,
188            max_rpc_retries: 3,
189        }
190    }
191
192    /// Create a three-node cluster configuration
193    pub fn three_node_cluster(node_id: NodeId) -> Self {
194        Self {
195            node_id,
196            cluster_nodes: vec![1, 2, 3],
197            heartbeat_interval_ms: 150,
198            election_timeout_min_ms: 300,
199            election_timeout_max_ms: 600,
200            max_entries_per_batch: 100,
201            enable_snapshots: true,
202            snapshot_threshold: 10_000,
203            max_rpc_retries: 3,
204        }
205    }
206
207    /// Get the quorum size (majority)
208    pub fn quorum_size(&self) -> usize {
209        self.cluster_nodes.len() / 2 + 1
210    }
211}
212
213impl Default for RaftConfig {
214    fn default() -> Self {
215        Self::single_node(1)
216    }
217}
218
219/// Statistics for the Raft node
220#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct RaftStats {
222    /// Current term
223    pub current_term: Term,
224    /// Current role
225    pub role: String,
226    /// Current leader ID (if known)
227    pub current_leader: Option<NodeId>,
228    /// Total log entries
229    pub log_length: usize,
230    /// Commit index
231    pub commit_index: LogIndex,
232    /// Last applied index
233    pub last_applied: LogIndex,
234    /// Number of elections participated in
235    pub elections_participated: u64,
236    /// Number of terms this node was leader
237    pub terms_as_leader: u64,
238    /// Number of index operations applied
239    pub operations_applied: u64,
240    /// Number of vectors in the distributed index
241    pub vector_count: usize,
242    /// Number of RPC messages sent
243    pub rpcs_sent: u64,
244    /// Number of RPC messages received
245    pub rpcs_received: u64,
246}
247
248/// The in-memory state machine: the actual vector index
249#[derive(Debug, Default)]
250struct IndexStateMachine {
251    /// All vectors stored in this index shard
252    vectors: HashMap<String, VectorEntry>,
253    /// Number of operations applied
254    operations_applied: u64,
255}
256
257impl IndexStateMachine {
258    /// Apply a command to the state machine
259    fn apply(&mut self, command: &IndexCommand) {
260        match command {
261            IndexCommand::Upsert(entry) => {
262                self.vectors.insert(entry.vector_id.clone(), entry.clone());
263                self.operations_applied += 1;
264                debug!("Applied Upsert for vector '{}'", entry.vector_id);
265            }
266            IndexCommand::Delete { vector_id } => {
267                self.vectors.remove(vector_id);
268                self.operations_applied += 1;
269                debug!("Applied Delete for vector '{}'", vector_id);
270            }
271            IndexCommand::UpdateMetadata {
272                vector_id,
273                metadata,
274            } => {
275                if let Some(entry) = self.vectors.get_mut(vector_id) {
276                    entry.metadata.clone_from(metadata);
277                    self.operations_applied += 1;
278                }
279            }
280            IndexCommand::Rebuild => {
281                debug!("Applied Rebuild command");
282                self.operations_applied += 1;
283            }
284            IndexCommand::NoOp => {
285                // No-op doesn't increment operations
286            }
287        }
288    }
289
290    fn len(&self) -> usize {
291        self.vectors.len()
292    }
293
294    fn get(&self, vector_id: &str) -> Option<&VectorEntry> {
295        self.vectors.get(vector_id)
296    }
297
298    fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
299        let mut similarities: Vec<(String, f32)> = self
300            .vectors
301            .iter()
302            .filter_map(|(id, entry)| {
303                if entry.vector.len() != query.len() {
304                    return None;
305                }
306                let dot: f32 = entry
307                    .vector
308                    .iter()
309                    .zip(query.iter())
310                    .map(|(a, b)| a * b)
311                    .sum();
312                let na: f32 = entry.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
313                let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
314                let sim = if na < 1e-9 || nb < 1e-9 {
315                    0.0
316                } else {
317                    dot / (na * nb)
318                };
319                Some((id.clone(), sim))
320            })
321            .collect();
322
323        similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
324        similarities.truncate(k);
325        similarities
326    }
327}
328
329/// Persistent state for Raft node (must survive restarts)
330#[derive(Debug, Clone, Serialize, Deserialize, Default)]
331pub struct PersistentState {
332    /// Latest term this server has seen
333    pub current_term: Term,
334    /// CandidateId that received vote in current term
335    pub voted_for: Option<NodeId>,
336    /// Log entries
337    pub log: Vec<LogEntry>,
338}
339
340impl PersistentState {
341    fn last_log_index(&self) -> LogIndex {
342        self.log.last().map(|e| e.index).unwrap_or(0)
343    }
344
345    fn last_log_term(&self) -> Term {
346        self.log.last().map(|e| e.term).unwrap_or(0)
347    }
348
349    fn get_entry(&self, index: LogIndex) -> Option<&LogEntry> {
350        if index == 0 {
351            return None;
352        }
353        // Log entries are 1-indexed; find by scanning
354        self.log.iter().find(|e| e.index == index)
355    }
356
357    fn truncate_from(&mut self, from_index: LogIndex) {
358        self.log.retain(|e| e.index < from_index);
359    }
360}
361
362/// Raft node implementation for distributed vector index
363///
364/// This implements the core Raft protocol. In a production deployment,
365/// RPCs would be sent over the network (e.g., gRPC or HTTP/2). Here
366/// we provide the state machine logic and expose methods for injecting
367/// simulated or actual network messages.
368#[derive(Debug)]
369pub struct RaftIndexNode {
370    config: RaftConfig,
371    /// Persistent state (term, vote, log)
372    persistent: Arc<RwLock<PersistentState>>,
373    /// Current role
374    role: Arc<Mutex<NodeRole>>,
375    /// Current leader (known)
376    current_leader: Arc<Mutex<Option<NodeId>>>,
377    /// Commit index (highest log entry known to be committed)
378    commit_index: Arc<Mutex<LogIndex>>,
379    /// Last applied (highest log entry applied to state machine)
380    last_applied: Arc<Mutex<LogIndex>>,
381    /// Next index to send to each follower (leader only)
382    next_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
383    /// Highest log index known to be replicated on each follower (leader only)
384    match_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
385    /// The actual state machine (vector index)
386    state_machine: Arc<RwLock<IndexStateMachine>>,
387    /// Votes received in current election
388    votes_received: Arc<Mutex<HashMap<NodeId, bool>>>,
389    /// Election timeout tracking
390    last_heartbeat: Arc<Mutex<Instant>>,
391    /// Statistics
392    stats: Arc<Mutex<RaftStats>>,
393    /// Number of elections participated in
394    elections_participated: Arc<Mutex<u64>>,
395    /// Number of terms as leader
396    terms_as_leader: Arc<Mutex<u64>>,
397    /// Total RPCs sent
398    rpcs_sent: Arc<Mutex<u64>>,
399    /// Total RPCs received
400    rpcs_received: Arc<Mutex<u64>>,
401}
402
403impl RaftIndexNode {
404    /// Create a new Raft index node
405    pub fn new(config: RaftConfig) -> Self {
406        let node_id = config.node_id;
407        let cluster_nodes: Vec<NodeId> = config.cluster_nodes.clone();
408
409        let next_index: HashMap<NodeId, LogIndex> = cluster_nodes
410            .iter()
411            .filter(|&&n| n != node_id)
412            .map(|&n| (n, 1))
413            .collect();
414
415        let match_index: HashMap<NodeId, LogIndex> = cluster_nodes
416            .iter()
417            .filter(|&&n| n != node_id)
418            .map(|&n| (n, 0))
419            .collect();
420
421        info!(
422            "Raft node {} initialized in cluster {:?}",
423            node_id, cluster_nodes
424        );
425
426        Self {
427            config,
428            persistent: Arc::new(RwLock::new(PersistentState::default())),
429            role: Arc::new(Mutex::new(NodeRole::Follower)),
430            current_leader: Arc::new(Mutex::new(None)),
431            commit_index: Arc::new(Mutex::new(0)),
432            last_applied: Arc::new(Mutex::new(0)),
433            next_index: Arc::new(Mutex::new(next_index)),
434            match_index: Arc::new(Mutex::new(match_index)),
435            state_machine: Arc::new(RwLock::new(IndexStateMachine::default())),
436            votes_received: Arc::new(Mutex::new(HashMap::new())),
437            last_heartbeat: Arc::new(Mutex::new(Instant::now())),
438            stats: Arc::new(Mutex::new(RaftStats::default())),
439            elections_participated: Arc::new(Mutex::new(0)),
440            terms_as_leader: Arc::new(Mutex::new(0)),
441            rpcs_sent: Arc::new(Mutex::new(0)),
442            rpcs_received: Arc::new(Mutex::new(0)),
443        }
444    }
445
446    /// Start an election (become candidate)
447    pub fn start_election(&self) -> RequestVoteRequest {
448        let mut persistent = self.persistent.write();
449        persistent.current_term += 1;
450        let new_term = persistent.current_term;
451        persistent.voted_for = Some(self.config.node_id);
452
453        *self.role.lock() = NodeRole::Candidate;
454        let mut votes = self.votes_received.lock();
455        votes.clear();
456        votes.insert(self.config.node_id, true); // Vote for self
457
458        *self.elections_participated.lock() += 1;
459
460        info!(
461            "Node {} starting election for term {}",
462            self.config.node_id, new_term
463        );
464
465        RequestVoteRequest {
466            term: new_term,
467            candidate_id: self.config.node_id,
468            last_log_index: persistent.last_log_index(),
469            last_log_term: persistent.last_log_term(),
470        }
471    }
472
473    /// Handle a RequestVote RPC from a candidate
474    pub fn handle_request_vote(&self, request: RequestVoteRequest) -> RequestVoteResponse {
475        *self.rpcs_received.lock() += 1;
476        let mut persistent = self.persistent.write();
477
478        // If we see a higher term, update and become follower
479        if request.term > persistent.current_term {
480            persistent.current_term = request.term;
481            persistent.voted_for = None;
482            *self.role.lock() = NodeRole::Follower;
483        }
484
485        let vote_granted = if request.term < persistent.current_term {
486            // Stale term, reject
487            false
488        } else {
489            let already_voted = persistent
490                .voted_for
491                .map(|v| v != request.candidate_id)
492                .unwrap_or(false);
493
494            if already_voted {
495                false
496            } else {
497                // Grant vote if candidate's log is at least as up-to-date
498                let our_last_index = persistent.last_log_index();
499                let our_last_term = persistent.last_log_term();
500
501                let log_ok = request.last_log_term > our_last_term
502                    || (request.last_log_term == our_last_term
503                        && request.last_log_index >= our_last_index);
504
505                if log_ok {
506                    persistent.voted_for = Some(request.candidate_id);
507                    *self.last_heartbeat.lock() = Instant::now();
508                    true
509                } else {
510                    false
511                }
512            }
513        };
514
515        debug!(
516            "Node {} {:?} vote to {} for term {}",
517            self.config.node_id,
518            if vote_granted { "grants" } else { "denies" },
519            request.candidate_id,
520            request.term
521        );
522
523        RequestVoteResponse {
524            term: persistent.current_term,
525            vote_granted,
526            node_id: self.config.node_id,
527        }
528    }
529
530    /// Process a vote response from a peer
531    ///
532    /// Returns `true` if this node just won the election.
533    pub fn process_vote_response(&self, response: RequestVoteResponse) -> bool {
534        *self.rpcs_received.lock() += 1;
535        let persistent = self.persistent.read();
536
537        // If we see a higher term, become follower
538        if response.term > persistent.current_term {
539            drop(persistent);
540            let mut p = self.persistent.write();
541            p.current_term = response.term;
542            p.voted_for = None;
543            *self.role.lock() = NodeRole::Follower;
544            return false;
545        }
546
547        // Only count votes if still a candidate in the same term
548        if *self.role.lock() != NodeRole::Candidate {
549            return false;
550        }
551
552        if response.term != persistent.current_term {
553            return false;
554        }
555
556        if response.vote_granted {
557            let mut votes = self.votes_received.lock();
558            votes.insert(response.node_id, true);
559            let vote_count = votes.values().filter(|&&v| v).count();
560
561            if vote_count >= self.config.quorum_size() {
562                // Won election!
563                drop(votes);
564                drop(persistent);
565                self.become_leader();
566                return true;
567            }
568        }
569        false
570    }
571
572    /// Transition to leader state
573    fn become_leader(&self) {
574        let term = self.persistent.read().current_term;
575        *self.role.lock() = NodeRole::Leader;
576        *self.current_leader.lock() = Some(self.config.node_id);
577        *self.terms_as_leader.lock() += 1;
578
579        // Initialize next_index and match_index for all followers
580        let last_log_index = self.persistent.read().last_log_index();
581        let mut next_idx = self.next_index.lock();
582        let mut match_idx = self.match_index.lock();
583
584        for &peer in &self.config.cluster_nodes {
585            if peer != self.config.node_id {
586                next_idx.insert(peer, last_log_index + 1);
587                match_idx.insert(peer, 0);
588            }
589        }
590
591        info!(
592            "Node {} became leader for term {}",
593            self.config.node_id, term
594        );
595
596        // Append no-op to establish leadership
597        drop(next_idx);
598        drop(match_idx);
599        let _ = self.append_entry(IndexCommand::NoOp, None);
600    }
601
602    /// Handle AppendEntries RPC (from leader)
603    pub fn handle_append_entries(&self, request: AppendEntriesRequest) -> AppendEntriesResponse {
604        *self.rpcs_received.lock() += 1;
605        let mut persistent = self.persistent.write();
606
607        // If we see a higher term, become follower
608        if request.term > persistent.current_term {
609            persistent.current_term = request.term;
610            persistent.voted_for = None;
611            *self.role.lock() = NodeRole::Follower;
612        }
613
614        // Reply false if term < currentTerm
615        if request.term < persistent.current_term {
616            return AppendEntriesResponse {
617                term: persistent.current_term,
618                success: false,
619                node_id: self.config.node_id,
620                conflict_index: None,
621            };
622        }
623
624        // Reset election timer since we heard from a valid leader
625        *self.last_heartbeat.lock() = Instant::now();
626        *self.current_leader.lock() = Some(request.leader_id);
627        *self.role.lock() = NodeRole::Follower;
628
629        // Check prev_log consistency
630        if request.prev_log_index > 0 {
631            let entry = persistent.get_entry(request.prev_log_index);
632            match entry {
633                None => {
634                    // Don't have that entry
635                    return AppendEntriesResponse {
636                        term: persistent.current_term,
637                        success: false,
638                        node_id: self.config.node_id,
639                        conflict_index: Some(persistent.last_log_index() + 1),
640                    };
641                }
642                Some(e) if e.term != request.prev_log_term => {
643                    // Conflicting entry
644                    let conflict_index = e.index;
645                    return AppendEntriesResponse {
646                        term: persistent.current_term,
647                        success: false,
648                        node_id: self.config.node_id,
649                        conflict_index: Some(conflict_index),
650                    };
651                }
652                _ => {}
653            }
654        }
655
656        // Append new entries, removing conflicting ones
657        for entry in &request.entries {
658            let existing = persistent.get_entry(entry.index).cloned();
659            match existing {
660                Some(e) if e.term != entry.term => {
661                    // Conflict: truncate log from here
662                    persistent.truncate_from(entry.index);
663                    persistent.log.push(entry.clone());
664                }
665                None => {
666                    persistent.log.push(entry.clone());
667                }
668                _ => {} // Entry already present and matches
669            }
670        }
671
672        // Update commit index
673        let prev_commit = *self.commit_index.lock();
674        if request.leader_commit > prev_commit {
675            let new_commit = request.leader_commit.min(persistent.last_log_index());
676            drop(persistent);
677            *self.commit_index.lock() = new_commit;
678            self.apply_committed_entries();
679        }
680
681        AppendEntriesResponse {
682            term: self.persistent.read().current_term,
683            success: true,
684            node_id: self.config.node_id,
685            conflict_index: None,
686        }
687    }
688
689    /// Process AppendEntries response from a follower (leader only)
690    pub fn process_append_entries_response(
691        &self,
692        peer_id: NodeId,
693        response: AppendEntriesResponse,
694        entries_sent_count: usize,
695    ) {
696        *self.rpcs_received.lock() += 1;
697        let current_term = self.persistent.read().current_term;
698
699        if response.term > current_term {
700            let mut p = self.persistent.write();
701            p.current_term = response.term;
702            p.voted_for = None;
703            *self.role.lock() = NodeRole::Follower;
704            return;
705        }
706
707        if *self.role.lock() != NodeRole::Leader {
708            return;
709        }
710
711        if response.success {
712            let mut next_idx = self.next_index.lock();
713            let mut match_idx = self.match_index.lock();
714
715            let new_next =
716                next_idx.get(&peer_id).copied().unwrap_or(1) + entries_sent_count as LogIndex;
717
718            next_idx.insert(peer_id, new_next);
719            match_idx.insert(peer_id, new_next - 1);
720            drop(next_idx);
721            drop(match_idx);
722
723            // Try to advance commit index
724            self.try_advance_commit_index();
725        } else {
726            // Decrement next_index for this follower
727            let mut next_idx = self.next_index.lock();
728            if let Some(conflict) = response.conflict_index {
729                next_idx.insert(peer_id, conflict);
730            } else {
731                let current = next_idx.get(&peer_id).copied().unwrap_or(1);
732                if current > 1 {
733                    next_idx.insert(peer_id, current - 1);
734                }
735            }
736        }
737    }
738
739    /// Try to advance the commit index based on match_index replication
740    fn try_advance_commit_index(&self) {
741        let persistent = self.persistent.read();
742        let current_term = persistent.current_term;
743        let last_log_index = persistent.last_log_index();
744        drop(persistent);
745
746        let match_idx = self.match_index.lock();
747        let mut commit = *self.commit_index.lock();
748
749        for n in (commit + 1)..=last_log_index {
750            let p = self.persistent.read();
751            let entry_term = p.get_entry(n).map(|e| e.term).unwrap_or(0);
752            drop(p);
753
754            // Only commit entries from current term (safety requirement)
755            if entry_term != current_term {
756                continue;
757            }
758
759            // Count replications
760            let replication_count = 1 + // self
761                match_idx.values().filter(|&&m| m >= n).count();
762
763            if replication_count >= self.config.quorum_size() {
764                commit = n;
765            }
766        }
767        drop(match_idx);
768
769        let old_commit = *self.commit_index.lock();
770        if commit > old_commit {
771            *self.commit_index.lock() = commit;
772            self.apply_committed_entries();
773        }
774    }
775
776    /// Apply all committed but not yet applied log entries to state machine
777    fn apply_committed_entries(&self) {
778        let commit = *self.commit_index.lock();
779        let mut last = *self.last_applied.lock();
780
781        while last < commit {
782            last += 1;
783            let persistent = self.persistent.read();
784            let entry = persistent.get_entry(last).cloned();
785            drop(persistent);
786
787            if let Some(entry) = entry {
788                let mut sm = self.state_machine.write();
789                sm.apply(&entry.command);
790                debug!("Node {} applied log entry {}", self.config.node_id, last);
791            }
792        }
793
794        *self.last_applied.lock() = last;
795    }
796
797    /// Propose a new command (leader only)
798    ///
799    /// Returns the log index of the proposed entry, or an error if not leader.
800    pub fn propose(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
801        if *self.role.lock() != NodeRole::Leader {
802            let leader = self.current_leader.lock().map(|l| l.to_string());
803            return Err(anyhow!(
804                "Not the leader. Current leader: {:?}",
805                leader.unwrap_or_else(|| "unknown".to_string())
806            ));
807        }
808        self.append_entry(command, client_id)
809    }
810
811    /// Append an entry to the leader's log
812    fn append_entry(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
813        let mut persistent = self.persistent.write();
814        let term = persistent.current_term;
815        let index = persistent.last_log_index() + 1;
816
817        let entry = LogEntry {
818            index,
819            term,
820            command,
821            client_id,
822        };
823
824        persistent.log.push(entry);
825        info!(
826            "Node {} appended log entry {} in term {}",
827            self.config.node_id, index, term
828        );
829        Ok(index)
830    }
831
832    /// Create an AppendEntries request for a specific follower
833    pub fn create_append_entries_request(&self, peer_id: NodeId) -> Result<AppendEntriesRequest> {
834        if *self.role.lock() != NodeRole::Leader {
835            return Err(anyhow!("Not the leader"));
836        }
837
838        let persistent = self.persistent.read();
839        let next_idx = self.next_index.lock();
840        let next = next_idx.get(&peer_id).copied().unwrap_or(1);
841
842        let prev_log_index = next.saturating_sub(1);
843        let prev_log_term = if prev_log_index > 0 {
844            persistent
845                .get_entry(prev_log_index)
846                .map(|e| e.term)
847                .unwrap_or(0)
848        } else {
849            0
850        };
851
852        let entries: Vec<LogEntry> = persistent
853            .log
854            .iter()
855            .filter(|e| e.index >= next)
856            .take(self.config.max_entries_per_batch)
857            .cloned()
858            .collect();
859
860        let commit = *self.commit_index.lock();
861
862        *self.rpcs_sent.lock() += 1;
863
864        Ok(AppendEntriesRequest {
865            term: persistent.current_term,
866            leader_id: self.config.node_id,
867            prev_log_index,
868            prev_log_term,
869            entries,
870            leader_commit: commit,
871        })
872    }
873
874    /// Force commit a single-node cluster (for testing)
875    ///
876    /// In a single-node cluster, entries are immediately committed.
877    pub fn force_commit_single_node(&self) {
878        if self.config.cluster_nodes.len() != 1 {
879            warn!("force_commit_single_node called on multi-node cluster");
880            return;
881        }
882        let last_index = self.persistent.read().last_log_index();
883        *self.commit_index.lock() = last_index;
884        self.apply_committed_entries();
885    }
886
887    /// Get the current role
888    pub fn role(&self) -> NodeRole {
889        *self.role.lock()
890    }
891
892    /// Get the current term
893    pub fn current_term(&self) -> Term {
894        self.persistent.read().current_term
895    }
896
897    /// Get the current leader ID (if known)
898    pub fn current_leader(&self) -> Option<NodeId> {
899        *self.current_leader.lock()
900    }
901
902    /// Check if this node is the leader
903    pub fn is_leader(&self) -> bool {
904        *self.role.lock() == NodeRole::Leader
905    }
906
907    /// Get the number of entries in the log
908    pub fn log_length(&self) -> usize {
909        self.persistent.read().log.len()
910    }
911
912    /// Get the commit index
913    pub fn commit_index(&self) -> LogIndex {
914        *self.commit_index.lock()
915    }
916
917    /// Get the last applied index
918    pub fn last_applied(&self) -> LogIndex {
919        *self.last_applied.lock()
920    }
921
922    /// Get the number of vectors in the state machine
923    pub fn vector_count(&self) -> usize {
924        self.state_machine.read().len()
925    }
926
927    /// Get a vector from the state machine (read only)
928    pub fn get_vector(&self, vector_id: &str) -> Option<VectorEntry> {
929        self.state_machine.read().get(vector_id).cloned()
930    }
931
932    /// Search for similar vectors in the state machine
933    pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
934        self.state_machine.read().search_similar(query, k)
935    }
936
937    /// Get current statistics
938    pub fn get_stats(&self) -> RaftStats {
939        let persistent = self.persistent.read();
940        RaftStats {
941            current_term: persistent.current_term,
942            role: self.role().to_string(),
943            current_leader: *self.current_leader.lock(),
944            log_length: persistent.log.len(),
945            commit_index: *self.commit_index.lock(),
946            last_applied: *self.last_applied.lock(),
947            elections_participated: *self.elections_participated.lock(),
948            terms_as_leader: *self.terms_as_leader.lock(),
949            operations_applied: self.state_machine.read().operations_applied,
950            vector_count: self.state_machine.read().len(),
951            rpcs_sent: *self.rpcs_sent.lock(),
952            rpcs_received: *self.rpcs_received.lock(),
953        }
954    }
955
956    /// Check if election timeout has elapsed
957    pub fn election_timeout_elapsed(&self) -> bool {
958        let elapsed = self.last_heartbeat.lock().elapsed();
959        elapsed > Duration::from_millis(self.config.election_timeout_max_ms)
960    }
961
962    /// Reset the heartbeat timer (call when receiving valid messages from leader)
963    pub fn reset_heartbeat(&self) {
964        *self.last_heartbeat.lock() = Instant::now();
965    }
966}
967
968/// Helper to simulate a two-node cluster interaction for testing
969pub struct ClusterSimulator {
970    pub nodes: Vec<RaftIndexNode>,
971}
972
973impl ClusterSimulator {
974    /// Create a simulated cluster of N nodes
975    pub fn new(n: usize) -> Result<Self> {
976        let cluster_nodes: Vec<NodeId> = (1..=(n as NodeId)).collect();
977
978        let nodes = cluster_nodes
979            .iter()
980            .map(|&id| {
981                let config = RaftConfig {
982                    node_id: id,
983                    cluster_nodes: cluster_nodes.clone(),
984                    heartbeat_interval_ms: 50,
985                    election_timeout_min_ms: 150,
986                    election_timeout_max_ms: 300,
987                    max_entries_per_batch: 10,
988                    enable_snapshots: false,
989                    snapshot_threshold: 1000,
990                    max_rpc_retries: 2,
991                };
992                RaftIndexNode::new(config)
993            })
994            .collect();
995
996        Ok(Self { nodes })
997    }
998
999    /// Elect node at index `leader_idx` as leader
1000    pub fn elect_leader(&self, leader_idx: usize) {
1001        // Start election on the chosen node
1002        let vote_request = self.nodes[leader_idx].start_election();
1003
1004        // Collect votes from all other nodes
1005        let mut all_won = false;
1006        for (i, node) in self.nodes.iter().enumerate() {
1007            if i == leader_idx {
1008                continue;
1009            }
1010            let response = node.handle_request_vote(vote_request.clone());
1011            if self.nodes[leader_idx].process_vote_response(response) {
1012                all_won = true;
1013            }
1014        }
1015
1016        // If won, send initial heartbeats
1017        if all_won || self.nodes[leader_idx].is_leader() {
1018            for (i, node) in self.nodes.iter().enumerate() {
1019                if i == leader_idx {
1020                    continue;
1021                }
1022                if let Ok(ae_req) =
1023                    self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1024                {
1025                    let response = node.handle_append_entries(ae_req.clone());
1026                    self.nodes[leader_idx].process_append_entries_response(
1027                        node.config.node_id,
1028                        response,
1029                        ae_req.entries.len(),
1030                    );
1031                }
1032            }
1033        }
1034    }
1035
1036    /// Replicate all pending entries from leader to all followers
1037    pub fn replicate_all(&self) -> Result<()> {
1038        let leader_idx = self
1039            .nodes
1040            .iter()
1041            .position(|n| n.is_leader())
1042            .ok_or_else(|| anyhow!("No leader elected"))?;
1043
1044        for (i, node) in self.nodes.iter().enumerate() {
1045            if i == leader_idx {
1046                continue;
1047            }
1048            if let Ok(ae_req) =
1049                self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1050            {
1051                let entries_len = ae_req.entries.len();
1052                let response = node.handle_append_entries(ae_req);
1053                self.nodes[leader_idx].process_append_entries_response(
1054                    node.config.node_id,
1055                    response,
1056                    entries_len,
1057                );
1058            }
1059        }
1060        Ok(())
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067    use anyhow::Result;
1068
1069    fn make_vector_entry(id: &str, vec: Vec<f32>) -> VectorEntry {
1070        VectorEntry {
1071            vector_id: id.to_string(),
1072            vector: vec,
1073            metadata: HashMap::new(),
1074            inserted_at: 0,
1075        }
1076    }
1077
1078    #[test]
1079    fn test_raft_config_single_node() {
1080        let config = RaftConfig::single_node(1);
1081        assert_eq!(config.node_id, 1);
1082        assert_eq!(config.cluster_nodes, vec![1]);
1083        assert_eq!(config.quorum_size(), 1);
1084    }
1085
1086    #[test]
1087    fn test_raft_config_three_node() {
1088        let config = RaftConfig::three_node_cluster(1);
1089        assert_eq!(config.quorum_size(), 2);
1090    }
1091
1092    #[test]
1093    fn test_node_starts_as_follower() {
1094        let config = RaftConfig::single_node(1);
1095        let node = RaftIndexNode::new(config);
1096        assert_eq!(node.role(), NodeRole::Follower);
1097        assert_eq!(node.current_term(), 0);
1098    }
1099
1100    #[test]
1101    fn test_single_node_becomes_leader() {
1102        let config = RaftConfig::single_node(1);
1103        let node = RaftIndexNode::new(config);
1104
1105        let vote_req = node.start_election();
1106        assert_eq!(vote_req.term, 1);
1107        assert_eq!(node.current_term(), 1);
1108
1109        // Single-node cluster wins immediately
1110        let won = node.process_vote_response(RequestVoteResponse {
1111            term: 1,
1112            vote_granted: true,
1113            node_id: 1,
1114        });
1115
1116        // Single node has quorum of 1, self-vote should win
1117        assert!(node.is_leader() || won);
1118    }
1119
1120    #[test]
1121    fn test_single_node_leader_force_commit() -> Result<()> {
1122        let config = RaftConfig::single_node(1);
1123        let node = RaftIndexNode::new(config);
1124
1125        // Make node leader directly
1126        node.start_election();
1127        // In single node, the self-vote should win
1128        let _ = node.process_vote_response(RequestVoteResponse {
1129            term: node.current_term(),
1130            vote_granted: true,
1131            node_id: 1,
1132        });
1133
1134        if !node.is_leader() {
1135            // Manually set role for testing
1136            *node.role.lock() = NodeRole::Leader;
1137            *node.current_leader.lock() = Some(1);
1138        }
1139
1140        let entry = make_vector_entry("v1", vec![1.0, 2.0, 3.0]);
1141        node.propose(IndexCommand::Upsert(entry), None)?;
1142        node.force_commit_single_node();
1143
1144        assert_eq!(node.vector_count(), 1);
1145        assert!(node.get_vector("v1").is_some());
1146        Ok(())
1147    }
1148
1149    #[test]
1150    fn test_propose_fails_when_not_leader() {
1151        let config = RaftConfig::three_node_cluster(1);
1152        let node = RaftIndexNode::new(config);
1153        // Node is follower, proposing should fail
1154        let result = node.propose(IndexCommand::NoOp, None);
1155        assert!(result.is_err(), "Should fail to propose when not leader");
1156    }
1157
1158    #[test]
1159    fn test_request_vote_grants_to_newer_term() {
1160        let config = RaftConfig::three_node_cluster(2);
1161        let voter = RaftIndexNode::new(config);
1162
1163        let req = RequestVoteRequest {
1164            term: 5,
1165            candidate_id: 1,
1166            last_log_index: 10,
1167            last_log_term: 5,
1168        };
1169
1170        let response = voter.handle_request_vote(req);
1171        assert!(response.vote_granted, "Should grant vote to higher term");
1172        assert_eq!(response.term, 5);
1173    }
1174
1175    #[test]
1176    fn test_request_vote_rejects_stale_term() {
1177        let config = RaftConfig::three_node_cluster(2);
1178        let voter = RaftIndexNode::new(config);
1179
1180        // Set voter's current term to 5
1181        voter.persistent.write().current_term = 5;
1182
1183        let req = RequestVoteRequest {
1184            term: 3, // Stale term
1185            candidate_id: 1,
1186            last_log_index: 0,
1187            last_log_term: 0,
1188        };
1189
1190        let response = voter.handle_request_vote(req);
1191        assert!(!response.vote_granted, "Should reject stale term vote");
1192        assert_eq!(response.term, 5);
1193    }
1194
1195    #[test]
1196    fn test_request_vote_rejects_duplicate_vote() {
1197        let config = RaftConfig::three_node_cluster(2);
1198        let voter = RaftIndexNode::new(config);
1199
1200        let req1 = RequestVoteRequest {
1201            term: 1,
1202            candidate_id: 1,
1203            last_log_index: 0,
1204            last_log_term: 0,
1205        };
1206
1207        let req2 = RequestVoteRequest {
1208            term: 1,
1209            candidate_id: 3, // Different candidate, same term
1210            last_log_index: 0,
1211            last_log_term: 0,
1212        };
1213
1214        let r1 = voter.handle_request_vote(req1);
1215        assert!(r1.vote_granted, "First vote should be granted");
1216
1217        let r2 = voter.handle_request_vote(req2);
1218        assert!(
1219            !r2.vote_granted,
1220            "Duplicate vote in same term should be rejected"
1221        );
1222    }
1223
1224    #[test]
1225    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1226    fn test_append_entries_heartbeat() {
1227        let config = RaftConfig::three_node_cluster(2);
1228        let follower = RaftIndexNode::new(config);
1229
1230        let heartbeat = AppendEntriesRequest {
1231            term: 1,
1232            leader_id: 1,
1233            prev_log_index: 0,
1234            prev_log_term: 0,
1235            entries: vec![],
1236            leader_commit: 0,
1237        };
1238
1239        let response = follower.handle_append_entries(heartbeat);
1240        assert!(response.success, "Heartbeat should succeed");
1241        assert_eq!(follower.current_leader(), Some(1));
1242    }
1243
1244    #[test]
1245    fn test_append_entries_stale_term() {
1246        let config = RaftConfig::three_node_cluster(2);
1247        let follower = RaftIndexNode::new(config);
1248        follower.persistent.write().current_term = 5;
1249
1250        let request = AppendEntriesRequest {
1251            term: 3, // Stale
1252            leader_id: 1,
1253            prev_log_index: 0,
1254            prev_log_term: 0,
1255            entries: vec![],
1256            leader_commit: 0,
1257        };
1258
1259        let response = follower.handle_append_entries(request);
1260        assert!(!response.success, "Stale term should be rejected");
1261        assert_eq!(response.term, 5);
1262    }
1263
1264    #[test]
1265    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1266    fn test_cluster_simulator_election() -> Result<()> {
1267        let sim = ClusterSimulator::new(3)?;
1268        sim.elect_leader(0);
1269
1270        // At least one node should be leader
1271        let leaders: Vec<_> = sim.nodes.iter().filter(|n| n.is_leader()).collect();
1272        assert!(!leaders.is_empty(), "At least one node should be leader");
1273        Ok(())
1274    }
1275
1276    #[test]
1277    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1278    fn test_cluster_simulator_replication() -> Result<()> {
1279        let sim = ClusterSimulator::new(3)?;
1280        sim.elect_leader(0);
1281
1282        let leader_idx = sim
1283            .nodes
1284            .iter()
1285            .position(|n| n.is_leader())
1286            .expect("no leader found");
1287        let entry = make_vector_entry("v1", vec![1.0, 0.0, 0.0]);
1288        sim.nodes[leader_idx].propose(IndexCommand::Upsert(entry), None)?;
1289
1290        sim.replicate_all()?;
1291
1292        // All nodes should eventually have the entry committed
1293        let leader = &sim.nodes[leader_idx];
1294        leader.force_commit_single_node();
1295        // Leader should have the vector
1296        let vec = leader.get_vector("v1");
1297        assert!(vec.is_some() || leader.log_length() > 0);
1298        Ok(())
1299    }
1300
1301    #[test]
1302    fn test_delete_command() -> Result<()> {
1303        let config = RaftConfig::single_node(1);
1304        let node = RaftIndexNode::new(config);
1305
1306        // Become leader
1307        node.start_election();
1308        let _ = node.process_vote_response(RequestVoteResponse {
1309            term: node.current_term(),
1310            vote_granted: true,
1311            node_id: 1,
1312        });
1313
1314        if !node.is_leader() {
1315            *node.role.lock() = NodeRole::Leader;
1316            *node.current_leader.lock() = Some(1);
1317        }
1318
1319        // Insert then delete
1320        let entry = make_vector_entry("v1", vec![1.0]);
1321        node.propose(IndexCommand::Upsert(entry), None)?;
1322        node.force_commit_single_node();
1323        assert_eq!(node.vector_count(), 1);
1324
1325        node.propose(
1326            IndexCommand::Delete {
1327                vector_id: "v1".to_string(),
1328            },
1329            None,
1330        )?;
1331        node.force_commit_single_node();
1332        assert_eq!(node.vector_count(), 0);
1333        Ok(())
1334    }
1335
1336    #[test]
1337    fn test_update_metadata_command() -> Result<()> {
1338        let config = RaftConfig::single_node(1);
1339        let node = RaftIndexNode::new(config);
1340
1341        *node.role.lock() = NodeRole::Leader;
1342        *node.current_leader.lock() = Some(1);
1343
1344        let entry = make_vector_entry("v1", vec![1.0, 2.0]);
1345        node.propose(IndexCommand::Upsert(entry), None)?;
1346
1347        let mut new_meta = HashMap::new();
1348        new_meta.insert("tag".to_string(), "important".to_string());
1349        node.propose(
1350            IndexCommand::UpdateMetadata {
1351                vector_id: "v1".to_string(),
1352                metadata: new_meta,
1353            },
1354            None,
1355        )?;
1356        node.force_commit_single_node();
1357
1358        let stored = node.get_vector("v1").expect("v1 not found");
1359        assert_eq!(stored.metadata.get("tag"), Some(&"important".to_string()));
1360        Ok(())
1361    }
1362
1363    #[test]
1364    fn test_search_similar() -> Result<()> {
1365        let config = RaftConfig::single_node(1);
1366        let node = RaftIndexNode::new(config);
1367
1368        *node.role.lock() = NodeRole::Leader;
1369        *node.current_leader.lock() = Some(1);
1370
1371        node.propose(
1372            IndexCommand::Upsert(make_vector_entry("v1", vec![1.0, 0.0, 0.0])),
1373            None,
1374        )?;
1375        node.propose(
1376            IndexCommand::Upsert(make_vector_entry("v2", vec![0.0, 1.0, 0.0])),
1377            None,
1378        )?;
1379        node.propose(
1380            IndexCommand::Upsert(make_vector_entry("v3", vec![0.0, 0.0, 1.0])),
1381            None,
1382        )?;
1383        node.force_commit_single_node();
1384
1385        let results = node.search_similar(&[1.0, 0.0, 0.0], 2);
1386        assert!(!results.is_empty());
1387        // First result should be v1 with similarity ~1.0
1388        assert_eq!(results[0].0, "v1");
1389        assert!((results[0].1 - 1.0).abs() < 1e-5);
1390        Ok(())
1391    }
1392
1393    #[test]
1394    fn test_stats_populated() -> Result<()> {
1395        let config = RaftConfig::single_node(1);
1396        let node = RaftIndexNode::new(config);
1397
1398        *node.role.lock() = NodeRole::Leader;
1399        *node.current_leader.lock() = Some(1);
1400        node.propose(IndexCommand::NoOp, None)?;
1401        node.force_commit_single_node();
1402
1403        let stats = node.get_stats();
1404        assert_eq!(stats.role, "Leader");
1405        assert!(stats.log_length > 0);
1406        Ok(())
1407    }
1408
1409    #[test]
1410    fn test_raft_log_length_increases() -> Result<()> {
1411        let config = RaftConfig::single_node(1);
1412        let node = RaftIndexNode::new(config);
1413
1414        *node.role.lock() = NodeRole::Leader;
1415        *node.current_leader.lock() = Some(1);
1416
1417        assert_eq!(node.log_length(), 0);
1418
1419        node.propose(IndexCommand::NoOp, None)?;
1420        assert_eq!(node.log_length(), 1);
1421
1422        node.propose(IndexCommand::Rebuild, None)?;
1423        assert_eq!(node.log_length(), 2);
1424        Ok(())
1425    }
1426
1427    #[test]
1428    fn test_persistent_state_default() {
1429        let state = PersistentState::default();
1430        assert_eq!(state.current_term, 0);
1431        assert!(state.voted_for.is_none());
1432        assert!(state.log.is_empty());
1433        assert_eq!(state.last_log_index(), 0);
1434        assert_eq!(state.last_log_term(), 0);
1435    }
1436
1437    #[test]
1438    fn test_node_role_display() {
1439        assert_eq!(NodeRole::Follower.to_string(), "Follower");
1440        assert_eq!(NodeRole::Candidate.to_string(), "Candidate");
1441        assert_eq!(NodeRole::Leader.to_string(), "Leader");
1442    }
1443
1444    #[test]
1445    fn test_election_timeout_not_elapsed_immediately() {
1446        let config = RaftConfig::single_node(1);
1447        let node = RaftIndexNode::new(config);
1448        // Freshly created node should not have elapsed election timeout
1449        assert!(!node.election_timeout_elapsed());
1450    }
1451
1452    #[test]
1453    fn test_reset_heartbeat() {
1454        let config = RaftConfig::single_node(1);
1455        let node = RaftIndexNode::new(config);
1456        // Resetting heartbeat should keep timeout from elapsing
1457        node.reset_heartbeat();
1458        assert!(!node.election_timeout_elapsed());
1459    }
1460
1461    #[test]
1462    fn test_append_entries_appends_new_log_entries() {
1463        let config = RaftConfig::three_node_cluster(2);
1464        let follower = RaftIndexNode::new(config);
1465
1466        let entry = LogEntry {
1467            index: 1,
1468            term: 1,
1469            command: IndexCommand::NoOp,
1470            client_id: None,
1471        };
1472
1473        let request = AppendEntriesRequest {
1474            term: 1,
1475            leader_id: 1,
1476            prev_log_index: 0,
1477            prev_log_term: 0,
1478            entries: vec![entry],
1479            leader_commit: 1,
1480        };
1481
1482        let response = follower.handle_append_entries(request);
1483        assert!(response.success);
1484        assert_eq!(follower.log_length(), 1);
1485    }
1486
1487    #[test]
1488    fn test_commit_advances_last_applied() {
1489        let config = RaftConfig::three_node_cluster(2);
1490        let follower = RaftIndexNode::new(config);
1491
1492        let entry = LogEntry {
1493            index: 1,
1494            term: 1,
1495            command: IndexCommand::Upsert(make_vector_entry("v1", vec![1.0])),
1496            client_id: None,
1497        };
1498
1499        let request = AppendEntriesRequest {
1500            term: 1,
1501            leader_id: 1,
1502            prev_log_index: 0,
1503            prev_log_term: 0,
1504            entries: vec![entry],
1505            leader_commit: 1, // Leader has committed this
1506        };
1507
1508        follower.handle_append_entries(request);
1509
1510        assert_eq!(follower.last_applied(), 1);
1511        assert_eq!(follower.vector_count(), 1);
1512    }
1513}