Skip to main content

net/adapter/net/redex/
replication_heartbeat.rs

1//! Heartbeat tracking — `docs/plans/REDEX_DISTRIBUTED_PLAN.md` §6.
2//!
3//! Pure-logic component the [`ReplicationCoordinator`]'s eventual
4//! heartbeat loop drives. Tracks per-peer last-seen / role / tail
5//! observations from inbound [`SyncHeartbeat`] messages, exposes
6//! the "is the believed leader silent for ≥ 3 heartbeats?"
7//! predicate that triggers `transition_to(Candidate,
8//! MissedHeartbeats)`, and surfaces per-peer lag for the leader-
9//! side `dataforts_replication_lag_seconds{role=replica}` metric.
10//!
11//! Time is passed in by the caller (not from a system clock) so
12//! tests can advance time deterministically without `tokio::time`
13//! plumbing. The eventual tokio interval-driven loop calls
14//! [`HeartbeatTracker::tick`] with `Instant::now()` each tick.
15//!
16//! The state machine in `replication_state.rs` enforces "which
17//! signal drives which transition" — this module is purely the
18//! signal generator: when the leader has been silent long enough,
19//! the coordinator's tick reads
20//! [`HeartbeatTracker::is_leader_silent`] and routes through
21//! `transition_to(Candidate, MissedHeartbeats)`.
22
23use std::collections::BTreeMap;
24use std::time::{Duration, Instant};
25
26use super::replication::ReplicaRole;
27use crate::adapter::net::behavior::placement::NodeId;
28
29/// Default consecutive-miss threshold per plan §6: "3 missed
30/// heartbeats prevents election thrash under transient packet
31/// loss."
32pub const DEFAULT_MISS_THRESHOLD: u8 = 3;
33
34/// Per-peer state cell. Captures the most recent
35/// [`SyncHeartbeat`](super::replication::SyncHeartbeat) observation.
36/// Public field shape so consumers can build leader-side lag
37/// metrics directly.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct PeerState {
40    /// Most recent inbound heartbeat timestamp from this peer.
41    pub last_seen: Instant,
42    /// Role the peer claimed in its most recent heartbeat.
43    pub role: ReplicaRole,
44    /// `tail_seq` the peer claimed in its most recent heartbeat.
45    /// Leader-side: lag-from-this-replica = our_tail - peer_tail.
46    /// Replica-side: lag-from-leader = leader_tail - our_tail (the
47    /// inverse).
48    pub tail_seq: u64,
49}
50
51/// Tracker over inbound heartbeats. The coordinator's eventual
52/// heartbeat loop drives a single one of these per replicated
53/// channel.
54///
55/// Not Send + Sync by default — the coordinator wraps the
56/// tracker in a `parking_lot::Mutex` so its tokio task can
57/// take exclusive access during a tick. Single-threaded by
58/// design; the criticism that a `RwLock<HashMap>` allows
59/// concurrent reads is irrelevant here — the heartbeat loop
60/// is the sole reader / writer.
61pub struct HeartbeatTracker {
62    /// Configured heartbeat cadence in milliseconds. Used as the
63    /// unit of "miss" computation: silence ≥ miss_threshold ×
64    /// heartbeat_ms triggers a Candidate transition.
65    heartbeat_ms: u64,
66    /// Consecutive-miss threshold. Default
67    /// [`DEFAULT_MISS_THRESHOLD`].
68    miss_threshold: u8,
69    /// Per-peer most-recent heartbeat observation.
70    peers: BTreeMap<NodeId, PeerState>,
71    /// The peer this tracker believes is the current leader, if
72    /// any. Set by the most recent heartbeat with `role ==
73    /// Leader`; cleared by [`Self::clear_believed_leader`] (the
74    /// coordinator clears it on `Replica → Candidate` so the next
75    /// election cycle starts clean).
76    believed_leader: Option<NodeId>,
77}
78
79impl HeartbeatTracker {
80    /// Construct a tracker for a channel configured with
81    /// `heartbeat_ms` cadence. Uses
82    /// [`DEFAULT_MISS_THRESHOLD`] = 3.
83    pub fn new(heartbeat_ms: u64) -> Self {
84        Self::with_miss_threshold(heartbeat_ms, DEFAULT_MISS_THRESHOLD)
85    }
86
87    /// Explicit-threshold constructor — pin the miss count for
88    /// tighter-SLA workloads or DST scenarios. Threshold of `0`
89    /// is clamped to `1` so a heartbeat tracker is never in a
90    /// "permanently silent" state (with `miss_threshold = 0`,
91    /// even a fresh heartbeat would be "miss enough" to trigger).
92    pub fn with_miss_threshold(heartbeat_ms: u64, miss_threshold: u8) -> Self {
93        Self {
94            heartbeat_ms,
95            miss_threshold: miss_threshold.max(1),
96            peers: BTreeMap::new(),
97            believed_leader: None,
98        }
99    }
100
101    /// Configured heartbeat cadence.
102    pub fn heartbeat_ms(&self) -> u64 {
103        self.heartbeat_ms
104    }
105
106    /// Configured miss threshold.
107    pub fn miss_threshold(&self) -> u8 {
108        self.miss_threshold
109    }
110
111    /// Record an inbound heartbeat from `peer`. Updates the
112    /// peer's `last_seen` / `role` / `tail_seq` and — if `role ==
113    /// Leader` — promotes `peer` to the believed leader (even if
114    /// a different peer was previously believed-leader; the most
115    /// recent `Leader`-roled heartbeat wins).
116    pub fn record_heartbeat(
117        &mut self,
118        peer: NodeId,
119        role: ReplicaRole,
120        tail_seq: u64,
121        now: Instant,
122    ) {
123        self.peers.insert(
124            peer,
125            PeerState {
126                last_seen: now,
127                role,
128                tail_seq,
129            },
130        );
131        if role == ReplicaRole::Leader {
132            // Tiebreak must match the dual-leader convergence rule in
133            // `replication_runtime.rs::on_inbound`: a Leader claim
134            // beats the current believed leader when its `(tail_seq,
135            // -node_id)` is strictly larger — i.e. higher tail wins,
136            // and on a tail tie the numerically smaller `node_id`
137            // wins.
138            //
139            // The two sites used to disagree. Runtime used
140            // `(higher tail, lower id)`; heartbeat used `lower id
141            // only, sticky`. A local Leader L1 (high tail, high id)
142            // and a peer Leader L2 (low tail, low id) heartbeating
143            // each other would:
144            //   - L1 stays Leader (runtime tiebreak: higher tail wins),
145            //   - L1 records L2 as believed_leader (heartbeat tiebreak:
146            //     lower id wins).
147            // L1's replica-side gates (`leader_belief != Some(from)`)
148            // then trusted L2's SyncResponses while L1 itself kept
149            // emitting Leader heartbeats. Aligning the rules closes
150            // that split-brain window.
151            //
152            // Stickiness is preserved in the form "current wins ties
153            // below the strict-beat threshold," so two peers with
154            // identical `(tail, id)` claims don't flap. That isn't
155            // weaker than the prior lex-only sticky variant — a
156            // higher-id claimant only displaces when it brings a
157            // strictly newer tail, exactly the condition under which
158            // we want the replica to follow the more-current peer.
159            match self.believed_leader {
160                None => self.believed_leader = Some(peer),
161                Some(existing) if existing == peer => {
162                    // Re-affirmation of the same leader — no change.
163                }
164                Some(existing) => {
165                    let existing_tail = self.peers.get(&existing).map(|p| p.tail_seq).unwrap_or(0);
166                    let peer_beats =
167                        tail_seq > existing_tail || (tail_seq == existing_tail && peer < existing);
168                    if peer_beats {
169                        self.believed_leader = Some(peer);
170                    }
171                }
172            }
173        }
174    }
175
176    /// True iff the believed leader has been silent past the
177    /// miss-threshold window — i.e. `(now - leader.last_seen) >=
178    /// miss_threshold × heartbeat_ms`.
179    ///
180    /// Returns `false` when:
181    /// - No believed leader is known (a fresh tracker, or just
182    ///   after [`Self::clear_believed_leader`]).
183    /// - The believed leader's last heartbeat is fresh enough.
184    ///
185    /// Caller drives this on every coordinator tick.
186    pub fn is_leader_silent(&self, now: Instant) -> bool {
187        let Some(leader_id) = self.believed_leader else {
188            return false;
189        };
190        let Some(leader) = self.peers.get(&leader_id) else {
191            // Believed leader was set but the peer entry was
192            // dropped (e.g. via `drop_peer`). Treat as silent so
193            // the coordinator runs an election from a clean
194            // slate.
195            return true;
196        };
197        let threshold =
198            Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
199        now.saturating_duration_since(leader.last_seen) >= threshold
200    }
201
202    /// Current believed leader. `None` if no heartbeat with
203    /// `role == Leader` has been observed (or
204    /// [`Self::clear_believed_leader`] was called).
205    pub fn believed_leader(&self) -> Option<NodeId> {
206        self.believed_leader
207    }
208
209    /// Clear the believed-leader cell. The coordinator calls
210    /// this on every `Replica → Candidate` transition so the
211    /// next election cycle starts clean; a stale believed leader
212    /// would let [`Self::is_leader_silent`] return false even
213    /// after the local node decided to run an election.
214    pub fn clear_believed_leader(&mut self) {
215        self.believed_leader = None;
216    }
217
218    /// Drop a peer from the tracker — disconnect / withdraw /
219    /// channel close. If the dropped peer was the believed
220    /// leader, clears that too so the coordinator's next tick
221    /// can re-observe leadership cleanly.
222    pub fn drop_peer(&mut self, peer: NodeId) {
223        self.peers.remove(&peer);
224        if self.believed_leader == Some(peer) {
225            self.believed_leader = None;
226        }
227    }
228
229    /// Read this peer's most recent observation, if any.
230    pub fn peer_state(&self, peer: NodeId) -> Option<PeerState> {
231        self.peers.get(&peer).copied()
232    }
233
234    /// Lag = `now - peer.last_seen` for the given peer.
235    /// `None` if the peer is unknown.
236    pub fn peer_lag(&self, peer: NodeId, now: Instant) -> Option<Duration> {
237        self.peers
238            .get(&peer)
239            .map(|p| now.saturating_duration_since(p.last_seen))
240    }
241
242    /// Set of peers considered alive in the local view —
243    /// last-seen within the miss-threshold window. Sorted by
244    /// `NodeId` for stable iteration.
245    ///
246    /// Consumed by the [`elect`](super::replication_election::elect)
247    /// selection function from `replication_election.rs` to filter
248    /// the replica set down to the healthy subset.
249    pub fn healthy_peers(&self, now: Instant) -> Vec<NodeId> {
250        let threshold =
251            Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
252        self.peers
253            .iter()
254            .filter(|(_, state)| now.saturating_duration_since(state.last_seen) < threshold)
255            .map(|(id, _)| *id)
256            .collect()
257    }
258
259    /// Snapshot every peer's `(NodeId, tail_seq)` pair. Useful
260    /// for the leader-side lag metric: the leader's own tail
261    /// minus each replica's reported tail = that replica's
262    /// observable lag.
263    pub fn peer_tail_seqs(&self) -> Vec<(NodeId, u64)> {
264        self.peers
265            .iter()
266            .map(|(id, state)| (*id, state.tail_seq))
267            .collect()
268    }
269
270    /// Number of peers currently tracked.
271    pub fn peer_count(&self) -> usize {
272        self.peers.len()
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    fn t0() -> Instant {
281        Instant::now()
282    }
283
284    fn at(base: Instant, ms: u64) -> Instant {
285        base + Duration::from_millis(ms)
286    }
287
288    #[test]
289    fn new_tracker_has_no_peers_or_leader() {
290        let t = HeartbeatTracker::new(500);
291        assert_eq!(t.peer_count(), 0);
292        assert!(t.believed_leader().is_none());
293        assert!(!t.is_leader_silent(t0()));
294        assert_eq!(t.heartbeat_ms(), 500);
295        assert_eq!(t.miss_threshold(), DEFAULT_MISS_THRESHOLD);
296    }
297
298    #[test]
299    fn miss_threshold_zero_clamped_to_one() {
300        let t = HeartbeatTracker::with_miss_threshold(100, 0);
301        assert_eq!(t.miss_threshold(), 1);
302    }
303
304    #[test]
305    fn record_heartbeat_tracks_peer_state() {
306        let base = t0();
307        let mut t = HeartbeatTracker::new(500);
308        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
309
310        assert_eq!(t.peer_count(), 1);
311        assert_eq!(t.believed_leader(), Some(0x42));
312        let p = t.peer_state(0x42).unwrap();
313        assert_eq!(p.role, ReplicaRole::Leader);
314        assert_eq!(p.tail_seq, 100);
315        assert_eq!(p.last_seen, base);
316    }
317
318    #[test]
319    fn leader_tiebreak_prefers_higher_tail_then_lower_id() {
320        let base = t0();
321        let mut t = HeartbeatTracker::new(500);
322        // First Leader heartbeat establishes believed_leader.
323        t.record_heartbeat(0x43, ReplicaRole::Leader, 200, base);
324        assert_eq!(t.believed_leader(), Some(0x43));
325        // A second peer claims Leader with LOWER tail. Even though
326        // its node_id is lex-smaller, the higher-tail peer keeps
327        // believed-leader — matching the runtime's dual-leader
328        // convergence rule. Without alignment, a peer with stale
329        // tail could displace a leader holding fresher data.
330        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, at(base, 100));
331        assert_eq!(
332            t.believed_leader(),
333            Some(0x43),
334            "higher-tail Leader should keep believed-leader against a lower-id claimant with lower tail",
335        );
336        // The same peer 0x42 now claims Leader with a STRICTLY
337        // HIGHER tail than the current believed leader — it
338        // displaces 0x43.
339        t.record_heartbeat(0x42, ReplicaRole::Leader, 300, at(base, 200));
340        assert_eq!(
341            t.believed_leader(),
342            Some(0x42),
343            "strictly higher tail wins the tiebreak",
344        );
345        // Tail tie: lex-smaller id wins. 0x41 with the same tail
346        // (300) as the current 0x42 displaces.
347        t.record_heartbeat(0x41, ReplicaRole::Leader, 300, at(base, 300));
348        assert_eq!(
349            t.believed_leader(),
350            Some(0x41),
351            "on a tail tie the lex-smaller id wins",
352        );
353    }
354
355    /// Regression: the heartbeat tiebreak and the runtime's
356    /// dual-leader convergence rule must agree. When they don't, a
357    /// local Leader can simultaneously (a) stay Leader because it
358    /// wins the runtime rule (`higher tail`) and (b) believe a peer
359    /// is the leader because the heartbeat rule picked the peer
360    /// (`lower id, sticky`). The local node's replica-side gates
361    /// then trust the peer's SyncResponses while it also keeps
362    /// emitting Leader heartbeats — a split-brain window.
363    ///
364    /// This test pins the alignment from the heartbeat side: a
365    /// peer that LOSES the runtime tiebreak (lower tail) must NOT
366    /// be recorded as the believed leader on the local node.
367    #[test]
368    fn heartbeat_tiebreak_aligns_with_runtime_convergence_rule() {
369        let base = t0();
370        let mut t = HeartbeatTracker::new(500);
371
372        // Local node is implicitly "Leader" (the heartbeat tracker
373        // tracks peers, not self). Simulate L1 (peer 0xAA, high
374        // tail) claiming Leader. We expect believed_leader == L1.
375        t.record_heartbeat(0xAA, ReplicaRole::Leader, 500, base);
376        assert_eq!(t.believed_leader(), Some(0xAA));
377
378        // Now L2 (peer 0x11, low id, LOWER tail) claims Leader. The
379        // runtime would say L1 wins (higher tail) and ask L2 to
380        // concede. The heartbeat tracker must agree — believed
381        // leader stays L1, NOT L2.
382        t.record_heartbeat(0x11, ReplicaRole::Leader, 100, at(base, 50));
383        assert_eq!(
384            t.believed_leader(),
385            Some(0xAA),
386            "lower-tail Leader claimant must NOT win the heartbeat tiebreak; \
387             pre-fix the lex-only rule made L2 win here and the local node \
388             ended up treating L2's SyncResponses as authoritative while \
389             still emitting Leader heartbeats itself — split brain",
390        );
391    }
392
393    #[test]
394    fn replica_role_heartbeat_does_not_change_believed_leader() {
395        let base = t0();
396        let mut t = HeartbeatTracker::new(500);
397        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
398        // Replica heartbeat from another peer — believed leader
399        // stays the original.
400        t.record_heartbeat(0x99, ReplicaRole::Replica, 95, at(base, 50));
401        assert_eq!(t.believed_leader(), Some(0x42));
402        // But the replica's state is recorded.
403        assert_eq!(t.peer_state(0x99).unwrap().role, ReplicaRole::Replica);
404    }
405
406    #[test]
407    fn leader_not_silent_within_window() {
408        let base = t0();
409        let mut t = HeartbeatTracker::new(500);
410        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
411        // 500 ms elapsed = 1 heartbeat window. Below 3 × 500 ms
412        // = silent? No.
413        assert!(!t.is_leader_silent(at(base, 500)));
414        // 1 ms before 3 × 500 ms still considered alive.
415        assert!(!t.is_leader_silent(at(base, 1499)));
416    }
417
418    #[test]
419    fn leader_silent_at_threshold() {
420        let base = t0();
421        let mut t = HeartbeatTracker::new(500);
422        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
423        // Exactly 3 × 500 ms = 1500 ms — silent.
424        assert!(t.is_leader_silent(at(base, 1500)));
425    }
426
427    #[test]
428    fn leader_silent_past_threshold() {
429        let base = t0();
430        let mut t = HeartbeatTracker::new(500);
431        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
432        assert!(t.is_leader_silent(at(base, 5000)));
433    }
434
435    #[test]
436    fn fresh_leader_heartbeat_resets_silence() {
437        let base = t0();
438        let mut t = HeartbeatTracker::new(500);
439        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
440        // Approach silence.
441        assert!(t.is_leader_silent(at(base, 1500)));
442        // A fresh heartbeat from the leader resets the window.
443        t.record_heartbeat(0x42, ReplicaRole::Leader, 105, at(base, 1500));
444        // 100ms after the fresh heartbeat — not silent.
445        assert!(!t.is_leader_silent(at(base, 1600)));
446    }
447
448    #[test]
449    fn dropped_believed_leader_treated_as_silent() {
450        let base = t0();
451        let mut t = HeartbeatTracker::new(500);
452        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
453        t.drop_peer(0x42);
454        // After drop_peer, believed_leader was the dropped peer,
455        // so it's cleared — fall back to "no believed leader =
456        // not silent."
457        assert!(!t.is_leader_silent(at(base, 100)));
458        assert!(t.believed_leader().is_none());
459    }
460
461    #[test]
462    fn clear_believed_leader_does_not_drop_peer_entry() {
463        let base = t0();
464        let mut t = HeartbeatTracker::new(500);
465        t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
466        t.clear_believed_leader();
467        assert!(t.believed_leader().is_none());
468        // The peer's state is still there — only the "believe
469        // them to be leader" cell was cleared.
470        assert!(t.peer_state(0x42).is_some());
471    }
472
473    #[test]
474    fn peer_lag_returns_elapsed() {
475        let base = t0();
476        let mut t = HeartbeatTracker::new(500);
477        t.record_heartbeat(0x42, ReplicaRole::Replica, 100, base);
478        let lag = t.peer_lag(0x42, at(base, 750)).unwrap();
479        assert_eq!(lag, Duration::from_millis(750));
480    }
481
482    #[test]
483    fn peer_lag_unknown_returns_none() {
484        let t = HeartbeatTracker::new(500);
485        assert!(t.peer_lag(0x42, t0()).is_none());
486    }
487
488    #[test]
489    fn healthy_peers_filters_stale_entries() {
490        let base = t0();
491        let mut t = HeartbeatTracker::new(500);
492        t.record_heartbeat(0x1, ReplicaRole::Leader, 100, base);
493        t.record_heartbeat(0x2, ReplicaRole::Replica, 100, at(base, 200));
494        t.record_heartbeat(0x3, ReplicaRole::Replica, 100, at(base, 400));
495        // At t=1500ms (3 × 500), peer 1's heartbeat (at 0ms) is
496        // stale; 2 (at 200ms) and 3 (at 400ms) are still fresh
497        // (just barely for peer 2: 1500-200=1300 < 1500).
498        let healthy = t.healthy_peers(at(base, 1500));
499        assert_eq!(healthy, vec![0x2, 0x3]);
500    }
501
502    #[test]
503    fn healthy_peers_sorted_by_node_id() {
504        let base = t0();
505        let mut t = HeartbeatTracker::new(500);
506        // Insert in reverse order; output should be ascending.
507        t.record_heartbeat(0x30, ReplicaRole::Replica, 0, base);
508        t.record_heartbeat(0x10, ReplicaRole::Replica, 0, base);
509        t.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
510        let healthy = t.healthy_peers(at(base, 100));
511        assert_eq!(healthy, vec![0x10, 0x20, 0x30]);
512    }
513
514    #[test]
515    fn peer_tail_seqs_snapshot() {
516        let base = t0();
517        let mut t = HeartbeatTracker::new(500);
518        t.record_heartbeat(0x10, ReplicaRole::Leader, 1000, base);
519        t.record_heartbeat(0x20, ReplicaRole::Replica, 950, base);
520        t.record_heartbeat(0x30, ReplicaRole::Replica, 980, base);
521        let mut tails = t.peer_tail_seqs();
522        tails.sort_by_key(|(id, _)| *id);
523        assert_eq!(tails, vec![(0x10, 1000), (0x20, 950), (0x30, 980)]);
524    }
525
526    #[test]
527    fn drop_peer_removes_and_decrements_count() {
528        let base = t0();
529        let mut t = HeartbeatTracker::new(500);
530        t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
531        t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
532        assert_eq!(t.peer_count(), 2);
533        t.drop_peer(0x1);
534        assert_eq!(t.peer_count(), 1);
535        assert!(t.peer_state(0x1).is_none());
536        assert!(t.peer_state(0x2).is_some());
537        // Believed leader cleared because it was the dropped peer.
538        assert!(t.believed_leader().is_none());
539    }
540
541    #[test]
542    fn drop_non_leader_peer_preserves_believed_leader() {
543        let base = t0();
544        let mut t = HeartbeatTracker::new(500);
545        t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
546        t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
547        t.drop_peer(0x2);
548        assert_eq!(t.believed_leader(), Some(0x1));
549    }
550
551    #[test]
552    fn miss_threshold_one_triggers_after_one_window() {
553        let base = t0();
554        let mut t = HeartbeatTracker::with_miss_threshold(500, 1);
555        t.record_heartbeat(0x42, ReplicaRole::Leader, 0, base);
556        assert!(!t.is_leader_silent(at(base, 499)));
557        assert!(t.is_leader_silent(at(base, 500)));
558    }
559
560    #[test]
561    fn no_believed_leader_never_silent_regardless_of_time() {
562        let base = t0();
563        let t = HeartbeatTracker::new(500);
564        // No heartbeats observed; not silent at any future time.
565        assert!(!t.is_leader_silent(at(base, 60_000)));
566    }
567}