Skip to main content

oxirs_tsdb/replication/
replication_group.rs

1//! Simulated Raft replication group for TSDB testing and single-process
2//! embedded deployments.
3//!
4//! [`ReplicationGroup`] manages a set of [`TsdbRaftNode`] instances that
5//! communicate through in-process message queues rather than over a real
6//! network.  This makes the implementation useful for:
7//!
8//! 1. **Unit tests** -- deterministic leader election and log replication
9//!    without spawning threads or sockets.
10//! 2. **Embedded deployments** -- lightweight consensus for a small (3-5)
11//!    node cluster within a single process or machine.
12//!
13//! ## Architecture
14//!
15//! ```text
16//!  ┌──────────────────────────────────────────────────┐
17//!  │                 ReplicationGroup                 │
18//!  │                                                  │
19//!  │  ┌──────────────┐    ┌──────────────┐           │
20//!  │  │ TsdbRaftNode │◄──►│ TsdbRaftNode │  ···      │
21//!  │  │   (leader)   │    │  (follower)  │           │
22//!  │  └──────────────┘    └──────────────┘           │
23//!  │         │                    │                   │
24//!  │         └────────────────────┘                   │
25//!  │           in-process message passing             │
26//!  └──────────────────────────────────────────────────┘
27//! ```
28//!
29//! ## References
30//!
31//! - Ongaro, D. & Ousterhout, J. (2014). *In Search of an Understandable
32//!   Consensus Algorithm*. USENIX ATC '14.
33//!   <https://raft.github.io/raft.pdf>
34
35use crate::error::{TsdbError, TsdbResult};
36use crate::replication::raft_state::{
37    AppendEntriesArgs, RaftRole, RaftState, RequestVoteArgs, TsdbCommand,
38};
39use std::collections::{HashMap, HashSet, VecDeque};
40
41// =============================================================================
42// WriteEntry -- time-series write entry for quorum replication
43// =============================================================================
44
45/// A time-series write entry that is replicated through the Raft log.
46///
47/// This is the high-level application-facing write primitive; it is
48/// internally converted to a [`TsdbCommand::WriteDatapoint`] before being
49/// proposed to the Raft log.
50#[derive(Debug, Clone, PartialEq)]
51pub struct WriteEntry {
52    /// Unix epoch milliseconds.
53    pub timestamp: i64,
54    /// Metric / series name.
55    pub metric_name: String,
56    /// Observed measurement value.
57    pub value: f64,
58    /// Optional tag key-value pairs for dimensionality.
59    pub tags: HashMap<String, String>,
60}
61
62impl WriteEntry {
63    /// Create a new write entry with no tags.
64    pub fn new(timestamp: i64, metric_name: impl Into<String>, value: f64) -> Self {
65        Self {
66            timestamp,
67            metric_name: metric_name.into(),
68            value,
69            tags: HashMap::new(),
70        }
71    }
72
73    /// Builder: add a single tag.
74    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
75        self.tags.insert(key.into(), value.into());
76        self
77    }
78
79    /// Builder: add multiple tags at once.
80    pub fn with_tags(mut self, tags: HashMap<String, String>) -> Self {
81        self.tags.extend(tags);
82        self
83    }
84
85    /// Convert to a [`TsdbCommand::WriteDatapoint`] for Raft replication.
86    pub fn to_command(&self) -> TsdbCommand {
87        TsdbCommand::WriteDatapoint {
88            series_id: self.metric_name.clone(),
89            timestamp: self.timestamp,
90            value: self.value,
91        }
92    }
93}
94
95// =============================================================================
96// Message types exchanged between nodes
97// =============================================================================
98
99/// An RPC message that one node sends to another.
100#[derive(Debug, Clone)]
101enum RaftMessage {
102    /// `RequestVote` RPC (section 5.2).
103    Vote(RequestVoteArgs),
104    /// `AppendEntries` RPC (section 5.3 / 5.4) including heartbeats.
105    Append(AppendEntriesArgs),
106    /// Vote granted reply from `voter_id` to `candidate_id`.
107    VoteGranted {
108        /// The term of the voter when the vote was cast.
109        term: u64,
110    },
111    /// AppendEntries success acknowledgement from follower to leader.
112    AppendAck {
113        /// Term of the follower.
114        term: u64,
115        /// The follower's last log index after applying entries.
116        match_index: u64,
117    },
118}
119
120/// A queued outbound message from `sender` to `recipient`.
121#[derive(Debug)]
122struct Envelope {
123    sender: String,
124    recipient: String,
125    message: RaftMessage,
126}
127
128// =============================================================================
129// TsdbRaftNode
130// =============================================================================
131
132/// A single Raft participant that wraps [`RaftState`] and maintains its own
133/// inbound message queue.
134///
135/// All I/O and timer operations are handled by the enclosing
136/// [`ReplicationGroup`]; the node itself is purely reactive.
137#[derive(Debug)]
138pub struct TsdbRaftNode {
139    /// Node identifier (must be unique within the group).
140    pub id: String,
141    /// Core Raft state machine.
142    pub(super) state: RaftState,
143    /// Whether this node is currently reachable (simulates network partition).
144    reachable: bool,
145    /// Number of election timeouts elapsed without receiving a valid leader
146    /// heartbeat.  Used by the group to trigger elections.
147    pub(super) election_ticks: u32,
148    /// Randomised election timeout threshold (ticks).
149    pub(super) election_timeout: u32,
150    /// Set of peer IDs that have granted a vote to this node in the current term.
151    votes_for_me: HashSet<String>,
152    /// Total number of write entries committed via quorum on this node.
153    committed_writes: u64,
154}
155
156impl TsdbRaftNode {
157    /// Create a new node with the given election timeout (in ticks).
158    pub fn new(id: impl Into<String>, election_timeout: u32) -> Self {
159        let id = id.into();
160        let state = RaftState::new(id.clone());
161        Self {
162            id,
163            state,
164            reachable: true,
165            election_ticks: 0,
166            election_timeout,
167            votes_for_me: HashSet::new(),
168            committed_writes: 0,
169        }
170    }
171
172    /// Return the current Raft role.
173    pub fn role(&self) -> RaftRole {
174        self.state.role.clone()
175    }
176
177    /// Return the current term.
178    pub fn current_term(&self) -> u64 {
179        self.state.current_term
180    }
181
182    /// Return the commit index.
183    pub fn commit_index(&self) -> u64 {
184        self.state.commit_index
185    }
186
187    /// Return the number of log entries (excluding the sentinel at index 0).
188    pub fn log_len(&self) -> usize {
189        // Subtract sentinel entry at position 0
190        self.state.log.len().saturating_sub(1)
191    }
192
193    /// Return the last log index (0 when no user entries have been appended).
194    pub fn last_log_index(&self) -> u64 {
195        self.state.last_log_index()
196    }
197
198    /// Mark this node as unreachable (network partition).
199    pub fn partition(&mut self) {
200        self.reachable = false;
201    }
202
203    /// Restore network reachability.
204    pub fn heal(&mut self) {
205        self.reachable = true;
206    }
207
208    /// Whether the node is currently reachable.
209    pub fn is_reachable(&self) -> bool {
210        self.reachable
211    }
212
213    /// Total committed writes on this node.
214    pub fn committed_writes(&self) -> u64 {
215        self.committed_writes
216    }
217
218    /// Propose a write command on this node.
219    ///
220    /// Returns the log index if this node is the leader, or an error
221    /// otherwise.
222    pub fn propose(&mut self, cmd: TsdbCommand) -> TsdbResult<u64> {
223        self.state
224            .propose_command(cmd)
225            .map_err(|e| TsdbError::Replication(e.to_string()))
226    }
227
228    /// Propose a [`WriteEntry`] on this node (convenience wrapper).
229    pub fn propose_write_entry(&mut self, entry: &WriteEntry) -> TsdbResult<u64> {
230        self.propose(entry.to_command())
231    }
232
233    /// Return applied commands (committed but not yet consumed).
234    pub fn drain_applied(&mut self) -> Vec<TsdbCommand> {
235        let applied = self.state.apply_committed_entries();
236        // Count write datapoints in applied entries
237        for cmd in &applied {
238            if matches!(cmd, TsdbCommand::WriteDatapoint { .. }) {
239                self.committed_writes += 1;
240            }
241        }
242        applied
243    }
244}
245
246// =============================================================================
247// ReplicationGroup
248// =============================================================================
249
250/// A simulated Raft cluster of [`TsdbRaftNode`]s.
251///
252/// All message delivery, election timeout tracking, and commit-index
253/// advancement are driven by calls to [`ReplicationGroup::tick`].
254pub struct ReplicationGroup {
255    nodes: HashMap<String, TsdbRaftNode>,
256    /// Outbound message bus: envelopes waiting to be delivered.
257    bus: VecDeque<Envelope>,
258    /// Number of nodes in this group (fixed at construction).
259    cluster_size: usize,
260}
261
262impl ReplicationGroup {
263    /// Create a group with the given node IDs and (optionally distinct)
264    /// election timeouts.
265    ///
266    /// Pass `timeout_override = None` to assign uniform timeouts (10 ticks).
267    /// Pass a slice of per-node timeouts to make elections deterministic.
268    pub fn new(ids: &[&str], timeout_override: Option<&[u32]>) -> Self {
269        let mut nodes = HashMap::new();
270        for (i, &id) in ids.iter().enumerate() {
271            let timeout = timeout_override
272                .and_then(|ts| ts.get(i).copied())
273                .unwrap_or(10);
274            nodes.insert(id.to_string(), TsdbRaftNode::new(id, timeout));
275        }
276        let cluster_size = nodes.len();
277        Self {
278            nodes,
279            bus: VecDeque::new(),
280            cluster_size,
281        }
282    }
283
284    /// Return the number of nodes in the group.
285    pub fn cluster_size(&self) -> usize {
286        self.cluster_size
287    }
288
289    /// Return a reference to a node by ID.
290    pub fn node(&self, id: &str) -> Option<&TsdbRaftNode> {
291        self.nodes.get(id)
292    }
293
294    /// Return a mutable reference to a node by ID.
295    pub fn node_mut(&mut self, id: &str) -> Option<&mut TsdbRaftNode> {
296        self.nodes.get_mut(id)
297    }
298
299    /// Find the current leader, if any.
300    ///
301    /// Returns `None` if no node is in the `Leader` role.
302    pub fn leader_id(&self) -> Option<String> {
303        self.nodes
304            .values()
305            .find(|n| n.role() == RaftRole::Leader && n.reachable)
306            .map(|n| n.id.clone())
307    }
308
309    /// Return the number of nodes currently acting as leader.
310    ///
311    /// In a healthy cluster this should always be <= 1.
312    pub fn leader_count(&self) -> usize {
313        self.nodes
314            .values()
315            .filter(|n| n.role() == RaftRole::Leader && n.reachable)
316            .count()
317    }
318
319    /// Return the quorum size (majority) for this cluster.
320    pub fn quorum_size(&self) -> usize {
321        self.cluster_size / 2 + 1
322    }
323
324    /// Partition a node (simulates network isolation).
325    pub fn partition(&mut self, id: &str) {
326        if let Some(n) = self.nodes.get_mut(id) {
327            n.partition();
328        }
329    }
330
331    /// Heal a partitioned node.
332    pub fn heal(&mut self, id: &str) {
333        if let Some(n) = self.nodes.get_mut(id) {
334            n.heal();
335        }
336    }
337
338    /// Propose a write command on the current leader.
339    ///
340    /// Returns `(leader_id, log_index)` or an error if there is no leader.
341    pub fn propose(&mut self, cmd: TsdbCommand) -> TsdbResult<(String, u64)> {
342        let leader_id = self
343            .leader_id()
344            .ok_or_else(|| TsdbError::Replication("no leader elected yet".into()))?;
345
346        let node = self
347            .nodes
348            .get_mut(&leader_id)
349            .ok_or_else(|| TsdbError::Replication("leader disappeared from nodes map".into()))?;
350
351        let idx = node.propose(cmd)?;
352        Ok((leader_id, idx))
353    }
354
355    /// Propose a [`WriteEntry`] via the current leader (convenience wrapper).
356    ///
357    /// Returns `(leader_id, log_index)` or an error.
358    pub fn propose_write_entry(&mut self, entry: &WriteEntry) -> TsdbResult<(String, u64)> {
359        self.propose(entry.to_command())
360    }
361
362    /// Propose a [`WriteEntry`] and wait for quorum commit by ticking.
363    ///
364    /// Returns `(leader_id, log_index)` after up to `max_ticks` ticks.
365    /// Returns an error if the entry is not committed within `max_ticks`.
366    pub fn propose_and_commit(
367        &mut self,
368        entry: &WriteEntry,
369        max_ticks: u32,
370    ) -> TsdbResult<(String, u64)> {
371        let (leader_id, log_index) = self.propose_write_entry(entry)?;
372        for _ in 0..max_ticks {
373            self.tick();
374            if let Some(leader) = self.nodes.get(&leader_id) {
375                if leader.commit_index() >= log_index {
376                    return Ok((leader_id, log_index));
377                }
378            }
379        }
380        Err(TsdbError::Replication(format!(
381            "quorum commit not achieved for index {log_index} within {max_ticks} ticks"
382        )))
383    }
384
385    // ── Tick ──────────────────────────────────────────────────────────────────
386
387    /// Advance the simulation by one logical tick.
388    ///
389    /// Each tick:
390    /// 1. Sends leader heartbeats first (so they enter the bus).
391    /// 2. Delivers all pending messages from the bus.
392    /// 3. Increments election timers on followers/candidates.
393    /// 4. Advances commit indices on the leader.
394    ///
395    /// The order matters: heartbeats are sent first so they can be delivered
396    /// in the same tick, preventing unnecessary election timeouts.
397    pub fn tick(&mut self) {
398        // Send heartbeats first so they're in the bus for delivery
399        self.send_leader_heartbeats();
400        // Deliver all pending messages (heartbeats + any other messages)
401        self.deliver_messages();
402        // Now advance election timers (followers that received heartbeats
403        // had their timers reset during delivery)
404        self.advance_election_timers();
405        // Advance commit indices based on match_index state
406        self.advance_commit_indices();
407    }
408
409    /// Run `n` ticks in sequence.
410    pub fn tick_n(&mut self, n: u32) {
411        for _ in 0..n {
412            self.tick();
413        }
414    }
415
416    // ── Message delivery ──────────────────────────────────────────────────────
417
418    /// Deliver all pending messages from the bus.
419    ///
420    /// For each `RequestVote`, the recipient processes it and, if it grants
421    /// the vote, sends a `VoteGranted` back.  For each `AppendEntries`, the
422    /// recipient processes it and sends an `AppendAck` back if successful.
423    fn deliver_messages(&mut self) {
424        // We may need multiple rounds of delivery for replies to propagate.
425        for _round in 0..3 {
426            if self.bus.is_empty() {
427                break;
428            }
429            let envelopes: Vec<Envelope> = self.bus.drain(..).collect();
430            let mut replies: Vec<Envelope> = Vec::new();
431
432            for env in envelopes {
433                // Drop messages to unreachable nodes
434                let recipient_reachable = self
435                    .nodes
436                    .get(&env.recipient)
437                    .map(|n| n.reachable)
438                    .unwrap_or(false);
439                if !recipient_reachable {
440                    continue;
441                }
442
443                // Drop messages from unreachable senders (partitioned)
444                let sender_reachable = self
445                    .nodes
446                    .get(&env.sender)
447                    .map(|n| n.reachable)
448                    .unwrap_or(true);
449                if !sender_reachable {
450                    continue;
451                }
452
453                let recipient = match self.nodes.get_mut(&env.recipient) {
454                    Some(n) => n,
455                    None => continue,
456                };
457
458                match env.message {
459                    RaftMessage::Vote(args) => {
460                        let reply = recipient.state.handle_vote_request(&args);
461                        if reply.vote_granted {
462                            replies.push(Envelope {
463                                sender: env.recipient.clone(),
464                                recipient: env.sender.clone(),
465                                message: RaftMessage::VoteGranted { term: reply.term },
466                            });
467                        }
468                    }
469                    RaftMessage::VoteGranted { term } => {
470                        if recipient.state.current_term == term
471                            && recipient.role() == RaftRole::Candidate
472                        {
473                            recipient.votes_for_me.insert(env.sender.clone());
474                        }
475                    }
476                    RaftMessage::Append(args) => {
477                        let reply = recipient.state.handle_append_entries(&args);
478                        if reply.success {
479                            // Reset election timer -- valid heartbeat/append received
480                            recipient.election_ticks = 0;
481                            if !args.entries.is_empty() {
482                                replies.push(Envelope {
483                                    sender: env.recipient.clone(),
484                                    recipient: env.sender.clone(),
485                                    message: RaftMessage::AppendAck {
486                                        term: reply.term,
487                                        match_index: recipient.state.last_log_index(),
488                                    },
489                                });
490                            }
491                        }
492                    }
493                    RaftMessage::AppendAck { term, match_index } => {
494                        // Step down if reply carries a higher term (Raft safety)
495                        if term > recipient.state.current_term {
496                            recipient.state.become_follower(term);
497                        } else if recipient.role() == RaftRole::Leader {
498                            recipient
499                                .state
500                                .handle_append_success(&env.sender, match_index);
501                        }
502                    }
503                }
504            }
505
506            for r in replies {
507                self.bus.push_back(r);
508            }
509        }
510    }
511
512    // ── Election timers ───────────────────────────────────────────────────────
513
514    /// Increment election timers; trigger elections for timed-out
515    /// followers/candidates and promote candidates with a quorum.
516    fn advance_election_timers(&mut self) {
517        let quorum = self.cluster_size / 2 + 1;
518
519        let node_ids: Vec<String> = self.nodes.keys().cloned().collect();
520
521        let mut to_start_election: Vec<String> = Vec::new();
522        let mut to_promote: Vec<String> = Vec::new();
523
524        for id in &node_ids {
525            let node = match self.nodes.get_mut(id) {
526                Some(n) if n.reachable => n,
527                _ => continue,
528            };
529
530            match node.role() {
531                RaftRole::Leader => {
532                    // Leaders don't time out
533                    node.election_ticks = 0;
534                }
535                RaftRole::Follower | RaftRole::Candidate => {
536                    node.election_ticks += 1;
537                    if node.election_ticks >= node.election_timeout {
538                        node.election_ticks = 0;
539                        let _vote_args = node.state.become_candidate();
540                        node.votes_for_me.clear();
541                        node.votes_for_me.insert(id.clone());
542                        to_start_election.push(id.clone());
543                    }
544
545                    // Check quorum for existing candidates
546                    if node.role() == RaftRole::Candidate && node.votes_for_me.len() >= quorum {
547                        to_promote.push(id.clone());
548                    }
549                }
550            }
551        }
552
553        // Broadcast RequestVote for nodes starting elections
554        for candidate_id in &to_start_election {
555            let vote_args = {
556                let node = match self.nodes.get(candidate_id) {
557                    Some(n) => n,
558                    None => continue,
559                };
560                RequestVoteArgs {
561                    term: node.state.current_term,
562                    candidate_id: candidate_id.clone(),
563                    last_log_index: node.state.last_log_index(),
564                    last_log_term: node.state.log.last().map(|e| e.term).unwrap_or(0),
565                }
566            };
567
568            let peers: Vec<String> = self
569                .nodes
570                .keys()
571                .filter(|pid| pid.as_str() != candidate_id.as_str())
572                .cloned()
573                .collect();
574
575            for peer in peers {
576                self.bus.push_back(Envelope {
577                    sender: candidate_id.clone(),
578                    recipient: peer,
579                    message: RaftMessage::Vote(vote_args.clone()),
580                });
581            }
582        }
583
584        // Promote candidates with quorum
585        for candidate_id in &to_promote {
586            let peers: Vec<String> = self
587                .nodes
588                .keys()
589                .filter(|pid| pid.as_str() != candidate_id.as_str())
590                .cloned()
591                .collect();
592
593            if let Some(node) = self.nodes.get_mut(candidate_id) {
594                if node.role() == RaftRole::Candidate && node.votes_for_me.len() >= quorum {
595                    node.state.become_leader(&peers);
596                    node.votes_for_me.clear();
597                }
598            }
599        }
600    }
601
602    // ── Leader heartbeats ─────────────────────────────────────────────────────
603
604    /// Leader sends heartbeat `AppendEntries` to all reachable peers.
605    fn send_leader_heartbeats(&mut self) {
606        let leader_id = match self.leader_id() {
607            Some(id) => id,
608            None => return,
609        };
610
611        let peers: Vec<String> = self
612            .nodes
613            .keys()
614            .filter(|id| {
615                id.as_str() != leader_id.as_str()
616                    && self
617                        .nodes
618                        .get(id.as_str())
619                        .map(|n| n.reachable)
620                        .unwrap_or(false)
621            })
622            .cloned()
623            .collect();
624
625        let leader = match self.nodes.get_mut(&leader_id) {
626            Some(n) => n,
627            None => return,
628        };
629
630        for peer in &peers {
631            if let Ok(args) = leader.state.build_append_entries(peer) {
632                self.bus.push_back(Envelope {
633                    sender: leader_id.clone(),
634                    recipient: peer.clone(),
635                    message: RaftMessage::Append(args),
636                });
637            }
638        }
639    }
640
641    // ── Commit index advancement ───────────────────────────────────────────────
642
643    /// Advance the leader's commit index based on follower acknowledgements.
644    fn advance_commit_indices(&mut self) {
645        let leader_id = match self.leader_id() {
646            Some(id) => id,
647            None => return,
648        };
649
650        if let Some(leader) = self.nodes.get_mut(&leader_id) {
651            leader.state.try_advance_commit_index(self.cluster_size);
652        }
653    }
654
655    /// Return all node IDs in the cluster.
656    pub fn node_ids(&self) -> Vec<String> {
657        self.nodes.keys().cloned().collect()
658    }
659
660    /// Return the number of pending messages on the bus.
661    pub fn pending_messages(&self) -> usize {
662        self.bus.len()
663    }
664}
665
666// =============================================================================
667// Tests
668// =============================================================================
669
670#[cfg(test)]
671mod tests {
672    use super::*;
673    use crate::replication::raft_state::TsdbCommand;
674
675    /// Build a deterministic 3-node group:
676    /// node-0 times out first (3 ticks), node-1 at 5, node-2 at 7.
677    fn three_node_group() -> ReplicationGroup {
678        ReplicationGroup::new(&["node-0", "node-1", "node-2"], Some(&[3, 5, 7]))
679    }
680
681    // ── Initial state ────────────────────────────────────────────────────────
682
683    #[test]
684    fn test_initial_all_followers() {
685        let g = three_node_group();
686        assert!(g.leader_id().is_none());
687        assert!(g.nodes.values().all(|n| n.role() == RaftRole::Follower));
688    }
689
690    #[test]
691    fn test_cluster_size_three() {
692        let g = three_node_group();
693        assert_eq!(g.cluster_size(), 3);
694    }
695
696    #[test]
697    fn test_node_lookup() {
698        let g = three_node_group();
699        assert!(g.node("node-0").is_some());
700        assert!(g.node("missing").is_none());
701    }
702
703    // ── Log length baseline ───────────────────────────────────────────────────
704
705    #[test]
706    fn test_node_initial_log_len_is_zero() {
707        let g = three_node_group();
708        assert_eq!(g.node("node-1").expect("n").log_len(), 0);
709    }
710
711    #[test]
712    fn test_node_initial_commit_index() {
713        let g = three_node_group();
714        assert_eq!(g.node("node-0").expect("n").commit_index(), 0);
715    }
716
717    #[test]
718    fn test_node_last_log_index_zero_initially() {
719        let g = three_node_group();
720        assert_eq!(g.node("node-2").expect("n").last_log_index(), 0);
721    }
722
723    // ── Election ─────────────────────────────────────────────────────────────
724
725    #[test]
726    fn test_leader_elected_after_ticks() {
727        let mut g = three_node_group();
728        g.tick_n(20);
729        assert_eq!(g.leader_count(), 1, "expected exactly one leader");
730    }
731
732    #[test]
733    fn test_leader_has_term_at_least_one() {
734        let mut g = three_node_group();
735        g.tick_n(20);
736        let term = g
737            .leader_id()
738            .and_then(|id| g.node(&id))
739            .map(|n| n.current_term())
740            .unwrap_or(0);
741        assert!(term >= 1);
742    }
743
744    #[test]
745    fn test_no_duplicate_leaders_during_election() {
746        let mut g = three_node_group();
747        g.tick_n(20);
748        let leader_count_at_20 = g.leader_count();
749        g.tick_n(30);
750        assert!(g.leader_count() <= 1);
751        if leader_count_at_20 == 1 {
752            assert_eq!(g.leader_count(), 1);
753        }
754    }
755
756    #[test]
757    fn test_five_node_group_elects_leader() {
758        let mut g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], Some(&[3, 6, 9, 12, 15]));
759        g.tick_n(40);
760        assert_eq!(
761            g.leader_count(),
762            1,
763            "5-node group must elect exactly 1 leader"
764        );
765    }
766
767    // ── Log replication ───────────────────────────────────────────────────────
768
769    #[test]
770    fn test_propose_on_leader_increments_log() {
771        let mut g = three_node_group();
772        g.tick_n(20);
773
774        let cmd = TsdbCommand::WriteDatapoint {
775            series_id: "temperature".into(),
776            timestamp: 1_700_000_000_000,
777            value: 21.3,
778        };
779        let result = g.propose(cmd);
780        assert!(result.is_ok(), "propose should succeed when leader exists");
781        let (_, idx) = result.expect("ok");
782        assert!(idx >= 1);
783    }
784
785    #[test]
786    fn test_multiple_proposals_monotone_indices() {
787        let mut g = three_node_group();
788        g.tick_n(20);
789
790        let mut prev_idx = 0u64;
791        for i in 0..5 {
792            let cmd = TsdbCommand::WriteDatapoint {
793                series_id: "s".into(),
794                timestamp: i * 1_000,
795                value: i as f64,
796            };
797            let (_, idx) = g.propose(cmd).expect("propose");
798            assert!(idx > prev_idx, "indices must be strictly increasing");
799            prev_idx = idx;
800        }
801    }
802
803    #[test]
804    fn test_propose_without_leader_returns_error() {
805        let mut g = three_node_group();
806        let cmd = TsdbCommand::WriteDatapoint {
807            series_id: "s".into(),
808            timestamp: 0,
809            value: 0.0,
810        };
811        assert!(g.propose(cmd).is_err());
812    }
813
814    // ── Partition and recovery ────────────────────────────────────────────────
815
816    #[test]
817    fn test_partition_reduces_leader_count() {
818        let mut g = three_node_group();
819        g.tick_n(20);
820
821        let old_leader = g.leader_id().expect("leader must exist");
822        g.partition(&old_leader);
823        assert_eq!(
824            g.leader_count(),
825            0,
826            "partitioned leader should not be counted as reachable leader"
827        );
828    }
829
830    #[test]
831    fn test_heal_partition_restores_node() {
832        let mut g = three_node_group();
833        g.partition("node-2");
834        assert!(!g.node("node-2").expect("exists").is_reachable());
835        g.heal("node-2");
836        assert!(g.node("node-2").expect("exists").is_reachable());
837    }
838
839    #[test]
840    fn test_remaining_nodes_can_elect_after_partition() {
841        let mut g = three_node_group();
842        g.tick_n(20);
843
844        let old_leader = g.leader_id().expect("leader");
845        g.partition(&old_leader);
846        g.tick_n(30);
847
848        let reachable_leaders = g
849            .nodes
850            .values()
851            .filter(|n| n.reachable && n.role() == RaftRole::Leader)
852            .count();
853        assert!(reachable_leaders <= 1);
854    }
855
856    #[test]
857    fn test_heal_partition_node_is_reachable() {
858        let mut g = three_node_group();
859        g.partition("node-1");
860        g.heal("node-1");
861        assert!(g.node("node-1").expect("n").is_reachable());
862    }
863
864    // ── Node state queries ────────────────────────────────────────────────────
865
866    #[test]
867    fn test_replication_group_node_count() {
868        let g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], None);
869        assert_eq!(g.cluster_size(), 5);
870    }
871
872    #[test]
873    fn test_replication_group_default_timeout() {
874        let g = ReplicationGroup::new(&["x", "y"], None);
875        assert_eq!(g.node("x").expect("x").election_timeout, 10);
876        assert_eq!(g.node("y").expect("y").election_timeout, 10);
877    }
878
879    #[test]
880    fn test_replication_group_custom_timeouts() {
881        let g = ReplicationGroup::new(&["p", "q"], Some(&[3, 7]));
882        assert_eq!(g.node("p").expect("p").election_timeout, 3);
883        assert_eq!(g.node("q").expect("q").election_timeout, 7);
884    }
885
886    // ── TsdbRaftNode unit tests ───────────────────────────────────────────────
887
888    #[test]
889    fn test_tsdb_raft_node_propose_non_leader_error() {
890        let mut node = TsdbRaftNode::new("solo", 5);
891        let result = node.propose(TsdbCommand::DeleteSeries {
892            series_id: "old".into(),
893        });
894        assert!(result.is_err());
895    }
896
897    #[test]
898    fn test_tsdb_raft_node_initial_role_follower() {
899        let node = TsdbRaftNode::new("n", 5);
900        assert_eq!(node.role(), RaftRole::Follower);
901    }
902
903    #[test]
904    fn test_tsdb_raft_node_partition_and_heal() {
905        let mut node = TsdbRaftNode::new("n", 5);
906        assert!(node.is_reachable());
907        node.partition();
908        assert!(!node.is_reachable());
909        node.heal();
910        assert!(node.is_reachable());
911    }
912
913    #[test]
914    fn test_tsdb_raft_node_drain_applied_empty_initially() {
915        let mut node = TsdbRaftNode::new("n", 5);
916        assert!(node.drain_applied().is_empty());
917    }
918
919    #[test]
920    fn test_tsdb_raft_node_initial_term() {
921        let node = TsdbRaftNode::new("n", 5);
922        assert_eq!(node.current_term(), 0);
923    }
924
925    // ── Leader stability ──────────────────────────────────────────────────────
926
927    #[test]
928    fn test_leader_stable_after_election() {
929        let mut g = three_node_group();
930        g.tick_n(20);
931
932        let leader_at_20 = g.leader_id();
933        assert!(leader_at_20.is_some(), "must have elected a leader");
934
935        g.tick_n(10);
936        assert_eq!(g.leader_count(), 1, "leader must remain stable");
937    }
938
939    #[test]
940    fn test_leader_count_never_exceeds_one() {
941        let mut g = ReplicationGroup::new(&["n0", "n1", "n2"], Some(&[4, 8, 12]));
942        for _ in 0..40 {
943            g.tick();
944            assert!(
945                g.leader_count() <= 1,
946                "split brain detected: {} leaders",
947                g.leader_count()
948            );
949        }
950    }
951
952    // ── WriteEntry tests ──────────────────────────────────────────────────────
953
954    #[test]
955    fn test_write_entry_new() {
956        let entry = WriteEntry::new(1_700_000_000_000, "cpu_usage", 42.5);
957        assert_eq!(entry.timestamp, 1_700_000_000_000);
958        assert_eq!(entry.metric_name, "cpu_usage");
959        assert!((entry.value - 42.5).abs() < f64::EPSILON);
960        assert!(entry.tags.is_empty());
961    }
962
963    #[test]
964    fn test_write_entry_with_tag() {
965        let entry = WriteEntry::new(1000, "mem", 8192.0)
966            .with_tag("host", "srv-01")
967            .with_tag("region", "eu-west");
968        assert_eq!(entry.tags.len(), 2);
969        assert_eq!(entry.tags["host"], "srv-01");
970        assert_eq!(entry.tags["region"], "eu-west");
971    }
972
973    #[test]
974    fn test_write_entry_with_tags_batch() {
975        let mut tags = HashMap::new();
976        tags.insert("env".to_string(), "prod".to_string());
977        tags.insert("dc".to_string(), "us-east-1".to_string());
978
979        let entry = WriteEntry::new(2000, "latency", 1.5).with_tags(tags);
980        assert_eq!(entry.tags.len(), 2);
981        assert_eq!(entry.tags["env"], "prod");
982    }
983
984    #[test]
985    fn test_write_entry_to_command() {
986        let entry = WriteEntry::new(5000, "temperature", 22.7);
987        let cmd = entry.to_command();
988        match cmd {
989            TsdbCommand::WriteDatapoint {
990                series_id,
991                timestamp,
992                value,
993            } => {
994                assert_eq!(series_id, "temperature");
995                assert_eq!(timestamp, 5000);
996                assert!((value - 22.7).abs() < f64::EPSILON);
997            }
998            other => panic!("expected WriteDatapoint, got {:?}", other),
999        }
1000    }
1001
1002    #[test]
1003    fn test_write_entry_clone_eq() {
1004        let a = WriteEntry::new(100, "x", 1.0);
1005        let b = a.clone();
1006        assert_eq!(a, b);
1007    }
1008
1009    // ── Quorum commit tests ─────────────────────────────────────────────────
1010
1011    #[test]
1012    fn test_quorum_size_three_node() {
1013        let g = three_node_group();
1014        assert_eq!(g.quorum_size(), 2);
1015    }
1016
1017    #[test]
1018    fn test_quorum_size_five_node() {
1019        let g = ReplicationGroup::new(&["a", "b", "c", "d", "e"], None);
1020        assert_eq!(g.quorum_size(), 3);
1021    }
1022
1023    #[test]
1024    fn test_propose_write_entry_on_leader() {
1025        let mut g = three_node_group();
1026        g.tick_n(20);
1027
1028        let entry = WriteEntry::new(1_700_000_000_000, "cpu", 88.0);
1029        let result = g.propose_write_entry(&entry);
1030        assert!(result.is_ok());
1031        let (leader_id, idx) = result.expect("propose");
1032        assert!(!leader_id.is_empty());
1033        assert!(idx >= 1);
1034    }
1035
1036    #[test]
1037    fn test_propose_write_entry_no_leader() {
1038        let mut g = three_node_group();
1039        let entry = WriteEntry::new(0, "x", 0.0);
1040        assert!(g.propose_write_entry(&entry).is_err());
1041    }
1042
1043    #[test]
1044    fn test_propose_and_commit_succeeds() {
1045        let mut g = three_node_group();
1046        g.tick_n(20);
1047
1048        let entry = WriteEntry::new(1000, "sensor_a", 99.0);
1049        let result = g.propose_and_commit(&entry, 20);
1050        assert!(
1051            result.is_ok(),
1052            "quorum commit should succeed: {:?}",
1053            result.err()
1054        );
1055    }
1056
1057    #[test]
1058    fn test_propose_and_commit_multiple_entries() {
1059        let mut g = three_node_group();
1060        g.tick_n(20);
1061
1062        for i in 0..5 {
1063            let entry = WriteEntry::new(i * 1000, format!("metric_{i}"), i as f64);
1064            let result = g.propose_and_commit(&entry, 20);
1065            assert!(result.is_ok(), "commit {i} should succeed");
1066        }
1067    }
1068
1069    // ── Node IDs and pending messages ────────────────────────────────────────
1070
1071    #[test]
1072    fn test_node_ids_returns_all() {
1073        let g = three_node_group();
1074        let ids = g.node_ids();
1075        assert_eq!(ids.len(), 3);
1076        assert!(ids.contains(&"node-0".to_string()));
1077        assert!(ids.contains(&"node-1".to_string()));
1078        assert!(ids.contains(&"node-2".to_string()));
1079    }
1080
1081    #[test]
1082    fn test_pending_messages_initially_zero() {
1083        let g = three_node_group();
1084        assert_eq!(g.pending_messages(), 0);
1085    }
1086
1087    // ── Committed writes tracking ──────────────────────────────────────────────
1088
1089    #[test]
1090    fn test_committed_writes_initially_zero() {
1091        let node = TsdbRaftNode::new("n", 5);
1092        assert_eq!(node.committed_writes(), 0);
1093    }
1094
1095    #[test]
1096    fn test_propose_write_entry_on_node() {
1097        let mut g = three_node_group();
1098        g.tick_n(20);
1099        let leader_id = g.leader_id().expect("leader");
1100        let entry = WriteEntry::new(1000, "temp", 22.0);
1101        let node = g.node_mut(&leader_id).expect("node");
1102        let result = node.propose_write_entry(&entry);
1103        assert!(result.is_ok());
1104    }
1105
1106    // ── Election with randomized timeouts ──────────────────────────────────
1107
1108    #[test]
1109    fn test_randomized_timeouts_elect_fastest() {
1110        // node-0 has timeout=2 (fastest), should become leader first
1111        let mut g = ReplicationGroup::new(&["n0", "n1", "n2"], Some(&[2, 10, 15]));
1112        g.tick_n(20);
1113        assert_eq!(g.leader_count(), 1);
1114    }
1115
1116    #[test]
1117    fn test_even_timeouts_still_elect_one() {
1118        let mut g = ReplicationGroup::new(&["a", "b", "c"], Some(&[5, 5, 5]));
1119        g.tick_n(30);
1120        // At least one leader should be elected (may require extra ticks due to ties)
1121        assert!(g.leader_count() <= 1);
1122    }
1123
1124    // ── Leader step-down on partition ──────────────────────────────────────
1125
1126    #[test]
1127    fn test_all_partitioned_no_leader() {
1128        let mut g = three_node_group();
1129        g.tick_n(20);
1130        g.partition("node-0");
1131        g.partition("node-1");
1132        g.partition("node-2");
1133        assert_eq!(g.leader_count(), 0);
1134    }
1135
1136    #[test]
1137    fn test_heal_all_after_partition() {
1138        let mut g = three_node_group();
1139        g.tick_n(20);
1140        g.partition("node-0");
1141        g.partition("node-1");
1142        g.partition("node-2");
1143        g.heal("node-0");
1144        g.heal("node-1");
1145        g.heal("node-2");
1146        g.tick_n(30);
1147        assert!(g.leader_count() <= 1);
1148    }
1149}