1use anyhow::{anyhow, Result};
22use parking_lot::{Mutex, RwLock};
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::sync::Arc;
26use std::time::{Duration, Instant};
27use tracing::{debug, info, warn};
28
29pub type NodeId = u64;
31
32pub type LogIndex = u64;
34
35pub type Term = u64;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40pub struct VectorEntry {
41 pub vector_id: String,
43 pub vector: Vec<f32>,
45 pub metadata: HashMap<String, String>,
47 pub inserted_at: u64,
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub enum IndexCommand {
54 Upsert(VectorEntry),
56 Delete { vector_id: String },
58 Rebuild,
60 UpdateMetadata {
62 vector_id: String,
63 metadata: HashMap<String, String>,
64 },
65 NoOp,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct LogEntry {
72 pub index: LogIndex,
74 pub term: Term,
76 pub command: IndexCommand,
78 pub client_id: Option<String>,
80}
81
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
84pub enum NodeRole {
85 Follower,
86 Candidate,
87 Leader,
88}
89
90impl std::fmt::Display for NodeRole {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 Self::Follower => write!(f, "Follower"),
94 Self::Candidate => write!(f, "Candidate"),
95 Self::Leader => write!(f, "Leader"),
96 }
97 }
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct AppendEntriesRequest {
103 pub term: Term,
105 pub leader_id: NodeId,
107 pub prev_log_index: LogIndex,
109 pub prev_log_term: Term,
111 pub entries: Vec<LogEntry>,
113 pub leader_commit: LogIndex,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct AppendEntriesResponse {
120 pub term: Term,
122 pub success: bool,
124 pub node_id: NodeId,
126 pub conflict_index: Option<LogIndex>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct RequestVoteRequest {
133 pub term: Term,
135 pub candidate_id: NodeId,
137 pub last_log_index: LogIndex,
139 pub last_log_term: Term,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub struct RequestVoteResponse {
146 pub term: Term,
148 pub vote_granted: bool,
150 pub node_id: NodeId,
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize)]
156pub struct RaftConfig {
157 pub node_id: NodeId,
159 pub cluster_nodes: Vec<NodeId>,
161 pub heartbeat_interval_ms: u64,
163 pub election_timeout_min_ms: u64,
165 pub election_timeout_max_ms: u64,
166 pub max_entries_per_batch: usize,
168 pub enable_snapshots: bool,
170 pub snapshot_threshold: usize,
172 pub max_rpc_retries: usize,
174}
175
176impl RaftConfig {
177 pub fn single_node(node_id: NodeId) -> Self {
179 Self {
180 node_id,
181 cluster_nodes: vec![node_id],
182 heartbeat_interval_ms: 150,
183 election_timeout_min_ms: 300,
184 election_timeout_max_ms: 600,
185 max_entries_per_batch: 100,
186 enable_snapshots: true,
187 snapshot_threshold: 10_000,
188 max_rpc_retries: 3,
189 }
190 }
191
192 pub fn three_node_cluster(node_id: NodeId) -> Self {
194 Self {
195 node_id,
196 cluster_nodes: vec![1, 2, 3],
197 heartbeat_interval_ms: 150,
198 election_timeout_min_ms: 300,
199 election_timeout_max_ms: 600,
200 max_entries_per_batch: 100,
201 enable_snapshots: true,
202 snapshot_threshold: 10_000,
203 max_rpc_retries: 3,
204 }
205 }
206
207 pub fn quorum_size(&self) -> usize {
209 self.cluster_nodes.len() / 2 + 1
210 }
211}
212
213impl Default for RaftConfig {
214 fn default() -> Self {
215 Self::single_node(1)
216 }
217}
218
219#[derive(Debug, Clone, Default, Serialize, Deserialize)]
221pub struct RaftStats {
222 pub current_term: Term,
224 pub role: String,
226 pub current_leader: Option<NodeId>,
228 pub log_length: usize,
230 pub commit_index: LogIndex,
232 pub last_applied: LogIndex,
234 pub elections_participated: u64,
236 pub terms_as_leader: u64,
238 pub operations_applied: u64,
240 pub vector_count: usize,
242 pub rpcs_sent: u64,
244 pub rpcs_received: u64,
246}
247
248#[derive(Debug, Default)]
250struct IndexStateMachine {
251 vectors: HashMap<String, VectorEntry>,
253 operations_applied: u64,
255}
256
257impl IndexStateMachine {
258 fn apply(&mut self, command: &IndexCommand) {
260 match command {
261 IndexCommand::Upsert(entry) => {
262 self.vectors.insert(entry.vector_id.clone(), entry.clone());
263 self.operations_applied += 1;
264 debug!("Applied Upsert for vector '{}'", entry.vector_id);
265 }
266 IndexCommand::Delete { vector_id } => {
267 self.vectors.remove(vector_id);
268 self.operations_applied += 1;
269 debug!("Applied Delete for vector '{}'", vector_id);
270 }
271 IndexCommand::UpdateMetadata {
272 vector_id,
273 metadata,
274 } => {
275 if let Some(entry) = self.vectors.get_mut(vector_id) {
276 entry.metadata.clone_from(metadata);
277 self.operations_applied += 1;
278 }
279 }
280 IndexCommand::Rebuild => {
281 debug!("Applied Rebuild command");
282 self.operations_applied += 1;
283 }
284 IndexCommand::NoOp => {
285 }
287 }
288 }
289
290 fn len(&self) -> usize {
291 self.vectors.len()
292 }
293
294 fn get(&self, vector_id: &str) -> Option<&VectorEntry> {
295 self.vectors.get(vector_id)
296 }
297
298 fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
299 let mut similarities: Vec<(String, f32)> = self
300 .vectors
301 .iter()
302 .filter_map(|(id, entry)| {
303 if entry.vector.len() != query.len() {
304 return None;
305 }
306 let dot: f32 = entry
307 .vector
308 .iter()
309 .zip(query.iter())
310 .map(|(a, b)| a * b)
311 .sum();
312 let na: f32 = entry.vector.iter().map(|x| x * x).sum::<f32>().sqrt();
313 let nb: f32 = query.iter().map(|x| x * x).sum::<f32>().sqrt();
314 let sim = if na < 1e-9 || nb < 1e-9 {
315 0.0
316 } else {
317 dot / (na * nb)
318 };
319 Some((id.clone(), sim))
320 })
321 .collect();
322
323 similarities.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
324 similarities.truncate(k);
325 similarities
326 }
327}
328
329#[derive(Debug, Clone, Serialize, Deserialize, Default)]
331pub struct PersistentState {
332 pub current_term: Term,
334 pub voted_for: Option<NodeId>,
336 pub log: Vec<LogEntry>,
338}
339
340impl PersistentState {
341 fn last_log_index(&self) -> LogIndex {
342 self.log.last().map(|e| e.index).unwrap_or(0)
343 }
344
345 fn last_log_term(&self) -> Term {
346 self.log.last().map(|e| e.term).unwrap_or(0)
347 }
348
349 fn get_entry(&self, index: LogIndex) -> Option<&LogEntry> {
350 if index == 0 {
351 return None;
352 }
353 self.log.iter().find(|e| e.index == index)
355 }
356
357 fn truncate_from(&mut self, from_index: LogIndex) {
358 self.log.retain(|e| e.index < from_index);
359 }
360}
361
362#[derive(Debug)]
369pub struct RaftIndexNode {
370 config: RaftConfig,
371 persistent: Arc<RwLock<PersistentState>>,
373 role: Arc<Mutex<NodeRole>>,
375 current_leader: Arc<Mutex<Option<NodeId>>>,
377 commit_index: Arc<Mutex<LogIndex>>,
379 last_applied: Arc<Mutex<LogIndex>>,
381 next_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
383 match_index: Arc<Mutex<HashMap<NodeId, LogIndex>>>,
385 state_machine: Arc<RwLock<IndexStateMachine>>,
387 votes_received: Arc<Mutex<HashMap<NodeId, bool>>>,
389 last_heartbeat: Arc<Mutex<Instant>>,
391 stats: Arc<Mutex<RaftStats>>,
393 elections_participated: Arc<Mutex<u64>>,
395 terms_as_leader: Arc<Mutex<u64>>,
397 rpcs_sent: Arc<Mutex<u64>>,
399 rpcs_received: Arc<Mutex<u64>>,
401}
402
403impl RaftIndexNode {
404 pub fn new(config: RaftConfig) -> Self {
406 let node_id = config.node_id;
407 let cluster_nodes: Vec<NodeId> = config.cluster_nodes.clone();
408
409 let next_index: HashMap<NodeId, LogIndex> = cluster_nodes
410 .iter()
411 .filter(|&&n| n != node_id)
412 .map(|&n| (n, 1))
413 .collect();
414
415 let match_index: HashMap<NodeId, LogIndex> = cluster_nodes
416 .iter()
417 .filter(|&&n| n != node_id)
418 .map(|&n| (n, 0))
419 .collect();
420
421 info!(
422 "Raft node {} initialized in cluster {:?}",
423 node_id, cluster_nodes
424 );
425
426 Self {
427 config,
428 persistent: Arc::new(RwLock::new(PersistentState::default())),
429 role: Arc::new(Mutex::new(NodeRole::Follower)),
430 current_leader: Arc::new(Mutex::new(None)),
431 commit_index: Arc::new(Mutex::new(0)),
432 last_applied: Arc::new(Mutex::new(0)),
433 next_index: Arc::new(Mutex::new(next_index)),
434 match_index: Arc::new(Mutex::new(match_index)),
435 state_machine: Arc::new(RwLock::new(IndexStateMachine::default())),
436 votes_received: Arc::new(Mutex::new(HashMap::new())),
437 last_heartbeat: Arc::new(Mutex::new(Instant::now())),
438 stats: Arc::new(Mutex::new(RaftStats::default())),
439 elections_participated: Arc::new(Mutex::new(0)),
440 terms_as_leader: Arc::new(Mutex::new(0)),
441 rpcs_sent: Arc::new(Mutex::new(0)),
442 rpcs_received: Arc::new(Mutex::new(0)),
443 }
444 }
445
446 pub fn start_election(&self) -> RequestVoteRequest {
448 let mut persistent = self.persistent.write();
449 persistent.current_term += 1;
450 let new_term = persistent.current_term;
451 persistent.voted_for = Some(self.config.node_id);
452
453 *self.role.lock() = NodeRole::Candidate;
454 let mut votes = self.votes_received.lock();
455 votes.clear();
456 votes.insert(self.config.node_id, true); *self.elections_participated.lock() += 1;
459
460 info!(
461 "Node {} starting election for term {}",
462 self.config.node_id, new_term
463 );
464
465 RequestVoteRequest {
466 term: new_term,
467 candidate_id: self.config.node_id,
468 last_log_index: persistent.last_log_index(),
469 last_log_term: persistent.last_log_term(),
470 }
471 }
472
473 pub fn handle_request_vote(&self, request: RequestVoteRequest) -> RequestVoteResponse {
475 *self.rpcs_received.lock() += 1;
476 let mut persistent = self.persistent.write();
477
478 if request.term > persistent.current_term {
480 persistent.current_term = request.term;
481 persistent.voted_for = None;
482 *self.role.lock() = NodeRole::Follower;
483 }
484
485 let vote_granted = if request.term < persistent.current_term {
486 false
488 } else {
489 let already_voted = persistent
490 .voted_for
491 .map(|v| v != request.candidate_id)
492 .unwrap_or(false);
493
494 if already_voted {
495 false
496 } else {
497 let our_last_index = persistent.last_log_index();
499 let our_last_term = persistent.last_log_term();
500
501 let log_ok = request.last_log_term > our_last_term
502 || (request.last_log_term == our_last_term
503 && request.last_log_index >= our_last_index);
504
505 if log_ok {
506 persistent.voted_for = Some(request.candidate_id);
507 *self.last_heartbeat.lock() = Instant::now();
508 true
509 } else {
510 false
511 }
512 }
513 };
514
515 debug!(
516 "Node {} {:?} vote to {} for term {}",
517 self.config.node_id,
518 if vote_granted { "grants" } else { "denies" },
519 request.candidate_id,
520 request.term
521 );
522
523 RequestVoteResponse {
524 term: persistent.current_term,
525 vote_granted,
526 node_id: self.config.node_id,
527 }
528 }
529
530 pub fn process_vote_response(&self, response: RequestVoteResponse) -> bool {
534 *self.rpcs_received.lock() += 1;
535 let persistent = self.persistent.read();
536
537 if response.term > persistent.current_term {
539 drop(persistent);
540 let mut p = self.persistent.write();
541 p.current_term = response.term;
542 p.voted_for = None;
543 *self.role.lock() = NodeRole::Follower;
544 return false;
545 }
546
547 if *self.role.lock() != NodeRole::Candidate {
549 return false;
550 }
551
552 if response.term != persistent.current_term {
553 return false;
554 }
555
556 if response.vote_granted {
557 let mut votes = self.votes_received.lock();
558 votes.insert(response.node_id, true);
559 let vote_count = votes.values().filter(|&&v| v).count();
560
561 if vote_count >= self.config.quorum_size() {
562 drop(votes);
564 drop(persistent);
565 self.become_leader();
566 return true;
567 }
568 }
569 false
570 }
571
572 fn become_leader(&self) {
574 let term = self.persistent.read().current_term;
575 *self.role.lock() = NodeRole::Leader;
576 *self.current_leader.lock() = Some(self.config.node_id);
577 *self.terms_as_leader.lock() += 1;
578
579 let last_log_index = self.persistent.read().last_log_index();
581 let mut next_idx = self.next_index.lock();
582 let mut match_idx = self.match_index.lock();
583
584 for &peer in &self.config.cluster_nodes {
585 if peer != self.config.node_id {
586 next_idx.insert(peer, last_log_index + 1);
587 match_idx.insert(peer, 0);
588 }
589 }
590
591 info!(
592 "Node {} became leader for term {}",
593 self.config.node_id, term
594 );
595
596 drop(next_idx);
598 drop(match_idx);
599 let _ = self.append_entry(IndexCommand::NoOp, None);
600 }
601
602 pub fn handle_append_entries(&self, request: AppendEntriesRequest) -> AppendEntriesResponse {
604 *self.rpcs_received.lock() += 1;
605 let mut persistent = self.persistent.write();
606
607 if request.term > persistent.current_term {
609 persistent.current_term = request.term;
610 persistent.voted_for = None;
611 *self.role.lock() = NodeRole::Follower;
612 }
613
614 if request.term < persistent.current_term {
616 return AppendEntriesResponse {
617 term: persistent.current_term,
618 success: false,
619 node_id: self.config.node_id,
620 conflict_index: None,
621 };
622 }
623
624 *self.last_heartbeat.lock() = Instant::now();
626 *self.current_leader.lock() = Some(request.leader_id);
627 *self.role.lock() = NodeRole::Follower;
628
629 if request.prev_log_index > 0 {
631 let entry = persistent.get_entry(request.prev_log_index);
632 match entry {
633 None => {
634 return AppendEntriesResponse {
636 term: persistent.current_term,
637 success: false,
638 node_id: self.config.node_id,
639 conflict_index: Some(persistent.last_log_index() + 1),
640 };
641 }
642 Some(e) if e.term != request.prev_log_term => {
643 let conflict_index = e.index;
645 return AppendEntriesResponse {
646 term: persistent.current_term,
647 success: false,
648 node_id: self.config.node_id,
649 conflict_index: Some(conflict_index),
650 };
651 }
652 _ => {}
653 }
654 }
655
656 for entry in &request.entries {
658 let existing = persistent.get_entry(entry.index).cloned();
659 match existing {
660 Some(e) if e.term != entry.term => {
661 persistent.truncate_from(entry.index);
663 persistent.log.push(entry.clone());
664 }
665 None => {
666 persistent.log.push(entry.clone());
667 }
668 _ => {} }
670 }
671
672 let prev_commit = *self.commit_index.lock();
674 if request.leader_commit > prev_commit {
675 let new_commit = request.leader_commit.min(persistent.last_log_index());
676 drop(persistent);
677 *self.commit_index.lock() = new_commit;
678 self.apply_committed_entries();
679 }
680
681 AppendEntriesResponse {
682 term: self.persistent.read().current_term,
683 success: true,
684 node_id: self.config.node_id,
685 conflict_index: None,
686 }
687 }
688
689 pub fn process_append_entries_response(
691 &self,
692 peer_id: NodeId,
693 response: AppendEntriesResponse,
694 entries_sent_count: usize,
695 ) {
696 *self.rpcs_received.lock() += 1;
697 let current_term = self.persistent.read().current_term;
698
699 if response.term > current_term {
700 let mut p = self.persistent.write();
701 p.current_term = response.term;
702 p.voted_for = None;
703 *self.role.lock() = NodeRole::Follower;
704 return;
705 }
706
707 if *self.role.lock() != NodeRole::Leader {
708 return;
709 }
710
711 if response.success {
712 let mut next_idx = self.next_index.lock();
713 let mut match_idx = self.match_index.lock();
714
715 let new_next =
716 next_idx.get(&peer_id).copied().unwrap_or(1) + entries_sent_count as LogIndex;
717
718 next_idx.insert(peer_id, new_next);
719 match_idx.insert(peer_id, new_next - 1);
720 drop(next_idx);
721 drop(match_idx);
722
723 self.try_advance_commit_index();
725 } else {
726 let mut next_idx = self.next_index.lock();
728 if let Some(conflict) = response.conflict_index {
729 next_idx.insert(peer_id, conflict);
730 } else {
731 let current = next_idx.get(&peer_id).copied().unwrap_or(1);
732 if current > 1 {
733 next_idx.insert(peer_id, current - 1);
734 }
735 }
736 }
737 }
738
739 fn try_advance_commit_index(&self) {
741 let persistent = self.persistent.read();
742 let current_term = persistent.current_term;
743 let last_log_index = persistent.last_log_index();
744 drop(persistent);
745
746 let match_idx = self.match_index.lock();
747 let mut commit = *self.commit_index.lock();
748
749 for n in (commit + 1)..=last_log_index {
750 let p = self.persistent.read();
751 let entry_term = p.get_entry(n).map(|e| e.term).unwrap_or(0);
752 drop(p);
753
754 if entry_term != current_term {
756 continue;
757 }
758
759 let replication_count = 1 + match_idx.values().filter(|&&m| m >= n).count();
762
763 if replication_count >= self.config.quorum_size() {
764 commit = n;
765 }
766 }
767 drop(match_idx);
768
769 let old_commit = *self.commit_index.lock();
770 if commit > old_commit {
771 *self.commit_index.lock() = commit;
772 self.apply_committed_entries();
773 }
774 }
775
776 fn apply_committed_entries(&self) {
778 let commit = *self.commit_index.lock();
779 let mut last = *self.last_applied.lock();
780
781 while last < commit {
782 last += 1;
783 let persistent = self.persistent.read();
784 let entry = persistent.get_entry(last).cloned();
785 drop(persistent);
786
787 if let Some(entry) = entry {
788 let mut sm = self.state_machine.write();
789 sm.apply(&entry.command);
790 debug!("Node {} applied log entry {}", self.config.node_id, last);
791 }
792 }
793
794 *self.last_applied.lock() = last;
795 }
796
797 pub fn propose(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
801 if *self.role.lock() != NodeRole::Leader {
802 let leader = self.current_leader.lock().map(|l| l.to_string());
803 return Err(anyhow!(
804 "Not the leader. Current leader: {:?}",
805 leader.unwrap_or_else(|| "unknown".to_string())
806 ));
807 }
808 self.append_entry(command, client_id)
809 }
810
811 fn append_entry(&self, command: IndexCommand, client_id: Option<String>) -> Result<LogIndex> {
813 let mut persistent = self.persistent.write();
814 let term = persistent.current_term;
815 let index = persistent.last_log_index() + 1;
816
817 let entry = LogEntry {
818 index,
819 term,
820 command,
821 client_id,
822 };
823
824 persistent.log.push(entry);
825 info!(
826 "Node {} appended log entry {} in term {}",
827 self.config.node_id, index, term
828 );
829 Ok(index)
830 }
831
832 pub fn create_append_entries_request(&self, peer_id: NodeId) -> Result<AppendEntriesRequest> {
834 if *self.role.lock() != NodeRole::Leader {
835 return Err(anyhow!("Not the leader"));
836 }
837
838 let persistent = self.persistent.read();
839 let next_idx = self.next_index.lock();
840 let next = next_idx.get(&peer_id).copied().unwrap_or(1);
841
842 let prev_log_index = next.saturating_sub(1);
843 let prev_log_term = if prev_log_index > 0 {
844 persistent
845 .get_entry(prev_log_index)
846 .map(|e| e.term)
847 .unwrap_or(0)
848 } else {
849 0
850 };
851
852 let entries: Vec<LogEntry> = persistent
853 .log
854 .iter()
855 .filter(|e| e.index >= next)
856 .take(self.config.max_entries_per_batch)
857 .cloned()
858 .collect();
859
860 let commit = *self.commit_index.lock();
861
862 *self.rpcs_sent.lock() += 1;
863
864 Ok(AppendEntriesRequest {
865 term: persistent.current_term,
866 leader_id: self.config.node_id,
867 prev_log_index,
868 prev_log_term,
869 entries,
870 leader_commit: commit,
871 })
872 }
873
874 pub fn force_commit_single_node(&self) {
878 if self.config.cluster_nodes.len() != 1 {
879 warn!("force_commit_single_node called on multi-node cluster");
880 return;
881 }
882 let last_index = self.persistent.read().last_log_index();
883 *self.commit_index.lock() = last_index;
884 self.apply_committed_entries();
885 }
886
887 pub fn role(&self) -> NodeRole {
889 *self.role.lock()
890 }
891
892 pub fn current_term(&self) -> Term {
894 self.persistent.read().current_term
895 }
896
897 pub fn current_leader(&self) -> Option<NodeId> {
899 *self.current_leader.lock()
900 }
901
902 pub fn is_leader(&self) -> bool {
904 *self.role.lock() == NodeRole::Leader
905 }
906
907 pub fn log_length(&self) -> usize {
909 self.persistent.read().log.len()
910 }
911
912 pub fn commit_index(&self) -> LogIndex {
914 *self.commit_index.lock()
915 }
916
917 pub fn last_applied(&self) -> LogIndex {
919 *self.last_applied.lock()
920 }
921
922 pub fn vector_count(&self) -> usize {
924 self.state_machine.read().len()
925 }
926
927 pub fn get_vector(&self, vector_id: &str) -> Option<VectorEntry> {
929 self.state_machine.read().get(vector_id).cloned()
930 }
931
932 pub fn search_similar(&self, query: &[f32], k: usize) -> Vec<(String, f32)> {
934 self.state_machine.read().search_similar(query, k)
935 }
936
937 pub fn get_stats(&self) -> RaftStats {
939 let persistent = self.persistent.read();
940 RaftStats {
941 current_term: persistent.current_term,
942 role: self.role().to_string(),
943 current_leader: *self.current_leader.lock(),
944 log_length: persistent.log.len(),
945 commit_index: *self.commit_index.lock(),
946 last_applied: *self.last_applied.lock(),
947 elections_participated: *self.elections_participated.lock(),
948 terms_as_leader: *self.terms_as_leader.lock(),
949 operations_applied: self.state_machine.read().operations_applied,
950 vector_count: self.state_machine.read().len(),
951 rpcs_sent: *self.rpcs_sent.lock(),
952 rpcs_received: *self.rpcs_received.lock(),
953 }
954 }
955
956 pub fn election_timeout_elapsed(&self) -> bool {
958 let elapsed = self.last_heartbeat.lock().elapsed();
959 elapsed > Duration::from_millis(self.config.election_timeout_max_ms)
960 }
961
962 pub fn reset_heartbeat(&self) {
964 *self.last_heartbeat.lock() = Instant::now();
965 }
966}
967
968pub struct ClusterSimulator {
970 pub nodes: Vec<RaftIndexNode>,
971}
972
973impl ClusterSimulator {
974 pub fn new(n: usize) -> Result<Self> {
976 let cluster_nodes: Vec<NodeId> = (1..=(n as NodeId)).collect();
977
978 let nodes = cluster_nodes
979 .iter()
980 .map(|&id| {
981 let config = RaftConfig {
982 node_id: id,
983 cluster_nodes: cluster_nodes.clone(),
984 heartbeat_interval_ms: 50,
985 election_timeout_min_ms: 150,
986 election_timeout_max_ms: 300,
987 max_entries_per_batch: 10,
988 enable_snapshots: false,
989 snapshot_threshold: 1000,
990 max_rpc_retries: 2,
991 };
992 RaftIndexNode::new(config)
993 })
994 .collect();
995
996 Ok(Self { nodes })
997 }
998
999 pub fn elect_leader(&self, leader_idx: usize) {
1001 let vote_request = self.nodes[leader_idx].start_election();
1003
1004 let mut all_won = false;
1006 for (i, node) in self.nodes.iter().enumerate() {
1007 if i == leader_idx {
1008 continue;
1009 }
1010 let response = node.handle_request_vote(vote_request.clone());
1011 if self.nodes[leader_idx].process_vote_response(response) {
1012 all_won = true;
1013 }
1014 }
1015
1016 if all_won || self.nodes[leader_idx].is_leader() {
1018 for (i, node) in self.nodes.iter().enumerate() {
1019 if i == leader_idx {
1020 continue;
1021 }
1022 if let Ok(ae_req) =
1023 self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1024 {
1025 let response = node.handle_append_entries(ae_req.clone());
1026 self.nodes[leader_idx].process_append_entries_response(
1027 node.config.node_id,
1028 response,
1029 ae_req.entries.len(),
1030 );
1031 }
1032 }
1033 }
1034 }
1035
1036 pub fn replicate_all(&self) -> Result<()> {
1038 let leader_idx = self
1039 .nodes
1040 .iter()
1041 .position(|n| n.is_leader())
1042 .ok_or_else(|| anyhow!("No leader elected"))?;
1043
1044 for (i, node) in self.nodes.iter().enumerate() {
1045 if i == leader_idx {
1046 continue;
1047 }
1048 if let Ok(ae_req) =
1049 self.nodes[leader_idx].create_append_entries_request(node.config.node_id)
1050 {
1051 let entries_len = ae_req.entries.len();
1052 let response = node.handle_append_entries(ae_req);
1053 self.nodes[leader_idx].process_append_entries_response(
1054 node.config.node_id,
1055 response,
1056 entries_len,
1057 );
1058 }
1059 }
1060 Ok(())
1061 }
1062}
1063
1064#[cfg(test)]
1065mod tests {
1066 use super::*;
1067
1068 fn make_vector_entry(id: &str, vec: Vec<f32>) -> VectorEntry {
1069 VectorEntry {
1070 vector_id: id.to_string(),
1071 vector: vec,
1072 metadata: HashMap::new(),
1073 inserted_at: 0,
1074 }
1075 }
1076
1077 #[test]
1078 fn test_raft_config_single_node() {
1079 let config = RaftConfig::single_node(1);
1080 assert_eq!(config.node_id, 1);
1081 assert_eq!(config.cluster_nodes, vec![1]);
1082 assert_eq!(config.quorum_size(), 1);
1083 }
1084
1085 #[test]
1086 fn test_raft_config_three_node() {
1087 let config = RaftConfig::three_node_cluster(1);
1088 assert_eq!(config.quorum_size(), 2);
1089 }
1090
1091 #[test]
1092 fn test_node_starts_as_follower() {
1093 let config = RaftConfig::single_node(1);
1094 let node = RaftIndexNode::new(config);
1095 assert_eq!(node.role(), NodeRole::Follower);
1096 assert_eq!(node.current_term(), 0);
1097 }
1098
1099 #[test]
1100 fn test_single_node_becomes_leader() {
1101 let config = RaftConfig::single_node(1);
1102 let node = RaftIndexNode::new(config);
1103
1104 let vote_req = node.start_election();
1105 assert_eq!(vote_req.term, 1);
1106 assert_eq!(node.current_term(), 1);
1107
1108 let won = node.process_vote_response(RequestVoteResponse {
1110 term: 1,
1111 vote_granted: true,
1112 node_id: 1,
1113 });
1114
1115 assert!(node.is_leader() || won);
1117 }
1118
1119 #[test]
1120 fn test_single_node_leader_force_commit() {
1121 let config = RaftConfig::single_node(1);
1122 let node = RaftIndexNode::new(config);
1123
1124 node.start_election();
1126 let _ = node.process_vote_response(RequestVoteResponse {
1128 term: node.current_term(),
1129 vote_granted: true,
1130 node_id: 1,
1131 });
1132
1133 if !node.is_leader() {
1134 *node.role.lock() = NodeRole::Leader;
1136 *node.current_leader.lock() = Some(1);
1137 }
1138
1139 let entry = make_vector_entry("v1", vec![1.0, 2.0, 3.0]);
1140 node.propose(IndexCommand::Upsert(entry), None).unwrap();
1141 node.force_commit_single_node();
1142
1143 assert_eq!(node.vector_count(), 1);
1144 assert!(node.get_vector("v1").is_some());
1145 }
1146
1147 #[test]
1148 fn test_propose_fails_when_not_leader() {
1149 let config = RaftConfig::three_node_cluster(1);
1150 let node = RaftIndexNode::new(config);
1151 let result = node.propose(IndexCommand::NoOp, None);
1153 assert!(result.is_err(), "Should fail to propose when not leader");
1154 }
1155
1156 #[test]
1157 fn test_request_vote_grants_to_newer_term() {
1158 let config = RaftConfig::three_node_cluster(2);
1159 let voter = RaftIndexNode::new(config);
1160
1161 let req = RequestVoteRequest {
1162 term: 5,
1163 candidate_id: 1,
1164 last_log_index: 10,
1165 last_log_term: 5,
1166 };
1167
1168 let response = voter.handle_request_vote(req);
1169 assert!(response.vote_granted, "Should grant vote to higher term");
1170 assert_eq!(response.term, 5);
1171 }
1172
1173 #[test]
1174 fn test_request_vote_rejects_stale_term() {
1175 let config = RaftConfig::three_node_cluster(2);
1176 let voter = RaftIndexNode::new(config);
1177
1178 voter.persistent.write().current_term = 5;
1180
1181 let req = RequestVoteRequest {
1182 term: 3, candidate_id: 1,
1184 last_log_index: 0,
1185 last_log_term: 0,
1186 };
1187
1188 let response = voter.handle_request_vote(req);
1189 assert!(!response.vote_granted, "Should reject stale term vote");
1190 assert_eq!(response.term, 5);
1191 }
1192
1193 #[test]
1194 fn test_request_vote_rejects_duplicate_vote() {
1195 let config = RaftConfig::three_node_cluster(2);
1196 let voter = RaftIndexNode::new(config);
1197
1198 let req1 = RequestVoteRequest {
1199 term: 1,
1200 candidate_id: 1,
1201 last_log_index: 0,
1202 last_log_term: 0,
1203 };
1204
1205 let req2 = RequestVoteRequest {
1206 term: 1,
1207 candidate_id: 3, last_log_index: 0,
1209 last_log_term: 0,
1210 };
1211
1212 let r1 = voter.handle_request_vote(req1);
1213 assert!(r1.vote_granted, "First vote should be granted");
1214
1215 let r2 = voter.handle_request_vote(req2);
1216 assert!(
1217 !r2.vote_granted,
1218 "Duplicate vote in same term should be rejected"
1219 );
1220 }
1221
1222 #[test]
1223 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1224 fn test_append_entries_heartbeat() {
1225 let config = RaftConfig::three_node_cluster(2);
1226 let follower = RaftIndexNode::new(config);
1227
1228 let heartbeat = AppendEntriesRequest {
1229 term: 1,
1230 leader_id: 1,
1231 prev_log_index: 0,
1232 prev_log_term: 0,
1233 entries: vec![],
1234 leader_commit: 0,
1235 };
1236
1237 let response = follower.handle_append_entries(heartbeat);
1238 assert!(response.success, "Heartbeat should succeed");
1239 assert_eq!(follower.current_leader(), Some(1));
1240 }
1241
1242 #[test]
1243 fn test_append_entries_stale_term() {
1244 let config = RaftConfig::three_node_cluster(2);
1245 let follower = RaftIndexNode::new(config);
1246 follower.persistent.write().current_term = 5;
1247
1248 let request = AppendEntriesRequest {
1249 term: 3, leader_id: 1,
1251 prev_log_index: 0,
1252 prev_log_term: 0,
1253 entries: vec![],
1254 leader_commit: 0,
1255 };
1256
1257 let response = follower.handle_append_entries(request);
1258 assert!(!response.success, "Stale term should be rejected");
1259 assert_eq!(response.term, 5);
1260 }
1261
1262 #[test]
1263 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1264 fn test_cluster_simulator_election() {
1265 let sim = ClusterSimulator::new(3).unwrap();
1266 sim.elect_leader(0);
1267
1268 let leaders: Vec<_> = sim.nodes.iter().filter(|n| n.is_leader()).collect();
1270 assert!(!leaders.is_empty(), "At least one node should be leader");
1271 }
1272
1273 #[test]
1274 #[ignore = "slow network simulation test - run explicitly with cargo test -- --ignored"]
1275 fn test_cluster_simulator_replication() {
1276 let sim = ClusterSimulator::new(3).unwrap();
1277 sim.elect_leader(0);
1278
1279 let leader_idx = sim.nodes.iter().position(|n| n.is_leader()).unwrap();
1280 let entry = make_vector_entry("v1", vec![1.0, 0.0, 0.0]);
1281 sim.nodes[leader_idx]
1282 .propose(IndexCommand::Upsert(entry), None)
1283 .unwrap();
1284
1285 sim.replicate_all().unwrap();
1286
1287 let leader = &sim.nodes[leader_idx];
1289 leader.force_commit_single_node();
1290 let vec = leader.get_vector("v1");
1292 assert!(vec.is_some() || leader.log_length() > 0);
1293 }
1294
1295 #[test]
1296 fn test_delete_command() {
1297 let config = RaftConfig::single_node(1);
1298 let node = RaftIndexNode::new(config);
1299
1300 node.start_election();
1302 let _ = node.process_vote_response(RequestVoteResponse {
1303 term: node.current_term(),
1304 vote_granted: true,
1305 node_id: 1,
1306 });
1307
1308 if !node.is_leader() {
1309 *node.role.lock() = NodeRole::Leader;
1310 *node.current_leader.lock() = Some(1);
1311 }
1312
1313 let entry = make_vector_entry("v1", vec![1.0]);
1315 node.propose(IndexCommand::Upsert(entry), None).unwrap();
1316 node.force_commit_single_node();
1317 assert_eq!(node.vector_count(), 1);
1318
1319 node.propose(
1320 IndexCommand::Delete {
1321 vector_id: "v1".to_string(),
1322 },
1323 None,
1324 )
1325 .unwrap();
1326 node.force_commit_single_node();
1327 assert_eq!(node.vector_count(), 0);
1328 }
1329
1330 #[test]
1331 fn test_update_metadata_command() {
1332 let config = RaftConfig::single_node(1);
1333 let node = RaftIndexNode::new(config);
1334
1335 *node.role.lock() = NodeRole::Leader;
1336 *node.current_leader.lock() = Some(1);
1337
1338 let entry = make_vector_entry("v1", vec![1.0, 2.0]);
1339 node.propose(IndexCommand::Upsert(entry), None).unwrap();
1340
1341 let mut new_meta = HashMap::new();
1342 new_meta.insert("tag".to_string(), "important".to_string());
1343 node.propose(
1344 IndexCommand::UpdateMetadata {
1345 vector_id: "v1".to_string(),
1346 metadata: new_meta,
1347 },
1348 None,
1349 )
1350 .unwrap();
1351 node.force_commit_single_node();
1352
1353 let stored = node.get_vector("v1").unwrap();
1354 assert_eq!(stored.metadata.get("tag"), Some(&"important".to_string()));
1355 }
1356
1357 #[test]
1358 fn test_search_similar() {
1359 let config = RaftConfig::single_node(1);
1360 let node = RaftIndexNode::new(config);
1361
1362 *node.role.lock() = NodeRole::Leader;
1363 *node.current_leader.lock() = Some(1);
1364
1365 node.propose(
1366 IndexCommand::Upsert(make_vector_entry("v1", vec![1.0, 0.0, 0.0])),
1367 None,
1368 )
1369 .unwrap();
1370 node.propose(
1371 IndexCommand::Upsert(make_vector_entry("v2", vec![0.0, 1.0, 0.0])),
1372 None,
1373 )
1374 .unwrap();
1375 node.propose(
1376 IndexCommand::Upsert(make_vector_entry("v3", vec![0.0, 0.0, 1.0])),
1377 None,
1378 )
1379 .unwrap();
1380 node.force_commit_single_node();
1381
1382 let results = node.search_similar(&[1.0, 0.0, 0.0], 2);
1383 assert!(!results.is_empty());
1384 assert_eq!(results[0].0, "v1");
1386 assert!((results[0].1 - 1.0).abs() < 1e-5);
1387 }
1388
1389 #[test]
1390 fn test_stats_populated() {
1391 let config = RaftConfig::single_node(1);
1392 let node = RaftIndexNode::new(config);
1393
1394 *node.role.lock() = NodeRole::Leader;
1395 *node.current_leader.lock() = Some(1);
1396 node.propose(IndexCommand::NoOp, None).unwrap();
1397 node.force_commit_single_node();
1398
1399 let stats = node.get_stats();
1400 assert_eq!(stats.role, "Leader");
1401 assert!(stats.log_length > 0);
1402 }
1403
1404 #[test]
1405 fn test_raft_log_length_increases() {
1406 let config = RaftConfig::single_node(1);
1407 let node = RaftIndexNode::new(config);
1408
1409 *node.role.lock() = NodeRole::Leader;
1410 *node.current_leader.lock() = Some(1);
1411
1412 assert_eq!(node.log_length(), 0);
1413
1414 node.propose(IndexCommand::NoOp, None).unwrap();
1415 assert_eq!(node.log_length(), 1);
1416
1417 node.propose(IndexCommand::Rebuild, None).unwrap();
1418 assert_eq!(node.log_length(), 2);
1419 }
1420
1421 #[test]
1422 fn test_persistent_state_default() {
1423 let state = PersistentState::default();
1424 assert_eq!(state.current_term, 0);
1425 assert!(state.voted_for.is_none());
1426 assert!(state.log.is_empty());
1427 assert_eq!(state.last_log_index(), 0);
1428 assert_eq!(state.last_log_term(), 0);
1429 }
1430
1431 #[test]
1432 fn test_node_role_display() {
1433 assert_eq!(NodeRole::Follower.to_string(), "Follower");
1434 assert_eq!(NodeRole::Candidate.to_string(), "Candidate");
1435 assert_eq!(NodeRole::Leader.to_string(), "Leader");
1436 }
1437
1438 #[test]
1439 fn test_election_timeout_not_elapsed_immediately() {
1440 let config = RaftConfig::single_node(1);
1441 let node = RaftIndexNode::new(config);
1442 assert!(!node.election_timeout_elapsed());
1444 }
1445
1446 #[test]
1447 fn test_reset_heartbeat() {
1448 let config = RaftConfig::single_node(1);
1449 let node = RaftIndexNode::new(config);
1450 node.reset_heartbeat();
1452 assert!(!node.election_timeout_elapsed());
1453 }
1454
1455 #[test]
1456 fn test_append_entries_appends_new_log_entries() {
1457 let config = RaftConfig::three_node_cluster(2);
1458 let follower = RaftIndexNode::new(config);
1459
1460 let entry = LogEntry {
1461 index: 1,
1462 term: 1,
1463 command: IndexCommand::NoOp,
1464 client_id: None,
1465 };
1466
1467 let request = AppendEntriesRequest {
1468 term: 1,
1469 leader_id: 1,
1470 prev_log_index: 0,
1471 prev_log_term: 0,
1472 entries: vec![entry],
1473 leader_commit: 1,
1474 };
1475
1476 let response = follower.handle_append_entries(request);
1477 assert!(response.success);
1478 assert_eq!(follower.log_length(), 1);
1479 }
1480
1481 #[test]
1482 fn test_commit_advances_last_applied() {
1483 let config = RaftConfig::three_node_cluster(2);
1484 let follower = RaftIndexNode::new(config);
1485
1486 let entry = LogEntry {
1487 index: 1,
1488 term: 1,
1489 command: IndexCommand::Upsert(make_vector_entry("v1", vec![1.0])),
1490 client_id: None,
1491 };
1492
1493 let request = AppendEntriesRequest {
1494 term: 1,
1495 leader_id: 1,
1496 prev_log_index: 0,
1497 prev_log_term: 0,
1498 entries: vec![entry],
1499 leader_commit: 1, };
1501
1502 follower.handle_append_entries(request);
1503
1504 assert_eq!(follower.last_applied(), 1);
1505 assert_eq!(follower.vector_count(), 1);
1506 }
1507}