Skip to main content

oxirs_vec/distributed/
raft_index.rs

1//! Raft-based distributed index consensus for vector stores
2//!
3//! This module implements the Raft consensus protocol for distributed
4//! vector index management. It provides:
5//! - Leader election among index nodes
6//! - Replicated log for index mutations (insertions, deletions, updates)
7//! - Consistent reads via quorum
8//! - Automatic failover and leader re-election
9//!
10//! # Design
11//!
12//! Each node in the cluster participates in Raft. Index mutations (vector
13//! insertions/deletions) are proposed as log entries. Once a majority of
14//! nodes acknowledge an entry, it is committed and applied to the local
15//! in-memory index.
16//!
17//! # Pure Rust
18//!
19//! This module is 100% Pure Rust - no CUDA or FFI dependencies.
20
21use anyhow::{anyhow, Result};
22use parking_lot::{Mutex, RwLock};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29/// Node ID type for Raft cluster members
30pub type NodeId = u64;
31
32/// Log index (1-based, 0 means no entry)
33pub type LogIndex = u64;
34
35/// Term number
36pub type Term = u64;
37
38/// A vector entry stored in the distributed index
39#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VectorEntry {
41    /// Unique identifier for this vector
42    pub vector_id: String,
43    /// Vector data
44    pub vector: Vec<f32>,
45    /// Associated metadata
46    pub metadata: HashMap<String, String>,
47    /// Timestamp of insertion
48    pub inserted_at: u64,
49}
50
51/// Commands that can be applied to the replicated state machine
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum IndexCommand {
54    /// Insert or update a vector
55    Upsert(VectorEntry),
56    /// Delete a vector by ID
57    Delete { vector_id: String },
58    /// Rebuild the index (triggers background rebuild)
59    Rebuild,
60    /// Update metadata for a vector
61    UpdateMetadata {
62        vector_id: String,
63        metadata: HashMap<String, String>,
64    },
65    /// No-op entry for leadership heartbeat and linearization
66    NoOp,
67}
68
69/// A single entry in the replicated log
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LogEntry {
72    /// Index of this entry in the log (1-based)
73    pub index: LogIndex,
74    /// Term when this entry was created
75    pub term: Term,
76    /// The command to be applied
77    pub command: IndexCommand,
78    /// Client request ID for deduplication
79    pub client_id: Option<String>,
80}
81
82/// Raft node role
83#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum NodeRole {
85    Follower,
86    Candidate,
87    Leader,
88}
89
90impl std::fmt::Display for NodeRole {
91    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92        match self {
93            Self::Follower => write!(f, "Follower"),
94            Self::Candidate => write!(f, "Candidate"),
95            Self::Leader => write!(f, "Leader"),
96        }
97    }
98}
99
100/// AppendEntries RPC request
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEntriesRequest {
103    /// Leader's term
104    pub term: Term,
105    /// Leader's ID
106    pub leader_id: NodeId,
107    /// Log index immediately preceding new entries
108    pub prev_log_index: LogIndex,
109    /// Term of prev_log_index entry
110    pub prev_log_term: Term,
111    /// New log entries to store (empty for heartbeat)
112    pub entries: Vec<LogEntry>,
113    /// Leader's commit index
114    pub leader_commit: LogIndex,
115}
116
117/// AppendEntries RPC response
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct AppendEntriesResponse {
120    /// Current term for leader to update itself
121    pub term: Term,
122    /// True if follower contained entry matching prev_log_index and prev_log_term
123    pub success: bool,
124    /// The responding node's ID
125    pub node_id: NodeId,
126    /// Conflict index for fast log rollback (optimization)
127    pub conflict_index: Option<LogIndex>,
128}
129
130/// RequestVote RPC request
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct RequestVoteRequest {
133    /// Candidate's term
134    pub term: Term,
135    /// Candidate requesting vote
136    pub candidate_id: NodeId,
137    /// Index of candidate's last log entry
138    pub last_log_index: LogIndex,
139    /// Term of candidate's last log entry
140    pub last_log_term: Term,
141}
142
143/// RequestVote RPC response
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct RequestVoteResponse {
146    /// Current term for candidate to update itself
147    pub term: Term,
148    /// True means candidate received vote
149    pub vote_granted: bool,
150    /// The responding node's ID
151    pub node_id: NodeId,
152}
153
154/// Raft node configuration
155#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RaftConfig {
157    /// This node's ID
158    pub node_id: NodeId,
159    /// All node IDs in the cluster (including self)
160    pub cluster_nodes: Vec<NodeId>,
161    /// Heartbeat interval in milliseconds
162    pub heartbeat_interval_ms: u64,
163    /// Election timeout range (min, max) in milliseconds
164    pub election_timeout_min_ms: u64,
165    pub election_timeout_max_ms: u64,
166    /// Maximum log entries per AppendEntries batch
167    pub max_entries_per_batch: usize,
168    /// Enable log compaction via snapshotting
169    pub enable_snapshots: bool,
170    /// Snapshot threshold (entries before snapshot)
171    pub snapshot_threshold: usize,
172    /// Maximum retries for failed RPCs
173    pub max_rpc_retries: usize,
174}
175
176impl RaftConfig {
177    /// Create a single-node cluster configuration (useful for testing)
178    pub fn single_node(node_id: NodeId) -> Self {
179        Self {
180            node_id,
181            cluster_nodes: vec![node_id],
182            heartbeat_interval_ms: 150,
183            election_timeout_min_ms: 300,
184            election_timeout_max_ms: 600,
185            max_entries_per_batch: 100,
186            enable_snapshots: true,
187            snapshot_threshold: 10_000,
188            max_rpc_retries: 3,
189        }
190    }
191
192    /// Create a three-node cluster configuration
193    pub fn three_node_cluster(node_id: NodeId) -> Self {
194        Self {
195            node_id,
196            cluster_nodes: vec![1, 2, 3],
197            heartbeat_interval_ms: 150,
198            election_timeout_min_ms: 300,
199            election_timeout_max_ms: 600,
200            max_entries_per_batch: 100,
201            enable_snapshots: true,
202            snapshot_threshold: 10_000,
203            max_rpc_retries: 3,
204        }
205    }
206
207    /// Get the quorum size (majority)
208    pub fn quorum_size(&self) -> usize {
209        self.cluster_nodes.len() / 2 + 1
210    }
211}
212
213impl Default for RaftConfig {
214    fn default() -> Self {
215        Self::single_node(1)
216    }
217}
218
219/// Statistics for the Raft node
220#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct RaftStats {
222    /// Current term
223    pub current_term: Term,
224    /// Current role
225    pub role: String,
226    /// Current leader ID (if known)
227    pub current_leader: Option<NodeId>,
228    /// Total log entries
229    pub log_length: usize,
230    /// Commit index
231    pub commit_index: LogIndex,
232    /// Last applied index
233    pub last_applied: LogIndex,
234    /// Number of elections participated in
235    pub elections_participated: u64,
236    /// Number of terms this node was leader
237    pub terms_as_leader: u64,
238    /// Number of index operations applied
239    pub operations_applied: u64,
240    /// Number of vectors in the distributed index
241    pub vector_count: usize,
242    /// Number of RPC messages sent
243    pub rpcs_sent: u64,
244    /// Number of RPC messages received
245    pub rpcs_received: u64,
246}
247
248/// The in-memory state machine: the actual vector index
249#[derive(Debug, Default)]
250struct IndexStateMachine {
251    /// All vectors stored in this index shard
252    vectors: HashMap<String, VectorEntry>,
253    /// Number of operations applied
254    operations_applied: u64,
255}
256
257impl IndexStateMachine {
258    /// Apply a command to the state machine
259    fn apply(&mut self, command: &IndexCommand) {
260        match command {
261            IndexCommand::Upsert(entry) => {
262                self.vectors.insert(entry.vector_id.clone(), entry.clone());
263                self.operations_applied += 1;
264                debug!("Applied Upsert for vector '{}'", entry.vector_id);
265            }
266            IndexCommand::Delete { vector_id } => {
267                self.vectors.remove(vector_id);
268                self.operations_applied += 1;
269                debug!("Applied Delete for vector '{}'", vector_id);
270            }
271            IndexCommand::UpdateMetadata {
272                vector_id,
273                metadata,
274            } => {
275                if let Some(entry) = self.vectors.get_mut(vector_id) {
276                    entry.metadata.clone_from(metadata);
277                    self.operations_applied += 1;
278                }
279            }
280            IndexCommand::Rebuild => {
281                debug!("Applied Rebuild command");
282                self.operations_applied += 1;
283            }
284            IndexCommand::NoOp => {
285                // No-op doesn't increment operations
286            }
287        }
288    }
289
290    fn len(&self) -> usize {
291        self.vectors.len()
292    }
293
294    fn get(&self, vector_id: &str) -> Option<&VectorEntry> {
295        self.vectors.get(vector_id)
296    }
297
298    fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
299        let mut similarities: Vec<(String, f32)> = self
300            .vectors
301            .iter()
302            .filter_map(|(id, entry)| {
303                if entry.vector.len() != query.len() {
304                    return None;
305                }
306                let dot: f32 = entry
307                    .vector
308                    .iter()
309                    .zip(query.iter())
310                    .map(|(a, b)| a * b)
311                    .sum();
312                let na: f32 = entry.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
313                let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
314                let sim = if na < 1e-9 || nb < 1e-9 {
315                    0.0
316                } else {
317                    dot / (na * nb)
318                };
319                Some((id.clone(), sim))
320            })
321            .collect();
322
323        similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
324        similarities.truncate(k);
325        similarities
326    }
327}
328
329/// Persistent state for Raft node (must survive restarts)
330#[derive(Debug, Clone, Serialize, Deserialize, Default)]
331pub struct PersistentState {
332    /// Latest term this server has seen
333    pub current_term: Term,
334    /// CandidateId that received vote in current term
335    pub voted_for: Option<NodeId>,
336    /// Log entries
337    pub log: Vec<LogEntry>,
338}
339
340impl PersistentState {
341    fn last_log_index(&self) -> LogIndex {
342        self.log.last().map(|e| e.index).unwrap_or(0)
343    }
344
345    fn last_log_term(&self) -> Term {
346        self.log.last().map(|e| e.term).unwrap_or(0)
347    }
348
349    fn get_entry(&self, index: LogIndex) -> Option<&LogEntry> {
350        if index == 0 {
351            return None;
352        }
353        // Log entries are 1-indexed; find by scanning
354        self.log.iter().find(|e| e.index == index)
355    }
356
357    fn truncate_from(&mut self, from_index: LogIndex) {
358        self.log.retain(|e| e.index < from_index);
359    }
360}
361
362/// Raft node implementation for distributed vector index
363///
364/// This implements the core Raft protocol. In a production deployment,
365/// RPCs would be sent over the network (e.g., gRPC or HTTP/2). Here
366/// we provide the state machine logic and expose methods for injecting
367/// simulated or actual network messages.
368#[derive(Debug)]
369pub struct RaftIndexNode {
370    config: RaftConfig,
371    /// Persistent state (term, vote, log)
372    persistent: Arc<RwLock<PersistentState>>,
373    /// Current role
374    role: Arc<Mutex<NodeRole>>,
375    /// Current leader (known)
376    current_leader: Arc<Mutex<Option<NodeId>>>,
377    /// Commit index (highest log entry known to be committed)
378    commit_index: Arc<Mutex<LogIndex>>,
379    /// Last applied (highest log entry applied to state machine)
380    last_applied: Arc<Mutex<LogIndex>>,
381    /// Next index to send to each follower (leader only)
382    next_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
383    /// Highest log index known to be replicated on each follower (leader only)
384    match_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
385    /// The actual state machine (vector index)
386    state_machine: Arc<RwLock<IndexStateMachine>>,
387    /// Votes received in current election
388    votes_received: Arc<Mutex<HashMap<NodeId, bool>>>,
389    /// Election timeout tracking
390    last_heartbeat: Arc<Mutex<Instant>>,
391    /// Statistics
392    stats: Arc<Mutex<RaftStats>>,
393    /// Number of elections participated in
394    elections_participated: Arc<Mutex<u64>>,
395    /// Number of terms as leader
396    terms_as_leader: Arc<Mutex<u64>>,
397    /// Total RPCs sent
398    rpcs_sent: Arc<Mutex<u64>>,
399    /// Total RPCs received
400    rpcs_received: Arc<Mutex<u64>>,
401}
402
403impl RaftIndexNode {
404    /// Create a new Raft index node
405    pub fn new(config: RaftConfig) -> Self {
406        let node_id = config.node_id;
407        let cluster_nodes: Vec<NodeId> = config.cluster_nodes.clone();
408
409        let next_index: HashMap<NodeId, LogIndex> = cluster_nodes
410            .iter()
411            .filter(|&&n| n != node_id)
412            .map(|&n| (n, 1))
413            .collect();
414
415        let match_index: HashMap<NodeId, LogIndex> = cluster_nodes
416            .iter()
417            .filter(|&&n| n != node_id)
418            .map(|&n| (n, 0))
419            .collect();
420
421        info!(
422            "Raft node {} initialized in cluster {:?}",
423            node_id, cluster_nodes
424        );
425
426        Self {
427            config,
428            persistent: Arc::new(RwLock::new(PersistentState::default())),
429            role: Arc::new(Mutex::new(NodeRole::Follower)),
430            current_leader: Arc::new(Mutex::new(None)),
431            commit_index: Arc::new(Mutex::new(0)),
432            last_applied: Arc::new(Mutex::new(0)),
433            next_index: Arc::new(Mutex::new(next_index)),
434            match_index: Arc::new(Mutex::new(match_index)),
435            state_machine: Arc::new(RwLock::new(IndexStateMachine::default())),
436            votes_received: Arc::new(Mutex::new(HashMap::new())),
437            last_heartbeat: Arc::new(Mutex::new(Instant::now())),
438            stats: Arc::new(Mutex::new(RaftStats::default())),
439            elections_participated: Arc::new(Mutex::new(0)),
440            terms_as_leader: Arc::new(Mutex::new(0)),
441            rpcs_sent: Arc::new(Mutex::new(0)),
442            rpcs_received: Arc::new(Mutex::new(0)),
443        }
444    }
445
446    /// Start an election (become candidate)
447    pub fn start_election(&self) -> RequestVoteRequest {
448        let mut persistent = self.persistent.write();
449        persistent.current_term += 1;
450        let new_term = persistent.current_term;
451        persistent.voted_for = Some(self.config.node_id);
452
453        *self.role.lock() = NodeRole::Candidate;
454        let mut votes = self.votes_received.lock();
455        votes.clear();
456        votes.insert(self.config.node_id, true); // Vote for self
457
458        *self.elections_participated.lock() += 1;
459
460        info!(
461            "Node {} starting election for term {}",
462            self.config.node_id, new_term
463        );
464
465        RequestVoteRequest {
466            term: new_term,
467            candidate_id: self.config.node_id,
468            last_log_index: persistent.last_log_index(),
469            last_log_term: persistent.last_log_term(),
470        }
471    }
472
473    /// Handle a RequestVote RPC from a candidate
474    pub fn handle_request_vote(&self, request: RequestVoteRequest) -> RequestVoteResponse {
475        *self.rpcs_received.lock() += 1;
476        let mut persistent = self.persistent.write();
477
478        // If we see a higher term, update and become follower
479        if request.term > persistent.current_term {
480            persistent.current_term = request.term;
481            persistent.voted_for = None;
482            *self.role.lock() = NodeRole::Follower;
483        }
484
485        let vote_granted = if request.term < persistent.current_term {
486            // Stale term, reject
487            false
488        } else {
489            let already_voted = persistent
490                .voted_for
491                .map(|v| v != request.candidate_id)
492                .unwrap_or(false);
493
494            if already_voted {
495                false
496            } else {
497                // Grant vote if candidate's log is at least as up-to-date
498                let our_last_index = persistent.last_log_index();
499                let our_last_term = persistent.last_log_term();
500
501                let log_ok = request.last_log_term > our_last_term
502                    || (request.last_log_term == our_last_term
503                        && request.last_log_index >= our_last_index);
504
505                if log_ok {
506                    persistent.voted_for = Some(request.candidate_id);
507                    *self.last_heartbeat.lock() = Instant::now();
508                    true
509                } else {
510                    false
511                }
512            }
513        };
514
515        debug!(
516            "Node {} {:?} vote to {} for term {}",
517            self.config.node_id,
518            if vote_granted { "grants" } else { "denies" },
519            request.candidate_id,
520            request.term
521        );
522
523        RequestVoteResponse {
524            term: persistent.current_term,
525            vote_granted,
526            node_id: self.config.node_id,
527        }
528    }
529
530    /// Process a vote response from a peer
531    ///
532    /// Returns `true` if this node just won the election.
533    pub fn process_vote_response(&self, response: RequestVoteResponse) -> bool {
534        *self.rpcs_received.lock() += 1;
535        let persistent = self.persistent.read();
536
537        // If we see a higher term, become follower
538        if response.term > persistent.current_term {
539            drop(persistent);
540            let mut p = self.persistent.write();
541            p.current_term = response.term;
542            p.voted_for = None;
543            *self.role.lock() = NodeRole::Follower;
544            return false;
545        }
546
547        // Only count votes if still a candidate in the same term
548        if *self.role.lock() != NodeRole::Candidate {
549            return false;
550        }
551
552        if response.term != persistent.current_term {
553            return false;
554        }
555
556        if response.vote_granted {
557            let mut votes = self.votes_received.lock();
558            votes.insert(response.node_id, true);
559            let vote_count = votes.values().filter(|&&v| v).count();
560
561            if vote_count >= self.config.quorum_size() {
562                // Won election!
563                drop(votes);
564                drop(persistent);
565                self.become_leader();
566                return true;
567            }
568        }
569        false
570    }
571
572    /// Transition to leader state
573    fn become_leader(&self) {
574        let term = self.persistent.read().current_term;
575        *self.role.lock() = NodeRole::Leader;
576        *self.current_leader.lock() = Some(self.config.node_id);
577        *self.terms_as_leader.lock() += 1;
578
579        // Initialize next_index and match_index for all followers
580        let last_log_index = self.persistent.read().last_log_index();
581        let mut next_idx = self.next_index.lock();
582        let mut match_idx = self.match_index.lock();
583
584        for &peer in &self.config.cluster_nodes {
585            if peer != self.config.node_id {
586                next_idx.insert(peer, last_log_index + 1);
587                match_idx.insert(peer, 0);
588            }
589        }
590
591        info!(
592            "Node {} became leader for term {}",
593            self.config.node_id, term
594        );
595
596        // Append no-op to establish leadership
597        drop(next_idx);
598        drop(match_idx);
599        let _ = self.append_entry(IndexCommand::NoOp, None);
600    }
601
602    /// Handle AppendEntries RPC (from leader)
603    pub fn handle_append_entries(&self, request: AppendEntriesRequest) -> AppendEntriesResponse {
604        *self.rpcs_received.lock() += 1;
605        let mut persistent = self.persistent.write();
606
607        // If we see a higher term, become follower
608        if request.term > persistent.current_term {
609            persistent.current_term = request.term;
610            persistent.voted_for = None;
611            *self.role.lock() = NodeRole::Follower;
612        }
613
614        // Reply false if term < currentTerm
615        if request.term < persistent.current_term {
616            return AppendEntriesResponse {
617                term: persistent.current_term,
618                success: false,
619                node_id: self.config.node_id,
620                conflict_index: None,
621            };
622        }
623
624        // Reset election timer since we heard from a valid leader
625        *self.last_heartbeat.lock() = Instant::now();
626        *self.current_leader.lock() = Some(request.leader_id);
627        *self.role.lock() = NodeRole::Follower;
628
629        // Check prev_log consistency
630        if request.prev_log_index > 0 {
631            let entry = persistent.get_entry(request.prev_log_index);
632            match entry {
633                None => {
634                    // Don't have that entry
635                    return AppendEntriesResponse {
636                        term: persistent.current_term,
637                        success: false,
638                        node_id: self.config.node_id,
639                        conflict_index: Some(persistent.last_log_index() + 1),
640                    };
641                }
642                Some(e) if e.term != request.prev_log_term => {
643                    // Conflicting entry
644                    let conflict_index = e.index;
645                    return AppendEntriesResponse {
646                        term: persistent.current_term,
647                        success: false,
648                        node_id: self.config.node_id,
649                        conflict_index: Some(conflict_index),
650                    };
651                }
652                _ => {}
653            }
654        }
655
656        // Append new entries, removing conflicting ones
657        for entry in &request.entries {
658            let existing = persistent.get_entry(entry.index).cloned();
659            match existing {
660                Some(e) if e.term != entry.term => {
661                    // Conflict: truncate log from here
662                    persistent.truncate_from(entry.index);
663                    persistent.log.push(entry.clone());
664                }
665                None => {
666                    persistent.log.push(entry.clone());
667                }
668                _ => {} // Entry already present and matches
669            }
670        }
671
672        // Update commit index
673        let prev_commit = *self.commit_index.lock();
674        if request.leader_commit > prev_commit {
675            let new_commit = request.leader_commit.min(persistent.last_log_index());
676            drop(persistent);
677            *self.commit_index.lock() = new_commit;
678            self.apply_committed_entries();
679        }
680
681        AppendEntriesResponse {
682            term: self.persistent.read().current_term,
683            success: true,
684            node_id: self.config.node_id,
685            conflict_index: None,
686        }
687    }
688
689    /// Process AppendEntries response from a follower (leader only)
690    pub fn process_append_entries_response(
691        &self,
692        peer_id: NodeId,
693        response: AppendEntriesResponse,
694        entries_sent_count: usize,
695    ) {
696        *self.rpcs_received.lock() += 1;
697        let current_term = self.persistent.read().current_term;
698
699        if response.term > current_term {
700            let mut p = self.persistent.write();
701            p.current_term = response.term;
702            p.voted_for = None;
703            *self.role.lock() = NodeRole::Follower;
704            return;
705        }
706
707        if *self.role.lock() != NodeRole::Leader {
708            return;
709        }
710
711        if response.success {
712            let mut next_idx = self.next_index.lock();
713            let mut match_idx = self.match_index.lock();
714
715            let new_next =
716                next_idx.get(&peer_id).copied().unwrap_or(1) + entries_sent_count as LogIndex;
717
718            next_idx.insert(peer_id, new_next);
719            match_idx.insert(peer_id, new_next - 1);
720            drop(next_idx);
721            drop(match_idx);
722
723            // Try to advance commit index
724            self.try_advance_commit_index();
725        } else {
726            // Decrement next_index for this follower
727            let mut next_idx = self.next_index.lock();
728            if let Some(conflict) = response.conflict_index {
729                next_idx.insert(peer_id, conflict);
730            } else {
731                let current = next_idx.get(&peer_id).copied().unwrap_or(1);
732                if current > 1 {
733                    next_idx.insert(peer_id, current - 1);
734                }
735            }
736        }
737    }
738
739    /// Try to advance the commit index based on match_index replication
740    fn try_advance_commit_index(&self) {
741        let persistent = self.persistent.read();
742        let current_term = persistent.current_term;
743        let last_log_index = persistent.last_log_index();
744        drop(persistent);
745
746        let match_idx = self.match_index.lock();
747        let mut commit = *self.commit_index.lock();
748
749        for n in (commit + 1)..=last_log_index {
750            let p = self.persistent.read();
751            let entry_term = p.get_entry(n).map(|e| e.term).unwrap_or(0);
752            drop(p);
753
754            // Only commit entries from current term (safety requirement)
755            if entry_term != current_term {
756                continue;
757            }
758
759            // Count replications
760            let replication_count = 1 + // self
761                match_idx.values().filter(|&&m| m >= n).count();
762
763            if replication_count >= self.config.quorum_size() {
764                commit = n;
765            }
766        }
767        drop(match_idx);
768
769        let old_commit = *self.commit_index.lock();
770        if commit > old_commit {
771            *self.commit_index.lock() = commit;
772            self.apply_committed_entries();
773        }
774    }
775
776    /// Apply all committed but not yet applied log entries to state machine
777    fn apply_committed_entries(&self) {
778        let commit = *self.commit_index.lock();
779        let mut last = *self.last_applied.lock();
780
781        while last < commit {
782            last += 1;
783            let persistent = self.persistent.read();
784            let entry = persistent.get_entry(last).cloned();
785            drop(persistent);
786
787            if let Some(entry) = entry {
788                let mut sm = self.state_machine.write();
789                sm.apply(&entry.command);
790                debug!("Node {} applied log entry {}", self.config.node_id, last);
791            }
792        }
793
794        *self.last_applied.lock() = last;
795    }
796
797    /// Propose a new command (leader only)
798    ///
799    /// Returns the log index of the proposed entry, or an error if not leader.
800    pub fn propose(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
801        if *self.role.lock() != NodeRole::Leader {
802            let leader = self.current_leader.lock().map(|l| l.to_string());
803            return Err(anyhow!(
804                "Not the leader. Current leader: {:?}",
805                leader.unwrap_or_else(|| "unknown".to_string())
806            ));
807        }
808        self.append_entry(command, client_id)
809    }
810
811    /// Append an entry to the leader's log
812    fn append_entry(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
813        let mut persistent = self.persistent.write();
814        let term = persistent.current_term;
815        let index = persistent.last_log_index() + 1;
816
817        let entry = LogEntry {
818            index,
819            term,
820            command,
821            client_id,
822        };
823
824        persistent.log.push(entry);
825        info!(
826            "Node {} appended log entry {} in term {}",
827            self.config.node_id, index, term
828        );
829        Ok(index)
830    }
831
832    /// Create an AppendEntries request for a specific follower
833    pub fn create_append_entries_request(&self, peer_id: NodeId) -> Result<AppendEntriesRequest> {
834        if *self.role.lock() != NodeRole::Leader {
835            return Err(anyhow!("Not the leader"));
836        }
837
838        let persistent = self.persistent.read();
839        let next_idx = self.next_index.lock();
840        let next = next_idx.get(&peer_id).copied().unwrap_or(1);
841
842        let prev_log_index = next.saturating_sub(1);
843        let prev_log_term = if prev_log_index > 0 {
844            persistent
845                .get_entry(prev_log_index)
846                .map(|e| e.term)
847                .unwrap_or(0)
848        } else {
849            0
850        };
851
852        let entries: Vec<LogEntry> = persistent
853            .log
854            .iter()
855            .filter(|e| e.index >= next)
856            .take(self.config.max_entries_per_batch)
857            .cloned()
858            .collect();
859
860        let commit = *self.commit_index.lock();
861
862        *self.rpcs_sent.lock() += 1;
863
864        Ok(AppendEntriesRequest {
865            term: persistent.current_term,
866            leader_id: self.config.node_id,
867            prev_log_index,
868            prev_log_term,
869            entries,
870            leader_commit: commit,
871        })
872    }
873
874    /// Force commit a single-node cluster (for testing)
875    ///
876    /// In a single-node cluster, entries are immediately committed.
877    pub fn force_commit_single_node(&self) {
878        if self.config.cluster_nodes.len() != 1 {
879            warn!("force_commit_single_node called on multi-node cluster");
880            return;
881        }
882        let last_index = self.persistent.read().last_log_index();
883        *self.commit_index.lock() = last_index;
884        self.apply_committed_entries();
885    }
886
887    /// Get the current role
888    pub fn role(&self) -> NodeRole {
889        *self.role.lock()
890    }
891
892    /// Get the current term
893    pub fn current_term(&self) -> Term {
894        self.persistent.read().current_term
895    }
896
897    /// Get the current leader ID (if known)
898    pub fn current_leader(&self) -> Option<NodeId> {
899        *self.current_leader.lock()
900    }
901
902    /// Check if this node is the leader
903    pub fn is_leader(&self) -> bool {
904        *self.role.lock() == NodeRole::Leader
905    }
906
907    /// Get the number of entries in the log
908    pub fn log_length(&self) -> usize {
909        self.persistent.read().log.len()
910    }
911
912    /// Get the commit index
913    pub fn commit_index(&self) -> LogIndex {
914        *self.commit_index.lock()
915    }
916
917    /// Get the last applied index
918    pub fn last_applied(&self) -> LogIndex {
919        *self.last_applied.lock()
920    }
921
922    /// Get the number of vectors in the state machine
923    pub fn vector_count(&self) -> usize {
924        self.state_machine.read().len()
925    }
926
927    /// Get a vector from the state machine (read only)
928    pub fn get_vector(&self, vector_id: &str) -> Option<VectorEntry> {
929        self.state_machine.read().get(vector_id).cloned()
930    }
931
932    /// Search for similar vectors in the state machine
933    pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
934        self.state_machine.read().search_similar(query, k)
935    }
936
937    /// Get current statistics
938    pub fn get_stats(&self) -> RaftStats {
939        let persistent = self.persistent.read();
940        RaftStats {
941            current_term: persistent.current_term,
942            role: self.role().to_string(),
943            current_leader: *self.current_leader.lock(),
944            log_length: persistent.log.len(),
945            commit_index: *self.commit_index.lock(),
946            last_applied: *self.last_applied.lock(),
947            elections_participated: *self.elections_participated.lock(),
948            terms_as_leader: *self.terms_as_leader.lock(),
949            operations_applied: self.state_machine.read().operations_applied,
950            vector_count: self.state_machine.read().len(),
951            rpcs_sent: *self.rpcs_sent.lock(),
952            rpcs_received: *self.rpcs_received.lock(),
953        }
954    }
955
956    /// Check if election timeout has elapsed
957    pub fn election_timeout_elapsed(&self) -> bool {
958        let elapsed = self.last_heartbeat.lock().elapsed();
959        elapsed > Duration::from_millis(self.config.election_timeout_max_ms)
960    }
961
962    /// Reset the heartbeat timer (call when receiving valid messages from leader)
963    pub fn reset_heartbeat(&self) {
964        *self.last_heartbeat.lock() = Instant::now();
965    }
966}
967
968/// Helper to simulate a two-node cluster interaction for testing
969pub struct ClusterSimulator {
970    pub nodes: Vec<RaftIndexNode>,
971}
972
973impl ClusterSimulator {
974    /// Create a simulated cluster of N nodes
975    pub fn new(n: usize) -> Result<Self> {
976        let cluster_nodes: Vec<NodeId> = (1..=(n as NodeId)).collect();
977
978        let nodes = cluster_nodes
979            .iter()
980            .map(|&id| {
981                let config = RaftConfig {
982                    node_id: id,
983                    cluster_nodes: cluster_nodes.clone(),
984                    heartbeat_interval_ms: 50,
985                    election_timeout_min_ms: 150,
986                    election_timeout_max_ms: 300,
987                    max_entries_per_batch: 10,
988                    enable_snapshots: false,
989                    snapshot_threshold: 1000,
990                    max_rpc_retries: 2,
991                };
992                RaftIndexNode::new(config)
993            })
994            .collect();
995
996        Ok(Self { nodes })
997    }
998
999    /// Elect node at index `leader_idx` as leader
1000    pub fn elect_leader(&self, leader_idx: usize) {
1001        // Start election on the chosen node
1002        let vote_request = self.nodes[leader_idx].start_election();
1003
1004        // Collect votes from all other nodes
1005        let mut all_won = false;
1006        for (i, node) in self.nodes.iter().enumerate() {
1007            if i == leader_idx {
1008                continue;
1009            }
1010            let response = node.handle_request_vote(vote_request.clone());
1011            if self.nodes[leader_idx].process_vote_response(response) {
1012                all_won = true;
1013            }
1014        }
1015
1016        // If won, send initial heartbeats
1017        if all_won || self.nodes[leader_idx].is_leader() {
1018            for (i, node) in self.nodes.iter().enumerate() {
1019                if i == leader_idx {
1020                    continue;
1021                }
1022                if let Ok(ae_req) =
1023                    self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1024                {
1025                    let response = node.handle_append_entries(ae_req.clone());
1026                    self.nodes[leader_idx].process_append_entries_response(
1027                        node.config.node_id,
1028                        response,
1029                        ae_req.entries.len(),
1030                    );
1031                }
1032            }
1033        }
1034    }
1035
1036    /// Replicate all pending entries from leader to all followers
1037    pub fn replicate_all(&self) -> Result<()> {
1038        let leader_idx = self
1039            .nodes
1040            .iter()
1041            .position(|n| n.is_leader())
1042            .ok_or_else(|| anyhow!("No leader elected"))?;
1043
1044        for (i, node) in self.nodes.iter().enumerate() {
1045            if i == leader_idx {
1046                continue;
1047            }
1048            if let Ok(ae_req) =
1049                self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1050            {
1051                let entries_len = ae_req.entries.len();
1052                let response = node.handle_append_entries(ae_req);
1053                self.nodes[leader_idx].process_append_entries_response(
1054                    node.config.node_id,
1055                    response,
1056                    entries_len,
1057                );
1058            }
1059        }
1060        Ok(())
1061    }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066    use super::*;
1067
1068    fn make_vector_entry(id: &str, vec: Vec<f32>) -> VectorEntry {
1069        VectorEntry {
1070            vector_id: id.to_string(),
1071            vector: vec,
1072            metadata: HashMap::new(),
1073            inserted_at: 0,
1074        }
1075    }
1076
1077    #[test]
1078    fn test_raft_config_single_node() {
1079        let config = RaftConfig::single_node(1);
1080        assert_eq!(config.node_id, 1);
1081        assert_eq!(config.cluster_nodes, vec![1]);
1082        assert_eq!(config.quorum_size(), 1);
1083    }
1084
1085    #[test]
1086    fn test_raft_config_three_node() {
1087        let config = RaftConfig::three_node_cluster(1);
1088        assert_eq!(config.quorum_size(), 2);
1089    }
1090
1091    #[test]
1092    fn test_node_starts_as_follower() {
1093        let config = RaftConfig::single_node(1);
1094        let node = RaftIndexNode::new(config);
1095        assert_eq!(node.role(), NodeRole::Follower);
1096        assert_eq!(node.current_term(), 0);
1097    }
1098
1099    #[test]
1100    fn test_single_node_becomes_leader() {
1101        let config = RaftConfig::single_node(1);
1102        let node = RaftIndexNode::new(config);
1103
1104        let vote_req = node.start_election();
1105        assert_eq!(vote_req.term, 1);
1106        assert_eq!(node.current_term(), 1);
1107
1108        // Single-node cluster wins immediately
1109        let won = node.process_vote_response(RequestVoteResponse {
1110            term: 1,
1111            vote_granted: true,
1112            node_id: 1,
1113        });
1114
1115        // Single node has quorum of 1, self-vote should win
1116        assert!(node.is_leader() || won);
1117    }
1118
1119    #[test]
1120    fn test_single_node_leader_force_commit() {
1121        let config = RaftConfig::single_node(1);
1122        let node = RaftIndexNode::new(config);
1123
1124        // Make node leader directly
1125        node.start_election();
1126        // In single node, the self-vote should win
1127        let _ = node.process_vote_response(RequestVoteResponse {
1128            term: node.current_term(),
1129            vote_granted: true,
1130            node_id: 1,
1131        });
1132
1133        if !node.is_leader() {
1134            // Manually set role for testing
1135            *node.role.lock() = NodeRole::Leader;
1136            *node.current_leader.lock() = Some(1);
1137        }
1138
1139        let entry = make_vector_entry("v1", vec![1.0, 2.0, 3.0]);
1140        node.propose(IndexCommand::Upsert(entry), None).unwrap();
1141        node.force_commit_single_node();
1142
1143        assert_eq!(node.vector_count(), 1);
1144        assert!(node.get_vector("v1").is_some());
1145    }
1146
1147    #[test]
1148    fn test_propose_fails_when_not_leader() {
1149        let config = RaftConfig::three_node_cluster(1);
1150        let node = RaftIndexNode::new(config);
1151        // Node is follower, proposing should fail
1152        let result = node.propose(IndexCommand::NoOp, None);
1153        assert!(result.is_err(), "Should fail to propose when not leader");
1154    }
1155
1156    #[test]
1157    fn test_request_vote_grants_to_newer_term() {
1158        let config = RaftConfig::three_node_cluster(2);
1159        let voter = RaftIndexNode::new(config);
1160
1161        let req = RequestVoteRequest {
1162            term: 5,
1163            candidate_id: 1,
1164            last_log_index: 10,
1165            last_log_term: 5,
1166        };
1167
1168        let response = voter.handle_request_vote(req);
1169        assert!(response.vote_granted, "Should grant vote to higher term");
1170        assert_eq!(response.term, 5);
1171    }
1172
1173    #[test]
1174    fn test_request_vote_rejects_stale_term() {
1175        let config = RaftConfig::three_node_cluster(2);
1176        let voter = RaftIndexNode::new(config);
1177
1178        // Set voter's current term to 5
1179        voter.persistent.write().current_term = 5;
1180
1181        let req = RequestVoteRequest {
1182            term: 3, // Stale term
1183            candidate_id: 1,
1184            last_log_index: 0,
1185            last_log_term: 0,
1186        };
1187
1188        let response = voter.handle_request_vote(req);
1189        assert!(!response.vote_granted, "Should reject stale term vote");
1190        assert_eq!(response.term, 5);
1191    }
1192
1193    #[test]
1194    fn test_request_vote_rejects_duplicate_vote() {
1195        let config = RaftConfig::three_node_cluster(2);
1196        let voter = RaftIndexNode::new(config);
1197
1198        let req1 = RequestVoteRequest {
1199            term: 1,
1200            candidate_id: 1,
1201            last_log_index: 0,
1202            last_log_term: 0,
1203        };
1204
1205        let req2 = RequestVoteRequest {
1206            term: 1,
1207            candidate_id: 3, // Different candidate, same term
1208            last_log_index: 0,
1209            last_log_term: 0,
1210        };
1211
1212        let r1 = voter.handle_request_vote(req1);
1213        assert!(r1.vote_granted, "First vote should be granted");
1214
1215        let r2 = voter.handle_request_vote(req2);
1216        assert!(
1217            !r2.vote_granted,
1218            "Duplicate vote in same term should be rejected"
1219        );
1220    }
1221
1222    #[test]
1223    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1224    fn test_append_entries_heartbeat() {
1225        let config = RaftConfig::three_node_cluster(2);
1226        let follower = RaftIndexNode::new(config);
1227
1228        let heartbeat = AppendEntriesRequest {
1229            term: 1,
1230            leader_id: 1,
1231            prev_log_index: 0,
1232            prev_log_term: 0,
1233            entries: vec![],
1234            leader_commit: 0,
1235        };
1236
1237        let response = follower.handle_append_entries(heartbeat);
1238        assert!(response.success, "Heartbeat should succeed");
1239        assert_eq!(follower.current_leader(), Some(1));
1240    }
1241
1242    #[test]
1243    fn test_append_entries_stale_term() {
1244        let config = RaftConfig::three_node_cluster(2);
1245        let follower = RaftIndexNode::new(config);
1246        follower.persistent.write().current_term = 5;
1247
1248        let request = AppendEntriesRequest {
1249            term: 3, // Stale
1250            leader_id: 1,
1251            prev_log_index: 0,
1252            prev_log_term: 0,
1253            entries: vec![],
1254            leader_commit: 0,
1255        };
1256
1257        let response = follower.handle_append_entries(request);
1258        assert!(!response.success, "Stale term should be rejected");
1259        assert_eq!(response.term, 5);
1260    }
1261
1262    #[test]
1263    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1264    fn test_cluster_simulator_election() {
1265        let sim = ClusterSimulator::new(3).unwrap();
1266        sim.elect_leader(0);
1267
1268        // At least one node should be leader
1269        let leaders: Vec<_> = sim.nodes.iter().filter(|n| n.is_leader()).collect();
1270        assert!(!leaders.is_empty(), "At least one node should be leader");
1271    }
1272
1273    #[test]
1274    #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1275    fn test_cluster_simulator_replication() {
1276        let sim = ClusterSimulator::new(3).unwrap();
1277        sim.elect_leader(0);
1278
1279        let leader_idx = sim.nodes.iter().position(|n| n.is_leader()).unwrap();
1280        let entry = make_vector_entry("v1", vec![1.0, 0.0, 0.0]);
1281        sim.nodes[leader_idx]
1282            .propose(IndexCommand::Upsert(entry), None)
1283            .unwrap();
1284
1285        sim.replicate_all().unwrap();
1286
1287        // All nodes should eventually have the entry committed
1288        let leader = &sim.nodes[leader_idx];
1289        leader.force_commit_single_node();
1290        // Leader should have the vector
1291        let vec = leader.get_vector("v1");
1292        assert!(vec.is_some() || leader.log_length() > 0);
1293    }
1294
1295    #[test]
1296    fn test_delete_command() {
1297        let config = RaftConfig::single_node(1);
1298        let node = RaftIndexNode::new(config);
1299
1300        // Become leader
1301        node.start_election();
1302        let _ = node.process_vote_response(RequestVoteResponse {
1303            term: node.current_term(),
1304            vote_granted: true,
1305            node_id: 1,
1306        });
1307
1308        if !node.is_leader() {
1309            *node.role.lock() = NodeRole::Leader;
1310            *node.current_leader.lock() = Some(1);
1311        }
1312
1313        // Insert then delete
1314        let entry = make_vector_entry("v1", vec![1.0]);
1315        node.propose(IndexCommand::Upsert(entry), None).unwrap();
1316        node.force_commit_single_node();
1317        assert_eq!(node.vector_count(), 1);
1318
1319        node.propose(
1320            IndexCommand::Delete {
1321                vector_id: "v1".to_string(),
1322            },
1323            None,
1324        )
1325        .unwrap();
1326        node.force_commit_single_node();
1327        assert_eq!(node.vector_count(), 0);
1328    }
1329
1330    #[test]
1331    fn test_update_metadata_command() {
1332        let config = RaftConfig::single_node(1);
1333        let node = RaftIndexNode::new(config);
1334
1335        *node.role.lock() = NodeRole::Leader;
1336        *node.current_leader.lock() = Some(1);
1337
1338        let entry = make_vector_entry("v1", vec![1.0, 2.0]);
1339        node.propose(IndexCommand::Upsert(entry), None).unwrap();
1340
1341        let mut new_meta = HashMap::new();
1342        new_meta.insert("tag".to_string(), "important".to_string());
1343        node.propose(
1344            IndexCommand::UpdateMetadata {
1345                vector_id: "v1".to_string(),
1346                metadata: new_meta,
1347            },
1348            None,
1349        )
1350        .unwrap();
1351        node.force_commit_single_node();
1352
1353        let stored = node.get_vector("v1").unwrap();
1354        assert_eq!(stored.metadata.get("tag"), Some(&"important".to_string()));
1355    }
1356
1357    #[test]
1358    fn test_search_similar() {
1359        let config = RaftConfig::single_node(1);
1360        let node = RaftIndexNode::new(config);
1361
1362        *node.role.lock() = NodeRole::Leader;
1363        *node.current_leader.lock() = Some(1);
1364
1365        node.propose(
1366            IndexCommand::Upsert(make_vector_entry("v1", vec![1.0, 0.0, 0.0])),
1367            None,
1368        )
1369        .unwrap();
1370        node.propose(
1371            IndexCommand::Upsert(make_vector_entry("v2", vec![0.0, 1.0, 0.0])),
1372            None,
1373        )
1374        .unwrap();
1375        node.propose(
1376            IndexCommand::Upsert(make_vector_entry("v3", vec![0.0, 0.0, 1.0])),
1377            None,
1378        )
1379        .unwrap();
1380        node.force_commit_single_node();
1381
1382        let results = node.search_similar(&[1.0, 0.0, 0.0], 2);
1383        assert!(!results.is_empty());
1384        // First result should be v1 with similarity ~1.0
1385        assert_eq!(results[0].0, "v1");
1386        assert!((results[0].1 - 1.0).abs() < 1e-5);
1387    }
1388
1389    #[test]
1390    fn test_stats_populated() {
1391        let config = RaftConfig::single_node(1);
1392        let node = RaftIndexNode::new(config);
1393
1394        *node.role.lock() = NodeRole::Leader;
1395        *node.current_leader.lock() = Some(1);
1396        node.propose(IndexCommand::NoOp, None).unwrap();
1397        node.force_commit_single_node();
1398
1399        let stats = node.get_stats();
1400        assert_eq!(stats.role, "Leader");
1401        assert!(stats.log_length > 0);
1402    }
1403
1404    #[test]
1405    fn test_raft_log_length_increases() {
1406        let config = RaftConfig::single_node(1);
1407        let node = RaftIndexNode::new(config);
1408
1409        *node.role.lock() = NodeRole::Leader;
1410        *node.current_leader.lock() = Some(1);
1411
1412        assert_eq!(node.log_length(), 0);
1413
1414        node.propose(IndexCommand::NoOp, None).unwrap();
1415        assert_eq!(node.log_length(), 1);
1416
1417        node.propose(IndexCommand::Rebuild, None).unwrap();
1418        assert_eq!(node.log_length(), 2);
1419    }
1420
1421    #[test]
1422    fn test_persistent_state_default() {
1423        let state = PersistentState::default();
1424        assert_eq!(state.current_term, 0);
1425        assert!(state.voted_for.is_none());
1426        assert!(state.log.is_empty());
1427        assert_eq!(state.last_log_index(), 0);
1428        assert_eq!(state.last_log_term(), 0);
1429    }
1430
1431    #[test]
1432    fn test_node_role_display() {
1433        assert_eq!(NodeRole::Follower.to_string(), "Follower");
1434        assert_eq!(NodeRole::Candidate.to_string(), "Candidate");
1435        assert_eq!(NodeRole::Leader.to_string(), "Leader");
1436    }
1437
1438    #[test]
1439    fn test_election_timeout_not_elapsed_immediately() {
1440        let config = RaftConfig::single_node(1);
1441        let node = RaftIndexNode::new(config);
1442        // Freshly created node should not have elapsed election timeout
1443        assert!(!node.election_timeout_elapsed());
1444    }
1445
1446    #[test]
1447    fn test_reset_heartbeat() {
1448        let config = RaftConfig::single_node(1);
1449        let node = RaftIndexNode::new(config);
1450        // Resetting heartbeat should keep timeout from elapsing
1451        node.reset_heartbeat();
1452        assert!(!node.election_timeout_elapsed());
1453    }
1454
1455    #[test]
1456    fn test_append_entries_appends_new_log_entries() {
1457        let config = RaftConfig::three_node_cluster(2);
1458        let follower = RaftIndexNode::new(config);
1459
1460        let entry = LogEntry {
1461            index: 1,
1462            term: 1,
1463            command: IndexCommand::NoOp,
1464            client_id: None,
1465        };
1466
1467        let request = AppendEntriesRequest {
1468            term: 1,
1469            leader_id: 1,
1470            prev_log_index: 0,
1471            prev_log_term: 0,
1472            entries: vec![entry],
1473            leader_commit: 1,
1474        };
1475
1476        let response = follower.handle_append_entries(request);
1477        assert!(response.success);
1478        assert_eq!(follower.log_length(), 1);
1479    }
1480
1481    #[test]
1482    fn test_commit_advances_last_applied() {
1483        let config = RaftConfig::three_node_cluster(2);
1484        let follower = RaftIndexNode::new(config);
1485
1486        let entry = LogEntry {
1487            index: 1,
1488            term: 1,
1489            command: IndexCommand::Upsert(make_vector_entry("v1", vec![1.0])),
1490            client_id: None,
1491        };
1492
1493        let request = AppendEntriesRequest {
1494            term: 1,
1495            leader_id: 1,
1496            prev_log_index: 0,
1497            prev_log_term: 0,
1498            entries: vec![entry],
1499            leader_commit: 1, // Leader has committed this
1500        };
1501
1502        follower.handle_append_entries(request);
1503
1504        assert_eq!(follower.last_applied(), 1);
1505        assert_eq!(follower.vector_count(), 1);
1506    }
1507}