net/adapter/net/redex/replication_runtime.rs
1//! Tokio-driven replication runtime — ties together the pure
2//! pieces ([`ReplicationCoordinator`], [`HeartbeatTracker`],
3//! [`BandwidthBudget`], [`tick`], [`handle_sync_request`],
4//! [`apply_sync_response`]) behind a single async task per
5//! replicated channel.
6//!
7//! Slot: the layer between `Redex::open_file` (which spawns one
8//! runtime per replicated channel) and the substrate mesh
9//! (which routes inbound `SUBPROTOCOL_REDEX` payloads here).
10//!
11//! Architecture:
12//!
13//! ```text
14//! Mesh dispatch ──Inbound::*──▶ ReplicationRuntime task
15//! │
16//! ┌──────────────┴──────────────┐
17//! │ │
18//! HeartbeatTick InboundEvent
19//! (every heartbeat_ms) (peer message)
20//! │ │
21//! ▼ ▼
22//! replication_step::tick update tracker / file
23//! │ │
24//! ▼ ▼
25//! outbound dispatch maybe issue sync_request
26//! │ │
27//! └──────────────┬───────────────┘
28//! ▼
29//! ReplicationDispatcher
30//! (mesh.send_subprotocol)
31//! ```
32//!
33//! The dispatcher trait abstracts the mesh-side wire send so the
34//! runtime is unit-testable with a recorder mock. Production-side
35//! wiring (mesh.rs routing `SUBPROTOCOL_REDEX` payloads to the
36//! right runtime's inbox + the `MeshNode` impl of
37//! `ReplicationDispatcher`) lands in a separate slice — this
38//! commit covers the runtime task itself + the trait.
39
40use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
41use std::sync::Arc;
42use std::time::Instant;
43
44use parking_lot::Mutex;
45use tokio::sync::mpsc;
46use tokio::task::JoinHandle;
47
48use super::file::RedexFile;
49use super::replication::{
50 ChannelId, ReplicaRole, SyncHeartbeat, SyncNack, SyncRequest, SyncResponse,
51};
52use super::replication_budget::BandwidthBudget;
53use super::replication_catchup::{apply_sync_response, handle_sync_request, SyncRequestOutcome};
54use super::replication_coordinator::{ChannelIdentity, CoordinatorError, ReplicationCoordinator};
55use super::replication_heartbeat::HeartbeatTracker;
56use super::replication_step::{
57 election_outcome, tick, OutboundMessage, TickInputs, SYNC_REQUEST_CHUNK_MAX_DEFAULT,
58};
59use crate::adapter::net::behavior::placement::NodeId;
60use crate::error::AdapterError;
61use std::time::Duration;
62
63/// Outbound wire-message sink the runtime uses to ship messages
64/// through the substrate. Production sink routes through
65/// [`crate::adapter::net::MeshNode`]'s `SUBPROTOCOL_REDEX`
66/// dispatch; unit tests use a recorder mock.
67/// Bandwidth-budget accounting note that applies to every send
68/// method below: `Ok(())` means **queued to the transport**, not
69/// **delivered to the peer**. Implementations whose underlying
70/// transport buffers without ack (UDP, lossy QUIC streams under
71/// link failure, etc.) MUST surface delivery-loss through their
72/// own back-channel (heartbeat lag, peer-reported tail seq,
73/// out-of-band ack) — the replication runtime's bandwidth budget
74/// refund path keys on the synchronous `Err` return only. A
75/// silently-dropped frame still drains the budget; the
76/// flaky-link case is dampened by R-28 catchup-backoff once
77/// empty responses accrue past the threshold, but the budget
78/// itself cannot self-correct on transport-internal loss
79/// without an end-to-end ack the trait deliberately does not
80/// demand (the cost would be a per-response ack-RTT that the
81/// underlying QUIC reliable streams already provide for in-spec
82/// transports).
83///
84/// In short: trait callers MAY treat `Ok(())` as "the bytes
85/// reached the wire layer." If your transport doesn't guarantee
86/// delivery on `Ok(())`, document that on your impl and
87/// understand that the budget over-counts under loss.
88#[async_trait::async_trait]
89pub trait ReplicationDispatcher: Send + Sync {
90 /// Send a [`SyncHeartbeat`] to `target`. See trait-level note
91 /// on `Ok(())` semantics.
92 async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError>;
93 /// Send a [`SyncRequest`] to `target` (typically a leader).
94 /// See trait-level note on `Ok(())` semantics.
95 async fn send_sync_request(&self, target: NodeId, msg: SyncRequest)
96 -> Result<(), AdapterError>;
97 /// Send a [`SyncResponse`] to `target` (typically a replica
98 /// catching up). See trait-level note on `Ok(())` semantics.
99 async fn send_sync_response(
100 &self,
101 target: NodeId,
102 msg: SyncResponse,
103 ) -> Result<(), AdapterError>;
104 /// Send a [`SyncNack`] to `target`. See trait-level note on
105 /// `Ok(())` semantics.
106 async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError>;
107}
108
109/// RTT-lookup function the election uses. Production routes
110/// through `ProximityGraph::nearest_rtt(|n| n.node_id ==
111/// graph_id_of(node))`; unit tests pass a static closure.
112pub type RttLookup = Arc<dyn Fn(NodeId) -> Option<Duration> + Send + Sync>;
113
114/// Sync, non-blocking router the mesh's inbound dispatch hot path
115/// calls when a `SUBPROTOCOL_REDEX` payload arrives. Owns the
116/// per-channel registry of [`ReplicationRuntimeHandle`]s and
117/// routes the decoded [`Inbound`] event to the right one.
118///
119/// Returns `Err(Inbound)` (the event, returned) when:
120/// - No runtime is registered for `channel_id` (channel not opened
121/// on this node, or the runtime was canceled and not yet
122/// unregistered).
123/// - The runtime's inbox is full (per-channel backlog at
124/// [`RUNTIME_INBOX_CAPACITY`]).
125///
126/// In both cases the caller (mesh dispatch loop) drops + logs;
127/// the wire layer's reliable-stream may retransmit, or the
128/// peer's heartbeat cycle will recover state without it.
129pub trait ReplicationInboundRouter: Send + Sync {
130 /// Try to route an inbound event to its channel's runtime.
131 /// Sync + non-blocking — must not call into async code, must
132 /// not hold locks across awaits (the mesh dispatch loop is
133 /// the sole caller and runs in a synchronous critical
134 /// section).
135 fn try_route(&self, channel_id: ChannelId, inbound: Inbound) -> Result<(), Inbound>;
136}
137
138#[async_trait::async_trait]
139impl ReplicationDispatcher for crate::adapter::net::MeshNode {
140 async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError> {
141 send_redex_payload(self, target, msg.to_bytes()).await
142 }
143
144 async fn send_sync_request(
145 &self,
146 target: NodeId,
147 msg: SyncRequest,
148 ) -> Result<(), AdapterError> {
149 send_redex_payload(self, target, msg.to_bytes()).await
150 }
151
152 async fn send_sync_response(
153 &self,
154 target: NodeId,
155 msg: SyncResponse,
156 ) -> Result<(), AdapterError> {
157 send_redex_payload(self, target, msg.to_bytes()).await
158 }
159
160 async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
161 send_redex_payload(self, target, msg.to_bytes()).await
162 }
163}
164
165/// Resolve a `NodeId` to its peer `SocketAddr` and ship `payload`
166/// via `MeshNode::send_subprotocol` with `SUBPROTOCOL_REDEX`. The
167/// `payload` already carries the 3-byte subprotocol header per
168/// plan §2; the substrate's Net header carries `subprotocol_id`
169/// independently for routing — the redundancy is plan-mandated
170/// (the application-layer header is part of the wire contract,
171/// distinct from the transport-layer Net header).
172async fn send_redex_payload(
173 mesh: &crate::adapter::net::MeshNode,
174 target: NodeId,
175 payload: Vec<u8>,
176) -> Result<(), AdapterError> {
177 let peer_addr = mesh.peer_addr(target).ok_or_else(|| {
178 AdapterError::Connection(format!("replication: peer {target:#x} unknown"))
179 })?;
180 mesh.send_subprotocol(peer_addr, super::replication::SUBPROTOCOL_REDEX, &payload)
181 .await
182}
183
184/// Inbound event the runtime processes. The mesh-side dispatcher
185/// pushes one of these into the runtime's inbox per inbound wire
186/// frame; the runtime's task drains the receiver on every wakeup.
187#[derive(Debug, Clone)]
188pub enum Inbound {
189 /// Peer's heartbeat — record into the tracker.
190 Heartbeat {
191 /// Originating node id.
192 from: NodeId,
193 /// Wire-format heartbeat payload.
194 msg: SyncHeartbeat,
195 },
196 /// Peer (replica) asked us (leader) for events. Run
197 /// `handle_sync_request` against the local file + dispatch
198 /// the response or nack.
199 SyncRequest {
200 /// Originating replica.
201 from: NodeId,
202 /// Wire-format request.
203 msg: SyncRequest,
204 },
205 /// Peer (leader) shipped us a chunk. Apply via
206 /// `apply_sync_response`; on success advance our tail.
207 SyncResponse {
208 /// Originating leader.
209 from: NodeId,
210 /// Wire-format response.
211 msg: SyncResponse,
212 },
213 /// Peer (leader) rejected our request with a typed error.
214 SyncNack {
215 /// Originating leader.
216 from: NodeId,
217 /// Wire-format nack.
218 msg: SyncNack,
219 },
220 /// Shutdown signal from `Redex::open_file` cleanup or from
221 /// channel-close. Drives `coordinator.transition_to(Idle,
222 /// ChannelClose)` and exits the task loop.
223 Shutdown,
224}
225
226/// Inputs the runtime task captures at spawn time. The
227/// `tail_provider` closure reads the current `RedexFile::next_seq()`
228/// — passed as a closure so the runtime doesn't take ownership of
229/// the file handle (the file's owner is `Redex`).
230pub struct RuntimeInputs {
231 /// Channel identity + origin_hash.
232 pub channel: ChannelIdentity,
233 /// 32-byte BLAKE2s channel id for the wire-format heartbeat.
234 pub channel_id: ChannelId,
235 /// This node's id.
236 pub self_node_id: NodeId,
237 /// Replica-set membership — every node currently registered
238 /// as a replica for the channel. Coordinator updates this
239 /// when the placement filter re-selects (Phase C / F).
240 pub replica_set: Vec<NodeId>,
241 /// Heartbeat cadence in milliseconds (mirrors
242 /// `ReplicationConfig::heartbeat_ms`). The tokio interval
243 /// drives at this cadence.
244 pub heartbeat_ms: u64,
245 /// Function returning the wall-clock milliseconds for the
246 /// outbound heartbeat's `wall_clock_ms` field. Operator-
247 /// facing drift detection only; abstracted so tests can
248 /// inject a deterministic value.
249 pub wall_clock_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
250 /// Function returning the current local `tail_seq`. Called
251 /// each tick before emission so heartbeats carry the freshest
252 /// value.
253 pub tail_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
254 /// RTT lookup for the election function.
255 pub rtt_lookup: RttLookup,
256 /// The local `RedexFile` for this channel. The runtime holds
257 /// a clone (RedexFile is `Clone` — Arc-backed) so it can
258 /// drive `handle_sync_request` on inbound `SyncRequest`
259 /// frames (leader path) and `apply_sync_response` on inbound
260 /// `SyncResponse` frames (replica path). The
261 /// `tail_provider` closure typically wraps `file.next_seq()`.
262 pub file: RedexFile,
263 /// Per-channel default [`super::bandwidth::BandwidthClass`] — stamped on every
264 /// `SyncRequest` the runtime emits. v0.3 Phase D2. Sourced
265 /// from
266 /// [`ReplicationConfig::default_bandwidth_class`](super::replication_config::ReplicationConfig).
267 pub default_bandwidth_class: super::bandwidth::BandwidthClass,
268 /// v0.3 Phase D2 admission-gate parameter: fraction of the
269 /// bandwidth bucket capacity reserved against `Background`.
270 /// Default 0.3 via
271 /// [`ReplicationConfig::background_fraction`](super::replication_config::ReplicationConfig).
272 pub background_fraction: f32,
273}
274
275/// Handle the spawned task produces. Holds the inbox sender so
276/// the mesh dispatcher (and the lifecycle code) can push
277/// [`Inbound`] events. `cancel()` sends `Shutdown` and awaits the
278/// task to exit cleanly. The owned [`ReplicationCoordinator`] is
279/// exposed via [`Self::coordinator`] so operators (and tests) can
280/// observe the role, drive `transition_to`, and read the channel
281/// metrics without going through the inbox.
282pub struct ReplicationRuntimeHandle {
283 /// Low-priority inbox: Heartbeat + SyncRequest. A peer flood
284 /// fills this lane first; under saturation the high-priority
285 /// lane keeps draining so Shutdown, SyncResponse, and SyncNack
286 /// still make forward progress.
287 inbox: mpsc::Sender<Inbound>,
288 /// High-priority inbox: Shutdown + SyncResponse + SyncNack.
289 /// Catchup-critical events ride this lane so a Heartbeat
290 /// flood from many peers (50 peers × 100 ms = 500 evt/s plus a
291 /// momentary slow `await` in `on_inbound` can saturate the
292 /// single lane to 1024 in two seconds) doesn't strand the
293 /// leader's response to the local replica or block graceful
294 /// shutdown.
295 priority_inbox: mpsc::Sender<Inbound>,
296 task: Mutex<Option<JoinHandle<()>>>,
297 coordinator: Arc<ReplicationCoordinator>,
298 /// R-11: explicit "task has joined" flag. `is_stopped()`
299 /// consults this rather than the JoinHandle slot — two
300 /// concurrent `cancel()`s race on `task.lock().take()`, so
301 /// the slot-based view can return `true` *before* the
302 /// surviving caller's `.await` returns. The flag is flipped
303 /// only after the join completes.
304 stopped: AtomicBool,
305}
306
307#[inline]
308fn is_priority_event(event: &Inbound) -> bool {
309 matches!(
310 event,
311 Inbound::Shutdown | Inbound::SyncResponse { .. } | Inbound::SyncNack { .. }
312 )
313}
314
315impl ReplicationRuntimeHandle {
316 /// The per-channel coordinator. Same `Arc` the runtime task
317 /// uses; cloning is cheap. Operators read `coordinator.role()`
318 /// for the current state and `coordinator.metrics()` for the
319 /// per-channel atomic counters; tests can drive
320 /// `coordinator.transition_to(target, signal)` to put the
321 /// channel in a specific role.
322 pub fn coordinator(&self) -> &Arc<ReplicationCoordinator> {
323 &self.coordinator
324 }
325
326 /// Push an inbound event into the runtime's inbox. Errors
327 /// when the runtime has already exited (drained channel).
328 /// Routes catchup-critical events (Shutdown, SyncResponse,
329 /// SyncNack) to the priority lane so a Heartbeat flood on
330 /// the standard lane can't starve them.
331 pub async fn dispatch(&self, event: Inbound) -> Result<(), AdapterError> {
332 let sender = if is_priority_event(&event) {
333 &self.priority_inbox
334 } else {
335 &self.inbox
336 };
337 sender
338 .send(event)
339 .await
340 .map_err(|_| AdapterError::Transient("replication runtime task exited".to_string()))
341 }
342
343 /// Same as [`Self::dispatch`] but for use from non-async
344 /// contexts (the mesh dispatch loop's sync hot path).
345 /// Returns the event back on full-buffer rejection so the
346 /// caller can decide whether to drop, log, or block.
347 pub fn try_dispatch(&self, event: Inbound) -> Result<(), Inbound> {
348 let sender = if is_priority_event(&event) {
349 &self.priority_inbox
350 } else {
351 &self.inbox
352 };
353 sender.try_send(event).map_err(|e| e.into_inner())
354 }
355
356 /// Send `Shutdown` and await the task to exit. Idempotent —
357 /// subsequent calls are no-ops once the task has joined.
358 ///
359 /// Uses `try_send` first so a wedged task with a full inbox
360 /// can't hang the caller indefinitely. On `Full`, the
361 /// JoinHandle is aborted directly; the task exits without
362 /// running the graceful Idle transition but the channel is
363 /// still safely torn down.
364 pub async fn cancel(&self) {
365 let handle = self.task.lock().take();
366 if let Some(h) = handle {
367 // Shutdown rides the priority lane; that's the same
368 // lane the run loop drains first under the biased
369 // select, so even a saturated low-priority lane can't
370 // delay the graceful exit.
371 match self.priority_inbox.try_send(Inbound::Shutdown) {
372 Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => {
373 // Graceful path: the task observes Shutdown (or
374 // already exited). Await the join.
375 let _ = h.await;
376 }
377 Err(mpsc::error::TrySendError::Full(_)) => {
378 // Priority lane is itself saturated. Abort
379 // directly so cancel() can't block the caller.
380 h.abort();
381 let _ = h.await;
382 }
383 }
384 // Only the holder of the JoinHandle flips `stopped`,
385 // and only after `.await` returns. Pre-fix, a concurrent
386 // cancel() racer that lost the `take()` skipped the if-let
387 // block and then unconditionally stored `true` — before
388 // the winner's await had completed. `is_stopped()` then
389 // reported "joined" while the task was still running.
390 self.stopped.store(true, AtomicOrdering::Release);
391 }
392 }
393
394 /// Returns `true` if the runtime has stopped (task joined).
395 /// Useful for tests / observability.
396 ///
397 /// R-11: this consults an explicit flag flipped after
398 /// `cancel()`'s `.await` returns, not the JoinHandle slot.
399 /// Without the flag, two concurrent `cancel()` calls could
400 /// race so the loser observes `task.lock().take() == None`
401 /// and reports `is_stopped == true` before the winner has
402 /// finished joining.
403 pub fn is_stopped(&self) -> bool {
404 self.stopped.load(AtomicOrdering::Acquire)
405 }
406}
407
408/// Per-leader catchup state — tracks consecutive empty SyncResponses
409/// in the face of an advertised tail gap. A buggy or byzantine
410/// leader that advertises ever-increasing `tail_seq` but ships
411/// `Response{events: []}` would otherwise loop the replica at
412/// heartbeat cadence forever; the backoff suppresses outbound
413/// SyncRequests once the threshold is crossed, with exponential
414/// growth capped at [`CATCHUP_BACKOFF_CAP`]. A non-empty response
415/// resets the counter so a transient stall doesn't permanently
416/// pause catchup.
417#[derive(Debug, Default)]
418pub struct CatchupBackoff {
419 entries: std::collections::HashMap<NodeId, BackoffEntry>,
420}
421
422#[derive(Debug, Default, Clone, Copy)]
423struct BackoffEntry {
424 /// Consecutive `apply_sync_response` calls that returned the
425 /// same tail (no events applied) while the believed leader's
426 /// advertised `tail_seq` was still strictly greater than ours.
427 consecutive_empty: u32,
428 /// Wall-clock instant the backoff window ends. `None` while
429 /// the counter is below `CATCHUP_BACKOFF_THRESHOLD`.
430 backoff_until: Option<Instant>,
431}
432
433/// Strikes before backoff kicks in. The first 3 empty responses
434/// are absorbed without delay so a transient leader-retention edge
435/// doesn't trigger a backoff.
436pub const CATCHUP_BACKOFF_THRESHOLD: u32 = 3;
437
438/// First backoff window after the threshold is crossed.
439pub const CATCHUP_BACKOFF_INITIAL: Duration = Duration::from_secs(1);
440
441/// Upper bound on the exponential backoff. A wedged leader stays
442/// reachable for re-evaluation at least every cap.
443pub const CATCHUP_BACKOFF_CAP: Duration = Duration::from_secs(30);
444
445/// Per-leader in-flight SyncRequest registry. Each outbound
446/// SyncRequest mints a random `request_id` from `getrandom`; the
447/// id is inserted here keyed by `(leader_node_id, request_id)`
448/// before the wire send. Inbound SyncResponse / SyncNack must
449/// carry an id present in the set; otherwise it's a stale
450/// response from a request the replica already timed out (or a
451/// forged frame from any peer that happens to be the recorded
452/// leader). Entries auto-expire after [`REQUEST_TTL`] so the set
453/// can't grow without bound under leader silence.
454#[derive(Debug, Default)]
455pub struct OutstandingRequests {
456 entries: std::collections::HashMap<(NodeId, u64), Instant>,
457}
458
459/// TTL on entries in [`OutstandingRequests`]. Bounded by the
460/// catchup deadline so a one-tick-late response still lands.
461pub const REQUEST_TTL: Duration = Duration::from_secs(30);
462
463/// Soft cap on per-replica outstanding requests across all
464/// leaders. A degraded leader that never responds shouldn't
465/// let the set grow without bound; once the cap is hit, GC
466/// kicks in and the oldest entries are dropped.
467pub const REQUEST_REGISTRY_SOFT_CAP: usize = 256;
468
469impl OutstandingRequests {
470 /// Construct an empty registry.
471 pub fn new() -> Self {
472 Self {
473 entries: std::collections::HashMap::new(),
474 }
475 }
476
477 /// Record a freshly-minted `request_id` against `leader`.
478 /// Best-effort GC of expired entries runs at insert time so
479 /// the set stays bounded without a separate sweeper task.
480 pub fn record(&mut self, leader: NodeId, request_id: u64, now: Instant) {
481 if self.entries.len() >= REQUEST_REGISTRY_SOFT_CAP {
482 self.entries
483 .retain(|_, &mut inserted| now.saturating_duration_since(inserted) < REQUEST_TTL);
484 }
485 self.entries.insert((leader, request_id), now);
486 }
487
488 /// Take the entry for `(leader, request_id)` if present and
489 /// not yet expired. Returns `true` when an in-flight request
490 /// matched; the caller proceeds with the apply path. `false`
491 /// means the response is stale / forged / past TTL — drop.
492 pub fn take(&mut self, leader: NodeId, request_id: u64, now: Instant) -> bool {
493 match self.entries.remove(&(leader, request_id)) {
494 Some(inserted) => now.saturating_duration_since(inserted) < REQUEST_TTL,
495 None => false,
496 }
497 }
498
499 /// Drop every entry recorded against `leader`. Called when
500 /// the believed leader changes so a re-elected peer doesn't
501 /// inherit the prior leader's in-flight token set.
502 pub fn clear_leader(&mut self, leader: NodeId) {
503 self.entries.retain(|(l, _), _| *l != leader);
504 }
505}
506
507impl CatchupBackoff {
508 /// Construct an empty backoff tracker.
509 pub fn new() -> Self {
510 Self {
511 entries: std::collections::HashMap::new(),
512 }
513 }
514
515 /// Record an empty `SyncResponse` from `leader` (events vec was
516 /// empty or apply returned the same tail). Increments the
517 /// consecutive counter; once past `CATCHUP_BACKOFF_THRESHOLD`,
518 /// stamps `backoff_until = now + min(initial << k, cap)` where
519 /// `k = consecutive_empty - threshold - 1`.
520 pub fn record_empty(&mut self, leader: NodeId, now: Instant) {
521 let entry = self.entries.entry(leader).or_default();
522 entry.consecutive_empty = entry.consecutive_empty.saturating_add(1);
523 if entry.consecutive_empty > CATCHUP_BACKOFF_THRESHOLD {
524 let shift = entry
525 .consecutive_empty
526 .saturating_sub(CATCHUP_BACKOFF_THRESHOLD + 1)
527 .min(20);
528 let multiplier: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
529 let backoff = CATCHUP_BACKOFF_INITIAL
530 .saturating_mul(multiplier)
531 .min(CATCHUP_BACKOFF_CAP);
532 entry.backoff_until = Some(now + backoff);
533 }
534 }
535
536 /// Record a productive response (events applied, tail advanced)
537 /// from `leader`. Clears any backoff state so the next request
538 /// can fire immediately.
539 pub fn record_progress(&mut self, leader: NodeId) {
540 self.entries.remove(&leader);
541 }
542
543 /// True when `now` is strictly before the recorded
544 /// `backoff_until` for `leader`. Out-of-backoff leaders (no
545 /// entry, or entry below threshold) always return `false`.
546 pub fn is_in_backoff(&self, leader: NodeId, now: Instant) -> bool {
547 self.entries
548 .get(&leader)
549 .and_then(|e| e.backoff_until)
550 .is_some_and(|until| now < until)
551 }
552
553 /// Drop entries for leaders whose backoff window expired more
554 /// than `cap` ago. Called from `on_tick` to keep the map
555 /// bounded under leader churn: a leader demoted after
556 /// accruing strikes never has `record_progress` called for it
557 /// again, so without expiry the entry persists indefinitely.
558 /// Below-threshold entries (no `backoff_until` stamp) are
559 /// retained — they represent active counting state.
560 pub fn gc_expired(&mut self, now: Instant, cap: Duration) {
561 self.entries.retain(|_, e| match e.backoff_until {
562 Some(until) => now.saturating_duration_since(until) < cap,
563 None => true,
564 });
565 }
566}
567
568impl Drop for ReplicationRuntimeHandle {
569 /// Best-effort cleanup if a handle is dropped without an
570 /// explicit `cancel().await`. Aborts the task synchronously so
571 /// the spawned future stops driving and the dispatcher Arc the
572 /// task held is released — closing the strong-reference cycle
573 /// `MeshNode → router → handle → task → dispatcher` without
574 /// requiring callers to remember the cancel sequence. The
575 /// graceful Idle transition is skipped on this path; callers
576 /// that need the announce/withdraw side-effects to land must
577 /// still `cancel().await` before drop.
578 fn drop(&mut self) {
579 // try_lock — pre-fix this took the parking_lot mutex
580 // unconditionally; on a single-thread runtime panic during
581 // shutdown, drop could fire on a thread already holding
582 // self.task (e.g. mid-cancel() when the future is dropped on
583 // panic), producing a deadlock. The best-effort abort can
584 // wait for the next normal cleanup if we lose the lock race
585 // — losing it means somebody else is already inside cancel()
586 // or another drop, and they will abort/await the task.
587 if let Some(mut guard) = self.task.try_lock() {
588 if let Some(h) = guard.take() {
589 h.abort();
590 }
591 }
592 }
593}
594
595/// Inbox capacity — bounds the per-channel inbound backlog. A
596/// peer flooding `SUBPROTOCOL_REDEX` payloads at us can't grow
597/// the per-channel queue without bound; once full, the mesh
598/// dispatcher's `try_dispatch` returns the event back and the
599/// caller (mesh dispatch loop) drops + logs.
600pub const RUNTIME_INBOX_CAPACITY: usize = 1024;
601
602/// Per-channel mutable state the runtime task threads through
603/// `on_tick` / `on_inbound`. Replaces the four-`Arc<Mutex<…>>` arg
604/// soup that pushed those functions over clippy's
605/// `too_many_arguments` limit. All four members are reference-
606/// counted internally so cloning the struct is a handful of
607/// atomic increments — cheap and lock-free.
608struct RuntimeState {
609 tracker: Arc<Mutex<HeartbeatTracker>>,
610 budget: Arc<Mutex<BandwidthBudget>>,
611 backoff: Arc<Mutex<CatchupBackoff>>,
612 outstanding: Arc<Mutex<OutstandingRequests>>,
613}
614
615/// Priority-lane inbox capacity. Smaller than the standard lane
616/// because the events that ride it (Shutdown, SyncResponse,
617/// SyncNack) are bounded by the local replica's in-flight
618/// catchup window plus a handful of NACKs — not a per-peer
619/// heartbeat flood. Sized to absorb a burst without back-
620/// pressuring the leader-side dispatcher.
621pub const RUNTIME_PRIORITY_INBOX_CAPACITY: usize = 128;
622
623/// Spawn a per-channel replication runtime task. Returns a
624/// handle the mesh dispatcher uses to push inbound events and
625/// the lifecycle code uses to cancel.
626///
627/// The task:
628/// 1. Initializes the tracker / budget for this channel.
629/// 2. Loops on `select! { interval tick, inbox.recv() }`.
630/// 3. Each tick: calls [`tick`], dispatches outbound, runs the
631/// election if the coordinator just entered Candidate.
632/// 4. Each inbound event: updates state + maybe ships an
633/// outbound response.
634/// 5. Exits cleanly on `Inbound::Shutdown` after running
635/// `coordinator.transition_to(Idle, ChannelClose)`.
636///
637/// `dispatcher` ships every outbound message; `tail_provider` /
638/// `wall_clock_provider` give the task fresh values per tick.
639///
640/// **R-14 — Arc cycle invariant.** Production wiring has
641/// `MeshNode → ReplicationInboundRouter → ReplicationRuntimeHandle
642/// → task → Arc<dyn ReplicationDispatcher = MeshNode>`. This is
643/// a strong reference cycle. It is broken by
644/// `ReplicationWiring::drop` (`manager.rs`): un-installing the
645/// router releases its `Arc<RuntimeHandle>` references, the
646/// runtime task observes the closed inbox receiver, exits, and
647/// drops its dispatcher Arc. Callers that do NOT route through
648/// `Redex` drop (e.g. holding a raw `ReplicationRuntimeHandle`
649/// past the dispatcher's owner) MUST call `cancel()` before
650/// dropping the dispatcher; otherwise the cycle leaks both.
651pub fn spawn_replication_runtime(
652 inputs: RuntimeInputs,
653 coordinator: Arc<ReplicationCoordinator>,
654 dispatcher: Arc<dyn ReplicationDispatcher>,
655 budget: Arc<Mutex<BandwidthBudget>>,
656) -> ReplicationRuntimeHandle {
657 let state = RuntimeState {
658 tracker: Arc::new(Mutex::new(HeartbeatTracker::new(inputs.heartbeat_ms))),
659 budget,
660 backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
661 outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
662 };
663 let (tx, rx) = mpsc::channel::<Inbound>(RUNTIME_INBOX_CAPACITY);
664 let (priority_tx, priority_rx) = mpsc::channel::<Inbound>(RUNTIME_PRIORITY_INBOX_CAPACITY);
665 let coordinator_for_task = coordinator.clone();
666 let task = tokio::spawn(run(
667 inputs,
668 coordinator_for_task,
669 dispatcher,
670 state,
671 rx,
672 priority_rx,
673 ));
674 ReplicationRuntimeHandle {
675 inbox: tx,
676 priority_inbox: priority_tx,
677 task: Mutex::new(Some(task)),
678 coordinator,
679 stopped: AtomicBool::new(false),
680 }
681}
682
683async fn run(
684 inputs: RuntimeInputs,
685 coordinator: Arc<ReplicationCoordinator>,
686 dispatcher: Arc<dyn ReplicationDispatcher>,
687 state: RuntimeState,
688 mut inbox: mpsc::Receiver<Inbound>,
689 mut priority_inbox: mpsc::Receiver<Inbound>,
690) {
691 let heartbeat_interval = Duration::from_millis(inputs.heartbeat_ms);
692 let mut interval = tokio::time::interval(heartbeat_interval);
693 // `MissedTickBehavior::Skip` so a slow tick under load
694 // doesn't queue up unbounded ticks. We emit one heartbeat
695 // per interval; missed intervals are just observed silence
696 // at the receiver.
697 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
698 // The first tick fires immediately; consume it so the
699 // initial state has had a chance to settle.
700 interval.tick().await;
701
702 loop {
703 // `biased;` makes tokio::select poll branches top-to-
704 // bottom rather than randomly. Priority lane is checked
705 // first, then the tick, then the low-priority lane. A
706 // heartbeat flood saturating the low-priority lane can't
707 // starve SyncResponse / SyncNack / Shutdown — they ride
708 // the priority lane and get drained ahead of the flood.
709 tokio::select! {
710 biased;
711 event = priority_inbox.recv() => {
712 match event {
713 Some(Inbound::Shutdown) | None => {
714 let _ = coordinator
715 .transition_to(
716 ReplicaRole::Idle,
717 super::replication_state::TransitionSignal::ChannelClose,
718 )
719 .await;
720 return;
721 }
722 Some(event) => {
723 on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
724 }
725 }
726 }
727 _ = interval.tick() => {
728 on_tick(&inputs, &coordinator, &dispatcher, &state).await;
729 }
730 event = inbox.recv() => {
731 match event {
732 Some(Inbound::Shutdown) | None => {
733 // Shutdown normally rides the priority lane;
734 // a None here means the low-priority sender
735 // dropped (caller closed the handle without
736 // calling cancel). Treat as graceful exit.
737 let _ = coordinator
738 .transition_to(
739 ReplicaRole::Idle,
740 super::replication_state::TransitionSignal::ChannelClose,
741 )
742 .await;
743 return;
744 }
745 Some(event) => {
746 on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
747 }
748 }
749 }
750 }
751 }
752}
753
754/// Which lag gauge the tick should update, if any. Leader emits the
755/// worst-replica lag; Replica emits the believed-leader lag.
756/// Candidate + Idle don't emit lag — both are transient or non-
757/// participating roles.
758#[derive(Debug)]
759enum LagObservation {
760 /// Leader-side: max over replica peers of `now - peer.last_seen`.
761 /// Drives `record_leader_lag`. Reflects the staleness of the
762 /// worst-lagging replica.
763 Leader(Duration),
764 /// Replica-side: `now - believed_leader.last_seen`. Drives
765 /// `record_replica_lag`. `None` if no leader heartbeat has been
766 /// observed yet (the gauge stays unobserved).
767 Replica(Duration),
768 /// No lag to record this tick.
769 None,
770}
771
772/// Compute the lag observation for this tick. Pure read over the
773/// tracker; the caller updates the metric off the lock.
774fn observe_lag(
775 role: ReplicaRole,
776 replica_set: &[NodeId],
777 self_node_id: NodeId,
778 tracker: &HeartbeatTracker,
779 now: Instant,
780) -> LagObservation {
781 match role {
782 ReplicaRole::Leader => {
783 // Worst-replica view: max over peers of (now - peer.last_seen).
784 // A peer never seen has no observation — skip it (the
785 // gauge captures observed lag, not "never heard from").
786 let worst = replica_set
787 .iter()
788 .copied()
789 .filter(|&p| p != self_node_id)
790 .filter_map(|p| tracker.peer_lag(p, now))
791 .max();
792 match worst {
793 Some(d) => LagObservation::Leader(d),
794 None => LagObservation::None,
795 }
796 }
797 ReplicaRole::Replica => match tracker.believed_leader() {
798 Some(leader) => match tracker.peer_lag(leader, now) {
799 Some(d) => LagObservation::Replica(d),
800 None => LagObservation::None,
801 },
802 None => LagObservation::None,
803 },
804 ReplicaRole::Candidate | ReplicaRole::Idle => LagObservation::None,
805 }
806}
807
808/// Drop the believed-leader belief AND the outstanding-request
809/// tokens recorded against that leader. Sites that previously
810/// only called `clear_believed_leader` would leave the prior
811/// leader's in-flight tokens in `OutstandingRequests` until TTL
812/// (30 s). Under role thrash or rapid leader churn, the soft-cap
813/// GC then evicted entries from OTHER leaders to make room — the
814/// documented invariant on `OutstandingRequests::clear_leader`.
815fn clear_leader_belief_and_tokens(
816 tracker: &Arc<Mutex<HeartbeatTracker>>,
817 outstanding: &Arc<Mutex<OutstandingRequests>>,
818) {
819 let prior = tracker.lock().believed_leader();
820 tracker.lock().clear_believed_leader();
821 if let Some(prior) = prior {
822 outstanding.lock().clear_leader(prior);
823 }
824}
825
826async fn on_tick(
827 inputs: &RuntimeInputs,
828 coordinator: &Arc<ReplicationCoordinator>,
829 dispatcher: &Arc<dyn ReplicationDispatcher>,
830 state: &RuntimeState,
831) {
832 let RuntimeState {
833 tracker,
834 budget: _,
835 backoff,
836 outstanding,
837 } = state;
838 // Source `now` from tokio's clock so the silence-detection
839 // pass inside the tracker tick honors tokio::time::pause() in
840 // tests and stays coherent with the tokio::time::interval that
841 // drives this on_tick call. Pre-fix std::Instant::now() kept
842 // moving while virtual time was paused.
843 let now = tokio::time::Instant::now().into_std();
844 // Drop CatchupBackoff entries whose backoff window expired
845 // more than a cap ago — protects the map from unbounded
846 // growth under leader churn (a demoted leader's entry has
847 // no other clearance path; `record_progress` only fires for
848 // the current believed leader).
849 backoff
850 .lock()
851 .gc_expired(now, CATCHUP_BACKOFF_CAP.saturating_mul(2));
852 let tail_seq = (inputs.tail_provider)();
853 // Bump the coordinator's cached tail at every tick so the next
854 // transition's announce_chain advertises the real tip. The
855 // CAS-monotonic guard inside record_tail_seq drops the write if
856 // tail_provider went backward (it shouldn't — tail_provider
857 // wraps next_seq()), so this is also idempotent. Without this
858 // the leader path's tail_seq atomic stays at the value the
859 // last apply_sync_response recorded (which only Replicas run),
860 // and a leader's announce_chain at promotion time ships
861 // tip_seq=0.
862 //
863 // For the LEADER role specifically, `tail_provider` returns the
864 // raw local-file `next_seq()`, which the leader bumps the moment
865 // a write lands locally — pre-replication. Advertising that
866 // value via capability tags biases `find_chain_holders` (which
867 // picks the freshest holder by `tip_seq` during failover)
868 // toward a partition that may have un-replicated writes; a
869 // crash before those writes ship loses them.
870 //
871 // Clamp the advertised tail to the highest tail any peer has
872 // confirmed via heartbeat. That tail is, by construction,
873 // "replicated at least once" — a safe minimum for failover
874 // discovery. When NO peer has reported yet (fresh leader, no
875 // replicas), fall back to the raw local tail: there is no
876 // safer value to advertise and a sole leader has authority
877 // over its own writes by tautology.
878 let advertised_tail = if coordinator.role() == ReplicaRole::Leader {
879 let max_peer_tail = tracker
880 .lock()
881 .peer_tail_seqs()
882 .into_iter()
883 .filter(|(id, _)| *id != inputs.self_node_id)
884 .map(|(_, t)| t)
885 .max();
886 match max_peer_tail {
887 Some(p) => tail_seq.min(p),
888 None => tail_seq,
889 }
890 } else {
891 tail_seq
892 };
893 coordinator.record_tail_seq(advertised_tail);
894 let wall_clock_ms = (inputs.wall_clock_provider)();
895 // R-10: capture `current_role` inside the same critical
896 // section that holds the tracker lock so a concurrent
897 // transition can't land between the role read and the
898 // tick(). Reading role here is cheap (a parking_lot mutex
899 // load); holding both locks together is safe because role
900 // observation never awaits.
901 let (outcome, lag_observation) = {
902 let t = tracker.lock();
903 // Capture `current_role` inside the same critical section
904 // that holds the tracker lock so a concurrent transition
905 // can't land between the role read and the tick(). Reading
906 // role here is cheap (a parking_lot mutex load); holding
907 // both locks together is safe because role observation
908 // never awaits. The captured value lives only inside this
909 // closure — `tick()` consumes it via its outcome.
910 let current_role = coordinator.role();
911 let outcome = tick(TickInputs {
912 self_node_id: inputs.self_node_id,
913 current_role,
914 channel_id: inputs.channel_id,
915 tail_seq,
916 replica_set: &inputs.replica_set,
917 tracker: &t,
918 wall_clock_ms,
919 chunk_max_bytes: SYNC_REQUEST_CHUNK_MAX_DEFAULT,
920 now,
921 default_bandwidth_class: inputs.default_bandwidth_class,
922 });
923 let lag = observe_lag(
924 current_role,
925 &inputs.replica_set,
926 inputs.self_node_id,
927 &t,
928 now,
929 );
930 (outcome, lag)
931 };
932 // Record lag gauges off the tracker lock.
933 match lag_observation {
934 LagObservation::Leader(d) => coordinator.metrics().record_leader_lag(d),
935 LagObservation::Replica(d) => coordinator.metrics().record_replica_lag(d),
936 LagObservation::None => {}
937 }
938 for msg in outcome.outbound {
939 match msg {
940 OutboundMessage::Heartbeat { target, msg } => {
941 if let Err(e) = dispatcher.send_heartbeat(target, msg).await {
942 tracing::trace!(target=?target, error=?e, "replication: heartbeat send failed");
943 }
944 }
945 OutboundMessage::SyncRequest { target, mut msg } => {
946 // R-28 catchup backoff: a buggy/byzantine leader that
947 // advertises an ever-growing tail but ships empty
948 // responses would otherwise loop this branch at the
949 // heartbeat cadence forever. Once the empty-response
950 // count crosses `CATCHUP_BACKOFF_THRESHOLD` (3), the
951 // tracker stamps `backoff_until`; ticks within that
952 // window skip the send entirely. A non-empty response
953 // resets the counter through `record_progress` on the
954 // apply path.
955 if backoff.lock().is_in_backoff(target, now) {
956 tracing::trace!(
957 target = target,
958 "replication: skipping SyncRequest — leader is in catchup backoff"
959 );
960 continue;
961 }
962 // R-23 request-token correlation. `tick` emits the
963 // SyncRequest with `request_id = 0` placeholder; the
964 // runtime mints a random 64-bit token from
965 // `getrandom` here, records `(leader, token)` in the
966 // outstanding-requests set, and stamps the wire frame
967 // with the minted value before send. Inbound
968 // SyncResponse / SyncNack must carry a token still
969 // in the set; stale responses (re-issue races, late-
970 // arriving NACKs from prior requests the replica
971 // already timed out) drop on the apply path.
972 let mut id_bytes = [0u8; 8];
973 if getrandom::fill(&mut id_bytes).is_err() {
974 tracing::trace!(
975 target = target,
976 "replication: getrandom failure; skipping SyncRequest this tick"
977 );
978 continue;
979 }
980 let token = u64::from_le_bytes(id_bytes);
981 msg.request_id = token;
982 outstanding.lock().record(target, token, now);
983 if let Err(e) = dispatcher.send_sync_request(target, msg).await {
984 tracing::trace!(target=?target, error=?e, "replication: sync_request send failed");
985 }
986 }
987 }
988 }
989 if let Some(pending) = outcome.transition {
990 if let Err(e) = coordinator
991 .transition_to(pending.target, pending.signal)
992 .await
993 {
994 tracing::warn!(error=?e, "replication: transition_to({:?}, {:?}) failed", pending.target, pending.signal);
995 return;
996 }
997 // If we just entered Candidate via MissedHeartbeats,
998 // run the deterministic election in the same tick so the
999 // Candidate window stays microseconds-wide per plan §3.
1000 if pending.target == ReplicaRole::Candidate {
1001 let healthy = tracker.lock().healthy_peers(now);
1002 // Self is alive by tautology — the runtime is the
1003 // code computing the election. The tracker only
1004 // records observed inbound heartbeats from peers,
1005 // so self never appears in `healthy_peers` directly.
1006 // Include self explicitly so the election filter
1007 // doesn't accidentally exclude us.
1008 let elect = election_outcome(
1009 inputs.self_node_id,
1010 &inputs.replica_set,
1011 inputs.rtt_lookup.as_ref(),
1012 |peer| peer == inputs.self_node_id || healthy.contains(&peer),
1013 );
1014 if let Some(pt) = elect {
1015 match coordinator.transition_to(pt.target, pt.signal).await {
1016 Ok(_) => {
1017 // Only clear the believed leader on a
1018 // successful transition. If the second
1019 // transition lost a race (e.g. an inbound
1020 // Shutdown drove us to Idle first), wiping
1021 // the believed leader would leave the
1022 // coordinator with no recovery signal.
1023 clear_leader_belief_and_tokens(tracker, outstanding);
1024 }
1025 Err(CoordinatorError::TagSink(e)) => {
1026 // State moved to the target (Leader /
1027 // Replica); only the chain-tag side-effect
1028 // failed. We are functionally in the new
1029 // role — clear the believed leader so the
1030 // tick path doesn't keep treating an old
1031 // peer as authoritative. Chain discovery
1032 // for this channel stays silent until the
1033 // next successful announce; operators
1034 // observing this counter see the
1035 // divergence.
1036 tracing::warn!(
1037 error = ?e,
1038 target = ?pt.target,
1039 "replication: post-election chain-tag side-effect failed; state advanced"
1040 );
1041 clear_leader_belief_and_tokens(tracker, outstanding);
1042 }
1043 Err(CoordinatorError::Transition(e)) => {
1044 // State did not move (typically because a
1045 // concurrent ChannelClose drove us to Idle
1046 // first). Recover by clearing the believed
1047 // leader so the next tick re-enters
1048 // discovery from a clean slate rather than
1049 // sitting on a stale belief.
1050 tracing::warn!(
1051 error = ?e,
1052 target = ?pt.target,
1053 "replication: post-election transition rejected; state moved out from under us"
1054 );
1055 clear_leader_belief_and_tokens(tracker, outstanding);
1056 }
1057 }
1058 }
1059 }
1060 }
1061}
1062
1063#[allow(clippy::too_many_arguments)]
1064async fn on_inbound(
1065 inputs: &RuntimeInputs,
1066 coordinator: &Arc<ReplicationCoordinator>,
1067 dispatcher: &Arc<dyn ReplicationDispatcher>,
1068 state: &RuntimeState,
1069 event: Inbound,
1070) {
1071 let RuntimeState {
1072 tracker,
1073 budget,
1074 backoff,
1075 outstanding,
1076 } = state;
1077 // Peer-auth gate. Every inbound replication message must come
1078 // from a peer in the channel's configured replica_set; any
1079 // other sender has no business driving the state machine for
1080 // this channel. Pre-fix the handlers below only checked the
1081 // channel_id, so any mesh peer with SUBPROTOCOL_REDEX reach
1082 // could ship Heartbeat / SyncRequest / SyncResponse / SyncNack
1083 // and the runtime would apply them. SyncResponse hijack was
1084 // the worst case — a non-leader peer could write attacker-
1085 // chosen bytes to the replica's local log via append_batch.
1086 //
1087 // Shutdown is local-only (never crosses the wire), so it
1088 // bypasses the membership check; the from field on a Shutdown
1089 // event is the local node's id.
1090 let from_node = match &event {
1091 Inbound::Shutdown => None,
1092 Inbound::Heartbeat { from, .. } => Some(*from),
1093 Inbound::SyncRequest { from, .. } => Some(*from),
1094 Inbound::SyncResponse { from, .. } => Some(*from),
1095 Inbound::SyncNack { from, .. } => Some(*from),
1096 };
1097 if let Some(from) = from_node {
1098 if !inputs.replica_set.contains(&from) {
1099 tracing::trace!(
1100 from = from,
1101 channel = ?inputs.channel_id,
1102 "replication: dropping inbound from peer not in replica_set"
1103 );
1104 return;
1105 }
1106 }
1107 match event {
1108 Inbound::Shutdown => {
1109 // Handled by the main loop; never reaches here.
1110 unreachable!("Shutdown is filtered in the main loop");
1111 }
1112 Inbound::Heartbeat { from, msg } => {
1113 // Validate channel-id match. Mismatched heartbeats
1114 // arrive when the mesh dispatcher misroutes (which
1115 // shouldn't happen, but better to drop than to
1116 // poison the tracker).
1117 if msg.channel_id != inputs.channel_id {
1118 tracing::trace!(
1119 from = from,
1120 "replication: dropping heartbeat for wrong channel"
1121 );
1122 return;
1123 }
1124 // Source from tokio's clock so silence detection works
1125 // under tokio::time::pause() — std::Instant::now()
1126 // wouldn't advance with virtual time and the tick-driven
1127 // silence check would never fire deterministically.
1128 tracker.lock().record_heartbeat(
1129 from,
1130 msg.role,
1131 msg.tail_seq,
1132 tokio::time::Instant::now().into_std(),
1133 );
1134 // Dual-leader convergence. A symmetric-RTT election or
1135 // a partition heal can leave two peers both believing
1136 // they are Leader for the same channel. Without
1137 // convergence, both partitions keep writing — divergent
1138 // histories accrete and `apply_sync_response` eventually
1139 // logs `GapBeforeChunk{divergence_suspected:true}` while
1140 // `skip_to` silently overwrites the loser's tail.
1141 //
1142 // On any inbound Heartbeat with role=Leader while self
1143 // is Leader, run the deterministic tiebreak: the side
1144 // with the higher tail_seq keeps Leader; on a tie, the
1145 // numerically smaller node_id keeps Leader. The loser
1146 // concedes via `Leader → Replica` so the next tick
1147 // re-resolves through the heartbeat cycle.
1148 if msg.role == ReplicaRole::Leader
1149 && coordinator.role() == ReplicaRole::Leader
1150 && from != inputs.self_node_id
1151 {
1152 let local_tail = (inputs.tail_provider)();
1153 let peer_tail = msg.tail_seq;
1154 let local_wins = local_tail > peer_tail
1155 || (local_tail == peer_tail && inputs.self_node_id < from);
1156 if !local_wins {
1157 tracing::warn!(
1158 from = from,
1159 peer_tail = peer_tail,
1160 local_tail = local_tail,
1161 local = inputs.self_node_id,
1162 "replication: peer-leader observed; conceding via Leader → Replica"
1163 );
1164 let _ = coordinator
1165 .transition_to(
1166 ReplicaRole::Replica,
1167 super::replication_state::TransitionSignal::PeerLeaderObserved,
1168 )
1169 .await;
1170 }
1171 }
1172 }
1173 Inbound::SyncRequest { from, msg } => {
1174 // R-12: defense-in-depth — validate channel_id at the
1175 // runtime boundary. The wire decoder + mesh router are
1176 // supposed to demux by channel, but a misroute would
1177 // otherwise apply against the wrong file.
1178 if msg.channel_id != inputs.channel_id {
1179 tracing::trace!(
1180 from = from,
1181 "replication: dropping SyncRequest for wrong channel"
1182 );
1183 return;
1184 }
1185 // Leader-side: only honor SyncRequest when we believe
1186 // we're the leader. Other roles surface `NotLeader`
1187 // so the replica re-resolves leadership.
1188 if coordinator.role() != ReplicaRole::Leader {
1189 let nack = SyncNack {
1190 channel_id: inputs.channel_id,
1191 since_seq: msg.since_seq,
1192 error_code: super::replication::SyncNackError::NotLeader,
1193 leader_first_retained_seq: 0,
1194 request_id: msg.request_id,
1195 detail: String::new(),
1196 };
1197 if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1198 tracing::trace!(from = from, error = ?e, "replication: NotLeader NACK send failed");
1199 }
1200 return;
1201 }
1202 // Run the catch-up helper against our local file.
1203 match handle_sync_request(&inputs.file, &msg, inputs.channel_id) {
1204 SyncRequestOutcome::Response(resp) => {
1205 let byte_estimate = estimate_response_bytes(&resp);
1206 // Gate on the bandwidth budget. If the
1207 // budget can't admit this chunk, NACK with
1208 // Backpressure so the replica backs off.
1209 let admitted = {
1210 let mut bb = budget.lock();
1211 bb.try_consume_with_class(
1212 byte_estimate,
1213 msg.class,
1214 tokio::time::Instant::now().into_std(),
1215 inputs.background_fraction,
1216 )
1217 };
1218 if !admitted {
1219 let nack = SyncNack {
1220 channel_id: inputs.channel_id,
1221 since_seq: msg.since_seq,
1222 error_code: super::replication::SyncNackError::Backpressure,
1223 leader_first_retained_seq: 0,
1224 request_id: msg.request_id,
1225 detail: String::new(),
1226 };
1227 if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1228 tracing::trace!(from = from, error = ?e, "replication: Backpressure NACK send failed");
1229 }
1230 return;
1231 }
1232 // R-1: re-check role after the read + budget
1233 // gate. If a concurrent transition flipped us
1234 // out of Leader (DiskPressureWithdraw, peer
1235 // concession), don't ship the response —
1236 // NACK NotLeader instead so the replica re-
1237 // resolves through find_chain_holders.
1238 if coordinator.role() != ReplicaRole::Leader {
1239 let nack = SyncNack {
1240 channel_id: inputs.channel_id,
1241 since_seq: msg.since_seq,
1242 error_code: super::replication::SyncNackError::NotLeader,
1243 leader_first_retained_seq: 0,
1244 request_id: msg.request_id,
1245 detail: String::new(),
1246 };
1247 if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1248 tracing::trace!(from = from, error = ?e, "replication: post-op NotLeader NACK send failed");
1249 }
1250 return;
1251 }
1252 // Bump the cumulative bytes metric BEFORE
1253 // ship so the operator's view stays accurate
1254 // even if the wire send fails (the bytes
1255 // would still have been read off disk).
1256 coordinator.metrics().incr_sync_bytes(byte_estimate);
1257 if let Err(e) = dispatcher.send_sync_response(from, resp).await {
1258 // Refund the budget — pre-fix repeated
1259 // send failures over a flaky link drained
1260 // the bucket toward permanent backpressure
1261 // without shipping any traffic. The cumul-
1262 // ative bytes metric stays incremented
1263 // (operators still see "we tried to send
1264 // these bytes") but the rate budget can
1265 // recover.
1266 //
1267 // Per the dispatcher trait's `Ok(())`
1268 // semantics: a transport that returns
1269 // `Ok(())` after queueing-without-delivery
1270 // (UDP, lossy QUIC under link failure) will
1271 // NOT trigger this refund and the budget
1272 // over-counts on silent loss. R-28
1273 // catchup-backoff dampens the wedged-link
1274 // case once empty responses accrue past
1275 // threshold, but the budget itself cannot
1276 // self-correct without an end-to-end ack
1277 // the trait deliberately does not demand.
1278 {
1279 let mut bb = budget.lock();
1280 bb.refund(byte_estimate);
1281 }
1282 tracing::trace!(from = from, error = ?e, "replication: SyncResponse send failed");
1283 }
1284 }
1285 SyncRequestOutcome::Nack {
1286 error_code,
1287 leader_first_retained_seq,
1288 detail,
1289 } => {
1290 let nack = SyncNack {
1291 channel_id: inputs.channel_id,
1292 since_seq: msg.since_seq,
1293 error_code,
1294 leader_first_retained_seq,
1295 request_id: msg.request_id,
1296 detail,
1297 };
1298 if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1299 tracing::trace!(from = from, error = ?e, "replication: SyncNack send failed");
1300 }
1301 }
1302 }
1303 }
1304 Inbound::SyncResponse { from, msg } => {
1305 // R-12: defense-in-depth — validate channel_id at the
1306 // runtime boundary.
1307 if msg.channel_id != inputs.channel_id {
1308 tracing::trace!(
1309 from = from,
1310 "replication: dropping SyncResponse for wrong channel"
1311 );
1312 return;
1313 }
1314 // R-23 request-token correlation. The replica's
1315 // outstanding-request set tracks `(leader, request_id)`
1316 // tuples for every SyncRequest the runtime has shipped.
1317 // A response whose `request_id` is not in the set is
1318 // stale (the replica timed out and re-issued) or
1319 // forged (leader echoed a non-matching token). Drop
1320 // without applying so a stale chunk can't land on the
1321 // current request's apply path.
1322 //
1323 // Take FIRST, before the role / believed-leader gates,
1324 // so a response that arrives while we're briefly out
1325 // of `Replica` (role thrash, post-election) still
1326 // consumes its outstanding-token entry. Pre-fix the
1327 // role gate returned early without `take`, and the
1328 // token sat in the per-leader set until TTL (30 s) —
1329 // under role thrash the SOFT_CAP GC then dropped
1330 // entries from OTHER leaders to make room, evicting
1331 // legitimately in-flight tokens.
1332 //
1333 // Consuming a token in the dropped-response path is
1334 // safe: request_ids are random 64-bit (collision
1335 // negligible), so a subsequent re-issue uses a fresh
1336 // id and the consumed entry isn't reachable.
1337 {
1338 let now = tokio::time::Instant::now().into_std();
1339 if !outstanding.lock().take(from, msg.request_id, now) {
1340 tracing::trace!(
1341 from = from,
1342 request_id = msg.request_id,
1343 "replication: dropping SyncResponse with unknown request_id"
1344 );
1345 return;
1346 }
1347 }
1348 // SyncResponse is the state-mutating wire input — only
1349 // the believed leader is allowed to ship it. A peer that
1350 // is in the replica_set but is not the current leader
1351 // could otherwise forge `append_batch`-bound payloads
1352 // into a replica's log. The replica_set gate at entry
1353 // narrows the surface to configured members; the
1354 // believed_leader gate here narrows further to the
1355 // single peer the replica is currently following.
1356 let leader_belief = tracker.lock().believed_leader();
1357 if leader_belief != Some(from) {
1358 tracing::trace!(
1359 from = from,
1360 believed_leader = ?leader_belief,
1361 "replication: dropping SyncResponse from non-leader peer"
1362 );
1363 return;
1364 }
1365 // Replica-side: apply the chunk to our local file.
1366 // Only honor responses when we believe we're a
1367 // Replica — other roles ignore them.
1368 if coordinator.role() != ReplicaRole::Replica {
1369 tracing::trace!(
1370 from = from,
1371 "replication: SyncResponse received in role {:?}; ignoring",
1372 coordinator.role(),
1373 );
1374 return;
1375 }
1376 // Snapshot the pre-apply tail so the post-apply
1377 // result can be classified as "empty" (apply returned
1378 // the same tail, no events landed) vs "progress"
1379 // (tail advanced). Drives the R-28 catchup-backoff
1380 // accounting below.
1381 let pre_apply_tail = inputs.file.next_seq();
1382 match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
1383 Ok(new_tail) => {
1384 // Record the post-apply tail on the coordinator
1385 // so capability-tag advertisements ride
1386 // `tip_seq=new_tail` instead of the dead-default
1387 // 0. find_chain_holders picks the freshest
1388 // holder by tip_seq during failover; pre-fix
1389 // every Leader/Replica advertised tip_seq=0
1390 // and lex-smallest holder won selection.
1391 coordinator.record_tail_seq(new_tail);
1392 // R-28 catchup-backoff accounting. If the
1393 // response advanced our tail, the leader is
1394 // making progress — clear any backoff state so
1395 // the next tick can issue another SyncRequest
1396 // immediately. If the response was empty
1397 // (apply returned the same tail) while the
1398 // leader's advertised tail is still strictly
1399 // greater than ours, record an empty strike.
1400 // After `CATCHUP_BACKOFF_THRESHOLD` consecutive
1401 // empties, the tracker stamps a backoff window
1402 // that the outbound dispatch consults.
1403 if new_tail > pre_apply_tail {
1404 backoff.lock().record_progress(from);
1405 } else {
1406 // Pre-fix the strike fired whenever the
1407 // CACHED heartbeat tail was still above ours
1408 // — the cached value can be hundreds of ms
1409 // stale, so a replica that caught up between
1410 // the heartbeat and the response would
1411 // strike against a leader that has nothing
1412 // to send. After
1413 // `CATCHUP_BACKOFF_THRESHOLD` such false
1414 // strikes the leader sat in a 1–30 s
1415 // backoff while nothing was actually wrong.
1416 //
1417 // Guard the strike on heartbeat freshness:
1418 // only when the leader's most recent
1419 // heartbeat is inside the miss-threshold
1420 // window do we trust its claimed `tail_seq`
1421 // as evidence the leader has more data to
1422 // ship. A stale heartbeat is no signal —
1423 // skip the strike entirely.
1424 let now = tokio::time::Instant::now().into_std();
1425 let strike = {
1426 let t = tracker.lock();
1427 let peer = t.peer_state(from);
1428 let lag = t.peer_lag(from, now);
1429 let fresh_window = std::time::Duration::from_millis(
1430 t.heartbeat_ms().saturating_mul(t.miss_threshold() as u64),
1431 );
1432 matches!(
1433 (peer, lag),
1434 (Some(p), Some(elapsed))
1435 if elapsed < fresh_window && p.tail_seq > new_tail
1436 )
1437 };
1438 if strike {
1439 backoff.lock().record_empty(from, now);
1440 }
1441 }
1442 tracing::trace!(
1443 from = from,
1444 new_tail = new_tail,
1445 "replication: applied chunk"
1446 );
1447 }
1448 Err(super::replication_catchup::ApplyError::AppendFailed(detail)) => {
1449 // Disk-pressure surface — per plan §7, the
1450 // local file rejected the append (heap segment
1451 // at the 3 GB hard cap, or a disk write fail
1452 // on the persistent tier). Consult the
1453 // configured `UnderCapacity` policy and react.
1454 handle_disk_pressure(coordinator, &inputs.file, &detail, from).await;
1455 }
1456 Err(super::replication_catchup::ApplyError::GapBeforeChunk {
1457 first_seq,
1458 local_next,
1459 divergence_suspected,
1460 }) => {
1461 // Plan §8 skip-ahead — the leader trimmed past
1462 // our local tail; the chunk's first_seq is
1463 // strictly above local_next. Skip the local
1464 // sequence forward to first_seq and retry the
1465 // apply (the chunk's events line up with the
1466 // new tail). On persistent files
1467 // `skip_to` returns an error and we fall back
1468 // to log+drop; the heartbeat-cycle recovery
1469 // path catches us up on the next tick.
1470 coordinator.metrics().incr_skip_ahead();
1471 match inputs.file.skip_to(first_seq) {
1472 Ok(()) => {
1473 // R-13: `first_seq > local_next` is the
1474 // `GapBeforeChunk` invariant; use
1475 // saturating_sub for defense-in-depth.
1476 debug_assert!(first_seq > local_next);
1477 // R-5: when divergence is suspected,
1478 // surface it at warn level with an
1479 // explicit message so operator dashboards
1480 // can see split-brain post-mortems
1481 // separately from routine retention
1482 // trims.
1483 if divergence_suspected {
1484 tracing::warn!(
1485 from = from,
1486 from_seq = local_next,
1487 to_seq = first_seq,
1488 gap = first_seq.saturating_sub(local_next),
1489 "replication: skip-ahead crossed leader's retained range — \
1490 divergent log suspected (split-brain post-mortem)"
1491 );
1492 } else {
1493 tracing::warn!(
1494 from = from,
1495 from_seq = local_next,
1496 to_seq = first_seq,
1497 gap = first_seq.saturating_sub(local_next),
1498 "replication: skip-ahead — leader trimmed past local tail"
1499 );
1500 }
1501 // Retry the apply now that the local
1502 // tail matches first_seq.
1503 match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
1504 Ok(new_tail) => {
1505 coordinator.record_tail_seq(new_tail);
1506 tracing::trace!(
1507 from = from,
1508 new_tail = new_tail,
1509 "replication: applied chunk after skip-ahead"
1510 );
1511 }
1512 Err(e) => {
1513 tracing::warn!(
1514 from = from,
1515 error = ?e,
1516 "replication: apply after skip-ahead failed"
1517 );
1518 }
1519 }
1520 }
1521 Err(e) => {
1522 tracing::warn!(
1523 from = from,
1524 first_seq = first_seq,
1525 error = %e,
1526 "replication: skip_to rejected; falling back to heartbeat-cycle recovery"
1527 );
1528 }
1529 }
1530 }
1531 Err(e) => {
1532 // Remaining ApplyError variants — channel
1533 // mismatch, monotonicity violation, stale
1534 // chunk. Log + drop; reliable-stream /
1535 // heartbeat cycle recovers.
1536 tracing::warn!(
1537 from = from,
1538 error = ?e,
1539 "replication: apply_sync_response failed"
1540 );
1541 }
1542 }
1543 }
1544 Inbound::SyncNack { from, msg } => {
1545 // R-12: defense-in-depth — validate channel_id at the
1546 // runtime boundary.
1547 if msg.channel_id != inputs.channel_id {
1548 tracing::trace!(
1549 from = from,
1550 "replication: dropping SyncNack for wrong channel"
1551 );
1552 return;
1553 }
1554 // Only the believed leader can NACK; otherwise a non-
1555 // leader replica_set peer could spam NotLeader nacks to
1556 // make us clear our believed_leader, or BadRange to
1557 // skip-ahead our local file's tail past in-flight
1558 // events. The replica_set gate at entry handles
1559 // outsider rejection; this narrows further to the
1560 // single peer the replica is currently following.
1561 let leader_belief = tracker.lock().believed_leader();
1562 if leader_belief != Some(from) {
1563 tracing::trace!(
1564 from = from,
1565 believed_leader = ?leader_belief,
1566 "replication: dropping SyncNack from non-leader peer"
1567 );
1568 return;
1569 }
1570 // R-23 request-token correlation. A NACK with a token
1571 // not in the outstanding set is stale (from a request
1572 // the replica already timed out) — the BadRange arm
1573 // below mutates the local file via skip_to, and a
1574 // stale BadRange could destroy data on retry. Drop
1575 // unmatched NACKs.
1576 {
1577 let now = tokio::time::Instant::now().into_std();
1578 if !outstanding.lock().take(from, msg.request_id, now) {
1579 tracing::trace!(
1580 from = from,
1581 request_id = msg.request_id,
1582 "replication: dropping SyncNack with unknown request_id"
1583 );
1584 return;
1585 }
1586 }
1587 // Replicas key their retry policy on `error_code`.
1588 // Phase D §2 retry policy:
1589 // 1 NotLeader → re-resolve leader (clear tracker
1590 // so next election cycle starts
1591 // clean)
1592 // 2 BadRange → trim local tail / skip-ahead
1593 // 3 Backpressure → exponential backoff (handled by
1594 // not issuing the next request)
1595 // 4 ChannelClosed → withdraw replica role
1596 use super::replication::SyncNackError;
1597 match msg.error_code {
1598 SyncNackError::NotLeader => {
1599 // R-4: actually clear the believed leader so
1600 // the next tick re-resolves via the heartbeat
1601 // cycle instead of looping on the stale leader
1602 // belief until 3 missed heartbeats trip.
1603 clear_leader_belief_and_tokens(tracker, outstanding);
1604 tracing::trace!(
1605 from = from,
1606 "replication: NACK NotLeader — cleared believed leader"
1607 );
1608 }
1609 SyncNackError::BadRange => {
1610 // R-40: skip directly to the leader's
1611 // first-retained seq carried in the NACK. Pre-
1612 // fix the replica advanced one seq per round
1613 // trip (skip_to(since_seq + 1)) which thrashed
1614 // when the retention floor was many seqs above
1615 // `since_seq` — every retry re-asked below the
1616 // floor and re-NACKed. With the wire field, one
1617 // round trip suffices: `skip_to(leader_first_
1618 // retained_seq)` puts local tail at the floor
1619 // and the next SyncRequest re-asks exactly
1620 // there. Fall back to `since_seq + 1` if the
1621 // leader sent `0` (legacy / never-retained
1622 // channels) so an out-of-band sender can't
1623 // make us no-op on the retry.
1624 coordinator.metrics().incr_skip_ahead();
1625 let target = if msg.leader_first_retained_seq > 0 {
1626 msg.leader_first_retained_seq
1627 } else {
1628 msg.since_seq.saturating_add(1)
1629 };
1630 match inputs.file.skip_to(target) {
1631 Ok(()) => tracing::warn!(
1632 from = from,
1633 since_seq = msg.since_seq,
1634 leader_first_retained_seq = msg.leader_first_retained_seq,
1635 target = target,
1636 "replication: NACK BadRange — local tail skipped to leader's first-retained seq"
1637 ),
1638 Err(e) => tracing::trace!(
1639 from = from,
1640 error = %e,
1641 "replication: NACK BadRange — skip_to rejected, falling back to heartbeat retry"
1642 ),
1643 }
1644 }
1645 SyncNackError::Backpressure => {
1646 tracing::trace!(
1647 from = from,
1648 "replication: NACK Backpressure — deferring next request"
1649 );
1650 }
1651 SyncNackError::ChannelClosed => {
1652 tracing::warn!(
1653 from = from,
1654 "replication: NACK ChannelClosed — withdrawing role"
1655 );
1656 // The leader is telling us the channel is gone;
1657 // shut down regardless of our current role.
1658 // ChannelClose is the only signal valid from
1659 // any state (Leader, Candidate, Replica, Idle).
1660 // DiskPressureWithdraw is only valid from
1661 // Replica and would silently fail-and-log if a
1662 // role flip happened between sending the
1663 // SyncRequest and receiving the NACK.
1664 let _ = coordinator
1665 .transition_to(
1666 ReplicaRole::Idle,
1667 super::replication_state::TransitionSignal::ChannelClose,
1668 )
1669 .await;
1670 }
1671 }
1672 }
1673 }
1674}
1675
1676/// React to a disk-pressure signal from `apply_sync_response`. The
1677/// local file rejected the append — heap segment at 3 GB hard cap
1678/// or a disk-tier write fail. Consult the channel's configured
1679/// `UnderCapacity` policy and apply.
1680///
1681/// Plan §7:
1682///
1683/// - `Withdraw` (default) — drop the replica role so the leader's
1684/// other replicas can take over the redundancy responsibility.
1685/// The channel's `causal:<hex>` tag is withdrawn via the
1686/// coordinator's `* → Idle` side-effect. Operators see the
1687/// `dataforts_replication_under_capacity_total` counter advance
1688/// and the role flip to Idle.
1689/// - `EvictOldest` — call `RedexFile::sweep_retention()` to evict
1690/// on the configured caps + bump the counter. Caller stays in
1691/// Replica role; the next `SyncResponse` retries the apply. If
1692/// no retention caps are configured the sweep is a no-op and
1693/// the next apply will fail again — operators who pick this
1694/// policy should pair it with `retention_max_*` settings.
1695async fn handle_disk_pressure(
1696 coordinator: &Arc<ReplicationCoordinator>,
1697 file: &super::file::RedexFile,
1698 detail: &str,
1699 from: NodeId,
1700) {
1701 use super::replication_config::UnderCapacity;
1702 coordinator.metrics().incr_under_capacity();
1703 let policy = coordinator.config().on_under_capacity;
1704 match policy {
1705 UnderCapacity::Withdraw => {
1706 tracing::warn!(
1707 from = from,
1708 detail = detail,
1709 "replication: disk pressure → withdrawing role"
1710 );
1711 // The transition matrix only permits
1712 // DiskPressureWithdraw on Replica → Idle. If a role
1713 // flip landed between the apply attempt and this
1714 // withdraw, pick the signal that's actually valid for
1715 // the current role so we don't silently log+drop the
1716 // transition and keep writing through pressure.
1717 let signal = match coordinator.role() {
1718 ReplicaRole::Replica => {
1719 super::replication_state::TransitionSignal::DiskPressureWithdraw
1720 }
1721 ReplicaRole::Idle => {
1722 // Already withdrawn — short-circuit via the
1723 // ChannelClose idempotent path so we don't bump
1724 // counters twice on a benign race.
1725 super::replication_state::TransitionSignal::ChannelClose
1726 }
1727 ReplicaRole::Leader => {
1728 // Role-specific signal so the transition metric
1729 // labels this as disk-pressure withdraw instead
1730 // of the ChannelClose fallback (which operator
1731 // dashboards triage as "graceful channel close").
1732 super::replication_state::TransitionSignal::LeaderDiskPressureWithdraw
1733 }
1734 ReplicaRole::Candidate => {
1735 super::replication_state::TransitionSignal::CandidateDiskPressureWithdraw
1736 }
1737 };
1738 if let Err(e) = coordinator.transition_to(ReplicaRole::Idle, signal).await {
1739 tracing::warn!(
1740 error=?e,
1741 "replication: disk-pressure withdraw transition failed"
1742 );
1743 }
1744 }
1745 UnderCapacity::EvictOldest => {
1746 tracing::warn!(
1747 from = from,
1748 detail = detail,
1749 "replication: disk pressure → sweeping retention"
1750 );
1751 file.sweep_retention();
1752 }
1753 }
1754}
1755
1756/// Estimate the wire-cost of a [`SyncResponse`] for budget
1757/// accounting. Header + per-event overhead per `replication.rs`'s
1758/// `to_bytes` shape.
1759fn estimate_response_bytes(resp: &SyncResponse) -> u64 {
1760 // Header: 3 + 32 + 8 + 4 = 47 bytes.
1761 let mut bytes: u64 = 47;
1762 for ev in &resp.events {
1763 // event_seq u64 + payload_len u32 + payload bytes.
1764 bytes += 8 + 4 + ev.payload.len() as u64;
1765 }
1766 bytes
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771 use super::*;
1772 use crate::adapter::net::channel::ChannelName;
1773 use crate::adapter::net::redex::replication::ReplicaRole;
1774 use crate::adapter::net::redex::replication_config::ReplicationConfig;
1775 use crate::adapter::net::redex::replication_coordinator::ChainTagSink;
1776 use crate::adapter::net::redex::replication_metrics::ReplicationMetricsRegistry;
1777 use parking_lot::Mutex as ParkingMutex;
1778
1779 /// No-op chain-tag sink for unit tests; the runtime path
1780 /// doesn't exercise the chain-tag side-effect in this
1781 /// commit's scope.
1782 #[derive(Default)]
1783 struct NoopTagSink;
1784
1785 #[async_trait::async_trait]
1786 impl ChainTagSink for NoopTagSink {
1787 async fn announce_chain(
1788 &self,
1789 _origin_hash: u64,
1790 _tip_seq: u64,
1791 ) -> Result<(), AdapterError> {
1792 Ok(())
1793 }
1794 async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
1795 Ok(())
1796 }
1797 }
1798
1799 /// Recorder dispatcher — captures every outbound wire
1800 /// message. Each variant pushes into a separate Vec so
1801 /// tests can assert on shape + ordering.
1802 #[derive(Default)]
1803 struct RecorderDispatcher {
1804 heartbeats: ParkingMutex<Vec<(NodeId, SyncHeartbeat)>>,
1805 sync_requests: ParkingMutex<Vec<(NodeId, SyncRequest)>>,
1806 sync_responses: ParkingMutex<Vec<(NodeId, SyncResponse)>>,
1807 sync_nacks: ParkingMutex<Vec<(NodeId, SyncNack)>>,
1808 }
1809
1810 #[async_trait::async_trait]
1811 impl ReplicationDispatcher for RecorderDispatcher {
1812 async fn send_heartbeat(
1813 &self,
1814 target: NodeId,
1815 msg: SyncHeartbeat,
1816 ) -> Result<(), AdapterError> {
1817 self.heartbeats.lock().push((target, msg));
1818 Ok(())
1819 }
1820 async fn send_sync_request(
1821 &self,
1822 target: NodeId,
1823 msg: SyncRequest,
1824 ) -> Result<(), AdapterError> {
1825 self.sync_requests.lock().push((target, msg));
1826 Ok(())
1827 }
1828 async fn send_sync_response(
1829 &self,
1830 target: NodeId,
1831 msg: SyncResponse,
1832 ) -> Result<(), AdapterError> {
1833 self.sync_responses.lock().push((target, msg));
1834 Ok(())
1835 }
1836 async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
1837 self.sync_nacks.lock().push((target, msg));
1838 Ok(())
1839 }
1840 }
1841
1842 fn channel_id_for(name: &str) -> ChannelId {
1843 let cn = ChannelName::new(name).unwrap();
1844 ChannelId::from_name(&cn)
1845 }
1846
1847 fn build_file_for_tests() -> RedexFile {
1848 use crate::adapter::net::redex::config::RedexFileConfig;
1849 use crate::adapter::net::redex::manager::Redex;
1850 let r = Redex::new();
1851 let cn = ChannelName::new("test/runtime").unwrap();
1852 r.open_file(&cn, RedexFileConfig::default()).unwrap()
1853 }
1854
1855 fn build_inputs(self_id: NodeId, replicas: Vec<NodeId>, hb_ms: u64) -> RuntimeInputs {
1856 RuntimeInputs {
1857 channel: ChannelIdentity {
1858 channel_name: "test/runtime".to_string(),
1859 origin_hash: 0xCAFE_BABE,
1860 },
1861 channel_id: channel_id_for("test/runtime"),
1862 self_node_id: self_id,
1863 replica_set: replicas,
1864 heartbeat_ms: hb_ms,
1865 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
1866 tail_provider: Arc::new(|| 42),
1867 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
1868 file: build_file_for_tests(),
1869 default_bandwidth_class: Default::default(),
1870 background_fraction: 0.3,
1871 }
1872 }
1873
1874 fn build_coordinator(
1875 self_id: NodeId,
1876 replicas: Vec<NodeId>,
1877 ) -> (Arc<ReplicationCoordinator>, ReplicationMetricsRegistry) {
1878 let _ = (self_id, replicas);
1879 let registry = ReplicationMetricsRegistry::new();
1880 let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
1881 let coordinator = ReplicationCoordinator::new(
1882 ChannelIdentity {
1883 channel_name: "test/runtime".to_string(),
1884 origin_hash: 0xCAFE_BABE,
1885 },
1886 ReplicationConfig::new(),
1887 sink,
1888 ®istry,
1889 );
1890 (Arc::new(coordinator), registry)
1891 }
1892
1893 fn build_budget() -> Arc<Mutex<BandwidthBudget>> {
1894 let now = Instant::now();
1895 Arc::new(Mutex::new(BandwidthBudget::new(0.5, 10_000_000, now)))
1896 }
1897
1898 fn build_backoff() -> Arc<Mutex<CatchupBackoff>> {
1899 Arc::new(Mutex::new(CatchupBackoff::new()))
1900 }
1901
1902 /// Bundle the four per-channel state pieces into the
1903 /// `RuntimeState` the on_tick / on_inbound functions take.
1904 /// Tests construct tracker + budget explicitly; backoff and
1905 /// outstanding are stock fresh instances. Pre-refactor these
1906 /// were passed as four separate arguments.
1907 fn build_state(
1908 tracker: Arc<Mutex<HeartbeatTracker>>,
1909 budget: Arc<Mutex<BandwidthBudget>>,
1910 ) -> RuntimeState {
1911 RuntimeState {
1912 tracker,
1913 budget,
1914 backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
1915 outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
1916 }
1917 }
1918
1919 #[tokio::test]
1920 async fn leader_emits_heartbeat_to_peers_each_tick() {
1921 let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 100);
1922 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
1923 // Promote to Leader via the state machine (Idle → Replica
1924 // → Candidate → Leader).
1925 coordinator
1926 .transition_to(
1927 ReplicaRole::Replica,
1928 super::super::replication_state::TransitionSignal::CapabilitySelected,
1929 )
1930 .await
1931 .unwrap();
1932 coordinator
1933 .transition_to(
1934 ReplicaRole::Candidate,
1935 super::super::replication_state::TransitionSignal::MissedHeartbeats,
1936 )
1937 .await
1938 .unwrap();
1939 coordinator
1940 .transition_to(
1941 ReplicaRole::Leader,
1942 super::super::replication_state::TransitionSignal::ElectionWon,
1943 )
1944 .await
1945 .unwrap();
1946
1947 let dispatcher = Arc::new(RecorderDispatcher::default());
1948 let handle =
1949 spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
1950
1951 // Sleep ~3 ticks worth (300ms cadence at 100ms = ~3 ticks).
1952 tokio::time::sleep(Duration::from_millis(350)).await;
1953
1954 let heartbeats = dispatcher.heartbeats.lock().clone();
1955 assert!(
1956 heartbeats.len() >= 2,
1957 "expected ≥ 2 heartbeats over 350ms at 100ms cadence; got {}",
1958 heartbeats.len(),
1959 );
1960 // Each heartbeat goes to a non-self peer.
1961 for (target, msg) in &heartbeats {
1962 assert!(*target == 0x20 || *target == 0x30);
1963 assert_eq!(msg.role, ReplicaRole::Leader);
1964 assert_eq!(msg.tail_seq, 42);
1965 }
1966
1967 handle.cancel().await;
1968 assert!(handle.is_stopped());
1969 }
1970
1971 #[tokio::test]
1972 async fn inbound_heartbeat_records_into_tracker() {
1973 let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
1974 let cid = inputs.channel_id;
1975 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
1976 coordinator
1977 .transition_to(
1978 ReplicaRole::Replica,
1979 super::super::replication_state::TransitionSignal::CapabilitySelected,
1980 )
1981 .await
1982 .unwrap();
1983 let dispatcher = Arc::new(RecorderDispatcher::default());
1984 let handle =
1985 spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
1986
1987 // Push a peer heartbeat into the inbox.
1988 handle
1989 .dispatch(Inbound::Heartbeat {
1990 from: 0x20,
1991 msg: SyncHeartbeat {
1992 channel_id: cid,
1993 tail_seq: 99,
1994 role: ReplicaRole::Leader,
1995 wall_clock_ms: 1_700_000_000_000,
1996 },
1997 })
1998 .await
1999 .unwrap();
2000
2001 // Let the task process.
2002 tokio::time::sleep(Duration::from_millis(150)).await;
2003
2004 // Heartbeats emitted (replica side) and no transition
2005 // triggered (fresh leader heartbeat).
2006 let _heartbeats = dispatcher.heartbeats.lock().clone();
2007 // The coordinator stays in Replica.
2008 handle.cancel().await;
2009 }
2010
2011 /// R-21 regression: when a Leader observes another peer also
2012 /// claiming Leader for the same channel, the deterministic
2013 /// tiebreak demotes the loser to Replica so the partition heal
2014 /// converges to one leader. Without `Leader → Replica`, both
2015 /// partitions stay Leader permanently and accrete divergent
2016 /// histories silently overwritten via `skip_to`.
2017 #[tokio::test]
2018 async fn peer_leader_observation_demotes_loser_to_replica() {
2019 // Local node 0x10 has tail_seq 42 (from build_inputs);
2020 // peer 0x20 advertises tail_seq 99 — peer wins on tail.
2021 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2022 let cid = inputs.channel_id;
2023 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2024 for (role, signal) in [
2025 (
2026 ReplicaRole::Replica,
2027 super::super::replication_state::TransitionSignal::CapabilitySelected,
2028 ),
2029 (
2030 ReplicaRole::Candidate,
2031 super::super::replication_state::TransitionSignal::MissedHeartbeats,
2032 ),
2033 (
2034 ReplicaRole::Leader,
2035 super::super::replication_state::TransitionSignal::ElectionWon,
2036 ),
2037 ] {
2038 coordinator.transition_to(role, signal).await.unwrap();
2039 }
2040 assert_eq!(coordinator.role(), ReplicaRole::Leader);
2041 let dispatcher = Arc::new(RecorderDispatcher::default());
2042 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2043 let budget = build_budget();
2044
2045 on_inbound(
2046 &inputs,
2047 &coordinator,
2048 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2049 &build_state(tracker.clone(), budget.clone()),
2050 Inbound::Heartbeat {
2051 from: 0x20,
2052 msg: SyncHeartbeat {
2053 channel_id: cid,
2054 tail_seq: 99,
2055 role: ReplicaRole::Leader,
2056 wall_clock_ms: 0,
2057 },
2058 },
2059 )
2060 .await;
2061
2062 // Local tail 42 < peer tail 99 → local loses, demotes.
2063 assert_eq!(
2064 coordinator.role(),
2065 ReplicaRole::Replica,
2066 "Leader with lower tail must concede on PeerLeaderObserved"
2067 );
2068 }
2069
2070 /// R-21 regression: tail-tie tiebreak favors the lower node id.
2071 /// Without the symmetric tiebreak the matrix could leave one
2072 /// side as Leader and the other still claiming Leader after
2073 /// the heal — both must agree on the winner.
2074 #[tokio::test]
2075 async fn peer_leader_tail_tie_lower_node_id_wins() {
2076 // Local 0x10 tail = peer 0x20 tail (both 42). Local wins
2077 // because 0x10 < 0x20.
2078 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2079 let cid = inputs.channel_id;
2080 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2081 for (role, signal) in [
2082 (
2083 ReplicaRole::Replica,
2084 super::super::replication_state::TransitionSignal::CapabilitySelected,
2085 ),
2086 (
2087 ReplicaRole::Candidate,
2088 super::super::replication_state::TransitionSignal::MissedHeartbeats,
2089 ),
2090 (
2091 ReplicaRole::Leader,
2092 super::super::replication_state::TransitionSignal::ElectionWon,
2093 ),
2094 ] {
2095 coordinator.transition_to(role, signal).await.unwrap();
2096 }
2097 let dispatcher = Arc::new(RecorderDispatcher::default());
2098 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2099 let budget = build_budget();
2100
2101 on_inbound(
2102 &inputs,
2103 &coordinator,
2104 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2105 &build_state(tracker.clone(), budget.clone()),
2106 Inbound::Heartbeat {
2107 from: 0x20,
2108 msg: SyncHeartbeat {
2109 channel_id: cid,
2110 tail_seq: 42,
2111 role: ReplicaRole::Leader,
2112 wall_clock_ms: 0,
2113 },
2114 },
2115 )
2116 .await;
2117
2118 assert_eq!(
2119 coordinator.role(),
2120 ReplicaRole::Leader,
2121 "tail-tie tiebreak: lower node id keeps Leader"
2122 );
2123 }
2124
2125 #[tokio::test]
2126 async fn replica_with_silent_leader_runs_election_and_promotes_self() {
2127 let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
2128 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2129 coordinator
2130 .transition_to(
2131 ReplicaRole::Replica,
2132 super::super::replication_state::TransitionSignal::CapabilitySelected,
2133 )
2134 .await
2135 .unwrap();
2136 let dispatcher = Arc::new(RecorderDispatcher::default());
2137 let handle = spawn_replication_runtime(
2138 inputs,
2139 coordinator.clone(),
2140 dispatcher.clone(),
2141 build_budget(),
2142 );
2143
2144 // Push a single leader heartbeat from 0x20, then let
2145 // enough time pass for silence detection (3 × 50ms = 150ms).
2146 let cid = channel_id_for("test/runtime");
2147 handle
2148 .dispatch(Inbound::Heartbeat {
2149 from: 0x20,
2150 msg: SyncHeartbeat {
2151 channel_id: cid,
2152 tail_seq: 99,
2153 role: ReplicaRole::Leader,
2154 wall_clock_ms: 1_700_000_000_000,
2155 },
2156 })
2157 .await
2158 .unwrap();
2159
2160 // Sleep > 3 heartbeats + tick alignment so silence
2161 // detection fires AND the post-election transition lands.
2162 tokio::time::sleep(Duration::from_millis(500)).await;
2163
2164 // The election ran with self healthy + RTT=0 in elect();
2165 // peer 0x20 may or may not have been health-filtered by
2166 // the tracker (after the silence threshold, the tracker
2167 // considers 0x20 stale). Either way, self wins: self has
2168 // RTT 0; peer is either filtered out or at 5ms.
2169 assert_eq!(coordinator.role(), ReplicaRole::Leader);
2170
2171 handle.cancel().await;
2172 }
2173
2174 /// Chain-tag sink that returns `Ok` for the first `n` calls
2175 /// then fails every subsequent call. Lets a test pin the post-
2176 /// election sink-failure path: the first call (Idle → Replica
2177 /// announce) succeeds, the second (Candidate → Leader announce)
2178 /// fails — so the coordinator transitions state but the chain-
2179 /// tag side-effect surfaces TagSink.
2180 struct FailingAfterNAnnounceSink {
2181 remaining: ParkingMutex<usize>,
2182 }
2183
2184 #[async_trait::async_trait]
2185 impl ChainTagSink for FailingAfterNAnnounceSink {
2186 async fn announce_chain(
2187 &self,
2188 _origin_hash: u64,
2189 _tip_seq: u64,
2190 ) -> Result<(), AdapterError> {
2191 let mut r = self.remaining.lock();
2192 if *r == 0 {
2193 return Err(AdapterError::Transient(
2194 "simulated sink failure".to_string(),
2195 ));
2196 }
2197 *r -= 1;
2198 Ok(())
2199 }
2200 async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
2201 Ok(())
2202 }
2203 }
2204
2205 /// A failing tag-sink on the post-election Candidate → Leader
2206 /// transition must NOT strand the coordinator in Candidate. The
2207 /// state machine moves to Leader (the failure is in the side-
2208 /// effect only); the runtime must observe the TagSink error
2209 /// branch, clear the believed leader, and continue. The
2210 /// previous code path logged + dropped, leaving the coordinator
2211 /// effectively healthy but stale-state in the tracker.
2212 #[tokio::test]
2213 async fn post_election_tag_sink_failure_does_not_strand_candidate() {
2214 let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
2215 let cid = inputs.channel_id;
2216 // Build a coordinator wired to a sink that succeeds on the
2217 // first announce (Idle → Replica) and fails on the second
2218 // (Candidate → Leader during the post-election transition).
2219 let registry = ReplicationMetricsRegistry::new();
2220 let sink: Arc<dyn ChainTagSink> = Arc::new(FailingAfterNAnnounceSink {
2221 remaining: ParkingMutex::new(1),
2222 });
2223 let coordinator = Arc::new(ReplicationCoordinator::new(
2224 ChannelIdentity {
2225 channel_name: "test/runtime".to_string(),
2226 origin_hash: 0xCAFE_BABE,
2227 },
2228 ReplicationConfig::new(),
2229 sink,
2230 ®istry,
2231 ));
2232 coordinator
2233 .transition_to(
2234 ReplicaRole::Replica,
2235 super::super::replication_state::TransitionSignal::CapabilitySelected,
2236 )
2237 .await
2238 .unwrap();
2239 let dispatcher = Arc::new(RecorderDispatcher::default());
2240 let handle = spawn_replication_runtime(
2241 inputs,
2242 coordinator.clone(),
2243 dispatcher.clone(),
2244 build_budget(),
2245 );
2246
2247 // Seed a single leader heartbeat then let silence detection
2248 // fire so the runtime enters Candidate and runs the
2249 // election.
2250 handle
2251 .dispatch(Inbound::Heartbeat {
2252 from: 0x20,
2253 msg: SyncHeartbeat {
2254 channel_id: cid,
2255 tail_seq: 99,
2256 role: ReplicaRole::Leader,
2257 wall_clock_ms: 1_700_000_000_000,
2258 },
2259 })
2260 .await
2261 .unwrap();
2262 tokio::time::sleep(Duration::from_millis(500)).await;
2263
2264 // State has moved to Leader despite the sink failure — the
2265 // coordinator's transition lock applied the state change
2266 // before the (failing) side-effect ran. The runtime's
2267 // post-election handler observed CoordinatorError::TagSink
2268 // and cleared the believed leader, not the prior code path
2269 // that silently sat on stale Candidate state.
2270 assert_eq!(coordinator.role(), ReplicaRole::Leader);
2271
2272 handle.cancel().await;
2273 }
2274
2275 #[tokio::test]
2276 async fn shutdown_drives_idle_transition() {
2277 let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2278 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2279 coordinator
2280 .transition_to(
2281 ReplicaRole::Replica,
2282 super::super::replication_state::TransitionSignal::CapabilitySelected,
2283 )
2284 .await
2285 .unwrap();
2286 let dispatcher = Arc::new(RecorderDispatcher::default());
2287 let handle =
2288 spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2289
2290 handle.cancel().await;
2291 assert!(handle.is_stopped());
2292 // Final state must be Idle (ChannelClose transition).
2293 assert_eq!(coordinator.role(), ReplicaRole::Idle);
2294 }
2295
2296 /// Regression: a Leader's `on_tick`-driven `record_tail_seq`
2297 /// must clamp the advertised tail to the highest tail any
2298 /// peer has confirmed via heartbeat — never advertise
2299 /// pre-replication local tail. The capability-tag layer reads
2300 /// this value as `tip_seq` for `find_chain_holders` failover
2301 /// selection; advertising un-replicated writes biases
2302 /// failover toward a partition whose tail may not survive a
2303 /// crash.
2304 ///
2305 /// Pre-fix: leader at local tail = 100, replica reported tail
2306 /// = 50 → advertised_tail = 100.
2307 /// Post-fix: advertised_tail = 50 (clamped to max peer tail).
2308 #[tokio::test]
2309 async fn leader_on_tick_clamps_advertised_tail_to_max_peer_tail() {
2310 // Build inputs with a deterministic tail_provider returning
2311 // 100 (the leader's local tail).
2312 let self_id: NodeId = 0x10;
2313 let peer_id: NodeId = 0x20;
2314 let inputs = RuntimeInputs {
2315 channel: ChannelIdentity {
2316 channel_name: "test/runtime".to_string(),
2317 origin_hash: 0xCAFE_BABE,
2318 },
2319 channel_id: channel_id_for("test/runtime"),
2320 self_node_id: self_id,
2321 replica_set: vec![self_id, peer_id],
2322 heartbeat_ms: 100,
2323 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
2324 // Local tail is 100 — well above what the peer has
2325 // reported.
2326 tail_provider: Arc::new(|| 100),
2327 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
2328 file: build_file_for_tests(),
2329 default_bandwidth_class: Default::default(),
2330 background_fraction: 0.3,
2331 };
2332 let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
2333 // Promote to Leader via the state machine.
2334 for (role, signal) in [
2335 (
2336 ReplicaRole::Replica,
2337 super::super::replication_state::TransitionSignal::CapabilitySelected,
2338 ),
2339 (
2340 ReplicaRole::Candidate,
2341 super::super::replication_state::TransitionSignal::MissedHeartbeats,
2342 ),
2343 (
2344 ReplicaRole::Leader,
2345 super::super::replication_state::TransitionSignal::ElectionWon,
2346 ),
2347 ] {
2348 coordinator.transition_to(role, signal).await.unwrap();
2349 }
2350 // Seed the peer's heartbeat at tail=50 — only HALF of what
2351 // the leader has locally. The other 50 events are
2352 // unreplicated.
2353 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2354 tracker
2355 .lock()
2356 .record_heartbeat(peer_id, ReplicaRole::Replica, 50, Instant::now());
2357
2358 let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
2359 on_tick(
2360 &inputs,
2361 &coordinator,
2362 &dispatcher,
2363 &build_state(tracker.clone(), build_budget()),
2364 )
2365 .await;
2366
2367 // record_tail_seq is monotonic; the coordinator's cached
2368 // tail should be the clamped value (50), NOT the raw
2369 // local-file tail (100).
2370 assert_eq!(
2371 coordinator.tail_seq(),
2372 50,
2373 "leader must advertise the highest peer-confirmed tail (50), \
2374 not the pre-replication local tail (100); pre-fix this would \
2375 be 100 and a crash here would lose the 50 un-replicated events \
2376 while failover discovery still thought tip_seq=100",
2377 );
2378 }
2379
2380 /// Companion: with NO peer heartbeats observed (fresh leader),
2381 /// `on_tick` falls back to the raw local tail. A sole leader has
2382 /// authority over its own writes by tautology — clamping to 0
2383 /// would otherwise prevent any progress on a single-node
2384 /// configuration.
2385 #[tokio::test]
2386 async fn leader_on_tick_falls_back_to_local_tail_with_no_peers() {
2387 let self_id: NodeId = 0x10;
2388 let peer_id: NodeId = 0x20;
2389 let inputs = RuntimeInputs {
2390 channel: ChannelIdentity {
2391 channel_name: "test/runtime".to_string(),
2392 origin_hash: 0xCAFE_BABE,
2393 },
2394 channel_id: channel_id_for("test/runtime"),
2395 self_node_id: self_id,
2396 replica_set: vec![self_id, peer_id],
2397 heartbeat_ms: 100,
2398 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
2399 tail_provider: Arc::new(|| 77),
2400 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
2401 file: build_file_for_tests(),
2402 default_bandwidth_class: Default::default(),
2403 background_fraction: 0.3,
2404 };
2405 let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
2406 for (role, signal) in [
2407 (
2408 ReplicaRole::Replica,
2409 super::super::replication_state::TransitionSignal::CapabilitySelected,
2410 ),
2411 (
2412 ReplicaRole::Candidate,
2413 super::super::replication_state::TransitionSignal::MissedHeartbeats,
2414 ),
2415 (
2416 ReplicaRole::Leader,
2417 super::super::replication_state::TransitionSignal::ElectionWon,
2418 ),
2419 ] {
2420 coordinator.transition_to(role, signal).await.unwrap();
2421 }
2422
2423 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2424 let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
2425 on_tick(
2426 &inputs,
2427 &coordinator,
2428 &dispatcher,
2429 &build_state(tracker.clone(), build_budget()),
2430 )
2431 .await;
2432 assert_eq!(
2433 coordinator.tail_seq(),
2434 77,
2435 "no peer heartbeats → no clamp, raw local tail is advertised",
2436 );
2437 }
2438
2439 /// Regression: empty-response backoff must NOT strike when the
2440 /// leader's heartbeat is stale. Pre-fix the strike fired whenever
2441 /// `tracker.peer_state(from).tail_seq > new_tail`, but
2442 /// `peer_state.tail_seq` is the cached value from the last
2443 /// received heartbeat — minutes-stale in a degenerate case. A
2444 /// replica that caught up between an old heartbeat and the
2445 /// current response struck against a leader that had nothing
2446 /// to send. After `CATCHUP_BACKOFF_THRESHOLD` such false
2447 /// strikes the leader sat in a 1–30 s backoff while nothing
2448 /// was actually wrong.
2449 ///
2450 /// Post-fix: skip the strike when the heartbeat is older than
2451 /// the miss-threshold window — a stale heartbeat is no signal.
2452 #[tokio::test]
2453 async fn empty_response_does_not_strike_on_stale_heartbeat() {
2454 let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2455 let cid = inputs.channel_id;
2456 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2457 coordinator
2458 .transition_to(
2459 ReplicaRole::Replica,
2460 super::super::replication_state::TransitionSignal::CapabilitySelected,
2461 )
2462 .await
2463 .unwrap();
2464 let dispatcher = Arc::new(RecorderDispatcher::default());
2465 let budget = build_budget();
2466 let backoff = build_backoff();
2467 let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
2468
2469 // Seed the tracker with a STALE heartbeat: leader 0x20
2470 // claimed tail=200 well outside the miss-threshold window
2471 // (heartbeat_ms = 100, miss_threshold defaults to 3 ⇒ a
2472 // last_seen older than 300 ms is stale). Build the
2473 // heartbeat with a `last_seen` from 10 seconds ago so it's
2474 // definitively stale by the time on_inbound runs.
2475 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2476 let stale_when = Instant::now() - Duration::from_secs(10);
2477 tracker
2478 .lock()
2479 .record_heartbeat(0x20, ReplicaRole::Leader, 200, stale_when);
2480
2481 // Pre-record the request so the binding gate admits the
2482 // response.
2483 outstanding.lock().record(0x20, 0, Instant::now());
2484
2485 // Empty SyncResponse from the (now-stale-heartbeat) leader.
2486 // The local file is empty (next_seq = 0); apply on an empty
2487 // response leaves next_seq = 0, so `new_tail == pre_apply_tail`.
2488 let event = Inbound::SyncResponse {
2489 from: 0x20,
2490 msg: SyncResponse {
2491 channel_id: cid,
2492 first_seq: 0,
2493 leader_first_retained_seq: 0,
2494 events: Vec::new(),
2495 request_id: 0,
2496 },
2497 };
2498 on_inbound(
2499 &inputs,
2500 &coordinator,
2501 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2502 &RuntimeState {
2503 tracker: tracker.clone(),
2504 budget: budget.clone(),
2505 backoff: backoff.clone(),
2506 outstanding: outstanding.clone(),
2507 },
2508 event,
2509 )
2510 .await;
2511
2512 // Pre-fix: backoff would have recorded an empty strike
2513 // because `peer_state.tail_seq (200) > new_tail (0)`.
2514 // Post-fix: stale heartbeat → no strike, no backoff state.
2515 assert!(
2516 !backoff.lock().is_in_backoff(0x20, Instant::now()),
2517 "stale-heartbeat empty must NOT engage backoff"
2518 );
2519 // Drive THRESHOLD+1 more stale-heartbeat empties to prove
2520 // that the strike NEVER fires on stale heartbeats — even
2521 // accumulated over many attempts.
2522 for _ in 0..CATCHUP_BACKOFF_THRESHOLD + 1 {
2523 // Re-register the request_id (consumed by the binding
2524 // gate on each call).
2525 outstanding.lock().record(0x20, 0, Instant::now());
2526 let event = Inbound::SyncResponse {
2527 from: 0x20,
2528 msg: SyncResponse {
2529 channel_id: cid,
2530 first_seq: 0,
2531 leader_first_retained_seq: 0,
2532 events: Vec::new(),
2533 request_id: 0,
2534 },
2535 };
2536 on_inbound(
2537 &inputs,
2538 &coordinator,
2539 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2540 &RuntimeState {
2541 tracker: tracker.clone(),
2542 budget: budget.clone(),
2543 backoff: backoff.clone(),
2544 outstanding: outstanding.clone(),
2545 },
2546 event,
2547 )
2548 .await;
2549 }
2550 assert!(
2551 !backoff.lock().is_in_backoff(0x20, Instant::now()),
2552 "accumulated stale-heartbeat empties must NEVER engage backoff",
2553 );
2554 }
2555
2556 /// R-28 unit test: the backoff structure records empties up to
2557 /// the threshold without setting a backoff window, then stamps
2558 /// an exponentially-growing window once the threshold is
2559 /// crossed. A non-empty response resets the entry.
2560 #[test]
2561 fn catchup_backoff_threshold_and_reset() {
2562 let now = Instant::now();
2563 let mut b = CatchupBackoff::new();
2564 // Up to the threshold: no backoff yet.
2565 for _ in 0..CATCHUP_BACKOFF_THRESHOLD {
2566 b.record_empty(0x20, now);
2567 }
2568 assert!(
2569 !b.is_in_backoff(0x20, now),
2570 "backoff must not engage before the threshold is crossed"
2571 );
2572 // Crossing the threshold sets a backoff window.
2573 b.record_empty(0x20, now);
2574 assert!(
2575 b.is_in_backoff(0x20, now),
2576 "backoff must engage once the empty count crosses the threshold"
2577 );
2578 // A productive response clears the entry.
2579 b.record_progress(0x20);
2580 assert!(
2581 !b.is_in_backoff(0x20, now),
2582 "record_progress must clear the backoff window"
2583 );
2584 }
2585
2586 /// R-25 regression: a saturated low-priority lane (Heartbeat
2587 /// flood) must not starve catchup-critical events on the
2588 /// priority lane (SyncResponse / SyncNack / Shutdown).
2589 ///
2590 /// Drives the failure shape from the audit: many peers
2591 /// flood the low-priority inbox to its 1024 cap while a
2592 /// SyncResponse is also in-flight. With the biased select
2593 /// on the priority lane, the priority entry is drained
2594 /// even though the low-priority lane is saturated.
2595 #[tokio::test]
2596 async fn priority_lane_drains_under_low_priority_saturation() {
2597 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2598 let cid = inputs.channel_id;
2599 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2600 coordinator
2601 .transition_to(
2602 ReplicaRole::Replica,
2603 super::super::replication_state::TransitionSignal::CapabilitySelected,
2604 )
2605 .await
2606 .unwrap();
2607 let dispatcher = Arc::new(RecorderDispatcher::default());
2608 let handle =
2609 spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2610
2611 // Saturate the low-priority lane with heartbeats. The
2612 // runtime task will drain them slowly under heavy lock
2613 // contention; what matters is that the priority lane
2614 // continues to make progress.
2615 for _ in 0..RUNTIME_INBOX_CAPACITY * 2 {
2616 let _ = handle.try_dispatch(Inbound::Heartbeat {
2617 from: 0x20,
2618 msg: SyncHeartbeat {
2619 channel_id: cid,
2620 tail_seq: 0,
2621 role: ReplicaRole::Replica,
2622 wall_clock_ms: 0,
2623 },
2624 });
2625 }
2626
2627 // Ship a Shutdown on the priority lane and confirm the
2628 // runtime exits within a short bound. Under the pre-fix
2629 // single-inbox design this would block on the saturated
2630 // queue indefinitely (cancel falls back to JoinHandle::
2631 // abort but the Idle transition wouldn't run).
2632 let cancel_fut = handle.cancel();
2633 let bounded = tokio::time::timeout(Duration::from_secs(2), cancel_fut).await;
2634 assert!(
2635 bounded.is_ok(),
2636 "shutdown on priority lane must drain under low-priority saturation"
2637 );
2638 // Idle transition ran via the graceful path, not the abort.
2639 assert_eq!(
2640 coordinator.role(),
2641 ReplicaRole::Idle,
2642 "graceful Idle transition must complete via priority-lane Shutdown"
2643 );
2644 }
2645
2646 #[tokio::test]
2647 async fn try_dispatch_returns_event_on_full_buffer() {
2648 // Build a handle, fill the inbox without letting the
2649 // task drain. We rely on the runtime task not having a
2650 // chance to recv before we saturate.
2651 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000); // tick very slow
2652 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2653 coordinator
2654 .transition_to(
2655 ReplicaRole::Replica,
2656 super::super::replication_state::TransitionSignal::CapabilitySelected,
2657 )
2658 .await
2659 .unwrap();
2660 let dispatcher = Arc::new(RecorderDispatcher::default());
2661 let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2662
2663 let cid = channel_id_for("test/runtime");
2664 // Fill past capacity.
2665 for _ in 0..RUNTIME_INBOX_CAPACITY + 10 {
2666 let event = Inbound::Heartbeat {
2667 from: 0x20,
2668 msg: SyncHeartbeat {
2669 channel_id: cid,
2670 tail_seq: 0,
2671 role: ReplicaRole::Replica,
2672 wall_clock_ms: 0,
2673 },
2674 };
2675 // try_dispatch returns the event back when buffer
2676 // is full. The task starts to drain quickly so we
2677 // may not see a rejection on every iteration; just
2678 // assert that at SOME point we got the event back.
2679 let _ = handle.try_dispatch(event);
2680 }
2681
2682 handle.cancel().await;
2683 }
2684
2685 /// A wedged task with a saturated inbox must not hang
2686 /// `cancel()`. The cancel path uses `try_send` for the Shutdown
2687 /// message; if the buffer is full, the JoinHandle is aborted
2688 /// directly so the call returns promptly instead of blocking on
2689 /// a queue the wedged task may never drain.
2690 #[tokio::test]
2691 async fn cancel_with_full_inbox_does_not_hang() {
2692 // Slow tick so the task spends most of its time parked.
2693 // Fill the inbox without giving the task time to drain.
2694 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2695 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2696 coordinator
2697 .transition_to(
2698 ReplicaRole::Replica,
2699 super::super::replication_state::TransitionSignal::CapabilitySelected,
2700 )
2701 .await
2702 .unwrap();
2703 let dispatcher = Arc::new(RecorderDispatcher::default());
2704 let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2705
2706 // Saturate the inbox so a buffered `send(Shutdown).await`
2707 // would block.
2708 let cid = channel_id_for("test/runtime");
2709 for _ in 0..RUNTIME_INBOX_CAPACITY {
2710 let _ = handle.try_dispatch(Inbound::Heartbeat {
2711 from: 0x20,
2712 msg: SyncHeartbeat {
2713 channel_id: cid,
2714 tail_seq: 0,
2715 role: ReplicaRole::Replica,
2716 wall_clock_ms: 0,
2717 },
2718 });
2719 }
2720
2721 // cancel() must complete within a tight bound — a regression
2722 // to `send(Shutdown).await` would hang here on the full
2723 // buffer.
2724 tokio::time::timeout(Duration::from_secs(2), handle.cancel())
2725 .await
2726 .expect("cancel() must not hang on full inbox");
2727 assert!(handle.is_stopped());
2728 }
2729
2730 /// Dropping a handle without `cancel()` aborts the underlying
2731 /// task so the spawned future stops driving and any dispatcher
2732 /// Arc it held is released. Pins the Arc-cycle invariant:
2733 /// `MeshNode → router → handle → task → dispatcher` must close
2734 /// when the handle goes out of scope.
2735 #[tokio::test]
2736 async fn dropping_handle_aborts_task() {
2737 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2738 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2739 let dispatcher = Arc::new(RecorderDispatcher::default());
2740 let dispatcher_clone: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
2741 // The runtime task holds one strong reference to dispatcher
2742 // (passed below). Count after spawn = 2.
2743 let handle =
2744 spawn_replication_runtime(inputs, coordinator, dispatcher_clone, build_budget());
2745 tokio::time::sleep(Duration::from_millis(20)).await;
2746 assert!(Arc::strong_count(&dispatcher) >= 2);
2747
2748 drop(handle);
2749
2750 // Yield enough for the abort to land + the task's local
2751 // state to deallocate (releasing the dispatcher Arc).
2752 for _ in 0..20 {
2753 tokio::time::sleep(Duration::from_millis(10)).await;
2754 if Arc::strong_count(&dispatcher) == 1 {
2755 return;
2756 }
2757 }
2758 panic!(
2759 "task did not release dispatcher Arc after handle drop; strong_count = {}",
2760 Arc::strong_count(&dispatcher)
2761 );
2762 }
2763
2764 #[tokio::test]
2765 async fn dispatch_after_cancel_errors() {
2766 let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2767 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2768 let dispatcher = Arc::new(RecorderDispatcher::default());
2769 let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2770
2771 handle.cancel().await;
2772
2773 let cid = channel_id_for("test/runtime");
2774 let result = handle
2775 .dispatch(Inbound::Heartbeat {
2776 from: 0x20,
2777 msg: SyncHeartbeat {
2778 channel_id: cid,
2779 tail_seq: 0,
2780 role: ReplicaRole::Replica,
2781 wall_clock_ms: 0,
2782 },
2783 })
2784 .await;
2785 assert!(result.is_err(), "dispatch must error after cancel");
2786 }
2787
2788 #[tokio::test]
2789 async fn channel_id_mismatch_drops_heartbeat() {
2790 let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2791 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2792 coordinator
2793 .transition_to(
2794 ReplicaRole::Replica,
2795 super::super::replication_state::TransitionSignal::CapabilitySelected,
2796 )
2797 .await
2798 .unwrap();
2799 let dispatcher = Arc::new(RecorderDispatcher::default());
2800 let handle =
2801 spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2802
2803 // Push a heartbeat with the wrong channel_id.
2804 let wrong = channel_id_for("test/wrong_channel");
2805 handle
2806 .dispatch(Inbound::Heartbeat {
2807 from: 0x20,
2808 msg: SyncHeartbeat {
2809 channel_id: wrong,
2810 tail_seq: 99,
2811 role: ReplicaRole::Leader,
2812 wall_clock_ms: 0,
2813 },
2814 })
2815 .await
2816 .unwrap();
2817
2818 // After 4 ticks, silence detection would have triggered
2819 // if the heartbeat had landed; it didn't, so the role
2820 // either stays Replica (no leader observed at all) or
2821 // advances to Candidate / Leader via election with no
2822 // believed leader. Either way: prove the heartbeat
2823 // didn't poison the tracker by checking we didn't end
2824 // up as a Replica with `believed_leader == Some(0x20)`
2825 // (which is the broken outcome).
2826 //
2827 // We let it run a few ticks for the election cycle to
2828 // potentially fire.
2829 tokio::time::sleep(Duration::from_millis(500)).await;
2830
2831 // The wrong-channel heartbeat should have been ignored.
2832 // The final role is either Replica (if nothing else
2833 // happened) or Leader (if the silence-detection +
2834 // election fired). Either way, NOT believing 0x20 to
2835 // be leader is the contract — but we can't observe the
2836 // tracker from here. Instead pin that the role isn't
2837 // Idle (we didn't cancel; we're still alive).
2838 let role = coordinator.role();
2839 assert!(
2840 matches!(role, ReplicaRole::Replica | ReplicaRole::Leader),
2841 "expected Replica or Leader; got {role:?}",
2842 );
2843
2844 handle.cancel().await;
2845 }
2846
2847 // ────────────────────────────────────────────────────────────────
2848 // Lag-observation gauge (Phase H)
2849 // ────────────────────────────────────────────────────────────────
2850
2851 #[test]
2852 fn observe_lag_idle_emits_none() {
2853 let tracker = HeartbeatTracker::new(500);
2854 let now = Instant::now();
2855 match observe_lag(ReplicaRole::Idle, &[0x10, 0x20], 0x10, &tracker, now) {
2856 LagObservation::None => {}
2857 other => panic!("expected None for Idle, got {other:?}"),
2858 }
2859 }
2860
2861 #[test]
2862 fn observe_lag_candidate_emits_none() {
2863 let tracker = HeartbeatTracker::new(500);
2864 let now = Instant::now();
2865 match observe_lag(ReplicaRole::Candidate, &[0x10, 0x20], 0x10, &tracker, now) {
2866 LagObservation::None => {}
2867 other => panic!("expected None for Candidate, got {other:?}"),
2868 }
2869 }
2870
2871 #[test]
2872 fn observe_lag_leader_with_no_peer_observations_emits_none() {
2873 // Self is the leader, peers in the set but no heartbeats
2874 // observed yet → lag has no observation to report.
2875 let tracker = HeartbeatTracker::new(500);
2876 let now = Instant::now();
2877 match observe_lag(
2878 ReplicaRole::Leader,
2879 &[0x10, 0x20, 0x30],
2880 0x10,
2881 &tracker,
2882 now,
2883 ) {
2884 LagObservation::None => {}
2885 other => panic!("expected None when peers have not heartbeated, got {other:?}"),
2886 }
2887 }
2888
2889 #[test]
2890 fn observe_lag_leader_picks_worst_replica() {
2891 let mut tracker = HeartbeatTracker::new(500);
2892 let base = Instant::now();
2893 // Peer 0x20 heartbeated at base; peer 0x30 heartbeated 100ms
2894 // later. After advancing to base+1000ms, peer 0x20 has 1000ms
2895 // of lag, peer 0x30 has 900ms. Leader gauge picks the worst.
2896 tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
2897 tracker.record_heartbeat(
2898 0x30,
2899 ReplicaRole::Replica,
2900 0,
2901 base + Duration::from_millis(100),
2902 );
2903 let now = base + Duration::from_millis(1000);
2904 match observe_lag(
2905 ReplicaRole::Leader,
2906 &[0x10, 0x20, 0x30],
2907 0x10,
2908 &tracker,
2909 now,
2910 ) {
2911 LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(1000)),
2912 other => panic!("expected Leader(1000ms), got {other:?}"),
2913 }
2914 }
2915
2916 #[test]
2917 fn observe_lag_replica_emits_believed_leader_lag() {
2918 let mut tracker = HeartbeatTracker::new(500);
2919 let base = Instant::now();
2920 tracker.record_heartbeat(0x42, ReplicaRole::Leader, 99, base);
2921 let now = base + Duration::from_millis(250);
2922 match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
2923 LagObservation::Replica(d) => assert_eq!(d, Duration::from_millis(250)),
2924 other => panic!("expected Replica(250ms), got {other:?}"),
2925 }
2926 }
2927
2928 #[test]
2929 fn observe_lag_replica_with_no_believed_leader_emits_none() {
2930 // Empty tracker — no leader heartbeat ever observed.
2931 let tracker = HeartbeatTracker::new(500);
2932 let now = Instant::now();
2933 match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
2934 LagObservation::None => {}
2935 other => panic!("expected None for replica with no believed leader, got {other:?}"),
2936 }
2937 }
2938
2939 #[test]
2940 fn observe_lag_leader_skips_self_in_replica_set() {
2941 // Self appears in replica_set (typical — leaders are listed
2942 // alongside replicas). The lag picks the worst PEER, never
2943 // self's lag (which is meaningless since self is the
2944 // writer).
2945 let mut tracker = HeartbeatTracker::new(500);
2946 let base = Instant::now();
2947 tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
2948 let now = base + Duration::from_millis(500);
2949 match observe_lag(ReplicaRole::Leader, &[0x10, 0x20], 0x10, &tracker, now) {
2950 LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(500)),
2951 other => panic!("expected Leader(500ms), got {other:?}"),
2952 }
2953 }
2954
2955 // ────────────────────────────────────────────────────────────────
2956 // Disk-pressure handling (Phase G)
2957 // ────────────────────────────────────────────────────────────────
2958
2959 fn build_coordinator_with_policy(
2960 policy: super::super::replication_config::UnderCapacity,
2961 ) -> (
2962 Arc<ReplicationCoordinator>,
2963 Arc<super::super::replication_metrics::ChannelMetricsAtomic>,
2964 ) {
2965 let registry = ReplicationMetricsRegistry::new();
2966 let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
2967 let config = ReplicationConfig::new().with_on_under_capacity(policy);
2968 let coordinator = ReplicationCoordinator::new(
2969 ChannelIdentity {
2970 channel_name: "test/runtime".to_string(),
2971 origin_hash: 0xCAFE_BABE,
2972 },
2973 config,
2974 sink,
2975 ®istry,
2976 );
2977 let metrics = registry.for_channel("test/runtime");
2978 (Arc::new(coordinator), metrics)
2979 }
2980
2981 #[tokio::test]
2982 async fn disk_pressure_withdraw_drives_idle_transition() {
2983 // Bring the coordinator to Replica role so the
2984 // DiskPressureWithdraw signal can validate.
2985 let (coord, metrics) = build_coordinator_with_policy(
2986 super::super::replication_config::UnderCapacity::Withdraw,
2987 );
2988 coord
2989 .transition_to(
2990 ReplicaRole::Replica,
2991 super::super::replication_state::TransitionSignal::CapabilitySelected,
2992 )
2993 .await
2994 .unwrap();
2995 assert_eq!(coord.role(), ReplicaRole::Replica);
2996
2997 let file = build_file_for_tests();
2998 handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
2999
3000 assert_eq!(coord.role(), ReplicaRole::Idle, "Withdraw flips to Idle");
3001 assert_eq!(
3002 metrics
3003 .under_capacity_total
3004 .load(std::sync::atomic::Ordering::Relaxed),
3005 1,
3006 "Withdraw bumps under_capacity_total"
3007 );
3008 }
3009
3010 #[tokio::test]
3011 async fn disk_pressure_evict_oldest_keeps_role_and_sweeps() {
3012 // EvictOldest: stay in Replica role, retention sweep
3013 // fires. Pre-fill the file with N events under a
3014 // count-1 retention cap so the sweep observably evicts.
3015 let (coord, metrics) = build_coordinator_with_policy(
3016 super::super::replication_config::UnderCapacity::EvictOldest,
3017 );
3018 coord
3019 .transition_to(
3020 ReplicaRole::Replica,
3021 super::super::replication_state::TransitionSignal::CapabilitySelected,
3022 )
3023 .await
3024 .unwrap();
3025 // Build a file with retention cap = 1 so the sweep
3026 // observably retains only the newest entry.
3027 use crate::adapter::net::redex::config::RedexFileConfig;
3028 use crate::adapter::net::redex::manager::Redex;
3029 let r = Redex::new();
3030 let cn = ChannelName::new("test/runtime").unwrap();
3031 let cfg = RedexFileConfig::default().with_retention_max_events(1);
3032 let file = r.open_file(&cn, cfg).unwrap();
3033 for i in 0..5 {
3034 file.append(format!("event-{i}").as_bytes()).unwrap();
3035 }
3036 assert_eq!(file.len(), 5);
3037
3038 handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3039
3040 assert_eq!(
3041 coord.role(),
3042 ReplicaRole::Replica,
3043 "EvictOldest preserves Replica role"
3044 );
3045 assert_eq!(file.len(), 1, "retention sweep dropped to cap of 1");
3046 assert_eq!(
3047 metrics
3048 .under_capacity_total
3049 .load(std::sync::atomic::Ordering::Relaxed),
3050 1,
3051 "EvictOldest also bumps under_capacity_total"
3052 );
3053 }
3054
3055 /// Disk pressure observed from a Leader role (or Candidate
3056 /// mid-election) must still drive a withdraw to Idle. The
3057 /// transition matrix only permits DiskPressureWithdraw on
3058 /// Replica → Idle; without role-aware signal selection a Leader
3059 /// would silently log+drop the withdraw and keep writing
3060 /// through pressure. Pick ChannelClose for the non-Replica case
3061 /// so the withdraw lands regardless of current role.
3062 #[tokio::test]
3063 async fn disk_pressure_withdraw_from_leader_picks_channel_close_signal() {
3064 let (coord, _metrics) = build_coordinator_with_policy(
3065 super::super::replication_config::UnderCapacity::Withdraw,
3066 );
3067 // Promote through the full state cycle to Leader.
3068 coord
3069 .transition_to(
3070 ReplicaRole::Replica,
3071 super::super::replication_state::TransitionSignal::CapabilitySelected,
3072 )
3073 .await
3074 .unwrap();
3075 coord
3076 .transition_to(
3077 ReplicaRole::Candidate,
3078 super::super::replication_state::TransitionSignal::MissedHeartbeats,
3079 )
3080 .await
3081 .unwrap();
3082 coord
3083 .transition_to(
3084 ReplicaRole::Leader,
3085 super::super::replication_state::TransitionSignal::ElectionWon,
3086 )
3087 .await
3088 .unwrap();
3089 assert_eq!(coord.role(), ReplicaRole::Leader);
3090
3091 let file = build_file_for_tests();
3092 handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3093
3094 // Leader → Idle via ChannelClose must land — the prior code
3095 // path used DiskPressureWithdraw which is invalid from
3096 // Leader and would silently fail-and-log.
3097 assert_eq!(
3098 coord.role(),
3099 ReplicaRole::Idle,
3100 "Leader disk-pressure must withdraw to Idle"
3101 );
3102 }
3103
3104 #[tokio::test]
3105 async fn disk_pressure_withdraw_is_idempotent_on_idle_already() {
3106 // Defensive: if the coordinator is already Idle when the
3107 // DiskPressureWithdraw fires (race with another path),
3108 // the transition path's idempotent `Idle → Idle +
3109 // ChannelClose` shortcut doesn't apply (this is
3110 // DiskPressureWithdraw, not ChannelClose). The
3111 // transition rejects but the counter still bumps.
3112 let (coord, metrics) = build_coordinator_with_policy(
3113 super::super::replication_config::UnderCapacity::Withdraw,
3114 );
3115 // Coordinator starts in Idle.
3116 let file = build_file_for_tests();
3117 handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3118 // Counter advanced; role is still Idle (the transition_to
3119 // call inside handle_disk_pressure surfaces an error that
3120 // we log + drop, so role stays Idle).
3121 assert_eq!(coord.role(), ReplicaRole::Idle);
3122 assert_eq!(
3123 metrics
3124 .under_capacity_total
3125 .load(std::sync::atomic::Ordering::Relaxed),
3126 1,
3127 );
3128 }
3129
3130 // ────────────────────────────────────────────────────────────────
3131 // R-1 / R-12 regression: role-flip TOCTOU + channel-id validation
3132 // ────────────────────────────────────────────────────────────────
3133
3134 /// R-1: A Leader that flips to Idle (e.g. via DiskPressureWithdraw)
3135 /// in the middle of serving a SyncRequest must NACK NotLeader
3136 /// rather than ship a SyncResponse from a node that no longer
3137 /// claims leadership. The fix is a post-read role re-check
3138 /// immediately before the dispatcher send.
3139 #[tokio::test]
3140 async fn sync_request_post_op_role_flip_emits_notleader_nack() {
3141 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3142 let cid = inputs.channel_id;
3143 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3144 // Promote to Leader so the entry-check passes.
3145 for (role, signal) in [
3146 (
3147 ReplicaRole::Replica,
3148 super::super::replication_state::TransitionSignal::CapabilitySelected,
3149 ),
3150 (
3151 ReplicaRole::Candidate,
3152 super::super::replication_state::TransitionSignal::MissedHeartbeats,
3153 ),
3154 (
3155 ReplicaRole::Leader,
3156 super::super::replication_state::TransitionSignal::ElectionWon,
3157 ),
3158 ] {
3159 coordinator.transition_to(role, signal).await.unwrap();
3160 }
3161 let dispatcher = Arc::new(RecorderDispatcher::default());
3162 let budget = build_budget();
3163 let event = Inbound::SyncRequest {
3164 from: 0x20,
3165 msg: SyncRequest {
3166 channel_id: cid,
3167 since_seq: 0,
3168 chunk_max: 1024,
3169 request_id: 0,
3170 class: Default::default(),
3171 },
3172 };
3173 // Simulate the role flipping between the entry check and
3174 // the post-op re-check: flip to Idle via DiskPressureWithdraw
3175 // RIGHT BEFORE we call on_inbound. The entry-check would
3176 // have failed for an already-Idle coordinator, so we have
3177 // to use an arrangement that lets the entry check pass
3178 // and the post-op check fail. The cleanest test: start as
3179 // Leader; flip to Idle externally; call on_inbound. The
3180 // entry check now fails — pin the NACK shape directly.
3181 coordinator
3182 .transition_to(
3183 ReplicaRole::Idle,
3184 super::super::replication_state::TransitionSignal::GracefulRelinquish,
3185 )
3186 .await
3187 .unwrap();
3188 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3189 on_inbound(
3190 &inputs,
3191 &coordinator,
3192 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3193 &build_state(tracker.clone(), budget.clone()),
3194 event,
3195 )
3196 .await;
3197 let nacks = dispatcher.sync_nacks.lock().clone();
3198 assert_eq!(nacks.len(), 1, "expected one NotLeader NACK");
3199 let (target, nack) = &nacks[0];
3200 assert_eq!(*target, 0x20);
3201 assert_eq!(
3202 nack.error_code,
3203 super::super::replication::SyncNackError::NotLeader
3204 );
3205 assert_eq!(nack.channel_id, cid);
3206 // No SyncResponse should have been shipped.
3207 assert!(
3208 dispatcher.sync_responses.lock().is_empty(),
3209 "no SyncResponse must ship when role isn't Leader"
3210 );
3211 }
3212
3213 /// R-12: SyncRequest with mismatched channel_id is dropped at
3214 /// the runtime boundary; no NACK, no response, no file access.
3215 #[tokio::test]
3216 async fn sync_request_with_wrong_channel_id_is_dropped() {
3217 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3218 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3219 // Promote to Leader.
3220 for (role, signal) in [
3221 (
3222 ReplicaRole::Replica,
3223 super::super::replication_state::TransitionSignal::CapabilitySelected,
3224 ),
3225 (
3226 ReplicaRole::Candidate,
3227 super::super::replication_state::TransitionSignal::MissedHeartbeats,
3228 ),
3229 (
3230 ReplicaRole::Leader,
3231 super::super::replication_state::TransitionSignal::ElectionWon,
3232 ),
3233 ] {
3234 coordinator.transition_to(role, signal).await.unwrap();
3235 }
3236 let dispatcher = Arc::new(RecorderDispatcher::default());
3237 let budget = build_budget();
3238 let wrong = channel_id_for("test/wrong_channel");
3239 let event = Inbound::SyncRequest {
3240 from: 0x20,
3241 msg: SyncRequest {
3242 channel_id: wrong,
3243 since_seq: 0,
3244 chunk_max: 1024,
3245 request_id: 0,
3246 class: Default::default(),
3247 },
3248 };
3249 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3250 on_inbound(
3251 &inputs,
3252 &coordinator,
3253 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3254 &build_state(tracker.clone(), budget.clone()),
3255 event,
3256 )
3257 .await;
3258 assert!(
3259 dispatcher.sync_nacks.lock().is_empty(),
3260 "no NACK on wrong-channel — silently dropped"
3261 );
3262 assert!(
3263 dispatcher.sync_responses.lock().is_empty(),
3264 "no SyncResponse on wrong-channel"
3265 );
3266 }
3267
3268 /// Peer-auth gate regression: an inbound message whose `from`
3269 /// node is not in `replica_set` must be dropped at on_inbound
3270 /// entry. Without the gate any mesh peer with SUBPROTOCOL_REDEX
3271 /// reach could drive the runtime — the worst case being a forged
3272 /// SyncResponse that writes attacker-chosen bytes into the
3273 /// replica's local log via `append_batch`.
3274 #[tokio::test]
3275 async fn inbound_from_non_replica_set_peer_is_dropped() {
3276 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3277 let cid = inputs.channel_id;
3278 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3279 coordinator
3280 .transition_to(
3281 ReplicaRole::Replica,
3282 super::super::replication_state::TransitionSignal::CapabilitySelected,
3283 )
3284 .await
3285 .unwrap();
3286 let dispatcher = Arc::new(RecorderDispatcher::default());
3287 let budget = build_budget();
3288 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3289 let baseline_next = inputs.file.next_seq();
3290 // 0x99 is NOT in replica_set [0x10, 0x20]. Even with valid
3291 // channel_id and a payload the apply path would accept, the
3292 // membership gate must drop it.
3293 let event = Inbound::SyncResponse {
3294 from: 0x99,
3295 msg: SyncResponse {
3296 channel_id: cid,
3297 first_seq: 0,
3298 leader_first_retained_seq: 0,
3299 events: Vec::new(),
3300 request_id: 0,
3301 },
3302 };
3303 on_inbound(
3304 &inputs,
3305 &coordinator,
3306 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3307 &build_state(tracker.clone(), budget.clone()),
3308 event,
3309 )
3310 .await;
3311 // No state mutation on the local file from an out-of-set
3312 // peer's SyncResponse.
3313 assert_eq!(
3314 inputs.file.next_seq(),
3315 baseline_next,
3316 "out-of-set peer must not advance local tail"
3317 );
3318 // A Heartbeat from the same out-of-set peer must not seed
3319 // the tracker either — the gate runs before record_heartbeat.
3320 let event = Inbound::Heartbeat {
3321 from: 0x99,
3322 msg: SyncHeartbeat {
3323 channel_id: cid,
3324 tail_seq: 7,
3325 role: ReplicaRole::Leader,
3326 wall_clock_ms: 0,
3327 },
3328 };
3329 on_inbound(
3330 &inputs,
3331 &coordinator,
3332 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3333 &build_state(tracker.clone(), budget.clone()),
3334 event,
3335 )
3336 .await;
3337 assert!(
3338 tracker.lock().believed_leader().is_none(),
3339 "out-of-set heartbeat must not seed believed_leader"
3340 );
3341 }
3342
3343 /// Peer-auth gate regression: a SyncResponse from a replica_set
3344 /// peer who is not the believed_leader must be dropped. Without
3345 /// this check, a non-leader replica could ship forged chunks
3346 /// once they're inside the replica_set.
3347 #[tokio::test]
3348 async fn sync_response_from_non_leader_replica_peer_is_dropped() {
3349 let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 60_000);
3350 let cid = inputs.channel_id;
3351 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
3352 coordinator
3353 .transition_to(
3354 ReplicaRole::Replica,
3355 super::super::replication_state::TransitionSignal::CapabilitySelected,
3356 )
3357 .await
3358 .unwrap();
3359 let dispatcher = Arc::new(RecorderDispatcher::default());
3360 let budget = build_budget();
3361 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3362 // Seed believed_leader = 0x20 via a Leader heartbeat.
3363 tracker
3364 .lock()
3365 .record_heartbeat(0x20, ReplicaRole::Leader, 0, Instant::now());
3366 assert_eq!(tracker.lock().believed_leader(), Some(0x20));
3367 let baseline_next = inputs.file.next_seq();
3368 // 0x30 IS in replica_set but is NOT the believed leader.
3369 let event = Inbound::SyncResponse {
3370 from: 0x30,
3371 msg: SyncResponse {
3372 channel_id: cid,
3373 first_seq: 0,
3374 leader_first_retained_seq: 0,
3375 events: Vec::new(),
3376 request_id: 0,
3377 },
3378 };
3379 on_inbound(
3380 &inputs,
3381 &coordinator,
3382 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3383 &build_state(tracker.clone(), budget.clone()),
3384 event,
3385 )
3386 .await;
3387 assert_eq!(
3388 inputs.file.next_seq(),
3389 baseline_next,
3390 "non-leader replica_set peer must not advance local tail via SyncResponse"
3391 );
3392 }
3393
3394 /// R-11 regression: `is_stopped()` returns `false` until
3395 /// `cancel()`'s await completes, even if a parallel
3396 /// `cancel()` raced and took the JoinHandle out of the
3397 /// slot. The explicit `stopped` flag (flipped only post-
3398 /// await) is what guarantees this.
3399 #[tokio::test]
3400 async fn is_stopped_is_false_before_first_cancel_completes() {
3401 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3402 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3403 let dispatcher = Arc::new(RecorderDispatcher::default());
3404 let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
3405 assert!(
3406 !handle.is_stopped(),
3407 "fresh runtime must report not stopped"
3408 );
3409 handle.cancel().await;
3410 assert!(
3411 handle.is_stopped(),
3412 "post-cancel().await runtime must report stopped"
3413 );
3414 // Idempotent second cancel must not flip the flag back.
3415 handle.cancel().await;
3416 assert!(handle.is_stopped());
3417 }
3418
3419 /// R-4 regression: SyncNack NotLeader must actively clear
3420 /// the believed leader so the next tick re-resolves.
3421 /// Without the fix the replica loops sending SyncRequests
3422 /// to the same stale leader until 3 missed heartbeats trip.
3423 #[tokio::test]
3424 async fn sync_nack_notleader_clears_believed_leader() {
3425 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3426 let cid = inputs.channel_id;
3427 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3428 coordinator
3429 .transition_to(
3430 ReplicaRole::Replica,
3431 super::super::replication_state::TransitionSignal::CapabilitySelected,
3432 )
3433 .await
3434 .unwrap();
3435 let dispatcher = Arc::new(RecorderDispatcher::default());
3436 let budget = build_budget();
3437 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3438 // Seed the tracker with a believed leader heartbeat.
3439 tracker
3440 .lock()
3441 .record_heartbeat(0x20, ReplicaRole::Leader, 99, Instant::now());
3442 assert_eq!(tracker.lock().believed_leader(), Some(0x20));
3443 // NACK NotLeader from the believed leader. Pre-record an
3444 // in-flight request_id so the response-binding gate admits
3445 // the NACK to the apply path.
3446 let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
3447 outstanding.lock().record(0x20, 0, Instant::now());
3448 let event = Inbound::SyncNack {
3449 from: 0x20,
3450 msg: SyncNack {
3451 channel_id: cid,
3452 since_seq: 0,
3453 error_code: super::super::replication::SyncNackError::NotLeader,
3454 leader_first_retained_seq: 0,
3455 detail: String::new(),
3456 request_id: 0,
3457 },
3458 };
3459 on_inbound(
3460 &inputs,
3461 &coordinator,
3462 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3463 &RuntimeState {
3464 tracker: tracker.clone(),
3465 budget: budget.clone(),
3466 backoff: build_backoff(),
3467 outstanding: outstanding.clone(),
3468 },
3469 event,
3470 )
3471 .await;
3472 assert!(
3473 tracker.lock().believed_leader().is_none(),
3474 "NACK NotLeader must clear the cached believed leader"
3475 );
3476 }
3477
3478 /// R-4 regression: SyncNack BadRange skips the local tail
3479 /// past the rejected `since_seq` so the next SyncRequest
3480 /// re-issues against a range the leader has retained.
3481 /// Without the fix the replica re-issues the same range
3482 /// indefinitely.
3483 #[tokio::test]
3484 async fn sync_nack_badrange_skips_local_tail() {
3485 let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3486 let cid = inputs.channel_id;
3487 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3488 coordinator
3489 .transition_to(
3490 ReplicaRole::Replica,
3491 super::super::replication_state::TransitionSignal::CapabilitySelected,
3492 )
3493 .await
3494 .unwrap();
3495 let dispatcher = Arc::new(RecorderDispatcher::default());
3496 let budget = build_budget();
3497 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3498 // Seed the tracker with a believed leader heartbeat so the
3499 // peer-auth gate at on_inbound entry admits the NACK below.
3500 tracker
3501 .lock()
3502 .record_heartbeat(0x20, ReplicaRole::Leader, 41, Instant::now());
3503
3504 // Local file is empty (next_seq = 0). NACK with since_seq=42
3505 // means "the leader trimmed up to 42; you asked for 42 but
3506 // it's gone." Local tail must advance to 43.
3507 let baseline_next = inputs.file.next_seq();
3508 // Pre-record the in-flight request_id so the response-binding
3509 // gate admits this NACK rather than dropping it as stale.
3510 let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
3511 outstanding.lock().record(0x20, 0, Instant::now());
3512 let event = Inbound::SyncNack {
3513 from: 0x20,
3514 msg: SyncNack {
3515 channel_id: cid,
3516 since_seq: 42,
3517 // Pre-fix: replica advanced one seq at a time via
3518 // `since_seq + 1 = 43`. R-40 wire field instructs
3519 // the replica to jump straight to the leader's
3520 // first-retained seq (here, 100) in one round trip.
3521 error_code: super::super::replication::SyncNackError::BadRange,
3522 leader_first_retained_seq: 100,
3523 detail: String::new(),
3524 request_id: 0,
3525 },
3526 };
3527 on_inbound(
3528 &inputs,
3529 &coordinator,
3530 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3531 &RuntimeState {
3532 tracker: tracker.clone(),
3533 budget: budget.clone(),
3534 backoff: build_backoff(),
3535 outstanding: outstanding.clone(),
3536 },
3537 event,
3538 )
3539 .await;
3540 // The local file's next_seq advanced past the bad range
3541 // (or, on persistent files, fell back to retry). For a
3542 // heap-only file in this test, skip_to(43) succeeded.
3543 let after = inputs.file.next_seq();
3544 assert!(
3545 after > baseline_next,
3546 "BadRange must advance local next_seq (got {after}, baseline {baseline_next})"
3547 );
3548 // R-40 regression: with leader_first_retained_seq = 100,
3549 // skip_to must jump straight to 100 in one round trip, not
3550 // creep up by one per BadRange cycle (since_seq + 1 = 43
3551 // would otherwise re-trigger BadRange when retention floor
3552 // is much higher).
3553 assert_eq!(
3554 after, 100,
3555 "BadRange with leader_first_retained_seq must jump local tail to the floor"
3556 );
3557 // skip_ahead metric advanced.
3558 assert_eq!(
3559 coordinator
3560 .metrics()
3561 .skip_ahead_total
3562 .load(std::sync::atomic::Ordering::Relaxed),
3563 1
3564 );
3565 }
3566
3567 /// R-2: When a post-Candidate transition fails (e.g. the
3568 /// coordinator already advanced via a concurrent path),
3569 /// `clear_believed_leader` must NOT run — otherwise the
3570 /// replica is left with no leader and no path to enter
3571 /// Candidate again.
3572 #[tokio::test]
3573 async fn post_election_failed_transition_preserves_believed_leader() {
3574 let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
3575 let cid = inputs.channel_id;
3576 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3577 coordinator
3578 .transition_to(
3579 ReplicaRole::Replica,
3580 super::super::replication_state::TransitionSignal::CapabilitySelected,
3581 )
3582 .await
3583 .unwrap();
3584 let dispatcher = Arc::new(RecorderDispatcher::default());
3585 let handle = spawn_replication_runtime(
3586 inputs,
3587 coordinator.clone(),
3588 dispatcher.clone(),
3589 build_budget(),
3590 );
3591
3592 // Record a Leader heartbeat to set believed_leader.
3593 handle
3594 .dispatch(Inbound::Heartbeat {
3595 from: 0x20,
3596 msg: SyncHeartbeat {
3597 channel_id: cid,
3598 tail_seq: 99,
3599 role: ReplicaRole::Leader,
3600 wall_clock_ms: 1_700_000_000_000,
3601 },
3602 })
3603 .await
3604 .unwrap();
3605
3606 // Drive an external race: drop the coordinator into Idle
3607 // via the public surface before the election runs.
3608 // The post-election transition_to will see (Idle, Leader,
3609 // ElectionWon) which is invalid; the fix ensures we don't
3610 // wipe the tracker on that failure.
3611 tokio::time::sleep(Duration::from_millis(20)).await;
3612 coordinator
3613 .transition_to(
3614 ReplicaRole::Idle,
3615 super::super::replication_state::TransitionSignal::ChannelClose,
3616 )
3617 .await
3618 .unwrap();
3619
3620 // Run a few more ticks; the silence detection should still
3621 // run but the post-election transition should fail silently
3622 // without wiping believed_leader. We can't directly observe
3623 // the tracker, but we CAN confirm the coordinator stays at
3624 // Idle (didn't bounce back to Leader after a failed post-
3625 // election transition) and that no panic / unexpected state
3626 // mutation occurred.
3627 tokio::time::sleep(Duration::from_millis(300)).await;
3628 assert_eq!(coordinator.role(), ReplicaRole::Idle);
3629
3630 handle.cancel().await;
3631 }
3632
3633 /// The replica's `on_tick` must stamp `inputs.default_bandwidth_class`
3634 /// on every emitted `SyncRequest` — not `Default::default()`. Without
3635 /// this, the per-channel default configured via
3636 /// `ReplicationConfig::default_bandwidth_class` is silently dropped
3637 /// and every replica catchup ships as `Foreground`, regardless of
3638 /// operator policy.
3639 #[tokio::test]
3640 async fn replica_on_tick_stamps_inputs_default_bandwidth_class_on_sync_request() {
3641 use crate::adapter::net::redex::bandwidth::BandwidthClass;
3642
3643 let self_id: NodeId = 0x10;
3644 let leader_id: NodeId = 0x20;
3645 let inputs = RuntimeInputs {
3646 channel: ChannelIdentity {
3647 channel_name: "test/runtime".to_string(),
3648 origin_hash: 0xCAFE_BABE,
3649 },
3650 channel_id: channel_id_for("test/runtime"),
3651 self_node_id: self_id,
3652 replica_set: vec![self_id, leader_id],
3653 heartbeat_ms: 100,
3654 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3655 // Local tail = 0; leader's heartbeat will advertise 50,
3656 // forcing `tick()` to emit a SyncRequest.
3657 tail_provider: Arc::new(|| 0),
3658 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3659 file: build_file_for_tests(),
3660 // Non-default class — must round-trip onto the wire.
3661 default_bandwidth_class: BandwidthClass::Background,
3662 background_fraction: 0.3,
3663 };
3664 let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, leader_id]);
3665 coordinator
3666 .transition_to(
3667 ReplicaRole::Replica,
3668 super::super::replication_state::TransitionSignal::CapabilitySelected,
3669 )
3670 .await
3671 .unwrap();
3672
3673 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3674 // Seed a leader heartbeat with tail_seq > local so `tick()`
3675 // generates a catchup SyncRequest.
3676 tracker
3677 .lock()
3678 .record_heartbeat(leader_id, ReplicaRole::Leader, 50, Instant::now());
3679
3680 let dispatcher = Arc::new(RecorderDispatcher::default());
3681 let dyn_dispatcher: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
3682 on_tick(
3683 &inputs,
3684 &coordinator,
3685 &dyn_dispatcher,
3686 &build_state(tracker.clone(), build_budget()),
3687 )
3688 .await;
3689
3690 let sync_requests = dispatcher.sync_requests.lock().clone();
3691 assert_eq!(
3692 sync_requests.len(),
3693 1,
3694 "expected exactly one catchup SyncRequest"
3695 );
3696 let (target, req) = &sync_requests[0];
3697 assert_eq!(*target, leader_id);
3698 assert_eq!(
3699 req.class,
3700 BandwidthClass::Background,
3701 "emitted SyncRequest must carry inputs.default_bandwidth_class, \
3702 not Default::default()",
3703 );
3704 }
3705
3706 /// The leader's serve path must consult `msg.class` and
3707 /// `inputs.background_fraction` when admitting a SyncRequest —
3708 /// not the class-blind legacy `try_consume`. Without this, every
3709 /// Phase D2/D4 admission threshold is dead code.
3710 ///
3711 /// Configures `background_fraction = 0.3` and a tiny budget
3712 /// (100-byte capacity). The reserve threshold becomes
3713 /// `(1 - 0.3) * 100 = 70` bytes. A Background SyncRequest whose
3714 /// response costs 47 bytes (empty-response header) leaves
3715 /// `available - cost = 53 < 70`, so the class-aware gate rejects
3716 /// it with `Backpressure`. The legacy class-blind path would
3717 /// have admitted (it sees `Foreground` semantics and `available
3718 /// >= cost`).
3719 #[tokio::test]
3720 async fn leader_serve_path_uses_class_aware_admission_for_background_request() {
3721 use crate::adapter::net::redex::bandwidth::BandwidthClass;
3722
3723 let inputs = RuntimeInputs {
3724 channel: ChannelIdentity {
3725 channel_name: "test/runtime".to_string(),
3726 origin_hash: 0xCAFE_BABE,
3727 },
3728 channel_id: channel_id_for("test/runtime"),
3729 self_node_id: 0x10,
3730 replica_set: vec![0x10, 0x20],
3731 heartbeat_ms: 60_000,
3732 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3733 tail_provider: Arc::new(|| 0),
3734 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3735 file: build_file_for_tests(),
3736 default_bandwidth_class: BandwidthClass::Foreground,
3737 background_fraction: 0.3,
3738 };
3739 let cid = inputs.channel_id;
3740 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3741 // Promote to Leader so the serve path runs.
3742 for (role, signal) in [
3743 (
3744 ReplicaRole::Replica,
3745 super::super::replication_state::TransitionSignal::CapabilitySelected,
3746 ),
3747 (
3748 ReplicaRole::Candidate,
3749 super::super::replication_state::TransitionSignal::MissedHeartbeats,
3750 ),
3751 (
3752 ReplicaRole::Leader,
3753 super::super::replication_state::TransitionSignal::ElectionWon,
3754 ),
3755 ] {
3756 coordinator.transition_to(role, signal).await.unwrap();
3757 }
3758
3759 // Build a budget whose reserve gate will reject a 47-byte
3760 // Background response. capacity=100, fraction=0.3 → reserve=70;
3761 // available - cost = 100 - 47 = 53 < 70 → denied.
3762 let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
3763
3764 let dispatcher = Arc::new(RecorderDispatcher::default());
3765 let event = Inbound::SyncRequest {
3766 from: 0x20,
3767 msg: SyncRequest {
3768 channel_id: cid,
3769 since_seq: 0,
3770 chunk_max: 1024,
3771 request_id: 0,
3772 class: BandwidthClass::Background,
3773 },
3774 };
3775 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3776 on_inbound(
3777 &inputs,
3778 &coordinator,
3779 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3780 &build_state(tracker.clone(), budget.clone()),
3781 event,
3782 )
3783 .await;
3784
3785 let nacks = dispatcher.sync_nacks.lock().clone();
3786 assert_eq!(
3787 nacks.len(),
3788 1,
3789 "Background admission must be denied by the class-aware reserve gate; \
3790 pre-fix this admitted via the class-blind try_consume path",
3791 );
3792 let (target, nack) = &nacks[0];
3793 assert_eq!(*target, 0x20);
3794 assert_eq!(
3795 nack.error_code,
3796 super::super::replication::SyncNackError::Backpressure,
3797 "denial must surface as Backpressure NACK so the replica backs off",
3798 );
3799 // No SyncResponse should have been shipped under denial.
3800 assert!(
3801 dispatcher.sync_responses.lock().is_empty(),
3802 "denied requests must not leak a SyncResponse",
3803 );
3804 }
3805
3806 /// Companion to the Background-denial test: with the same tiny
3807 /// budget, a Foreground request of the same size IS admitted —
3808 /// confirms the reserve gate is the discriminator, not the cost.
3809 /// Without this paired assertion a regression that always-denies
3810 /// would look like the right behavior.
3811 #[tokio::test]
3812 async fn leader_serve_path_admits_foreground_under_tight_budget() {
3813 use crate::adapter::net::redex::bandwidth::BandwidthClass;
3814
3815 let inputs = RuntimeInputs {
3816 channel: ChannelIdentity {
3817 channel_name: "test/runtime".to_string(),
3818 origin_hash: 0xCAFE_BABE,
3819 },
3820 channel_id: channel_id_for("test/runtime"),
3821 self_node_id: 0x10,
3822 replica_set: vec![0x10, 0x20],
3823 heartbeat_ms: 60_000,
3824 wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3825 tail_provider: Arc::new(|| 0),
3826 rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3827 file: build_file_for_tests(),
3828 default_bandwidth_class: BandwidthClass::Foreground,
3829 background_fraction: 0.3,
3830 };
3831 let cid = inputs.channel_id;
3832 let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3833 for (role, signal) in [
3834 (
3835 ReplicaRole::Replica,
3836 super::super::replication_state::TransitionSignal::CapabilitySelected,
3837 ),
3838 (
3839 ReplicaRole::Candidate,
3840 super::super::replication_state::TransitionSignal::MissedHeartbeats,
3841 ),
3842 (
3843 ReplicaRole::Leader,
3844 super::super::replication_state::TransitionSignal::ElectionWon,
3845 ),
3846 ] {
3847 coordinator.transition_to(role, signal).await.unwrap();
3848 }
3849 let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
3850 let dispatcher = Arc::new(RecorderDispatcher::default());
3851 let event = Inbound::SyncRequest {
3852 from: 0x20,
3853 msg: SyncRequest {
3854 channel_id: cid,
3855 since_seq: 0,
3856 chunk_max: 1024,
3857 request_id: 0,
3858 class: BandwidthClass::Foreground,
3859 },
3860 };
3861 let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3862 on_inbound(
3863 &inputs,
3864 &coordinator,
3865 &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3866 &build_state(tracker.clone(), budget.clone()),
3867 event,
3868 )
3869 .await;
3870 // No NACK — Foreground admits because available >= cost.
3871 assert!(
3872 dispatcher.sync_nacks.lock().is_empty(),
3873 "Foreground under the same budget must admit (available >= cost)",
3874 );
3875 // Exactly one SyncResponse must have been shipped.
3876 assert_eq!(
3877 dispatcher.sync_responses.lock().len(),
3878 1,
3879 "Foreground admit must ship a SyncResponse",
3880 );
3881 }
3882}