1use crate::error::{TsdbError, TsdbResult};
36use crate::replication::raft_state::{
37 AppendEntriesArgs, RaftRole, RaftState, RequestVoteArgs, TsdbCommand,
38};
39use std::collections::{HashMap, HashSet, VecDeque};
40
41#[derive(Debug, Clone, PartialEq)]
51pub struct WriteEntry {
52 pub timestamp: i64,
54 pub metric_name: String,
56 pub value: f64,
58 pub tags: HashMap<String, String>,
60}
61
62impl WriteEntry {
63 pub fn new(timestamp: i64, metric_name: impl Into<String>, value: f64) -> Self {
65 Self {
66 timestamp,
67 metric_name: metric_name.into(),
68 value,
69 tags: HashMap::new(),
70 }
71 }
72
73 pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
75 self.tags.insert(key.into(), value.into());
76 self
77 }
78
79 pub fn with_tags(mut self, tags: HashMap<String, String>) -> Self {
81 self.tags.extend(tags);
82 self
83 }
84
85 pub fn to_command(&self) -> TsdbCommand {
87 TsdbCommand::WriteDatapoint {
88 series_id: self.metric_name.clone(),
89 timestamp: self.timestamp,
90 value: self.value,
91 }
92 }
93}
94
95#[derive(Debug, Clone)]
101enum RaftMessage {
102 Vote(RequestVoteArgs),
104 Append(AppendEntriesArgs),
106 VoteGranted {
108 term: u64,
110 },
111 AppendAck {
113 term: u64,
115 match_index: u64,
117 },
118}
119
120#[derive(Debug)]
122struct Envelope {
123 sender: String,
124 recipient: String,
125 message: RaftMessage,
126}
127
128#[derive(Debug)]
138pub struct TsdbRaftNode {
139 pub id: String,
141 pub(super) state: RaftState,
143 reachable: bool,
145 pub(super) election_ticks: u32,
148 pub(super) election_timeout: u32,
150 votes_for_me: HashSet<String>,
152 committed_writes: u64,
154}
155
156impl TsdbRaftNode {
157 pub fn new(id: impl Into<String>, election_timeout: u32) -> Self {
159 let id = id.into();
160 let state = RaftState::new(id.clone());
161 Self {
162 id,
163 state,
164 reachable: true,
165 election_ticks: 0,
166 election_timeout,
167 votes_for_me: HashSet::new(),
168 committed_writes: 0,
169 }
170 }
171
172 pub fn role(&self) -> RaftRole {
174 self.state.role.clone()
175 }
176
177 pub fn current_term(&self) -> u64 {
179 self.state.current_term
180 }
181
182 pub fn commit_index(&self) -> u64 {
184 self.state.commit_index
185 }
186
187 pub fn log_len(&self) -> usize {
189 self.state.log.len().saturating_sub(1)
191 }
192
193 pub fn last_log_index(&self) -> u64 {
195 self.state.last_log_index()
196 }
197
198 pub fn partition(&mut self) {
200 self.reachable = false;
201 }
202
203 pub fn heal(&mut self) {
205 self.reachable = true;
206 }
207
208 pub fn is_reachable(&self) -> bool {
210 self.reachable
211 }
212
213 pub fn committed_writes(&self) -> u64 {
215 self.committed_writes
216 }
217
218 pub fn propose(&mut self, cmd: TsdbCommand) -> TsdbResult<u64> {
223 self.state
224 .propose_command(cmd)
225 .map_err(|e| TsdbError::Replication(e.to_string()))
226 }
227
228 pub fn propose_write_entry(&mut self, entry: &WriteEntry) -> TsdbResult<u64> {
230 self.propose(entry.to_command())
231 }
232
233 pub fn drain_applied(&mut self) -> Vec<TsdbCommand> {
235 let applied = self.state.apply_committed_entries();
236 for cmd in &applied {
238 if matches!(cmd, TsdbCommand::WriteDatapoint { .. }) {
239 self.committed_writes += 1;
240 }
241 }
242 applied
243 }
244}
245
246pub struct ReplicationGroup {
255 nodes: HashMap<String, TsdbRaftNode>,
256 bus: VecDeque<Envelope>,
258 cluster_size: usize,
260}
261
262impl ReplicationGroup {
263 pub fn new(ids: &[&str], timeout_override: Option<&[u32]>) -> Self {
269 let mut nodes = HashMap::new();
270 for (i, &id) in ids.iter().enumerate() {
271 let timeout = timeout_override
272 .and_then(|ts| ts.get(i).copied())
273 .unwrap_or(10);
274 nodes.insert(id.to_string(), TsdbRaftNode::new(id, timeout));
275 }
276 let cluster_size = nodes.len();
277 Self {
278 nodes,
279 bus: VecDeque::new(),
280 cluster_size,
281 }
282 }
283
284 pub fn cluster_size(&self) -> usize {
286 self.cluster_size
287 }
288
289 pub fn node(&self, id: &str) -> Option<&TsdbRaftNode> {
291 self.nodes.get(id)
292 }
293
294 pub fn node_mut(&mut self, id: &str) -> Option<&mut TsdbRaftNode> {
296 self.nodes.get_mut(id)
297 }
298
299 pub fn leader_id(&self) -> Option<String> {
303 self.nodes
304 .values()
305 .find(|n| n.role() == RaftRole::Leader && n.reachable)
306 .map(|n| n.id.clone())
307 }
308
309 pub fn leader_count(&self) -> usize {
313 self.nodes
314 .values()
315 .filter(|n| n.role() == RaftRole::Leader && n.reachable)
316 .count()
317 }
318
319 pub fn quorum_size(&self) -> usize {
321 self.cluster_size / 2 + 1
322 }
323
324 pub fn partition(&mut self, id: &str) {
326 if let Some(n) = self.nodes.get_mut(id) {
327 n.partition();
328 }
329 }
330
331 pub fn heal(&mut self, id: &str) {
333 if let Some(n) = self.nodes.get_mut(id) {
334 n.heal();
335 }
336 }
337
338 pub fn propose(&mut self, cmd: TsdbCommand) -> TsdbResult<(String, u64)> {
342 let leader_id = self
343 .leader_id()
344 .ok_or_else(|| TsdbError::Replication("no leader elected yet".into()))?;
345
346 let node = self
347 .nodes
348 .get_mut(&leader_id)
349 .ok_or_else(|| TsdbError::Replication("leader disappeared from nodes map".into()))?;
350
351 let idx = node.propose(cmd)?;
352 Ok((leader_id, idx))
353 }
354
355 pub fn propose_write_entry(&mut self, entry: &WriteEntry) -> TsdbResult<(String, u64)> {
359 self.propose(entry.to_command())
360 }
361
362 pub fn propose_and_commit(
367 &mut self,
368 entry: &WriteEntry,
369 max_ticks: u32,
370 ) -> TsdbResult<(String, u64)> {
371 let (leader_id, log_index) = self.propose_write_entry(entry)?;
372 for _ in 0..max_ticks {
373 self.tick();
374 if let Some(leader) = self.nodes.get(&leader_id) {
375 if leader.commit_index() >= log_index {
376 return Ok((leader_id, log_index));
377 }
378 }
379 }
380 Err(TsdbError::Replication(format!(
381 "quorum commit not achieved for index {log_index} within {max_ticks} ticks"
382 )))
383 }
384
385 pub fn tick(&mut self) {
398 self.send_leader_heartbeats();
400 self.deliver_messages();
402 self.advance_election_timers();
405 self.advance_commit_indices();
407 }
408
409 pub fn tick_n(&mut self, n: u32) {
411 for _ in 0..n {
412 self.tick();
413 }
414 }
415
416 fn deliver_messages(&mut self) {
424 for _round in 0..3 {
426 if self.bus.is_empty() {
427 break;
428 }
429 let envelopes: Vec<Envelope> = self.bus.drain(..).collect();
430 let mut replies: Vec<Envelope> = Vec::new();
431
432 for env in envelopes {
433 let recipient_reachable = self
435 .nodes
436 .get(&env.recipient)
437 .map(|n| n.reachable)
438 .unwrap_or(false);
439 if !recipient_reachable {
440 continue;
441 }
442
443 let sender_reachable = self
445 .nodes
446 .get(&env.sender)
447 .map(|n| n.reachable)
448 .unwrap_or(true);
449 if !sender_reachable {
450 continue;
451 }
452
453 let recipient = match self.nodes.get_mut(&env.recipient) {
454 Some(n) => n,
455 None => continue,
456 };
457
458 match env.message {
459 RaftMessage::Vote(args) => {
460 let reply = recipient.state.handle_vote_request(&args);
461 if reply.vote_granted {
462 replies.push(Envelope {
463 sender: env.recipient.clone(),
464 recipient: env.sender.clone(),
465 message: RaftMessage::VoteGranted { term: reply.term },
466 });
467 }
468 }
469 RaftMessage::VoteGranted { term } => {
470 if recipient.state.current_term == term
471 && recipient.role() == RaftRole::Candidate
472 {
473 recipient.votes_for_me.insert(env.sender.clone());
474 }
475 }
476 RaftMessage::Append(args) => {
477 let reply = recipient.state.handle_append_entries(&args);
478 if reply.success {
479 recipient.election_ticks = 0;
481 if !args.entries.is_empty() {
482 replies.push(Envelope {
483 sender: env.recipient.clone(),
484 recipient: env.sender.clone(),
485 message: RaftMessage::AppendAck {
486 term: reply.term,
487 match_index: recipient.state.last_log_index(),
488 },
489 });
490 }
491 }
492 }
493 RaftMessage::AppendAck { term, match_index } => {
494 if term > recipient.state.current_term {
496 recipient.state.become_follower(term);
497 } else if recipient.role() == RaftRole::Leader {
498 recipient
499 .state
500 .handle_append_success(&env.sender, match_index);
501 }
502 }
503 }
504 }
505
506 for r in replies {
507 self.bus.push_back(r);
508 }
509 }
510 }
511
512 fn advance_election_timers(&mut self) {
517 let quorum = self.cluster_size / 2 + 1;
518
519 let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
520
521 let mut to_start_election: Vec<String> = Vec::new();
522 let mut to_promote: Vec<String> = Vec::new();
523
524 for id in &node_ids {
525 let node = match self.nodes.get_mut(id) {
526 Some(n) if n.reachable => n,
527 _ => continue,
528 };
529
530 match node.role() {
531 RaftRole::Leader => {
532 node.election_ticks = 0;
534 }
535 RaftRole::Follower | RaftRole::Candidate => {
536 node.election_ticks += 1;
537 if node.election_ticks >= node.election_timeout {
538 node.election_ticks = 0;
539 let _vote_args = node.state.become_candidate();
540 node.votes_for_me.clear();
541 node.votes_for_me.insert(id.clone());
542 to_start_election.push(id.clone());
543 }
544
545 if node.role() == RaftRole::Candidate && node.votes_for_me.len() >= quorum {
547 to_promote.push(id.clone());
548 }
549 }
550 }
551 }
552
553 for candidate_id in &to_start_election {
555 let vote_args = {
556 let node = match self.nodes.get(candidate_id) {
557 Some(n) => n,
558 None => continue,
559 };
560 RequestVoteArgs {
561 term: node.state.current_term,
562 candidate_id: candidate_id.clone(),
563 last_log_index: node.state.last_log_index(),
564 last_log_term: node.state.log.last().map(|e| e.term).unwrap_or(0),
565 }
566 };
567
568 let peers: Vec<String> = self
569 .nodes
570 .keys()
571 .filter(|pid| pid.as_str() != candidate_id.as_str())
572 .cloned()
573 .collect();
574
575 for peer in peers {
576 self.bus.push_back(Envelope {
577 sender: candidate_id.clone(),
578 recipient: peer,
579 message: RaftMessage::Vote(vote_args.clone()),
580 });
581 }
582 }
583
584 for candidate_id in &to_promote {
586 let peers: Vec<String> = self
587 .nodes
588 .keys()
589 .filter(|pid| pid.as_str() != candidate_id.as_str())
590 .cloned()
591 .collect();
592
593 if let Some(node) = self.nodes.get_mut(candidate_id) {
594 if node.role() == RaftRole::Candidate && node.votes_for_me.len() >= quorum {
595 node.state.become_leader(&peers);
596 node.votes_for_me.clear();
597 }
598 }
599 }
600 }
601
602 fn send_leader_heartbeats(&mut self) {
606 let leader_id = match self.leader_id() {
607 Some(id) => id,
608 None => return,
609 };
610
611 let peers: Vec<String> = self
612 .nodes
613 .keys()
614 .filter(|id| {
615 id.as_str() != leader_id.as_str()
616 && self
617 .nodes
618 .get(id.as_str())
619 .map(|n| n.reachable)
620 .unwrap_or(false)
621 })
622 .cloned()
623 .collect();
624
625 let leader = match self.nodes.get_mut(&leader_id) {
626 Some(n) => n,
627 None => return,
628 };
629
630 for peer in &peers {
631 if let Ok(args) = leader.state.build_append_entries(peer) {
632 self.bus.push_back(Envelope {
633 sender: leader_id.clone(),
634 recipient: peer.clone(),
635 message: RaftMessage::Append(args),
636 });
637 }
638 }
639 }
640
641 fn advance_commit_indices(&mut self) {
645 let leader_id = match self.leader_id() {
646 Some(id) => id,
647 None => return,
648 };
649
650 if let Some(leader) = self.nodes.get_mut(&leader_id) {
651 leader.state.try_advance_commit_index(self.cluster_size);
652 }
653 }
654
655 pub fn node_ids(&self) -> Vec<String> {
657 self.nodes.keys().cloned().collect()
658 }
659
660 pub fn pending_messages(&self) -> usize {
662 self.bus.len()
663 }
664}
665
666#[cfg(test)]
671mod tests {
672 use super::*;
673 use crate::replication::raft_state::TsdbCommand;
674
675 fn three_node_group() -> ReplicationGroup {
678 ReplicationGroup::new(&["node-0", "node-1", "node-2"], Some(&[3, 5, 7]))
679 }
680
681 #[test]
684 fn test_initial_all_followers() {
685 let g = three_node_group();
686 assert!(g.leader_id().is_none());
687 assert!(g.nodes.values().all(|n| n.role() == RaftRole::Follower));
688 }
689
690 #[test]
691 fn test_cluster_size_three() {
692 let g = three_node_group();
693 assert_eq!(g.cluster_size(), 3);
694 }
695
696 #[test]
697 fn test_node_lookup() {
698 let g = three_node_group();
699 assert!(g.node("node-0").is_some());
700 assert!(g.node("missing").is_none());
701 }
702
703 #[test]
706 fn test_node_initial_log_len_is_zero() {
707 let g = three_node_group();
708 assert_eq!(g.node("node-1").expect("n").log_len(), 0);
709 }
710
711 #[test]
712 fn test_node_initial_commit_index() {
713 let g = three_node_group();
714 assert_eq!(g.node("node-0").expect("n").commit_index(), 0);
715 }
716
717 #[test]
718 fn test_node_last_log_index_zero_initially() {
719 let g = three_node_group();
720 assert_eq!(g.node("node-2").expect("n").last_log_index(), 0);
721 }
722
723 #[test]
726 fn test_leader_elected_after_ticks() {
727 let mut g = three_node_group();
728 g.tick_n(20);
729 assert_eq!(g.leader_count(), 1, "expected exactly one leader");
730 }
731
732 #[test]
733 fn test_leader_has_term_at_least_one() {
734 let mut g = three_node_group();
735 g.tick_n(20);
736 let term = g
737 .leader_id()
738 .and_then(|id| g.node(&id))
739 .map(|n| n.current_term())
740 .unwrap_or(0);
741 assert!(term >= 1);
742 }
743
744 #[test]
745 fn test_no_duplicate_leaders_during_election() {
746 let mut g = three_node_group();
747 g.tick_n(20);
748 let leader_count_at_20 = g.leader_count();
749 g.tick_n(30);
750 assert!(g.leader_count() <= 1);
751 if leader_count_at_20 == 1 {
752 assert_eq!(g.leader_count(), 1);
753 }
754 }
755
756 #[test]
757 fn test_five_node_group_elects_leader() {
758 let mut g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], Some(&[3, 6, 9, 12, 15]));
759 g.tick_n(40);
760 assert_eq!(
761 g.leader_count(),
762 1,
763 "5-node group must elect exactly 1 leader"
764 );
765 }
766
767 #[test]
770 fn test_propose_on_leader_increments_log() {
771 let mut g = three_node_group();
772 g.tick_n(20);
773
774 let cmd = TsdbCommand::WriteDatapoint {
775 series_id: "temperature".into(),
776 timestamp: 1_700_000_000_000,
777 value: 21.3,
778 };
779 let result = g.propose(cmd);
780 assert!(result.is_ok(), "propose should succeed when leader exists");
781 let (_, idx) = result.expect("ok");
782 assert!(idx >= 1);
783 }
784
785 #[test]
786 fn test_multiple_proposals_monotone_indices() {
787 let mut g = three_node_group();
788 g.tick_n(20);
789
790 let mut prev_idx = 0u64;
791 for i in 0..5 {
792 let cmd = TsdbCommand::WriteDatapoint {
793 series_id: "s".into(),
794 timestamp: i * 1_000,
795 value: i as f64,
796 };
797 let (_, idx) = g.propose(cmd).expect("propose");
798 assert!(idx > prev_idx, "indices must be strictly increasing");
799 prev_idx = idx;
800 }
801 }
802
803 #[test]
804 fn test_propose_without_leader_returns_error() {
805 let mut g = three_node_group();
806 let cmd = TsdbCommand::WriteDatapoint {
807 series_id: "s".into(),
808 timestamp: 0,
809 value: 0.0,
810 };
811 assert!(g.propose(cmd).is_err());
812 }
813
814 #[test]
817 fn test_partition_reduces_leader_count() {
818 let mut g = three_node_group();
819 g.tick_n(20);
820
821 let old_leader = g.leader_id().expect("leader must exist");
822 g.partition(&old_leader);
823 assert_eq!(
824 g.leader_count(),
825 0,
826 "partitioned leader should not be counted as reachable leader"
827 );
828 }
829
830 #[test]
831 fn test_heal_partition_restores_node() {
832 let mut g = three_node_group();
833 g.partition("node-2");
834 assert!(!g.node("node-2").expect("exists").is_reachable());
835 g.heal("node-2");
836 assert!(g.node("node-2").expect("exists").is_reachable());
837 }
838
839 #[test]
840 fn test_remaining_nodes_can_elect_after_partition() {
841 let mut g = three_node_group();
842 g.tick_n(20);
843
844 let old_leader = g.leader_id().expect("leader");
845 g.partition(&old_leader);
846 g.tick_n(30);
847
848 let reachable_leaders = g
849 .nodes
850 .values()
851 .filter(|n| n.reachable && n.role() == RaftRole::Leader)
852 .count();
853 assert!(reachable_leaders <= 1);
854 }
855
856 #[test]
857 fn test_heal_partition_node_is_reachable() {
858 let mut g = three_node_group();
859 g.partition("node-1");
860 g.heal("node-1");
861 assert!(g.node("node-1").expect("n").is_reachable());
862 }
863
864 #[test]
867 fn test_replication_group_node_count() {
868 let g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], None);
869 assert_eq!(g.cluster_size(), 5);
870 }
871
872 #[test]
873 fn test_replication_group_default_timeout() {
874 let g = ReplicationGroup::new(&["x", "y"], None);
875 assert_eq!(g.node("x").expect("x").election_timeout, 10);
876 assert_eq!(g.node("y").expect("y").election_timeout, 10);
877 }
878
879 #[test]
880 fn test_replication_group_custom_timeouts() {
881 let g = ReplicationGroup::new(&["p", "q"], Some(&[3, 7]));
882 assert_eq!(g.node("p").expect("p").election_timeout, 3);
883 assert_eq!(g.node("q").expect("q").election_timeout, 7);
884 }
885
886 #[test]
889 fn test_tsdb_raft_node_propose_non_leader_error() {
890 let mut node = TsdbRaftNode::new("solo", 5);
891 let result = node.propose(TsdbCommand::DeleteSeries {
892 series_id: "old".into(),
893 });
894 assert!(result.is_err());
895 }
896
897 #[test]
898 fn test_tsdb_raft_node_initial_role_follower() {
899 let node = TsdbRaftNode::new("n", 5);
900 assert_eq!(node.role(), RaftRole::Follower);
901 }
902
903 #[test]
904 fn test_tsdb_raft_node_partition_and_heal() {
905 let mut node = TsdbRaftNode::new("n", 5);
906 assert!(node.is_reachable());
907 node.partition();
908 assert!(!node.is_reachable());
909 node.heal();
910 assert!(node.is_reachable());
911 }
912
913 #[test]
914 fn test_tsdb_raft_node_drain_applied_empty_initially() {
915 let mut node = TsdbRaftNode::new("n", 5);
916 assert!(node.drain_applied().is_empty());
917 }
918
919 #[test]
920 fn test_tsdb_raft_node_initial_term() {
921 let node = TsdbRaftNode::new("n", 5);
922 assert_eq!(node.current_term(), 0);
923 }
924
925 #[test]
928 fn test_leader_stable_after_election() {
929 let mut g = three_node_group();
930 g.tick_n(20);
931
932 let leader_at_20 = g.leader_id();
933 assert!(leader_at_20.is_some(), "must have elected a leader");
934
935 g.tick_n(10);
936 assert_eq!(g.leader_count(), 1, "leader must remain stable");
937 }
938
939 #[test]
940 fn test_leader_count_never_exceeds_one() {
941 let mut g = ReplicationGroup::new(&["n0", "n1", "n2"], Some(&[4, 8, 12]));
942 for _ in 0..40 {
943 g.tick();
944 assert!(
945 g.leader_count() <= 1,
946 "split brain detected: {} leaders",
947 g.leader_count()
948 );
949 }
950 }
951
952 #[test]
955 fn test_write_entry_new() {
956 let entry = WriteEntry::new(1_700_000_000_000, "cpu_usage", 42.5);
957 assert_eq!(entry.timestamp, 1_700_000_000_000);
958 assert_eq!(entry.metric_name, "cpu_usage");
959 assert!((entry.value - 42.5).abs() < f64::EPSILON);
960 assert!(entry.tags.is_empty());
961 }
962
963 #[test]
964 fn test_write_entry_with_tag() {
965 let entry = WriteEntry::new(1000, "mem", 8192.0)
966 .with_tag("host", "srv-01")
967 .with_tag("region", "eu-west");
968 assert_eq!(entry.tags.len(), 2);
969 assert_eq!(entry.tags["host"], "srv-01");
970 assert_eq!(entry.tags["region"], "eu-west");
971 }
972
973 #[test]
974 fn test_write_entry_with_tags_batch() {
975 let mut tags = HashMap::new();
976 tags.insert("env".to_string(), "prod".to_string());
977 tags.insert("dc".to_string(), "us-east-1".to_string());
978
979 let entry = WriteEntry::new(2000, "latency", 1.5).with_tags(tags);
980 assert_eq!(entry.tags.len(), 2);
981 assert_eq!(entry.tags["env"], "prod");
982 }
983
984 #[test]
985 fn test_write_entry_to_command() {
986 let entry = WriteEntry::new(5000, "temperature", 22.7);
987 let cmd = entry.to_command();
988 match cmd {
989 TsdbCommand::WriteDatapoint {
990 series_id,
991 timestamp,
992 value,
993 } => {
994 assert_eq!(series_id, "temperature");
995 assert_eq!(timestamp, 5000);
996 assert!((value - 22.7).abs() < f64::EPSILON);
997 }
998 other => panic!("expected WriteDatapoint, got {:?}", other),
999 }
1000 }
1001
1002 #[test]
1003 fn test_write_entry_clone_eq() {
1004 let a = WriteEntry::new(100, "x", 1.0);
1005 let b = a.clone();
1006 assert_eq!(a, b);
1007 }
1008
1009 #[test]
1012 fn test_quorum_size_three_node() {
1013 let g = three_node_group();
1014 assert_eq!(g.quorum_size(), 2);
1015 }
1016
1017 #[test]
1018 fn test_quorum_size_five_node() {
1019 let g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], None);
1020 assert_eq!(g.quorum_size(), 3);
1021 }
1022
1023 #[test]
1024 fn test_propose_write_entry_on_leader() {
1025 let mut g = three_node_group();
1026 g.tick_n(20);
1027
1028 let entry = WriteEntry::new(1_700_000_000_000, "cpu", 88.0);
1029 let result = g.propose_write_entry(&entry);
1030 assert!(result.is_ok());
1031 let (leader_id, idx) = result.expect("propose");
1032 assert!(!leader_id.is_empty());
1033 assert!(idx >= 1);
1034 }
1035
1036 #[test]
1037 fn test_propose_write_entry_no_leader() {
1038 let mut g = three_node_group();
1039 let entry = WriteEntry::new(0, "x", 0.0);
1040 assert!(g.propose_write_entry(&entry).is_err());
1041 }
1042
1043 #[test]
1044 fn test_propose_and_commit_succeeds() {
1045 let mut g = three_node_group();
1046 g.tick_n(20);
1047
1048 let entry = WriteEntry::new(1000, "sensor_a", 99.0);
1049 let result = g.propose_and_commit(&entry, 20);
1050 assert!(
1051 result.is_ok(),
1052 "quorum commit should succeed: {:?}",
1053 result.err()
1054 );
1055 }
1056
1057 #[test]
1058 fn test_propose_and_commit_multiple_entries() {
1059 let mut g = three_node_group();
1060 g.tick_n(20);
1061
1062 for i in 0..5 {
1063 let entry = WriteEntry::new(i * 1000, format!("metric_{i}"), i as f64);
1064 let result = g.propose_and_commit(&entry, 20);
1065 assert!(result.is_ok(), "commit {i} should succeed");
1066 }
1067 }
1068
1069 #[test]
1072 fn test_node_ids_returns_all() {
1073 let g = three_node_group();
1074 let ids = g.node_ids();
1075 assert_eq!(ids.len(), 3);
1076 assert!(ids.contains(&"node-0".to_string()));
1077 assert!(ids.contains(&"node-1".to_string()));
1078 assert!(ids.contains(&"node-2".to_string()));
1079 }
1080
1081 #[test]
1082 fn test_pending_messages_initially_zero() {
1083 let g = three_node_group();
1084 assert_eq!(g.pending_messages(), 0);
1085 }
1086
1087 #[test]
1090 fn test_committed_writes_initially_zero() {
1091 let node = TsdbRaftNode::new("n", 5);
1092 assert_eq!(node.committed_writes(), 0);
1093 }
1094
1095 #[test]
1096 fn test_propose_write_entry_on_node() {
1097 let mut g = three_node_group();
1098 g.tick_n(20);
1099 let leader_id = g.leader_id().expect("leader");
1100 let entry = WriteEntry::new(1000, "temp", 22.0);
1101 let node = g.node_mut(&leader_id).expect("node");
1102 let result = node.propose_write_entry(&entry);
1103 assert!(result.is_ok());
1104 }
1105
1106 #[test]
1109 fn test_randomized_timeouts_elect_fastest() {
1110 let mut g = ReplicationGroup::new(&["n0", "n1", "n2"], Some(&[2, 10, 15]));
1112 g.tick_n(20);
1113 assert_eq!(g.leader_count(), 1);
1114 }
1115
1116 #[test]
1117 fn test_even_timeouts_still_elect_one() {
1118 let mut g = ReplicationGroup::new(&["a", "b", "c"], Some(&[5, 5, 5]));
1119 g.tick_n(30);
1120 assert!(g.leader_count() <= 1);
1122 }
1123
1124 #[test]
1127 fn test_all_partitioned_no_leader() {
1128 let mut g = three_node_group();
1129 g.tick_n(20);
1130 g.partition("node-0");
1131 g.partition("node-1");
1132 g.partition("node-2");
1133 assert_eq!(g.leader_count(), 0);
1134 }
1135
1136 #[test]
1137 fn test_heal_all_after_partition() {
1138 let mut g = three_node_group();
1139 g.tick_n(20);
1140 g.partition("node-0");
1141 g.partition("node-1");
1142 g.partition("node-2");
1143 g.heal("node-0");
1144 g.heal("node-1");
1145 g.heal("node-2");
1146 g.tick_n(30);
1147 assert!(g.leader_count() <= 1);
1148 }
1149}