net/adapter/net/redex/replication_heartbeat.rs
1//! Heartbeat tracking — `docs/plans/REDEX_DISTRIBUTED_PLAN.md` §6.
2//!
3//! Pure-logic component the [`ReplicationCoordinator`]'s eventual
4//! heartbeat loop drives. Tracks per-peer last-seen / role / tail
5//! observations from inbound [`SyncHeartbeat`] messages, exposes
6//! the "is the believed leader silent for ≥ 3 heartbeats?"
7//! predicate that triggers `transition_to(Candidate,
8//! MissedHeartbeats)`, and surfaces per-peer lag for the leader-
9//! side `dataforts_replication_lag_seconds{role=replica}` metric.
10//!
11//! Time is passed in by the caller (not from a system clock) so
12//! tests can advance time deterministically without `tokio::time`
13//! plumbing. The eventual tokio interval-driven loop calls
14//! [`HeartbeatTracker::tick`] with `Instant::now()` each tick.
15//!
16//! The state machine in `replication_state.rs` enforces "which
17//! signal drives which transition" — this module is purely the
18//! signal generator: when the leader has been silent long enough,
19//! the coordinator's tick reads
20//! [`HeartbeatTracker::is_leader_silent`] and routes through
21//! `transition_to(Candidate, MissedHeartbeats)`.
22
23use std::collections::BTreeMap;
24use std::time::{Duration, Instant};
25
26use super::replication::ReplicaRole;
27use crate::adapter::net::behavior::placement::NodeId;
28
29/// Default consecutive-miss threshold per plan §6: "3 missed
30/// heartbeats prevents election thrash under transient packet
31/// loss."
32pub const DEFAULT_MISS_THRESHOLD: u8 = 3;
33
34/// Per-peer state cell. Captures the most recent
35/// [`SyncHeartbeat`](super::replication::SyncHeartbeat) observation.
36/// Public field shape so consumers can build leader-side lag
37/// metrics directly.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub struct PeerState {
40 /// Most recent inbound heartbeat timestamp from this peer.
41 pub last_seen: Instant,
42 /// Role the peer claimed in its most recent heartbeat.
43 pub role: ReplicaRole,
44 /// `tail_seq` the peer claimed in its most recent heartbeat.
45 /// Leader-side: lag-from-this-replica = our_tail - peer_tail.
46 /// Replica-side: lag-from-leader = leader_tail - our_tail (the
47 /// inverse).
48 pub tail_seq: u64,
49}
50
51/// Tracker over inbound heartbeats. The coordinator's eventual
52/// heartbeat loop drives a single one of these per replicated
53/// channel.
54///
55/// Not Send + Sync by default — the coordinator wraps the
56/// tracker in a `parking_lot::Mutex` so its tokio task can
57/// take exclusive access during a tick. Single-threaded by
58/// design; the criticism that a `RwLock<HashMap>` allows
59/// concurrent reads is irrelevant here — the heartbeat loop
60/// is the sole reader / writer.
61pub struct HeartbeatTracker {
62 /// Configured heartbeat cadence in milliseconds. Used as the
63 /// unit of "miss" computation: silence ≥ miss_threshold ×
64 /// heartbeat_ms triggers a Candidate transition.
65 heartbeat_ms: u64,
66 /// Consecutive-miss threshold. Default
67 /// [`DEFAULT_MISS_THRESHOLD`].
68 miss_threshold: u8,
69 /// Per-peer most-recent heartbeat observation.
70 peers: BTreeMap<NodeId, PeerState>,
71 /// The peer this tracker believes is the current leader, if
72 /// any. Set by the most recent heartbeat with `role ==
73 /// Leader`; cleared by [`Self::clear_believed_leader`] (the
74 /// coordinator clears it on `Replica → Candidate` so the next
75 /// election cycle starts clean).
76 believed_leader: Option<NodeId>,
77}
78
79impl HeartbeatTracker {
80 /// Construct a tracker for a channel configured with
81 /// `heartbeat_ms` cadence. Uses
82 /// [`DEFAULT_MISS_THRESHOLD`] = 3.
83 pub fn new(heartbeat_ms: u64) -> Self {
84 Self::with_miss_threshold(heartbeat_ms, DEFAULT_MISS_THRESHOLD)
85 }
86
87 /// Explicit-threshold constructor — pin the miss count for
88 /// tighter-SLA workloads or DST scenarios. Threshold of `0`
89 /// is clamped to `1` so a heartbeat tracker is never in a
90 /// "permanently silent" state (with `miss_threshold = 0`,
91 /// even a fresh heartbeat would be "miss enough" to trigger).
92 pub fn with_miss_threshold(heartbeat_ms: u64, miss_threshold: u8) -> Self {
93 Self {
94 heartbeat_ms,
95 miss_threshold: miss_threshold.max(1),
96 peers: BTreeMap::new(),
97 believed_leader: None,
98 }
99 }
100
101 /// Configured heartbeat cadence.
102 pub fn heartbeat_ms(&self) -> u64 {
103 self.heartbeat_ms
104 }
105
106 /// Configured miss threshold.
107 pub fn miss_threshold(&self) -> u8 {
108 self.miss_threshold
109 }
110
111 /// Record an inbound heartbeat from `peer`. Updates the
112 /// peer's `last_seen` / `role` / `tail_seq` and — if `role ==
113 /// Leader` — promotes `peer` to the believed leader (even if
114 /// a different peer was previously believed-leader; the most
115 /// recent `Leader`-roled heartbeat wins).
116 pub fn record_heartbeat(
117 &mut self,
118 peer: NodeId,
119 role: ReplicaRole,
120 tail_seq: u64,
121 now: Instant,
122 ) {
123 self.peers.insert(
124 peer,
125 PeerState {
126 last_seen: now,
127 role,
128 tail_seq,
129 },
130 );
131 if role == ReplicaRole::Leader {
132 // Tiebreak must match the dual-leader convergence rule in
133 // `replication_runtime.rs::on_inbound`: a Leader claim
134 // beats the current believed leader when its `(tail_seq,
135 // -node_id)` is strictly larger — i.e. higher tail wins,
136 // and on a tail tie the numerically smaller `node_id`
137 // wins.
138 //
139 // The two sites used to disagree. Runtime used
140 // `(higher tail, lower id)`; heartbeat used `lower id
141 // only, sticky`. A local Leader L1 (high tail, high id)
142 // and a peer Leader L2 (low tail, low id) heartbeating
143 // each other would:
144 // - L1 stays Leader (runtime tiebreak: higher tail wins),
145 // - L1 records L2 as believed_leader (heartbeat tiebreak:
146 // lower id wins).
147 // L1's replica-side gates (`leader_belief != Some(from)`)
148 // then trusted L2's SyncResponses while L1 itself kept
149 // emitting Leader heartbeats. Aligning the rules closes
150 // that split-brain window.
151 //
152 // Stickiness is preserved in the form "current wins ties
153 // below the strict-beat threshold," so two peers with
154 // identical `(tail, id)` claims don't flap. That isn't
155 // weaker than the prior lex-only sticky variant — a
156 // higher-id claimant only displaces when it brings a
157 // strictly newer tail, exactly the condition under which
158 // we want the replica to follow the more-current peer.
159 match self.believed_leader {
160 None => self.believed_leader = Some(peer),
161 Some(existing) if existing == peer => {
162 // Re-affirmation of the same leader — no change.
163 }
164 Some(existing) => {
165 let existing_tail = self.peers.get(&existing).map(|p| p.tail_seq).unwrap_or(0);
166 let peer_beats =
167 tail_seq > existing_tail || (tail_seq == existing_tail && peer < existing);
168 if peer_beats {
169 self.believed_leader = Some(peer);
170 }
171 }
172 }
173 }
174 }
175
176 /// True iff the believed leader has been silent past the
177 /// miss-threshold window — i.e. `(now - leader.last_seen) >=
178 /// miss_threshold × heartbeat_ms`.
179 ///
180 /// Returns `false` when:
181 /// - No believed leader is known (a fresh tracker, or just
182 /// after [`Self::clear_believed_leader`]).
183 /// - The believed leader's last heartbeat is fresh enough.
184 ///
185 /// Caller drives this on every coordinator tick.
186 pub fn is_leader_silent(&self, now: Instant) -> bool {
187 let Some(leader_id) = self.believed_leader else {
188 return false;
189 };
190 let Some(leader) = self.peers.get(&leader_id) else {
191 // Believed leader was set but the peer entry was
192 // dropped (e.g. via `drop_peer`). Treat as silent so
193 // the coordinator runs an election from a clean
194 // slate.
195 return true;
196 };
197 let threshold =
198 Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
199 now.saturating_duration_since(leader.last_seen) >= threshold
200 }
201
202 /// Current believed leader. `None` if no heartbeat with
203 /// `role == Leader` has been observed (or
204 /// [`Self::clear_believed_leader`] was called).
205 pub fn believed_leader(&self) -> Option<NodeId> {
206 self.believed_leader
207 }
208
209 /// Clear the believed-leader cell. The coordinator calls
210 /// this on every `Replica → Candidate` transition so the
211 /// next election cycle starts clean; a stale believed leader
212 /// would let [`Self::is_leader_silent`] return false even
213 /// after the local node decided to run an election.
214 pub fn clear_believed_leader(&mut self) {
215 self.believed_leader = None;
216 }
217
218 /// Drop a peer from the tracker — disconnect / withdraw /
219 /// channel close. If the dropped peer was the believed
220 /// leader, clears that too so the coordinator's next tick
221 /// can re-observe leadership cleanly.
222 pub fn drop_peer(&mut self, peer: NodeId) {
223 self.peers.remove(&peer);
224 if self.believed_leader == Some(peer) {
225 self.believed_leader = None;
226 }
227 }
228
229 /// Read this peer's most recent observation, if any.
230 pub fn peer_state(&self, peer: NodeId) -> Option<PeerState> {
231 self.peers.get(&peer).copied()
232 }
233
234 /// Lag = `now - peer.last_seen` for the given peer.
235 /// `None` if the peer is unknown.
236 pub fn peer_lag(&self, peer: NodeId, now: Instant) -> Option<Duration> {
237 self.peers
238 .get(&peer)
239 .map(|p| now.saturating_duration_since(p.last_seen))
240 }
241
242 /// Set of peers considered alive in the local view —
243 /// last-seen within the miss-threshold window. Sorted by
244 /// `NodeId` for stable iteration.
245 ///
246 /// Consumed by the [`elect`](super::replication_election::elect)
247 /// selection function from `replication_election.rs` to filter
248 /// the replica set down to the healthy subset.
249 pub fn healthy_peers(&self, now: Instant) -> Vec<NodeId> {
250 let threshold =
251 Duration::from_millis(self.heartbeat_ms.saturating_mul(self.miss_threshold as u64));
252 self.peers
253 .iter()
254 .filter(|(_, state)| now.saturating_duration_since(state.last_seen) < threshold)
255 .map(|(id, _)| *id)
256 .collect()
257 }
258
259 /// Snapshot every peer's `(NodeId, tail_seq)` pair. Useful
260 /// for the leader-side lag metric: the leader's own tail
261 /// minus each replica's reported tail = that replica's
262 /// observable lag.
263 pub fn peer_tail_seqs(&self) -> Vec<(NodeId, u64)> {
264 self.peers
265 .iter()
266 .map(|(id, state)| (*id, state.tail_seq))
267 .collect()
268 }
269
270 /// Number of peers currently tracked.
271 pub fn peer_count(&self) -> usize {
272 self.peers.len()
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 fn t0() -> Instant {
281 Instant::now()
282 }
283
284 fn at(base: Instant, ms: u64) -> Instant {
285 base + Duration::from_millis(ms)
286 }
287
288 #[test]
289 fn new_tracker_has_no_peers_or_leader() {
290 let t = HeartbeatTracker::new(500);
291 assert_eq!(t.peer_count(), 0);
292 assert!(t.believed_leader().is_none());
293 assert!(!t.is_leader_silent(t0()));
294 assert_eq!(t.heartbeat_ms(), 500);
295 assert_eq!(t.miss_threshold(), DEFAULT_MISS_THRESHOLD);
296 }
297
298 #[test]
299 fn miss_threshold_zero_clamped_to_one() {
300 let t = HeartbeatTracker::with_miss_threshold(100, 0);
301 assert_eq!(t.miss_threshold(), 1);
302 }
303
304 #[test]
305 fn record_heartbeat_tracks_peer_state() {
306 let base = t0();
307 let mut t = HeartbeatTracker::new(500);
308 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
309
310 assert_eq!(t.peer_count(), 1);
311 assert_eq!(t.believed_leader(), Some(0x42));
312 let p = t.peer_state(0x42).unwrap();
313 assert_eq!(p.role, ReplicaRole::Leader);
314 assert_eq!(p.tail_seq, 100);
315 assert_eq!(p.last_seen, base);
316 }
317
318 #[test]
319 fn leader_tiebreak_prefers_higher_tail_then_lower_id() {
320 let base = t0();
321 let mut t = HeartbeatTracker::new(500);
322 // First Leader heartbeat establishes believed_leader.
323 t.record_heartbeat(0x43, ReplicaRole::Leader, 200, base);
324 assert_eq!(t.believed_leader(), Some(0x43));
325 // A second peer claims Leader with LOWER tail. Even though
326 // its node_id is lex-smaller, the higher-tail peer keeps
327 // believed-leader — matching the runtime's dual-leader
328 // convergence rule. Without alignment, a peer with stale
329 // tail could displace a leader holding fresher data.
330 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, at(base, 100));
331 assert_eq!(
332 t.believed_leader(),
333 Some(0x43),
334 "higher-tail Leader should keep believed-leader against a lower-id claimant with lower tail",
335 );
336 // The same peer 0x42 now claims Leader with a STRICTLY
337 // HIGHER tail than the current believed leader — it
338 // displaces 0x43.
339 t.record_heartbeat(0x42, ReplicaRole::Leader, 300, at(base, 200));
340 assert_eq!(
341 t.believed_leader(),
342 Some(0x42),
343 "strictly higher tail wins the tiebreak",
344 );
345 // Tail tie: lex-smaller id wins. 0x41 with the same tail
346 // (300) as the current 0x42 displaces.
347 t.record_heartbeat(0x41, ReplicaRole::Leader, 300, at(base, 300));
348 assert_eq!(
349 t.believed_leader(),
350 Some(0x41),
351 "on a tail tie the lex-smaller id wins",
352 );
353 }
354
355 /// Regression: the heartbeat tiebreak and the runtime's
356 /// dual-leader convergence rule must agree. When they don't, a
357 /// local Leader can simultaneously (a) stay Leader because it
358 /// wins the runtime rule (`higher tail`) and (b) believe a peer
359 /// is the leader because the heartbeat rule picked the peer
360 /// (`lower id, sticky`). The local node's replica-side gates
361 /// then trust the peer's SyncResponses while it also keeps
362 /// emitting Leader heartbeats — a split-brain window.
363 ///
364 /// This test pins the alignment from the heartbeat side: a
365 /// peer that LOSES the runtime tiebreak (lower tail) must NOT
366 /// be recorded as the believed leader on the local node.
367 #[test]
368 fn heartbeat_tiebreak_aligns_with_runtime_convergence_rule() {
369 let base = t0();
370 let mut t = HeartbeatTracker::new(500);
371
372 // Local node is implicitly "Leader" (the heartbeat tracker
373 // tracks peers, not self). Simulate L1 (peer 0xAA, high
374 // tail) claiming Leader. We expect believed_leader == L1.
375 t.record_heartbeat(0xAA, ReplicaRole::Leader, 500, base);
376 assert_eq!(t.believed_leader(), Some(0xAA));
377
378 // Now L2 (peer 0x11, low id, LOWER tail) claims Leader. The
379 // runtime would say L1 wins (higher tail) and ask L2 to
380 // concede. The heartbeat tracker must agree — believed
381 // leader stays L1, NOT L2.
382 t.record_heartbeat(0x11, ReplicaRole::Leader, 100, at(base, 50));
383 assert_eq!(
384 t.believed_leader(),
385 Some(0xAA),
386 "lower-tail Leader claimant must NOT win the heartbeat tiebreak; \
387 pre-fix the lex-only rule made L2 win here and the local node \
388 ended up treating L2's SyncResponses as authoritative while \
389 still emitting Leader heartbeats itself — split brain",
390 );
391 }
392
393 #[test]
394 fn replica_role_heartbeat_does_not_change_believed_leader() {
395 let base = t0();
396 let mut t = HeartbeatTracker::new(500);
397 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
398 // Replica heartbeat from another peer — believed leader
399 // stays the original.
400 t.record_heartbeat(0x99, ReplicaRole::Replica, 95, at(base, 50));
401 assert_eq!(t.believed_leader(), Some(0x42));
402 // But the replica's state is recorded.
403 assert_eq!(t.peer_state(0x99).unwrap().role, ReplicaRole::Replica);
404 }
405
406 #[test]
407 fn leader_not_silent_within_window() {
408 let base = t0();
409 let mut t = HeartbeatTracker::new(500);
410 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
411 // 500 ms elapsed = 1 heartbeat window. Below 3 × 500 ms
412 // = silent? No.
413 assert!(!t.is_leader_silent(at(base, 500)));
414 // 1 ms before 3 × 500 ms still considered alive.
415 assert!(!t.is_leader_silent(at(base, 1499)));
416 }
417
418 #[test]
419 fn leader_silent_at_threshold() {
420 let base = t0();
421 let mut t = HeartbeatTracker::new(500);
422 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
423 // Exactly 3 × 500 ms = 1500 ms — silent.
424 assert!(t.is_leader_silent(at(base, 1500)));
425 }
426
427 #[test]
428 fn leader_silent_past_threshold() {
429 let base = t0();
430 let mut t = HeartbeatTracker::new(500);
431 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
432 assert!(t.is_leader_silent(at(base, 5000)));
433 }
434
435 #[test]
436 fn fresh_leader_heartbeat_resets_silence() {
437 let base = t0();
438 let mut t = HeartbeatTracker::new(500);
439 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
440 // Approach silence.
441 assert!(t.is_leader_silent(at(base, 1500)));
442 // A fresh heartbeat from the leader resets the window.
443 t.record_heartbeat(0x42, ReplicaRole::Leader, 105, at(base, 1500));
444 // 100ms after the fresh heartbeat — not silent.
445 assert!(!t.is_leader_silent(at(base, 1600)));
446 }
447
448 #[test]
449 fn dropped_believed_leader_treated_as_silent() {
450 let base = t0();
451 let mut t = HeartbeatTracker::new(500);
452 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
453 t.drop_peer(0x42);
454 // After drop_peer, believed_leader was the dropped peer,
455 // so it's cleared — fall back to "no believed leader =
456 // not silent."
457 assert!(!t.is_leader_silent(at(base, 100)));
458 assert!(t.believed_leader().is_none());
459 }
460
461 #[test]
462 fn clear_believed_leader_does_not_drop_peer_entry() {
463 let base = t0();
464 let mut t = HeartbeatTracker::new(500);
465 t.record_heartbeat(0x42, ReplicaRole::Leader, 100, base);
466 t.clear_believed_leader();
467 assert!(t.believed_leader().is_none());
468 // The peer's state is still there — only the "believe
469 // them to be leader" cell was cleared.
470 assert!(t.peer_state(0x42).is_some());
471 }
472
473 #[test]
474 fn peer_lag_returns_elapsed() {
475 let base = t0();
476 let mut t = HeartbeatTracker::new(500);
477 t.record_heartbeat(0x42, ReplicaRole::Replica, 100, base);
478 let lag = t.peer_lag(0x42, at(base, 750)).unwrap();
479 assert_eq!(lag, Duration::from_millis(750));
480 }
481
482 #[test]
483 fn peer_lag_unknown_returns_none() {
484 let t = HeartbeatTracker::new(500);
485 assert!(t.peer_lag(0x42, t0()).is_none());
486 }
487
488 #[test]
489 fn healthy_peers_filters_stale_entries() {
490 let base = t0();
491 let mut t = HeartbeatTracker::new(500);
492 t.record_heartbeat(0x1, ReplicaRole::Leader, 100, base);
493 t.record_heartbeat(0x2, ReplicaRole::Replica, 100, at(base, 200));
494 t.record_heartbeat(0x3, ReplicaRole::Replica, 100, at(base, 400));
495 // At t=1500ms (3 × 500), peer 1's heartbeat (at 0ms) is
496 // stale; 2 (at 200ms) and 3 (at 400ms) are still fresh
497 // (just barely for peer 2: 1500-200=1300 < 1500).
498 let healthy = t.healthy_peers(at(base, 1500));
499 assert_eq!(healthy, vec![0x2, 0x3]);
500 }
501
502 #[test]
503 fn healthy_peers_sorted_by_node_id() {
504 let base = t0();
505 let mut t = HeartbeatTracker::new(500);
506 // Insert in reverse order; output should be ascending.
507 t.record_heartbeat(0x30, ReplicaRole::Replica, 0, base);
508 t.record_heartbeat(0x10, ReplicaRole::Replica, 0, base);
509 t.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
510 let healthy = t.healthy_peers(at(base, 100));
511 assert_eq!(healthy, vec![0x10, 0x20, 0x30]);
512 }
513
514 #[test]
515 fn peer_tail_seqs_snapshot() {
516 let base = t0();
517 let mut t = HeartbeatTracker::new(500);
518 t.record_heartbeat(0x10, ReplicaRole::Leader, 1000, base);
519 t.record_heartbeat(0x20, ReplicaRole::Replica, 950, base);
520 t.record_heartbeat(0x30, ReplicaRole::Replica, 980, base);
521 let mut tails = t.peer_tail_seqs();
522 tails.sort_by_key(|(id, _)| *id);
523 assert_eq!(tails, vec![(0x10, 1000), (0x20, 950), (0x30, 980)]);
524 }
525
526 #[test]
527 fn drop_peer_removes_and_decrements_count() {
528 let base = t0();
529 let mut t = HeartbeatTracker::new(500);
530 t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
531 t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
532 assert_eq!(t.peer_count(), 2);
533 t.drop_peer(0x1);
534 assert_eq!(t.peer_count(), 1);
535 assert!(t.peer_state(0x1).is_none());
536 assert!(t.peer_state(0x2).is_some());
537 // Believed leader cleared because it was the dropped peer.
538 assert!(t.believed_leader().is_none());
539 }
540
541 #[test]
542 fn drop_non_leader_peer_preserves_believed_leader() {
543 let base = t0();
544 let mut t = HeartbeatTracker::new(500);
545 t.record_heartbeat(0x1, ReplicaRole::Leader, 0, base);
546 t.record_heartbeat(0x2, ReplicaRole::Replica, 0, base);
547 t.drop_peer(0x2);
548 assert_eq!(t.believed_leader(), Some(0x1));
549 }
550
551 #[test]
552 fn miss_threshold_one_triggers_after_one_window() {
553 let base = t0();
554 let mut t = HeartbeatTracker::with_miss_threshold(500, 1);
555 t.record_heartbeat(0x42, ReplicaRole::Leader, 0, base);
556 assert!(!t.is_leader_silent(at(base, 499)));
557 assert!(t.is_leader_silent(at(base, 500)));
558 }
559
560 #[test]
561 fn no_believed_leader_never_silent_regardless_of_time() {
562 let base = t0();
563 let t = HeartbeatTracker::new(500);
564 // No heartbeats observed; not silent at any future time.
565 assert!(!t.is_leader_silent(at(base, 60_000)));
566 }
567}