1use anyhow::{anyhow, Result};
22use parking_lot::{Mutex, RwLock};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29pub type NodeId = u64;
31
32pub type LogIndex = u64;
34
35pub type Term = u64;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VectorEntry {
41 pub vector_id: String,
43 pub vector: Vec<f32>,
45 pub metadata: HashMap<String, String>,
47 pub inserted_at: u64,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum IndexCommand {
54 Upsert(VectorEntry),
56 Delete { vector_id: String },
58 Rebuild,
60 UpdateMetadata {
62 vector_id: String,
63 metadata: HashMap<String, String>,
64 },
65 NoOp,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LogEntry {
72 pub index: LogIndex,
74 pub term: Term,
76 pub command: IndexCommand,
78 pub client_id: Option<String>,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum NodeRole {
85 Follower,
86 Candidate,
87 Leader,
88}
89
90impl std::fmt::Display for NodeRole {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 Self::Follower => write!(f, "Follower"),
94 Self::Candidate => write!(f, "Candidate"),
95 Self::Leader => write!(f, "Leader"),
96 }
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEntriesRequest {
103 pub term: Term,
105 pub leader_id: NodeId,
107 pub prev_log_index: LogIndex,
109 pub prev_log_term: Term,
111 pub entries: Vec<LogEntry>,
113 pub leader_commit: LogIndex,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct AppendEntriesResponse {
120 pub term: Term,
122 pub success: bool,
124 pub node_id: NodeId,
126 pub conflict_index: Option<LogIndex>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct RequestVoteRequest {
133 pub term: Term,
135 pub candidate_id: NodeId,
137 pub last_log_index: LogIndex,
139 pub last_log_term: Term,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct RequestVoteResponse {
146 pub term: Term,
148 pub vote_granted: bool,
150 pub node_id: NodeId,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RaftConfig {
157 pub node_id: NodeId,
159 pub cluster_nodes: Vec<NodeId>,
161 pub heartbeat_interval_ms: u64,
163 pub election_timeout_min_ms: u64,
165 pub election_timeout_max_ms: u64,
166 pub max_entries_per_batch: usize,
168 pub enable_snapshots: bool,
170 pub snapshot_threshold: usize,
172 pub max_rpc_retries: usize,
174}
175
176impl RaftConfig {
177 pub fn single_node(node_id: NodeId) -> Self {
179 Self {
180 node_id,
181 cluster_nodes: vec![node_id],
182 heartbeat_interval_ms: 150,
183 election_timeout_min_ms: 300,
184 election_timeout_max_ms: 600,
185 max_entries_per_batch: 100,
186 enable_snapshots: true,
187 snapshot_threshold: 10_000,
188 max_rpc_retries: 3,
189 }
190 }
191
192 pub fn three_node_cluster(node_id: NodeId) -> Self {
194 Self {
195 node_id,
196 cluster_nodes: vec![1, 2, 3],
197 heartbeat_interval_ms: 150,
198 election_timeout_min_ms: 300,
199 election_timeout_max_ms: 600,
200 max_entries_per_batch: 100,
201 enable_snapshots: true,
202 snapshot_threshold: 10_000,
203 max_rpc_retries: 3,
204 }
205 }
206
207 pub fn quorum_size(&self) -> usize {
209 self.cluster_nodes.len() / 2 + 1
210 }
211}
212
213impl Default for RaftConfig {
214 fn default() -> Self {
215 Self::single_node(1)
216 }
217}
218
219#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct RaftStats {
222 pub current_term: Term,
224 pub role: String,
226 pub current_leader: Option<NodeId>,
228 pub log_length: usize,
230 pub commit_index: LogIndex,
232 pub last_applied: LogIndex,
234 pub elections_participated: u64,
236 pub terms_as_leader: u64,
238 pub operations_applied: u64,
240 pub vector_count: usize,
242 pub rpcs_sent: u64,
244 pub rpcs_received: u64,
246}
247
248#[derive(Debug, Default)]
250struct IndexStateMachine {
251 vectors: HashMap<String, VectorEntry>,
253 operations_applied: u64,
255}
256
257impl IndexStateMachine {
258 fn apply(&mut self, command: &IndexCommand) {
260 match command {
261 IndexCommand::Upsert(entry) => {
262 self.vectors.insert(entry.vector_id.clone(), entry.clone());
263 self.operations_applied += 1;
264 debug!("Applied Upsert for vector '{}'", entry.vector_id);
265 }
266 IndexCommand::Delete { vector_id } => {
267 self.vectors.remove(vector_id);
268 self.operations_applied += 1;
269 debug!("Applied Delete for vector '{}'", vector_id);
270 }
271 IndexCommand::UpdateMetadata {
272 vector_id,
273 metadata,
274 } => {
275 if let Some(entry) = self.vectors.get_mut(vector_id) {
276 entry.metadata.clone_from(metadata);
277 self.operations_applied += 1;
278 }
279 }
280 IndexCommand::Rebuild => {
281 debug!("Applied Rebuild command");
282 self.operations_applied += 1;
283 }
284 IndexCommand::NoOp => {
285 }
287 }
288 }
289
290 fn len(&self) -> usize {
291 self.vectors.len()
292 }
293
294 fn get(&self, vector_id: &str) -> Option<&VectorEntry> {
295 self.vectors.get(vector_id)
296 }
297
298 fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
299 let mut similarities: Vec<(String, f32)> = self
300 .vectors
301 .iter()
302 .filter_map(|(id, entry)| {
303 if entry.vector.len() != query.len() {
304 return None;
305 }
306 let dot: f32 = entry
307 .vector
308 .iter()
309 .zip(query.iter())
310 .map(|(a, b)| a * b)
311 .sum();
312 let na: f32 = entry.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
313 let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
314 let sim = if na < 1e-9 || nb < 1e-9 {
315 0.0
316 } else {
317 dot / (na * nb)
318 };
319 Some((id.clone(), sim))
320 })
321 .collect();
322
323 similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
324 similarities.truncate(k);
325 similarities
326 }
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize, Default)]
331pub struct PersistentState {
332 pub current_term: Term,
334 pub voted_for: Option<NodeId>,
336 pub log: Vec<LogEntry>,
338}
339
340impl PersistentState {
341 fn last_log_index(&self) -> LogIndex {
342 self.log.last().map(|e| e.index).unwrap_or(0)
343 }
344
345 fn last_log_term(&self) -> Term {
346 self.log.last().map(|e| e.term).unwrap_or(0)
347 }
348
349 fn get_entry(&self, index: LogIndex) -> Option<&LogEntry> {
350 if index == 0 {
351 return None;
352 }
353 self.log.iter().find(|e| e.index == index)
355 }
356
357 fn truncate_from(&mut self, from_index: LogIndex) {
358 self.log.retain(|e| e.index < from_index);
359 }
360}
361
362#[derive(Debug)]
369pub struct RaftIndexNode {
370 config: RaftConfig,
371 persistent: Arc<RwLock<PersistentState>>,
373 role: Arc<Mutex<NodeRole>>,
375 current_leader: Arc<Mutex<Option<NodeId>>>,
377 commit_index: Arc<Mutex<LogIndex>>,
379 last_applied: Arc<Mutex<LogIndex>>,
381 next_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
383 match_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
385 state_machine: Arc<RwLock<IndexStateMachine>>,
387 votes_received: Arc<Mutex<HashMap<NodeId, bool>>>,
389 last_heartbeat: Arc<Mutex<Instant>>,
391 stats: Arc<Mutex<RaftStats>>,
393 elections_participated: Arc<Mutex<u64>>,
395 terms_as_leader: Arc<Mutex<u64>>,
397 rpcs_sent: Arc<Mutex<u64>>,
399 rpcs_received: Arc<Mutex<u64>>,
401}
402
403impl RaftIndexNode {
404 pub fn new(config: RaftConfig) -> Self {
406 let node_id = config.node_id;
407 let cluster_nodes: Vec<NodeId> = config.cluster_nodes.clone();
408
409 let next_index: HashMap<NodeId, LogIndex> = cluster_nodes
410 .iter()
411 .filter(|&&n| n != node_id)
412 .map(|&n| (n, 1))
413 .collect();
414
415 let match_index: HashMap<NodeId, LogIndex> = cluster_nodes
416 .iter()
417 .filter(|&&n| n != node_id)
418 .map(|&n| (n, 0))
419 .collect();
420
421 info!(
422 "Raft node {} initialized in cluster {:?}",
423 node_id, cluster_nodes
424 );
425
426 Self {
427 config,
428 persistent: Arc::new(RwLock::new(PersistentState::default())),
429 role: Arc::new(Mutex::new(NodeRole::Follower)),
430 current_leader: Arc::new(Mutex::new(None)),
431 commit_index: Arc::new(Mutex::new(0)),
432 last_applied: Arc::new(Mutex::new(0)),
433 next_index: Arc::new(Mutex::new(next_index)),
434 match_index: Arc::new(Mutex::new(match_index)),
435 state_machine: Arc::new(RwLock::new(IndexStateMachine::default())),
436 votes_received: Arc::new(Mutex::new(HashMap::new())),
437 last_heartbeat: Arc::new(Mutex::new(Instant::now())),
438 stats: Arc::new(Mutex::new(RaftStats::default())),
439 elections_participated: Arc::new(Mutex::new(0)),
440 terms_as_leader: Arc::new(Mutex::new(0)),
441 rpcs_sent: Arc::new(Mutex::new(0)),
442 rpcs_received: Arc::new(Mutex::new(0)),
443 }
444 }
445
446 pub fn start_election(&self) -> RequestVoteRequest {
448 let mut persistent = self.persistent.write();
449 persistent.current_term += 1;
450 let new_term = persistent.current_term;
451 persistent.voted_for = Some(self.config.node_id);
452
453 *self.role.lock() = NodeRole::Candidate;
454 let mut votes = self.votes_received.lock();
455 votes.clear();
456 votes.insert(self.config.node_id, true); *self.elections_participated.lock() += 1;
459
460 info!(
461 "Node {} starting election for term {}",
462 self.config.node_id, new_term
463 );
464
465 RequestVoteRequest {
466 term: new_term,
467 candidate_id: self.config.node_id,
468 last_log_index: persistent.last_log_index(),
469 last_log_term: persistent.last_log_term(),
470 }
471 }
472
473 pub fn handle_request_vote(&self, request: RequestVoteRequest) -> RequestVoteResponse {
475 *self.rpcs_received.lock() += 1;
476 let mut persistent = self.persistent.write();
477
478 if request.term > persistent.current_term {
480 persistent.current_term = request.term;
481 persistent.voted_for = None;
482 *self.role.lock() = NodeRole::Follower;
483 }
484
485 let vote_granted = if request.term < persistent.current_term {
486 false
488 } else {
489 let already_voted = persistent
490 .voted_for
491 .map(|v| v != request.candidate_id)
492 .unwrap_or(false);
493
494 if already_voted {
495 false
496 } else {
497 let our_last_index = persistent.last_log_index();
499 let our_last_term = persistent.last_log_term();
500
501 let log_ok = request.last_log_term > our_last_term
502 || (request.last_log_term == our_last_term
503 && request.last_log_index >= our_last_index);
504
505 if log_ok {
506 persistent.voted_for = Some(request.candidate_id);
507 *self.last_heartbeat.lock() = Instant::now();
508 true
509 } else {
510 false
511 }
512 }
513 };
514
515 debug!(
516 "Node {} {:?} vote to {} for term {}",
517 self.config.node_id,
518 if vote_granted { "grants" } else { "denies" },
519 request.candidate_id,
520 request.term
521 );
522
523 RequestVoteResponse {
524 term: persistent.current_term,
525 vote_granted,
526 node_id: self.config.node_id,
527 }
528 }
529
530 pub fn process_vote_response(&self, response: RequestVoteResponse) -> bool {
534 *self.rpcs_received.lock() += 1;
535 let persistent = self.persistent.read();
536
537 if response.term > persistent.current_term {
539 drop(persistent);
540 let mut p = self.persistent.write();
541 p.current_term = response.term;
542 p.voted_for = None;
543 *self.role.lock() = NodeRole::Follower;
544 return false;
545 }
546
547 if *self.role.lock() != NodeRole::Candidate {
549 return false;
550 }
551
552 if response.term != persistent.current_term {
553 return false;
554 }
555
556 if response.vote_granted {
557 let mut votes = self.votes_received.lock();
558 votes.insert(response.node_id, true);
559 let vote_count = votes.values().filter(|&&v| v).count();
560
561 if vote_count >= self.config.quorum_size() {
562 drop(votes);
564 drop(persistent);
565 self.become_leader();
566 return true;
567 }
568 }
569 false
570 }
571
572 fn become_leader(&self) {
574 let term = self.persistent.read().current_term;
575 *self.role.lock() = NodeRole::Leader;
576 *self.current_leader.lock() = Some(self.config.node_id);
577 *self.terms_as_leader.lock() += 1;
578
579 let last_log_index = self.persistent.read().last_log_index();
581 let mut next_idx = self.next_index.lock();
582 let mut match_idx = self.match_index.lock();
583
584 for &peer in &self.config.cluster_nodes {
585 if peer != self.config.node_id {
586 next_idx.insert(peer, last_log_index + 1);
587 match_idx.insert(peer, 0);
588 }
589 }
590
591 info!(
592 "Node {} became leader for term {}",
593 self.config.node_id, term
594 );
595
596 drop(next_idx);
598 drop(match_idx);
599 let _ = self.append_entry(IndexCommand::NoOp, None);
600 }
601
602 pub fn handle_append_entries(&self, request: AppendEntriesRequest) -> AppendEntriesResponse {
604 *self.rpcs_received.lock() += 1;
605 let mut persistent = self.persistent.write();
606
607 if request.term > persistent.current_term {
609 persistent.current_term = request.term;
610 persistent.voted_for = None;
611 *self.role.lock() = NodeRole::Follower;
612 }
613
614 if request.term < persistent.current_term {
616 return AppendEntriesResponse {
617 term: persistent.current_term,
618 success: false,
619 node_id: self.config.node_id,
620 conflict_index: None,
621 };
622 }
623
624 *self.last_heartbeat.lock() = Instant::now();
626 *self.current_leader.lock() = Some(request.leader_id);
627 *self.role.lock() = NodeRole::Follower;
628
629 if request.prev_log_index > 0 {
631 let entry = persistent.get_entry(request.prev_log_index);
632 match entry {
633 None => {
634 return AppendEntriesResponse {
636 term: persistent.current_term,
637 success: false,
638 node_id: self.config.node_id,
639 conflict_index: Some(persistent.last_log_index() + 1),
640 };
641 }
642 Some(e) if e.term != request.prev_log_term => {
643 let conflict_index = e.index;
645 return AppendEntriesResponse {
646 term: persistent.current_term,
647 success: false,
648 node_id: self.config.node_id,
649 conflict_index: Some(conflict_index),
650 };
651 }
652 _ => {}
653 }
654 }
655
656 for entry in &request.entries {
658 let existing = persistent.get_entry(entry.index).cloned();
659 match existing {
660 Some(e) if e.term != entry.term => {
661 persistent.truncate_from(entry.index);
663 persistent.log.push(entry.clone());
664 }
665 None => {
666 persistent.log.push(entry.clone());
667 }
668 _ => {} }
670 }
671
672 let prev_commit = *self.commit_index.lock();
674 if request.leader_commit > prev_commit {
675 let new_commit = request.leader_commit.min(persistent.last_log_index());
676 drop(persistent);
677 *self.commit_index.lock() = new_commit;
678 self.apply_committed_entries();
679 }
680
681 AppendEntriesResponse {
682 term: self.persistent.read().current_term,
683 success: true,
684 node_id: self.config.node_id,
685 conflict_index: None,
686 }
687 }
688
689 pub fn process_append_entries_response(
691 &self,
692 peer_id: NodeId,
693 response: AppendEntriesResponse,
694 entries_sent_count: usize,
695 ) {
696 *self.rpcs_received.lock() += 1;
697 let current_term = self.persistent.read().current_term;
698
699 if response.term > current_term {
700 let mut p = self.persistent.write();
701 p.current_term = response.term;
702 p.voted_for = None;
703 *self.role.lock() = NodeRole::Follower;
704 return;
705 }
706
707 if *self.role.lock() != NodeRole::Leader {
708 return;
709 }
710
711 if response.success {
712 let mut next_idx = self.next_index.lock();
713 let mut match_idx = self.match_index.lock();
714
715 let new_next =
716 next_idx.get(&peer_id).copied().unwrap_or(1) + entries_sent_count as LogIndex;
717
718 next_idx.insert(peer_id, new_next);
719 match_idx.insert(peer_id, new_next - 1);
720 drop(next_idx);
721 drop(match_idx);
722
723 self.try_advance_commit_index();
725 } else {
726 let mut next_idx = self.next_index.lock();
728 if let Some(conflict) = response.conflict_index {
729 next_idx.insert(peer_id, conflict);
730 } else {
731 let current = next_idx.get(&peer_id).copied().unwrap_or(1);
732 if current > 1 {
733 next_idx.insert(peer_id, current - 1);
734 }
735 }
736 }
737 }
738
739 fn try_advance_commit_index(&self) {
741 let persistent = self.persistent.read();
742 let current_term = persistent.current_term;
743 let last_log_index = persistent.last_log_index();
744 drop(persistent);
745
746 let match_idx = self.match_index.lock();
747 let mut commit = *self.commit_index.lock();
748
749 for n in (commit + 1)..=last_log_index {
750 let p = self.persistent.read();
751 let entry_term = p.get_entry(n).map(|e| e.term).unwrap_or(0);
752 drop(p);
753
754 if entry_term != current_term {
756 continue;
757 }
758
759 let replication_count = 1 + match_idx.values().filter(|&&m| m >= n).count();
762
763 if replication_count >= self.config.quorum_size() {
764 commit = n;
765 }
766 }
767 drop(match_idx);
768
769 let old_commit = *self.commit_index.lock();
770 if commit > old_commit {
771 *self.commit_index.lock() = commit;
772 self.apply_committed_entries();
773 }
774 }
775
776 fn apply_committed_entries(&self) {
778 let commit = *self.commit_index.lock();
779 let mut last = *self.last_applied.lock();
780
781 while last < commit {
782 last += 1;
783 let persistent = self.persistent.read();
784 let entry = persistent.get_entry(last).cloned();
785 drop(persistent);
786
787 if let Some(entry) = entry {
788 let mut sm = self.state_machine.write();
789 sm.apply(&entry.command);
790 debug!("Node {} applied log entry {}", self.config.node_id, last);
791 }
792 }
793
794 *self.last_applied.lock() = last;
795 }
796
797 pub fn propose(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
801 if *self.role.lock() != NodeRole::Leader {
802 let leader = self.current_leader.lock().map(|l| l.to_string());
803 return Err(anyhow!(
804 "Not the leader. Current leader: {:?}",
805 leader.unwrap_or_else(|| "unknown".to_string())
806 ));
807 }
808 self.append_entry(command, client_id)
809 }
810
811 fn append_entry(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
813 let mut persistent = self.persistent.write();
814 let term = persistent.current_term;
815 let index = persistent.last_log_index() + 1;
816
817 let entry = LogEntry {
818 index,
819 term,
820 command,
821 client_id,
822 };
823
824 persistent.log.push(entry);
825 info!(
826 "Node {} appended log entry {} in term {}",
827 self.config.node_id, index, term
828 );
829 Ok(index)
830 }
831
832 pub fn create_append_entries_request(&self, peer_id: NodeId) -> Result<AppendEntriesRequest> {
834 if *self.role.lock() != NodeRole::Leader {
835 return Err(anyhow!("Not the leader"));
836 }
837
838 let persistent = self.persistent.read();
839 let next_idx = self.next_index.lock();
840 let next = next_idx.get(&peer_id).copied().unwrap_or(1);
841
842 let prev_log_index = next.saturating_sub(1);
843 let prev_log_term = if prev_log_index > 0 {
844 persistent
845 .get_entry(prev_log_index)
846 .map(|e| e.term)
847 .unwrap_or(0)
848 } else {
849 0
850 };
851
852 let entries: Vec<LogEntry> = persistent
853 .log
854 .iter()
855 .filter(|e| e.index >= next)
856 .take(self.config.max_entries_per_batch)
857 .cloned()
858 .collect();
859
860 let commit = *self.commit_index.lock();
861
862 *self.rpcs_sent.lock() += 1;
863
864 Ok(AppendEntriesRequest {
865 term: persistent.current_term,
866 leader_id: self.config.node_id,
867 prev_log_index,
868 prev_log_term,
869 entries,
870 leader_commit: commit,
871 })
872 }
873
874 pub fn force_commit_single_node(&self) {
878 if self.config.cluster_nodes.len() != 1 {
879 warn!("force_commit_single_node called on multi-node cluster");
880 return;
881 }
882 let last_index = self.persistent.read().last_log_index();
883 *self.commit_index.lock() = last_index;
884 self.apply_committed_entries();
885 }
886
887 pub fn role(&self) -> NodeRole {
889 *self.role.lock()
890 }
891
892 pub fn current_term(&self) -> Term {
894 self.persistent.read().current_term
895 }
896
897 pub fn current_leader(&self) -> Option<NodeId> {
899 *self.current_leader.lock()
900 }
901
902 pub fn is_leader(&self) -> bool {
904 *self.role.lock() == NodeRole::Leader
905 }
906
907 pub fn log_length(&self) -> usize {
909 self.persistent.read().log.len()
910 }
911
912 pub fn commit_index(&self) -> LogIndex {
914 *self.commit_index.lock()
915 }
916
917 pub fn last_applied(&self) -> LogIndex {
919 *self.last_applied.lock()
920 }
921
922 pub fn vector_count(&self) -> usize {
924 self.state_machine.read().len()
925 }
926
927 pub fn get_vector(&self, vector_id: &str) -> Option<VectorEntry> {
929 self.state_machine.read().get(vector_id).cloned()
930 }
931
932 pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
934 self.state_machine.read().search_similar(query, k)
935 }
936
937 pub fn get_stats(&self) -> RaftStats {
939 let persistent = self.persistent.read();
940 RaftStats {
941 current_term: persistent.current_term,
942 role: self.role().to_string(),
943 current_leader: *self.current_leader.lock(),
944 log_length: persistent.log.len(),
945 commit_index: *self.commit_index.lock(),
946 last_applied: *self.last_applied.lock(),
947 elections_participated: *self.elections_participated.lock(),
948 terms_as_leader: *self.terms_as_leader.lock(),
949 operations_applied: self.state_machine.read().operations_applied,
950 vector_count: self.state_machine.read().len(),
951 rpcs_sent: *self.rpcs_sent.lock(),
952 rpcs_received: *self.rpcs_received.lock(),
953 }
954 }
955
956 pub fn election_timeout_elapsed(&self) -> bool {
958 let elapsed = self.last_heartbeat.lock().elapsed();
959 elapsed > Duration::from_millis(self.config.election_timeout_max_ms)
960 }
961
962 pub fn reset_heartbeat(&self) {
964 *self.last_heartbeat.lock() = Instant::now();
965 }
966}
967
968pub struct ClusterSimulator {
970 pub nodes: Vec<RaftIndexNode>,
971}
972
973impl ClusterSimulator {
974 pub fn new(n: usize) -> Result<Self> {
976 let cluster_nodes: Vec<NodeId> = (1..=(n as NodeId)).collect();
977
978 let nodes = cluster_nodes
979 .iter()
980 .map(|&id| {
981 let config = RaftConfig {
982 node_id: id,
983 cluster_nodes: cluster_nodes.clone(),
984 heartbeat_interval_ms: 50,
985 election_timeout_min_ms: 150,
986 election_timeout_max_ms: 300,
987 max_entries_per_batch: 10,
988 enable_snapshots: false,
989 snapshot_threshold: 1000,
990 max_rpc_retries: 2,
991 };
992 RaftIndexNode::new(config)
993 })
994 .collect();
995
996 Ok(Self { nodes })
997 }
998
999 pub fn elect_leader(&self, leader_idx: usize) {
1001 let vote_request = self.nodes[leader_idx].start_election();
1003
1004 let mut all_won = false;
1006 for (i, node) in self.nodes.iter().enumerate() {
1007 if i == leader_idx {
1008 continue;
1009 }
1010 let response = node.handle_request_vote(vote_request.clone());
1011 if self.nodes[leader_idx].process_vote_response(response) {
1012 all_won = true;
1013 }
1014 }
1015
1016 if all_won || self.nodes[leader_idx].is_leader() {
1018 for (i, node) in self.nodes.iter().enumerate() {
1019 if i == leader_idx {
1020 continue;
1021 }
1022 if let Ok(ae_req) =
1023 self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1024 {
1025 let response = node.handle_append_entries(ae_req.clone());
1026 self.nodes[leader_idx].process_append_entries_response(
1027 node.config.node_id,
1028 response,
1029 ae_req.entries.len(),
1030 );
1031 }
1032 }
1033 }
1034 }
1035
1036 pub fn replicate_all(&self) -> Result<()> {
1038 let leader_idx = self
1039 .nodes
1040 .iter()
1041 .position(|n| n.is_leader())
1042 .ok_or_else(|| anyhow!("No leader elected"))?;
1043
1044 for (i, node) in self.nodes.iter().enumerate() {
1045 if i == leader_idx {
1046 continue;
1047 }
1048 if let Ok(ae_req) =
1049 self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1050 {
1051 let entries_len = ae_req.entries.len();
1052 let response = node.handle_append_entries(ae_req);
1053 self.nodes[leader_idx].process_append_entries_response(
1054 node.config.node_id,
1055 response,
1056 entries_len,
1057 );
1058 }
1059 }
1060 Ok(())
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067 use anyhow::Result;
1068
1069 fn make_vector_entry(id: &str, vec: Vec<f32>) -> VectorEntry {
1070 VectorEntry {
1071 vector_id: id.to_string(),
1072 vector: vec,
1073 metadata: HashMap::new(),
1074 inserted_at: 0,
1075 }
1076 }
1077
1078 #[test]
1079 fn test_raft_config_single_node() {
1080 let config = RaftConfig::single_node(1);
1081 assert_eq!(config.node_id, 1);
1082 assert_eq!(config.cluster_nodes, vec![1]);
1083 assert_eq!(config.quorum_size(), 1);
1084 }
1085
1086 #[test]
1087 fn test_raft_config_three_node() {
1088 let config = RaftConfig::three_node_cluster(1);
1089 assert_eq!(config.quorum_size(), 2);
1090 }
1091
1092 #[test]
1093 fn test_node_starts_as_follower() {
1094 let config = RaftConfig::single_node(1);
1095 let node = RaftIndexNode::new(config);
1096 assert_eq!(node.role(), NodeRole::Follower);
1097 assert_eq!(node.current_term(), 0);
1098 }
1099
1100 #[test]
1101 fn test_single_node_becomes_leader() {
1102 let config = RaftConfig::single_node(1);
1103 let node = RaftIndexNode::new(config);
1104
1105 let vote_req = node.start_election();
1106 assert_eq!(vote_req.term, 1);
1107 assert_eq!(node.current_term(), 1);
1108
1109 let won = node.process_vote_response(RequestVoteResponse {
1111 term: 1,
1112 vote_granted: true,
1113 node_id: 1,
1114 });
1115
1116 assert!(node.is_leader() || won);
1118 }
1119
1120 #[test]
1121 fn test_single_node_leader_force_commit() -> Result<()> {
1122 let config = RaftConfig::single_node(1);
1123 let node = RaftIndexNode::new(config);
1124
1125 node.start_election();
1127 let _ = node.process_vote_response(RequestVoteResponse {
1129 term: node.current_term(),
1130 vote_granted: true,
1131 node_id: 1,
1132 });
1133
1134 if !node.is_leader() {
1135 *node.role.lock() = NodeRole::Leader;
1137 *node.current_leader.lock() = Some(1);
1138 }
1139
1140 let entry = make_vector_entry("v1", vec![1.0, 2.0, 3.0]);
1141 node.propose(IndexCommand::Upsert(entry), None)?;
1142 node.force_commit_single_node();
1143
1144 assert_eq!(node.vector_count(), 1);
1145 assert!(node.get_vector("v1").is_some());
1146 Ok(())
1147 }
1148
1149 #[test]
1150 fn test_propose_fails_when_not_leader() {
1151 let config = RaftConfig::three_node_cluster(1);
1152 let node = RaftIndexNode::new(config);
1153 let result = node.propose(IndexCommand::NoOp, None);
1155 assert!(result.is_err(), "Should fail to propose when not leader");
1156 }
1157
1158 #[test]
1159 fn test_request_vote_grants_to_newer_term() {
1160 let config = RaftConfig::three_node_cluster(2);
1161 let voter = RaftIndexNode::new(config);
1162
1163 let req = RequestVoteRequest {
1164 term: 5,
1165 candidate_id: 1,
1166 last_log_index: 10,
1167 last_log_term: 5,
1168 };
1169
1170 let response = voter.handle_request_vote(req);
1171 assert!(response.vote_granted, "Should grant vote to higher term");
1172 assert_eq!(response.term, 5);
1173 }
1174
1175 #[test]
1176 fn test_request_vote_rejects_stale_term() {
1177 let config = RaftConfig::three_node_cluster(2);
1178 let voter = RaftIndexNode::new(config);
1179
1180 voter.persistent.write().current_term = 5;
1182
1183 let req = RequestVoteRequest {
1184 term: 3, candidate_id: 1,
1186 last_log_index: 0,
1187 last_log_term: 0,
1188 };
1189
1190 let response = voter.handle_request_vote(req);
1191 assert!(!response.vote_granted, "Should reject stale term vote");
1192 assert_eq!(response.term, 5);
1193 }
1194
1195 #[test]
1196 fn test_request_vote_rejects_duplicate_vote() {
1197 let config = RaftConfig::three_node_cluster(2);
1198 let voter = RaftIndexNode::new(config);
1199
1200 let req1 = RequestVoteRequest {
1201 term: 1,
1202 candidate_id: 1,
1203 last_log_index: 0,
1204 last_log_term: 0,
1205 };
1206
1207 let req2 = RequestVoteRequest {
1208 term: 1,
1209 candidate_id: 3, last_log_index: 0,
1211 last_log_term: 0,
1212 };
1213
1214 let r1 = voter.handle_request_vote(req1);
1215 assert!(r1.vote_granted, "First vote should be granted");
1216
1217 let r2 = voter.handle_request_vote(req2);
1218 assert!(
1219 !r2.vote_granted,
1220 "Duplicate vote in same term should be rejected"
1221 );
1222 }
1223
1224 #[test]
1225 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1226 fn test_append_entries_heartbeat() {
1227 let config = RaftConfig::three_node_cluster(2);
1228 let follower = RaftIndexNode::new(config);
1229
1230 let heartbeat = AppendEntriesRequest {
1231 term: 1,
1232 leader_id: 1,
1233 prev_log_index: 0,
1234 prev_log_term: 0,
1235 entries: vec![],
1236 leader_commit: 0,
1237 };
1238
1239 let response = follower.handle_append_entries(heartbeat);
1240 assert!(response.success, "Heartbeat should succeed");
1241 assert_eq!(follower.current_leader(), Some(1));
1242 }
1243
1244 #[test]
1245 fn test_append_entries_stale_term() {
1246 let config = RaftConfig::three_node_cluster(2);
1247 let follower = RaftIndexNode::new(config);
1248 follower.persistent.write().current_term = 5;
1249
1250 let request = AppendEntriesRequest {
1251 term: 3, leader_id: 1,
1253 prev_log_index: 0,
1254 prev_log_term: 0,
1255 entries: vec![],
1256 leader_commit: 0,
1257 };
1258
1259 let response = follower.handle_append_entries(request);
1260 assert!(!response.success, "Stale term should be rejected");
1261 assert_eq!(response.term, 5);
1262 }
1263
1264 #[test]
1265 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1266 fn test_cluster_simulator_election() -> Result<()> {
1267 let sim = ClusterSimulator::new(3)?;
1268 sim.elect_leader(0);
1269
1270 let leaders: Vec<_> = sim.nodes.iter().filter(|n| n.is_leader()).collect();
1272 assert!(!leaders.is_empty(), "At least one node should be leader");
1273 Ok(())
1274 }
1275
1276 #[test]
1277 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1278 fn test_cluster_simulator_replication() -> Result<()> {
1279 let sim = ClusterSimulator::new(3)?;
1280 sim.elect_leader(0);
1281
1282 let leader_idx = sim
1283 .nodes
1284 .iter()
1285 .position(|n| n.is_leader())
1286 .expect("no leader found");
1287 let entry = make_vector_entry("v1", vec![1.0, 0.0, 0.0]);
1288 sim.nodes[leader_idx].propose(IndexCommand::Upsert(entry), None)?;
1289
1290 sim.replicate_all()?;
1291
1292 let leader = &sim.nodes[leader_idx];
1294 leader.force_commit_single_node();
1295 let vec = leader.get_vector("v1");
1297 assert!(vec.is_some() || leader.log_length() > 0);
1298 Ok(())
1299 }
1300
1301 #[test]
1302 fn test_delete_command() -> Result<()> {
1303 let config = RaftConfig::single_node(1);
1304 let node = RaftIndexNode::new(config);
1305
1306 node.start_election();
1308 let _ = node.process_vote_response(RequestVoteResponse {
1309 term: node.current_term(),
1310 vote_granted: true,
1311 node_id: 1,
1312 });
1313
1314 if !node.is_leader() {
1315 *node.role.lock() = NodeRole::Leader;
1316 *node.current_leader.lock() = Some(1);
1317 }
1318
1319 let entry = make_vector_entry("v1", vec![1.0]);
1321 node.propose(IndexCommand::Upsert(entry), None)?;
1322 node.force_commit_single_node();
1323 assert_eq!(node.vector_count(), 1);
1324
1325 node.propose(
1326 IndexCommand::Delete {
1327 vector_id: "v1".to_string(),
1328 },
1329 None,
1330 )?;
1331 node.force_commit_single_node();
1332 assert_eq!(node.vector_count(), 0);
1333 Ok(())
1334 }
1335
1336 #[test]
1337 fn test_update_metadata_command() -> Result<()> {
1338 let config = RaftConfig::single_node(1);
1339 let node = RaftIndexNode::new(config);
1340
1341 *node.role.lock() = NodeRole::Leader;
1342 *node.current_leader.lock() = Some(1);
1343
1344 let entry = make_vector_entry("v1", vec![1.0, 2.0]);
1345 node.propose(IndexCommand::Upsert(entry), None)?;
1346
1347 let mut new_meta = HashMap::new();
1348 new_meta.insert("tag".to_string(), "important".to_string());
1349 node.propose(
1350 IndexCommand::UpdateMetadata {
1351 vector_id: "v1".to_string(),
1352 metadata: new_meta,
1353 },
1354 None,
1355 )?;
1356 node.force_commit_single_node();
1357
1358 let stored = node.get_vector("v1").expect("v1 not found");
1359 assert_eq!(stored.metadata.get("tag"), Some(&"important".to_string()));
1360 Ok(())
1361 }
1362
1363 #[test]
1364 fn test_search_similar() -> Result<()> {
1365 let config = RaftConfig::single_node(1);
1366 let node = RaftIndexNode::new(config);
1367
1368 *node.role.lock() = NodeRole::Leader;
1369 *node.current_leader.lock() = Some(1);
1370
1371 node.propose(
1372 IndexCommand::Upsert(make_vector_entry("v1", vec![1.0, 0.0, 0.0])),
1373 None,
1374 )?;
1375 node.propose(
1376 IndexCommand::Upsert(make_vector_entry("v2", vec![0.0, 1.0, 0.0])),
1377 None,
1378 )?;
1379 node.propose(
1380 IndexCommand::Upsert(make_vector_entry("v3", vec![0.0, 0.0, 1.0])),
1381 None,
1382 )?;
1383 node.force_commit_single_node();
1384
1385 let results = node.search_similar(&[1.0, 0.0, 0.0], 2);
1386 assert!(!results.is_empty());
1387 assert_eq!(results[0].0, "v1");
1389 assert!((results[0].1 - 1.0).abs() < 1e-5);
1390 Ok(())
1391 }
1392
1393 #[test]
1394 fn test_stats_populated() -> Result<()> {
1395 let config = RaftConfig::single_node(1);
1396 let node = RaftIndexNode::new(config);
1397
1398 *node.role.lock() = NodeRole::Leader;
1399 *node.current_leader.lock() = Some(1);
1400 node.propose(IndexCommand::NoOp, None)?;
1401 node.force_commit_single_node();
1402
1403 let stats = node.get_stats();
1404 assert_eq!(stats.role, "Leader");
1405 assert!(stats.log_length > 0);
1406 Ok(())
1407 }
1408
1409 #[test]
1410 fn test_raft_log_length_increases() -> Result<()> {
1411 let config = RaftConfig::single_node(1);
1412 let node = RaftIndexNode::new(config);
1413
1414 *node.role.lock() = NodeRole::Leader;
1415 *node.current_leader.lock() = Some(1);
1416
1417 assert_eq!(node.log_length(), 0);
1418
1419 node.propose(IndexCommand::NoOp, None)?;
1420 assert_eq!(node.log_length(), 1);
1421
1422 node.propose(IndexCommand::Rebuild, None)?;
1423 assert_eq!(node.log_length(), 2);
1424 Ok(())
1425 }
1426
1427 #[test]
1428 fn test_persistent_state_default() {
1429 let state = PersistentState::default();
1430 assert_eq!(state.current_term, 0);
1431 assert!(state.voted_for.is_none());
1432 assert!(state.log.is_empty());
1433 assert_eq!(state.last_log_index(), 0);
1434 assert_eq!(state.last_log_term(), 0);
1435 }
1436
1437 #[test]
1438 fn test_node_role_display() {
1439 assert_eq!(NodeRole::Follower.to_string(), "Follower");
1440 assert_eq!(NodeRole::Candidate.to_string(), "Candidate");
1441 assert_eq!(NodeRole::Leader.to_string(), "Leader");
1442 }
1443
1444 #[test]
1445 fn test_election_timeout_not_elapsed_immediately() {
1446 let config = RaftConfig::single_node(1);
1447 let node = RaftIndexNode::new(config);
1448 assert!(!node.election_timeout_elapsed());
1450 }
1451
1452 #[test]
1453 fn test_reset_heartbeat() {
1454 let config = RaftConfig::single_node(1);
1455 let node = RaftIndexNode::new(config);
1456 node.reset_heartbeat();
1458 assert!(!node.election_timeout_elapsed());
1459 }
1460
1461 #[test]
1462 fn test_append_entries_appends_new_log_entries() {
1463 let config = RaftConfig::three_node_cluster(2);
1464 let follower = RaftIndexNode::new(config);
1465
1466 let entry = LogEntry {
1467 index: 1,
1468 term: 1,
1469 command: IndexCommand::NoOp,
1470 client_id: None,
1471 };
1472
1473 let request = AppendEntriesRequest {
1474 term: 1,
1475 leader_id: 1,
1476 prev_log_index: 0,
1477 prev_log_term: 0,
1478 entries: vec![entry],
1479 leader_commit: 1,
1480 };
1481
1482 let response = follower.handle_append_entries(request);
1483 assert!(response.success);
1484 assert_eq!(follower.log_length(), 1);
1485 }
1486
1487 #[test]
1488 fn test_commit_advances_last_applied() {
1489 let config = RaftConfig::three_node_cluster(2);
1490 let follower = RaftIndexNode::new(config);
1491
1492 let entry = LogEntry {
1493 index: 1,
1494 term: 1,
1495 command: IndexCommand::Upsert(make_vector_entry("v1", vec![1.0])),
1496 client_id: None,
1497 };
1498
1499 let request = AppendEntriesRequest {
1500 term: 1,
1501 leader_id: 1,
1502 prev_log_index: 0,
1503 prev_log_term: 0,
1504 entries: vec![entry],
1505 leader_commit: 1, };
1507
1508 follower.handle_append_entries(request);
1509
1510 assert_eq!(follower.last_applied(), 1);
1511 assert_eq!(follower.vector_count(), 1);
1512 }
1513}