Skip to main content

net/adapter/net/redex/
replication_runtime.rs

1//! Tokio-driven replication runtime — ties together the pure
2//! pieces ([`ReplicationCoordinator`], [`HeartbeatTracker`],
3//! [`BandwidthBudget`], [`tick`], [`handle_sync_request`],
4//! [`apply_sync_response`]) behind a single async task per
5//! replicated channel.
6//!
7//! Slot: the layer between `Redex::open_file` (which spawns one
8//! runtime per replicated channel) and the substrate mesh
9//! (which routes inbound `SUBPROTOCOL_REDEX` payloads here).
10//!
11//! Architecture:
12//!
13//! ```text
14//!   Mesh dispatch  ──Inbound::*──▶ ReplicationRuntime task
15//!                                      │
16//!                       ┌──────────────┴──────────────┐
17//!                       │                              │
18//!                  HeartbeatTick                   InboundEvent
19//!                  (every heartbeat_ms)            (peer message)
20//!                       │                              │
21//!                       ▼                              ▼
22//!                replication_step::tick     update tracker / file
23//!                       │                              │
24//!                       ▼                              ▼
25//!                outbound dispatch          maybe issue sync_request
26//!                       │                              │
27//!                       └──────────────┬───────────────┘
28//!                                      ▼
29//!                            ReplicationDispatcher
30//!                            (mesh.send_subprotocol)
31//! ```
32//!
33//! The dispatcher trait abstracts the mesh-side wire send so the
34//! runtime is unit-testable with a recorder mock. Production-side
35//! wiring (mesh.rs routing `SUBPROTOCOL_REDEX` payloads to the
36//! right runtime's inbox + the `MeshNode` impl of
37//! `ReplicationDispatcher`) lands in a separate slice — this
38//! commit covers the runtime task itself + the trait.
39
40use std::sync::atomic::{AtomicBool, Ordering as AtomicOrdering};
41use std::sync::Arc;
42use std::time::Instant;
43
44use parking_lot::Mutex;
45use tokio::sync::mpsc;
46use tokio::task::JoinHandle;
47
48use super::file::RedexFile;
49use super::replication::{
50    ChannelId, ReplicaRole, SyncHeartbeat, SyncNack, SyncRequest, SyncResponse,
51};
52use super::replication_budget::BandwidthBudget;
53use super::replication_catchup::{apply_sync_response, handle_sync_request, SyncRequestOutcome};
54use super::replication_coordinator::{ChannelIdentity, CoordinatorError, ReplicationCoordinator};
55use super::replication_heartbeat::HeartbeatTracker;
56use super::replication_step::{
57    election_outcome, tick, OutboundMessage, TickInputs, SYNC_REQUEST_CHUNK_MAX_DEFAULT,
58};
59use crate::adapter::net::behavior::placement::NodeId;
60use crate::error::AdapterError;
61use std::time::Duration;
62
63/// Outbound wire-message sink the runtime uses to ship messages
64/// through the substrate. Production sink routes through
65/// [`crate::adapter::net::MeshNode`]'s `SUBPROTOCOL_REDEX`
66/// dispatch; unit tests use a recorder mock.
67/// Bandwidth-budget accounting note that applies to every send
68/// method below: `Ok(())` means **queued to the transport**, not
69/// **delivered to the peer**. Implementations whose underlying
70/// transport buffers without ack (UDP, lossy QUIC streams under
71/// link failure, etc.) MUST surface delivery-loss through their
72/// own back-channel (heartbeat lag, peer-reported tail seq,
73/// out-of-band ack) — the replication runtime's bandwidth budget
74/// refund path keys on the synchronous `Err` return only. A
75/// silently-dropped frame still drains the budget; the
76/// flaky-link case is dampened by R-28 catchup-backoff once
77/// empty responses accrue past the threshold, but the budget
78/// itself cannot self-correct on transport-internal loss
79/// without an end-to-end ack the trait deliberately does not
80/// demand (the cost would be a per-response ack-RTT that the
81/// underlying QUIC reliable streams already provide for in-spec
82/// transports).
83///
84/// In short: trait callers MAY treat `Ok(())` as "the bytes
85/// reached the wire layer." If your transport doesn't guarantee
86/// delivery on `Ok(())`, document that on your impl and
87/// understand that the budget over-counts under loss.
88#[async_trait::async_trait]
89pub trait ReplicationDispatcher: Send + Sync {
90    /// Send a [`SyncHeartbeat`] to `target`. See trait-level note
91    /// on `Ok(())` semantics.
92    async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError>;
93    /// Send a [`SyncRequest`] to `target` (typically a leader).
94    /// See trait-level note on `Ok(())` semantics.
95    async fn send_sync_request(&self, target: NodeId, msg: SyncRequest)
96        -> Result<(), AdapterError>;
97    /// Send a [`SyncResponse`] to `target` (typically a replica
98    /// catching up). See trait-level note on `Ok(())` semantics.
99    async fn send_sync_response(
100        &self,
101        target: NodeId,
102        msg: SyncResponse,
103    ) -> Result<(), AdapterError>;
104    /// Send a [`SyncNack`] to `target`. See trait-level note on
105    /// `Ok(())` semantics.
106    async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError>;
107}
108
109/// RTT-lookup function the election uses. Production routes
110/// through `ProximityGraph::nearest_rtt(|n| n.node_id ==
111/// graph_id_of(node))`; unit tests pass a static closure.
112pub type RttLookup = Arc<dyn Fn(NodeId) -> Option<Duration> + Send + Sync>;
113
114/// Sync, non-blocking router the mesh's inbound dispatch hot path
115/// calls when a `SUBPROTOCOL_REDEX` payload arrives. Owns the
116/// per-channel registry of [`ReplicationRuntimeHandle`]s and
117/// routes the decoded [`Inbound`] event to the right one.
118///
119/// Returns `Err(Inbound)` (the event, returned) when:
120/// - No runtime is registered for `channel_id` (channel not opened
121///   on this node, or the runtime was canceled and not yet
122///   unregistered).
123/// - The runtime's inbox is full (per-channel backlog at
124///   [`RUNTIME_INBOX_CAPACITY`]).
125///
126/// In both cases the caller (mesh dispatch loop) drops + logs;
127/// the wire layer's reliable-stream may retransmit, or the
128/// peer's heartbeat cycle will recover state without it.
129pub trait ReplicationInboundRouter: Send + Sync {
130    /// Try to route an inbound event to its channel's runtime.
131    /// Sync + non-blocking — must not call into async code, must
132    /// not hold locks across awaits (the mesh dispatch loop is
133    /// the sole caller and runs in a synchronous critical
134    /// section).
135    fn try_route(&self, channel_id: ChannelId, inbound: Inbound) -> Result<(), Inbound>;
136}
137
138#[async_trait::async_trait]
139impl ReplicationDispatcher for crate::adapter::net::MeshNode {
140    async fn send_heartbeat(&self, target: NodeId, msg: SyncHeartbeat) -> Result<(), AdapterError> {
141        send_redex_payload(self, target, msg.to_bytes()).await
142    }
143
144    async fn send_sync_request(
145        &self,
146        target: NodeId,
147        msg: SyncRequest,
148    ) -> Result<(), AdapterError> {
149        send_redex_payload(self, target, msg.to_bytes()).await
150    }
151
152    async fn send_sync_response(
153        &self,
154        target: NodeId,
155        msg: SyncResponse,
156    ) -> Result<(), AdapterError> {
157        send_redex_payload(self, target, msg.to_bytes()).await
158    }
159
160    async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
161        send_redex_payload(self, target, msg.to_bytes()).await
162    }
163}
164
165/// Resolve a `NodeId` to its peer `SocketAddr` and ship `payload`
166/// via `MeshNode::send_subprotocol` with `SUBPROTOCOL_REDEX`. The
167/// `payload` already carries the 3-byte subprotocol header per
168/// plan §2; the substrate's Net header carries `subprotocol_id`
169/// independently for routing — the redundancy is plan-mandated
170/// (the application-layer header is part of the wire contract,
171/// distinct from the transport-layer Net header).
172async fn send_redex_payload(
173    mesh: &crate::adapter::net::MeshNode,
174    target: NodeId,
175    payload: Vec<u8>,
176) -> Result<(), AdapterError> {
177    let peer_addr = mesh.peer_addr(target).ok_or_else(|| {
178        AdapterError::Connection(format!("replication: peer {target:#x} unknown"))
179    })?;
180    mesh.send_subprotocol(peer_addr, super::replication::SUBPROTOCOL_REDEX, &payload)
181        .await
182}
183
184/// Inbound event the runtime processes. The mesh-side dispatcher
185/// pushes one of these into the runtime's inbox per inbound wire
186/// frame; the runtime's task drains the receiver on every wakeup.
187#[derive(Debug, Clone)]
188pub enum Inbound {
189    /// Peer's heartbeat — record into the tracker.
190    Heartbeat {
191        /// Originating node id.
192        from: NodeId,
193        /// Wire-format heartbeat payload.
194        msg: SyncHeartbeat,
195    },
196    /// Peer (replica) asked us (leader) for events. Run
197    /// `handle_sync_request` against the local file + dispatch
198    /// the response or nack.
199    SyncRequest {
200        /// Originating replica.
201        from: NodeId,
202        /// Wire-format request.
203        msg: SyncRequest,
204    },
205    /// Peer (leader) shipped us a chunk. Apply via
206    /// `apply_sync_response`; on success advance our tail.
207    SyncResponse {
208        /// Originating leader.
209        from: NodeId,
210        /// Wire-format response.
211        msg: SyncResponse,
212    },
213    /// Peer (leader) rejected our request with a typed error.
214    SyncNack {
215        /// Originating leader.
216        from: NodeId,
217        /// Wire-format nack.
218        msg: SyncNack,
219    },
220    /// Shutdown signal from `Redex::open_file` cleanup or from
221    /// channel-close. Drives `coordinator.transition_to(Idle,
222    /// ChannelClose)` and exits the task loop.
223    Shutdown,
224}
225
226/// Inputs the runtime task captures at spawn time. The
227/// `tail_provider` closure reads the current `RedexFile::next_seq()`
228/// — passed as a closure so the runtime doesn't take ownership of
229/// the file handle (the file's owner is `Redex`).
230pub struct RuntimeInputs {
231    /// Channel identity + origin_hash.
232    pub channel: ChannelIdentity,
233    /// 32-byte BLAKE2s channel id for the wire-format heartbeat.
234    pub channel_id: ChannelId,
235    /// This node's id.
236    pub self_node_id: NodeId,
237    /// Replica-set membership — every node currently registered
238    /// as a replica for the channel. Coordinator updates this
239    /// when the placement filter re-selects (Phase C / F).
240    pub replica_set: Vec<NodeId>,
241    /// Heartbeat cadence in milliseconds (mirrors
242    /// `ReplicationConfig::heartbeat_ms`). The tokio interval
243    /// drives at this cadence.
244    pub heartbeat_ms: u64,
245    /// Function returning the wall-clock milliseconds for the
246    /// outbound heartbeat's `wall_clock_ms` field. Operator-
247    /// facing drift detection only; abstracted so tests can
248    /// inject a deterministic value.
249    pub wall_clock_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
250    /// Function returning the current local `tail_seq`. Called
251    /// each tick before emission so heartbeats carry the freshest
252    /// value.
253    pub tail_provider: Arc<dyn Fn() -> u64 + Send + Sync>,
254    /// RTT lookup for the election function.
255    pub rtt_lookup: RttLookup,
256    /// The local `RedexFile` for this channel. The runtime holds
257    /// a clone (RedexFile is `Clone` — Arc-backed) so it can
258    /// drive `handle_sync_request` on inbound `SyncRequest`
259    /// frames (leader path) and `apply_sync_response` on inbound
260    /// `SyncResponse` frames (replica path). The
261    /// `tail_provider` closure typically wraps `file.next_seq()`.
262    pub file: RedexFile,
263    /// Per-channel default [`super::bandwidth::BandwidthClass`] — stamped on every
264    /// `SyncRequest` the runtime emits. v0.3 Phase D2. Sourced
265    /// from
266    /// [`ReplicationConfig::default_bandwidth_class`](super::replication_config::ReplicationConfig).
267    pub default_bandwidth_class: super::bandwidth::BandwidthClass,
268    /// v0.3 Phase D2 admission-gate parameter: fraction of the
269    /// bandwidth bucket capacity reserved against `Background`.
270    /// Default 0.3 via
271    /// [`ReplicationConfig::background_fraction`](super::replication_config::ReplicationConfig).
272    pub background_fraction: f32,
273}
274
275/// Handle the spawned task produces. Holds the inbox sender so
276/// the mesh dispatcher (and the lifecycle code) can push
277/// [`Inbound`] events. `cancel()` sends `Shutdown` and awaits the
278/// task to exit cleanly. The owned [`ReplicationCoordinator`] is
279/// exposed via [`Self::coordinator`] so operators (and tests) can
280/// observe the role, drive `transition_to`, and read the channel
281/// metrics without going through the inbox.
282pub struct ReplicationRuntimeHandle {
283    /// Low-priority inbox: Heartbeat + SyncRequest. A peer flood
284    /// fills this lane first; under saturation the high-priority
285    /// lane keeps draining so Shutdown, SyncResponse, and SyncNack
286    /// still make forward progress.
287    inbox: mpsc::Sender<Inbound>,
288    /// High-priority inbox: Shutdown + SyncResponse + SyncNack.
289    /// Catchup-critical events ride this lane so a Heartbeat
290    /// flood from many peers (50 peers × 100 ms = 500 evt/s plus a
291    /// momentary slow `await` in `on_inbound` can saturate the
292    /// single lane to 1024 in two seconds) doesn't strand the
293    /// leader's response to the local replica or block graceful
294    /// shutdown.
295    priority_inbox: mpsc::Sender<Inbound>,
296    task: Mutex<Option<JoinHandle<()>>>,
297    coordinator: Arc<ReplicationCoordinator>,
298    /// R-11: explicit "task has joined" flag. `is_stopped()`
299    /// consults this rather than the JoinHandle slot — two
300    /// concurrent `cancel()`s race on `task.lock().take()`, so
301    /// the slot-based view can return `true` *before* the
302    /// surviving caller's `.await` returns. The flag is flipped
303    /// only after the join completes.
304    stopped: AtomicBool,
305}
306
307#[inline]
308fn is_priority_event(event: &Inbound) -> bool {
309    matches!(
310        event,
311        Inbound::Shutdown | Inbound::SyncResponse { .. } | Inbound::SyncNack { .. }
312    )
313}
314
315impl ReplicationRuntimeHandle {
316    /// The per-channel coordinator. Same `Arc` the runtime task
317    /// uses; cloning is cheap. Operators read `coordinator.role()`
318    /// for the current state and `coordinator.metrics()` for the
319    /// per-channel atomic counters; tests can drive
320    /// `coordinator.transition_to(target, signal)` to put the
321    /// channel in a specific role.
322    pub fn coordinator(&self) -> &Arc<ReplicationCoordinator> {
323        &self.coordinator
324    }
325
326    /// Push an inbound event into the runtime's inbox. Errors
327    /// when the runtime has already exited (drained channel).
328    /// Routes catchup-critical events (Shutdown, SyncResponse,
329    /// SyncNack) to the priority lane so a Heartbeat flood on
330    /// the standard lane can't starve them.
331    pub async fn dispatch(&self, event: Inbound) -> Result<(), AdapterError> {
332        let sender = if is_priority_event(&event) {
333            &self.priority_inbox
334        } else {
335            &self.inbox
336        };
337        sender
338            .send(event)
339            .await
340            .map_err(|_| AdapterError::Transient("replication runtime task exited".to_string()))
341    }
342
343    /// Same as [`Self::dispatch`] but for use from non-async
344    /// contexts (the mesh dispatch loop's sync hot path).
345    /// Returns the event back on full-buffer rejection so the
346    /// caller can decide whether to drop, log, or block.
347    pub fn try_dispatch(&self, event: Inbound) -> Result<(), Inbound> {
348        let sender = if is_priority_event(&event) {
349            &self.priority_inbox
350        } else {
351            &self.inbox
352        };
353        sender.try_send(event).map_err(|e| e.into_inner())
354    }
355
356    /// Send `Shutdown` and await the task to exit. Idempotent —
357    /// subsequent calls are no-ops once the task has joined.
358    ///
359    /// Uses `try_send` first so a wedged task with a full inbox
360    /// can't hang the caller indefinitely. On `Full`, the
361    /// JoinHandle is aborted directly; the task exits without
362    /// running the graceful Idle transition but the channel is
363    /// still safely torn down.
364    pub async fn cancel(&self) {
365        let handle = self.task.lock().take();
366        if let Some(h) = handle {
367            // Shutdown rides the priority lane; that's the same
368            // lane the run loop drains first under the biased
369            // select, so even a saturated low-priority lane can't
370            // delay the graceful exit.
371            match self.priority_inbox.try_send(Inbound::Shutdown) {
372                Ok(()) | Err(mpsc::error::TrySendError::Closed(_)) => {
373                    // Graceful path: the task observes Shutdown (or
374                    // already exited). Await the join.
375                    let _ = h.await;
376                }
377                Err(mpsc::error::TrySendError::Full(_)) => {
378                    // Priority lane is itself saturated. Abort
379                    // directly so cancel() can't block the caller.
380                    h.abort();
381                    let _ = h.await;
382                }
383            }
384            // Only the holder of the JoinHandle flips `stopped`,
385            // and only after `.await` returns. Pre-fix, a concurrent
386            // cancel() racer that lost the `take()` skipped the if-let
387            // block and then unconditionally stored `true` — before
388            // the winner's await had completed. `is_stopped()` then
389            // reported "joined" while the task was still running.
390            self.stopped.store(true, AtomicOrdering::Release);
391        }
392    }
393
394    /// Returns `true` if the runtime has stopped (task joined).
395    /// Useful for tests / observability.
396    ///
397    /// R-11: this consults an explicit flag flipped after
398    /// `cancel()`'s `.await` returns, not the JoinHandle slot.
399    /// Without the flag, two concurrent `cancel()` calls could
400    /// race so the loser observes `task.lock().take() == None`
401    /// and reports `is_stopped == true` before the winner has
402    /// finished joining.
403    pub fn is_stopped(&self) -> bool {
404        self.stopped.load(AtomicOrdering::Acquire)
405    }
406}
407
408/// Per-leader catchup state — tracks consecutive empty SyncResponses
409/// in the face of an advertised tail gap. A buggy or byzantine
410/// leader that advertises ever-increasing `tail_seq` but ships
411/// `Response{events: []}` would otherwise loop the replica at
412/// heartbeat cadence forever; the backoff suppresses outbound
413/// SyncRequests once the threshold is crossed, with exponential
414/// growth capped at [`CATCHUP_BACKOFF_CAP`]. A non-empty response
415/// resets the counter so a transient stall doesn't permanently
416/// pause catchup.
417#[derive(Debug, Default)]
418pub struct CatchupBackoff {
419    entries: std::collections::HashMap<NodeId, BackoffEntry>,
420}
421
422#[derive(Debug, Default, Clone, Copy)]
423struct BackoffEntry {
424    /// Consecutive `apply_sync_response` calls that returned the
425    /// same tail (no events applied) while the believed leader's
426    /// advertised `tail_seq` was still strictly greater than ours.
427    consecutive_empty: u32,
428    /// Wall-clock instant the backoff window ends. `None` while
429    /// the counter is below `CATCHUP_BACKOFF_THRESHOLD`.
430    backoff_until: Option<Instant>,
431}
432
433/// Strikes before backoff kicks in. The first 3 empty responses
434/// are absorbed without delay so a transient leader-retention edge
435/// doesn't trigger a backoff.
436pub const CATCHUP_BACKOFF_THRESHOLD: u32 = 3;
437
438/// First backoff window after the threshold is crossed.
439pub const CATCHUP_BACKOFF_INITIAL: Duration = Duration::from_secs(1);
440
441/// Upper bound on the exponential backoff. A wedged leader stays
442/// reachable for re-evaluation at least every cap.
443pub const CATCHUP_BACKOFF_CAP: Duration = Duration::from_secs(30);
444
445/// Per-leader in-flight SyncRequest registry. Each outbound
446/// SyncRequest mints a random `request_id` from `getrandom`; the
447/// id is inserted here keyed by `(leader_node_id, request_id)`
448/// before the wire send. Inbound SyncResponse / SyncNack must
449/// carry an id present in the set; otherwise it's a stale
450/// response from a request the replica already timed out (or a
451/// forged frame from any peer that happens to be the recorded
452/// leader). Entries auto-expire after [`REQUEST_TTL`] so the set
453/// can't grow without bound under leader silence.
454#[derive(Debug, Default)]
455pub struct OutstandingRequests {
456    entries: std::collections::HashMap<(NodeId, u64), Instant>,
457}
458
459/// TTL on entries in [`OutstandingRequests`]. Bounded by the
460/// catchup deadline so a one-tick-late response still lands.
461pub const REQUEST_TTL: Duration = Duration::from_secs(30);
462
463/// Soft cap on per-replica outstanding requests across all
464/// leaders. A degraded leader that never responds shouldn't
465/// let the set grow without bound; once the cap is hit, GC
466/// kicks in and the oldest entries are dropped.
467pub const REQUEST_REGISTRY_SOFT_CAP: usize = 256;
468
469impl OutstandingRequests {
470    /// Construct an empty registry.
471    pub fn new() -> Self {
472        Self {
473            entries: std::collections::HashMap::new(),
474        }
475    }
476
477    /// Record a freshly-minted `request_id` against `leader`.
478    /// Best-effort GC of expired entries runs at insert time so
479    /// the set stays bounded without a separate sweeper task.
480    pub fn record(&mut self, leader: NodeId, request_id: u64, now: Instant) {
481        if self.entries.len() >= REQUEST_REGISTRY_SOFT_CAP {
482            self.entries
483                .retain(|_, &mut inserted| now.saturating_duration_since(inserted) < REQUEST_TTL);
484        }
485        self.entries.insert((leader, request_id), now);
486    }
487
488    /// Take the entry for `(leader, request_id)` if present and
489    /// not yet expired. Returns `true` when an in-flight request
490    /// matched; the caller proceeds with the apply path. `false`
491    /// means the response is stale / forged / past TTL — drop.
492    pub fn take(&mut self, leader: NodeId, request_id: u64, now: Instant) -> bool {
493        match self.entries.remove(&(leader, request_id)) {
494            Some(inserted) => now.saturating_duration_since(inserted) < REQUEST_TTL,
495            None => false,
496        }
497    }
498
499    /// Drop every entry recorded against `leader`. Called when
500    /// the believed leader changes so a re-elected peer doesn't
501    /// inherit the prior leader's in-flight token set.
502    pub fn clear_leader(&mut self, leader: NodeId) {
503        self.entries.retain(|(l, _), _| *l != leader);
504    }
505}
506
507impl CatchupBackoff {
508    /// Construct an empty backoff tracker.
509    pub fn new() -> Self {
510        Self {
511            entries: std::collections::HashMap::new(),
512        }
513    }
514
515    /// Record an empty `SyncResponse` from `leader` (events vec was
516    /// empty or apply returned the same tail). Increments the
517    /// consecutive counter; once past `CATCHUP_BACKOFF_THRESHOLD`,
518    /// stamps `backoff_until = now + min(initial << k, cap)` where
519    /// `k = consecutive_empty - threshold - 1`.
520    pub fn record_empty(&mut self, leader: NodeId, now: Instant) {
521        let entry = self.entries.entry(leader).or_default();
522        entry.consecutive_empty = entry.consecutive_empty.saturating_add(1);
523        if entry.consecutive_empty > CATCHUP_BACKOFF_THRESHOLD {
524            let shift = entry
525                .consecutive_empty
526                .saturating_sub(CATCHUP_BACKOFF_THRESHOLD + 1)
527                .min(20);
528            let multiplier: u32 = 1u32.checked_shl(shift).unwrap_or(u32::MAX);
529            let backoff = CATCHUP_BACKOFF_INITIAL
530                .saturating_mul(multiplier)
531                .min(CATCHUP_BACKOFF_CAP);
532            entry.backoff_until = Some(now + backoff);
533        }
534    }
535
536    /// Record a productive response (events applied, tail advanced)
537    /// from `leader`. Clears any backoff state so the next request
538    /// can fire immediately.
539    pub fn record_progress(&mut self, leader: NodeId) {
540        self.entries.remove(&leader);
541    }
542
543    /// True when `now` is strictly before the recorded
544    /// `backoff_until` for `leader`. Out-of-backoff leaders (no
545    /// entry, or entry below threshold) always return `false`.
546    pub fn is_in_backoff(&self, leader: NodeId, now: Instant) -> bool {
547        self.entries
548            .get(&leader)
549            .and_then(|e| e.backoff_until)
550            .is_some_and(|until| now < until)
551    }
552
553    /// Drop entries for leaders whose backoff window expired more
554    /// than `cap` ago. Called from `on_tick` to keep the map
555    /// bounded under leader churn: a leader demoted after
556    /// accruing strikes never has `record_progress` called for it
557    /// again, so without expiry the entry persists indefinitely.
558    /// Below-threshold entries (no `backoff_until` stamp) are
559    /// retained — they represent active counting state.
560    pub fn gc_expired(&mut self, now: Instant, cap: Duration) {
561        self.entries.retain(|_, e| match e.backoff_until {
562            Some(until) => now.saturating_duration_since(until) < cap,
563            None => true,
564        });
565    }
566}
567
568impl Drop for ReplicationRuntimeHandle {
569    /// Best-effort cleanup if a handle is dropped without an
570    /// explicit `cancel().await`. Aborts the task synchronously so
571    /// the spawned future stops driving and the dispatcher Arc the
572    /// task held is released — closing the strong-reference cycle
573    /// `MeshNode → router → handle → task → dispatcher` without
574    /// requiring callers to remember the cancel sequence. The
575    /// graceful Idle transition is skipped on this path; callers
576    /// that need the announce/withdraw side-effects to land must
577    /// still `cancel().await` before drop.
578    fn drop(&mut self) {
579        // try_lock — pre-fix this took the parking_lot mutex
580        // unconditionally; on a single-thread runtime panic during
581        // shutdown, drop could fire on a thread already holding
582        // self.task (e.g. mid-cancel() when the future is dropped on
583        // panic), producing a deadlock. The best-effort abort can
584        // wait for the next normal cleanup if we lose the lock race
585        // — losing it means somebody else is already inside cancel()
586        // or another drop, and they will abort/await the task.
587        if let Some(mut guard) = self.task.try_lock() {
588            if let Some(h) = guard.take() {
589                h.abort();
590            }
591        }
592    }
593}
594
595/// Inbox capacity — bounds the per-channel inbound backlog. A
596/// peer flooding `SUBPROTOCOL_REDEX` payloads at us can't grow
597/// the per-channel queue without bound; once full, the mesh
598/// dispatcher's `try_dispatch` returns the event back and the
599/// caller (mesh dispatch loop) drops + logs.
600pub const RUNTIME_INBOX_CAPACITY: usize = 1024;
601
602/// Per-channel mutable state the runtime task threads through
603/// `on_tick` / `on_inbound`. Replaces the four-`Arc<Mutex<…>>` arg
604/// soup that pushed those functions over clippy's
605/// `too_many_arguments` limit. All four members are reference-
606/// counted internally so cloning the struct is a handful of
607/// atomic increments — cheap and lock-free.
608struct RuntimeState {
609    tracker: Arc<Mutex<HeartbeatTracker>>,
610    budget: Arc<Mutex<BandwidthBudget>>,
611    backoff: Arc<Mutex<CatchupBackoff>>,
612    outstanding: Arc<Mutex<OutstandingRequests>>,
613}
614
615/// Priority-lane inbox capacity. Smaller than the standard lane
616/// because the events that ride it (Shutdown, SyncResponse,
617/// SyncNack) are bounded by the local replica's in-flight
618/// catchup window plus a handful of NACKs — not a per-peer
619/// heartbeat flood. Sized to absorb a burst without back-
620/// pressuring the leader-side dispatcher.
621pub const RUNTIME_PRIORITY_INBOX_CAPACITY: usize = 128;
622
623/// Spawn a per-channel replication runtime task. Returns a
624/// handle the mesh dispatcher uses to push inbound events and
625/// the lifecycle code uses to cancel.
626///
627/// The task:
628/// 1. Initializes the tracker / budget for this channel.
629/// 2. Loops on `select! { interval tick, inbox.recv() }`.
630/// 3. Each tick: calls [`tick`], dispatches outbound, runs the
631///    election if the coordinator just entered Candidate.
632/// 4. Each inbound event: updates state + maybe ships an
633///    outbound response.
634/// 5. Exits cleanly on `Inbound::Shutdown` after running
635///    `coordinator.transition_to(Idle, ChannelClose)`.
636///
637/// `dispatcher` ships every outbound message; `tail_provider` /
638/// `wall_clock_provider` give the task fresh values per tick.
639///
640/// **R-14 — Arc cycle invariant.** Production wiring has
641/// `MeshNode → ReplicationInboundRouter → ReplicationRuntimeHandle
642/// → task → Arc<dyn ReplicationDispatcher = MeshNode>`. This is
643/// a strong reference cycle. It is broken by
644/// `ReplicationWiring::drop` (`manager.rs`): un-installing the
645/// router releases its `Arc<RuntimeHandle>` references, the
646/// runtime task observes the closed inbox receiver, exits, and
647/// drops its dispatcher Arc. Callers that do NOT route through
648/// `Redex` drop (e.g. holding a raw `ReplicationRuntimeHandle`
649/// past the dispatcher's owner) MUST call `cancel()` before
650/// dropping the dispatcher; otherwise the cycle leaks both.
651pub fn spawn_replication_runtime(
652    inputs: RuntimeInputs,
653    coordinator: Arc<ReplicationCoordinator>,
654    dispatcher: Arc<dyn ReplicationDispatcher>,
655    budget: Arc<Mutex<BandwidthBudget>>,
656) -> ReplicationRuntimeHandle {
657    let state = RuntimeState {
658        tracker: Arc::new(Mutex::new(HeartbeatTracker::new(inputs.heartbeat_ms))),
659        budget,
660        backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
661        outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
662    };
663    let (tx, rx) = mpsc::channel::<Inbound>(RUNTIME_INBOX_CAPACITY);
664    let (priority_tx, priority_rx) = mpsc::channel::<Inbound>(RUNTIME_PRIORITY_INBOX_CAPACITY);
665    let coordinator_for_task = coordinator.clone();
666    let task = tokio::spawn(run(
667        inputs,
668        coordinator_for_task,
669        dispatcher,
670        state,
671        rx,
672        priority_rx,
673    ));
674    ReplicationRuntimeHandle {
675        inbox: tx,
676        priority_inbox: priority_tx,
677        task: Mutex::new(Some(task)),
678        coordinator,
679        stopped: AtomicBool::new(false),
680    }
681}
682
683async fn run(
684    inputs: RuntimeInputs,
685    coordinator: Arc<ReplicationCoordinator>,
686    dispatcher: Arc<dyn ReplicationDispatcher>,
687    state: RuntimeState,
688    mut inbox: mpsc::Receiver<Inbound>,
689    mut priority_inbox: mpsc::Receiver<Inbound>,
690) {
691    let heartbeat_interval = Duration::from_millis(inputs.heartbeat_ms);
692    let mut interval = tokio::time::interval(heartbeat_interval);
693    // `MissedTickBehavior::Skip` so a slow tick under load
694    // doesn't queue up unbounded ticks. We emit one heartbeat
695    // per interval; missed intervals are just observed silence
696    // at the receiver.
697    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
698    // The first tick fires immediately; consume it so the
699    // initial state has had a chance to settle.
700    interval.tick().await;
701
702    loop {
703        // `biased;` makes tokio::select poll branches top-to-
704        // bottom rather than randomly. Priority lane is checked
705        // first, then the tick, then the low-priority lane. A
706        // heartbeat flood saturating the low-priority lane can't
707        // starve SyncResponse / SyncNack / Shutdown — they ride
708        // the priority lane and get drained ahead of the flood.
709        tokio::select! {
710            biased;
711            event = priority_inbox.recv() => {
712                match event {
713                    Some(Inbound::Shutdown) | None => {
714                        let _ = coordinator
715                            .transition_to(
716                                ReplicaRole::Idle,
717                                super::replication_state::TransitionSignal::ChannelClose,
718                            )
719                            .await;
720                        return;
721                    }
722                    Some(event) => {
723                        on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
724                    }
725                }
726            }
727            _ = interval.tick() => {
728                on_tick(&inputs, &coordinator, &dispatcher, &state).await;
729            }
730            event = inbox.recv() => {
731                match event {
732                    Some(Inbound::Shutdown) | None => {
733                        // Shutdown normally rides the priority lane;
734                        // a None here means the low-priority sender
735                        // dropped (caller closed the handle without
736                        // calling cancel). Treat as graceful exit.
737                        let _ = coordinator
738                            .transition_to(
739                                ReplicaRole::Idle,
740                                super::replication_state::TransitionSignal::ChannelClose,
741                            )
742                            .await;
743                        return;
744                    }
745                    Some(event) => {
746                        on_inbound(&inputs, &coordinator, &dispatcher, &state, event).await;
747                    }
748                }
749            }
750        }
751    }
752}
753
754/// Which lag gauge the tick should update, if any. Leader emits the
755/// worst-replica lag; Replica emits the believed-leader lag.
756/// Candidate + Idle don't emit lag — both are transient or non-
757/// participating roles.
758#[derive(Debug)]
759enum LagObservation {
760    /// Leader-side: max over replica peers of `now - peer.last_seen`.
761    /// Drives `record_leader_lag`. Reflects the staleness of the
762    /// worst-lagging replica.
763    Leader(Duration),
764    /// Replica-side: `now - believed_leader.last_seen`. Drives
765    /// `record_replica_lag`. `None` if no leader heartbeat has been
766    /// observed yet (the gauge stays unobserved).
767    Replica(Duration),
768    /// No lag to record this tick.
769    None,
770}
771
772/// Compute the lag observation for this tick. Pure read over the
773/// tracker; the caller updates the metric off the lock.
774fn observe_lag(
775    role: ReplicaRole,
776    replica_set: &[NodeId],
777    self_node_id: NodeId,
778    tracker: &HeartbeatTracker,
779    now: Instant,
780) -> LagObservation {
781    match role {
782        ReplicaRole::Leader => {
783            // Worst-replica view: max over peers of (now - peer.last_seen).
784            // A peer never seen has no observation — skip it (the
785            // gauge captures observed lag, not "never heard from").
786            let worst = replica_set
787                .iter()
788                .copied()
789                .filter(|&p| p != self_node_id)
790                .filter_map(|p| tracker.peer_lag(p, now))
791                .max();
792            match worst {
793                Some(d) => LagObservation::Leader(d),
794                None => LagObservation::None,
795            }
796        }
797        ReplicaRole::Replica => match tracker.believed_leader() {
798            Some(leader) => match tracker.peer_lag(leader, now) {
799                Some(d) => LagObservation::Replica(d),
800                None => LagObservation::None,
801            },
802            None => LagObservation::None,
803        },
804        ReplicaRole::Candidate | ReplicaRole::Idle => LagObservation::None,
805    }
806}
807
808/// Drop the believed-leader belief AND the outstanding-request
809/// tokens recorded against that leader. Sites that previously
810/// only called `clear_believed_leader` would leave the prior
811/// leader's in-flight tokens in `OutstandingRequests` until TTL
812/// (30 s). Under role thrash or rapid leader churn, the soft-cap
813/// GC then evicted entries from OTHER leaders to make room — the
814/// documented invariant on `OutstandingRequests::clear_leader`.
815fn clear_leader_belief_and_tokens(
816    tracker: &Arc<Mutex<HeartbeatTracker>>,
817    outstanding: &Arc<Mutex<OutstandingRequests>>,
818) {
819    let prior = tracker.lock().believed_leader();
820    tracker.lock().clear_believed_leader();
821    if let Some(prior) = prior {
822        outstanding.lock().clear_leader(prior);
823    }
824}
825
826async fn on_tick(
827    inputs: &RuntimeInputs,
828    coordinator: &Arc<ReplicationCoordinator>,
829    dispatcher: &Arc<dyn ReplicationDispatcher>,
830    state: &RuntimeState,
831) {
832    let RuntimeState {
833        tracker,
834        budget: _,
835        backoff,
836        outstanding,
837    } = state;
838    // Source `now` from tokio's clock so the silence-detection
839    // pass inside the tracker tick honors tokio::time::pause() in
840    // tests and stays coherent with the tokio::time::interval that
841    // drives this on_tick call. Pre-fix std::Instant::now() kept
842    // moving while virtual time was paused.
843    let now = tokio::time::Instant::now().into_std();
844    // Drop CatchupBackoff entries whose backoff window expired
845    // more than a cap ago — protects the map from unbounded
846    // growth under leader churn (a demoted leader's entry has
847    // no other clearance path; `record_progress` only fires for
848    // the current believed leader).
849    backoff
850        .lock()
851        .gc_expired(now, CATCHUP_BACKOFF_CAP.saturating_mul(2));
852    let tail_seq = (inputs.tail_provider)();
853    // Bump the coordinator's cached tail at every tick so the next
854    // transition's announce_chain advertises the real tip. The
855    // CAS-monotonic guard inside record_tail_seq drops the write if
856    // tail_provider went backward (it shouldn't — tail_provider
857    // wraps next_seq()), so this is also idempotent. Without this
858    // the leader path's tail_seq atomic stays at the value the
859    // last apply_sync_response recorded (which only Replicas run),
860    // and a leader's announce_chain at promotion time ships
861    // tip_seq=0.
862    //
863    // For the LEADER role specifically, `tail_provider` returns the
864    // raw local-file `next_seq()`, which the leader bumps the moment
865    // a write lands locally — pre-replication. Advertising that
866    // value via capability tags biases `find_chain_holders` (which
867    // picks the freshest holder by `tip_seq` during failover)
868    // toward a partition that may have un-replicated writes; a
869    // crash before those writes ship loses them.
870    //
871    // Clamp the advertised tail to the highest tail any peer has
872    // confirmed via heartbeat. That tail is, by construction,
873    // "replicated at least once" — a safe minimum for failover
874    // discovery. When NO peer has reported yet (fresh leader, no
875    // replicas), fall back to the raw local tail: there is no
876    // safer value to advertise and a sole leader has authority
877    // over its own writes by tautology.
878    let advertised_tail = if coordinator.role() == ReplicaRole::Leader {
879        let max_peer_tail = tracker
880            .lock()
881            .peer_tail_seqs()
882            .into_iter()
883            .filter(|(id, _)| *id != inputs.self_node_id)
884            .map(|(_, t)| t)
885            .max();
886        match max_peer_tail {
887            Some(p) => tail_seq.min(p),
888            None => tail_seq,
889        }
890    } else {
891        tail_seq
892    };
893    coordinator.record_tail_seq(advertised_tail);
894    let wall_clock_ms = (inputs.wall_clock_provider)();
895    // R-10: capture `current_role` inside the same critical
896    // section that holds the tracker lock so a concurrent
897    // transition can't land between the role read and the
898    // tick(). Reading role here is cheap (a parking_lot mutex
899    // load); holding both locks together is safe because role
900    // observation never awaits.
901    let (outcome, lag_observation) = {
902        let t = tracker.lock();
903        // Capture `current_role` inside the same critical section
904        // that holds the tracker lock so a concurrent transition
905        // can't land between the role read and the tick(). Reading
906        // role here is cheap (a parking_lot mutex load); holding
907        // both locks together is safe because role observation
908        // never awaits. The captured value lives only inside this
909        // closure — `tick()` consumes it via its outcome.
910        let current_role = coordinator.role();
911        let outcome = tick(TickInputs {
912            self_node_id: inputs.self_node_id,
913            current_role,
914            channel_id: inputs.channel_id,
915            tail_seq,
916            replica_set: &inputs.replica_set,
917            tracker: &t,
918            wall_clock_ms,
919            chunk_max_bytes: SYNC_REQUEST_CHUNK_MAX_DEFAULT,
920            now,
921            default_bandwidth_class: inputs.default_bandwidth_class,
922        });
923        let lag = observe_lag(
924            current_role,
925            &inputs.replica_set,
926            inputs.self_node_id,
927            &t,
928            now,
929        );
930        (outcome, lag)
931    };
932    // Record lag gauges off the tracker lock.
933    match lag_observation {
934        LagObservation::Leader(d) => coordinator.metrics().record_leader_lag(d),
935        LagObservation::Replica(d) => coordinator.metrics().record_replica_lag(d),
936        LagObservation::None => {}
937    }
938    for msg in outcome.outbound {
939        match msg {
940            OutboundMessage::Heartbeat { target, msg } => {
941                if let Err(e) = dispatcher.send_heartbeat(target, msg).await {
942                    tracing::trace!(target=?target, error=?e, "replication: heartbeat send failed");
943                }
944            }
945            OutboundMessage::SyncRequest { target, mut msg } => {
946                // R-28 catchup backoff: a buggy/byzantine leader that
947                // advertises an ever-growing tail but ships empty
948                // responses would otherwise loop this branch at the
949                // heartbeat cadence forever. Once the empty-response
950                // count crosses `CATCHUP_BACKOFF_THRESHOLD` (3), the
951                // tracker stamps `backoff_until`; ticks within that
952                // window skip the send entirely. A non-empty response
953                // resets the counter through `record_progress` on the
954                // apply path.
955                if backoff.lock().is_in_backoff(target, now) {
956                    tracing::trace!(
957                        target = target,
958                        "replication: skipping SyncRequest — leader is in catchup backoff"
959                    );
960                    continue;
961                }
962                // R-23 request-token correlation. `tick` emits the
963                // SyncRequest with `request_id = 0` placeholder; the
964                // runtime mints a random 64-bit token from
965                // `getrandom` here, records `(leader, token)` in the
966                // outstanding-requests set, and stamps the wire frame
967                // with the minted value before send. Inbound
968                // SyncResponse / SyncNack must carry a token still
969                // in the set; stale responses (re-issue races, late-
970                // arriving NACKs from prior requests the replica
971                // already timed out) drop on the apply path.
972                let mut id_bytes = [0u8; 8];
973                if getrandom::fill(&mut id_bytes).is_err() {
974                    tracing::trace!(
975                        target = target,
976                        "replication: getrandom failure; skipping SyncRequest this tick"
977                    );
978                    continue;
979                }
980                let token = u64::from_le_bytes(id_bytes);
981                msg.request_id = token;
982                outstanding.lock().record(target, token, now);
983                if let Err(e) = dispatcher.send_sync_request(target, msg).await {
984                    tracing::trace!(target=?target, error=?e, "replication: sync_request send failed");
985                }
986            }
987        }
988    }
989    if let Some(pending) = outcome.transition {
990        if let Err(e) = coordinator
991            .transition_to(pending.target, pending.signal)
992            .await
993        {
994            tracing::warn!(error=?e, "replication: transition_to({:?}, {:?}) failed", pending.target, pending.signal);
995            return;
996        }
997        // If we just entered Candidate via MissedHeartbeats,
998        // run the deterministic election in the same tick so the
999        // Candidate window stays microseconds-wide per plan §3.
1000        if pending.target == ReplicaRole::Candidate {
1001            let healthy = tracker.lock().healthy_peers(now);
1002            // Self is alive by tautology — the runtime is the
1003            // code computing the election. The tracker only
1004            // records observed inbound heartbeats from peers,
1005            // so self never appears in `healthy_peers` directly.
1006            // Include self explicitly so the election filter
1007            // doesn't accidentally exclude us.
1008            let elect = election_outcome(
1009                inputs.self_node_id,
1010                &inputs.replica_set,
1011                inputs.rtt_lookup.as_ref(),
1012                |peer| peer == inputs.self_node_id || healthy.contains(&peer),
1013            );
1014            if let Some(pt) = elect {
1015                match coordinator.transition_to(pt.target, pt.signal).await {
1016                    Ok(_) => {
1017                        // Only clear the believed leader on a
1018                        // successful transition. If the second
1019                        // transition lost a race (e.g. an inbound
1020                        // Shutdown drove us to Idle first), wiping
1021                        // the believed leader would leave the
1022                        // coordinator with no recovery signal.
1023                        clear_leader_belief_and_tokens(tracker, outstanding);
1024                    }
1025                    Err(CoordinatorError::TagSink(e)) => {
1026                        // State moved to the target (Leader /
1027                        // Replica); only the chain-tag side-effect
1028                        // failed. We are functionally in the new
1029                        // role — clear the believed leader so the
1030                        // tick path doesn't keep treating an old
1031                        // peer as authoritative. Chain discovery
1032                        // for this channel stays silent until the
1033                        // next successful announce; operators
1034                        // observing this counter see the
1035                        // divergence.
1036                        tracing::warn!(
1037                            error = ?e,
1038                            target = ?pt.target,
1039                            "replication: post-election chain-tag side-effect failed; state advanced"
1040                        );
1041                        clear_leader_belief_and_tokens(tracker, outstanding);
1042                    }
1043                    Err(CoordinatorError::Transition(e)) => {
1044                        // State did not move (typically because a
1045                        // concurrent ChannelClose drove us to Idle
1046                        // first). Recover by clearing the believed
1047                        // leader so the next tick re-enters
1048                        // discovery from a clean slate rather than
1049                        // sitting on a stale belief.
1050                        tracing::warn!(
1051                            error = ?e,
1052                            target = ?pt.target,
1053                            "replication: post-election transition rejected; state moved out from under us"
1054                        );
1055                        clear_leader_belief_and_tokens(tracker, outstanding);
1056                    }
1057                }
1058            }
1059        }
1060    }
1061}
1062
1063#[allow(clippy::too_many_arguments)]
1064async fn on_inbound(
1065    inputs: &RuntimeInputs,
1066    coordinator: &Arc<ReplicationCoordinator>,
1067    dispatcher: &Arc<dyn ReplicationDispatcher>,
1068    state: &RuntimeState,
1069    event: Inbound,
1070) {
1071    let RuntimeState {
1072        tracker,
1073        budget,
1074        backoff,
1075        outstanding,
1076    } = state;
1077    // Peer-auth gate. Every inbound replication message must come
1078    // from a peer in the channel's configured replica_set; any
1079    // other sender has no business driving the state machine for
1080    // this channel. Pre-fix the handlers below only checked the
1081    // channel_id, so any mesh peer with SUBPROTOCOL_REDEX reach
1082    // could ship Heartbeat / SyncRequest / SyncResponse / SyncNack
1083    // and the runtime would apply them. SyncResponse hijack was
1084    // the worst case — a non-leader peer could write attacker-
1085    // chosen bytes to the replica's local log via append_batch.
1086    //
1087    // Shutdown is local-only (never crosses the wire), so it
1088    // bypasses the membership check; the from field on a Shutdown
1089    // event is the local node's id.
1090    let from_node = match &event {
1091        Inbound::Shutdown => None,
1092        Inbound::Heartbeat { from, .. } => Some(*from),
1093        Inbound::SyncRequest { from, .. } => Some(*from),
1094        Inbound::SyncResponse { from, .. } => Some(*from),
1095        Inbound::SyncNack { from, .. } => Some(*from),
1096    };
1097    if let Some(from) = from_node {
1098        if !inputs.replica_set.contains(&from) {
1099            tracing::trace!(
1100                from = from,
1101                channel = ?inputs.channel_id,
1102                "replication: dropping inbound from peer not in replica_set"
1103            );
1104            return;
1105        }
1106    }
1107    match event {
1108        Inbound::Shutdown => {
1109            // Handled by the main loop; never reaches here.
1110            unreachable!("Shutdown is filtered in the main loop");
1111        }
1112        Inbound::Heartbeat { from, msg } => {
1113            // Validate channel-id match. Mismatched heartbeats
1114            // arrive when the mesh dispatcher misroutes (which
1115            // shouldn't happen, but better to drop than to
1116            // poison the tracker).
1117            if msg.channel_id != inputs.channel_id {
1118                tracing::trace!(
1119                    from = from,
1120                    "replication: dropping heartbeat for wrong channel"
1121                );
1122                return;
1123            }
1124            // Source from tokio's clock so silence detection works
1125            // under tokio::time::pause() — std::Instant::now()
1126            // wouldn't advance with virtual time and the tick-driven
1127            // silence check would never fire deterministically.
1128            tracker.lock().record_heartbeat(
1129                from,
1130                msg.role,
1131                msg.tail_seq,
1132                tokio::time::Instant::now().into_std(),
1133            );
1134            // Dual-leader convergence. A symmetric-RTT election or
1135            // a partition heal can leave two peers both believing
1136            // they are Leader for the same channel. Without
1137            // convergence, both partitions keep writing — divergent
1138            // histories accrete and `apply_sync_response` eventually
1139            // logs `GapBeforeChunk{divergence_suspected:true}` while
1140            // `skip_to` silently overwrites the loser's tail.
1141            //
1142            // On any inbound Heartbeat with role=Leader while self
1143            // is Leader, run the deterministic tiebreak: the side
1144            // with the higher tail_seq keeps Leader; on a tie, the
1145            // numerically smaller node_id keeps Leader. The loser
1146            // concedes via `Leader → Replica` so the next tick
1147            // re-resolves through the heartbeat cycle.
1148            if msg.role == ReplicaRole::Leader
1149                && coordinator.role() == ReplicaRole::Leader
1150                && from != inputs.self_node_id
1151            {
1152                let local_tail = (inputs.tail_provider)();
1153                let peer_tail = msg.tail_seq;
1154                let local_wins = local_tail > peer_tail
1155                    || (local_tail == peer_tail && inputs.self_node_id < from);
1156                if !local_wins {
1157                    tracing::warn!(
1158                        from = from,
1159                        peer_tail = peer_tail,
1160                        local_tail = local_tail,
1161                        local = inputs.self_node_id,
1162                        "replication: peer-leader observed; conceding via Leader → Replica"
1163                    );
1164                    let _ = coordinator
1165                        .transition_to(
1166                            ReplicaRole::Replica,
1167                            super::replication_state::TransitionSignal::PeerLeaderObserved,
1168                        )
1169                        .await;
1170                }
1171            }
1172        }
1173        Inbound::SyncRequest { from, msg } => {
1174            // R-12: defense-in-depth — validate channel_id at the
1175            // runtime boundary. The wire decoder + mesh router are
1176            // supposed to demux by channel, but a misroute would
1177            // otherwise apply against the wrong file.
1178            if msg.channel_id != inputs.channel_id {
1179                tracing::trace!(
1180                    from = from,
1181                    "replication: dropping SyncRequest for wrong channel"
1182                );
1183                return;
1184            }
1185            // Leader-side: only honor SyncRequest when we believe
1186            // we're the leader. Other roles surface `NotLeader`
1187            // so the replica re-resolves leadership.
1188            if coordinator.role() != ReplicaRole::Leader {
1189                let nack = SyncNack {
1190                    channel_id: inputs.channel_id,
1191                    since_seq: msg.since_seq,
1192                    error_code: super::replication::SyncNackError::NotLeader,
1193                    leader_first_retained_seq: 0,
1194                    request_id: msg.request_id,
1195                    detail: String::new(),
1196                };
1197                if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1198                    tracing::trace!(from = from, error = ?e, "replication: NotLeader NACK send failed");
1199                }
1200                return;
1201            }
1202            // Run the catch-up helper against our local file.
1203            match handle_sync_request(&inputs.file, &msg, inputs.channel_id) {
1204                SyncRequestOutcome::Response(resp) => {
1205                    let byte_estimate = estimate_response_bytes(&resp);
1206                    // Gate on the bandwidth budget. If the
1207                    // budget can't admit this chunk, NACK with
1208                    // Backpressure so the replica backs off.
1209                    let admitted = {
1210                        let mut bb = budget.lock();
1211                        bb.try_consume_with_class(
1212                            byte_estimate,
1213                            msg.class,
1214                            tokio::time::Instant::now().into_std(),
1215                            inputs.background_fraction,
1216                        )
1217                    };
1218                    if !admitted {
1219                        let nack = SyncNack {
1220                            channel_id: inputs.channel_id,
1221                            since_seq: msg.since_seq,
1222                            error_code: super::replication::SyncNackError::Backpressure,
1223                            leader_first_retained_seq: 0,
1224                            request_id: msg.request_id,
1225                            detail: String::new(),
1226                        };
1227                        if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1228                            tracing::trace!(from = from, error = ?e, "replication: Backpressure NACK send failed");
1229                        }
1230                        return;
1231                    }
1232                    // R-1: re-check role after the read + budget
1233                    // gate. If a concurrent transition flipped us
1234                    // out of Leader (DiskPressureWithdraw, peer
1235                    // concession), don't ship the response —
1236                    // NACK NotLeader instead so the replica re-
1237                    // resolves through find_chain_holders.
1238                    if coordinator.role() != ReplicaRole::Leader {
1239                        let nack = SyncNack {
1240                            channel_id: inputs.channel_id,
1241                            since_seq: msg.since_seq,
1242                            error_code: super::replication::SyncNackError::NotLeader,
1243                            leader_first_retained_seq: 0,
1244                            request_id: msg.request_id,
1245                            detail: String::new(),
1246                        };
1247                        if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1248                            tracing::trace!(from = from, error = ?e, "replication: post-op NotLeader NACK send failed");
1249                        }
1250                        return;
1251                    }
1252                    // Bump the cumulative bytes metric BEFORE
1253                    // ship so the operator's view stays accurate
1254                    // even if the wire send fails (the bytes
1255                    // would still have been read off disk).
1256                    coordinator.metrics().incr_sync_bytes(byte_estimate);
1257                    if let Err(e) = dispatcher.send_sync_response(from, resp).await {
1258                        // Refund the budget — pre-fix repeated
1259                        // send failures over a flaky link drained
1260                        // the bucket toward permanent backpressure
1261                        // without shipping any traffic. The cumul-
1262                        // ative bytes metric stays incremented
1263                        // (operators still see "we tried to send
1264                        // these bytes") but the rate budget can
1265                        // recover.
1266                        //
1267                        // Per the dispatcher trait's `Ok(())`
1268                        // semantics: a transport that returns
1269                        // `Ok(())` after queueing-without-delivery
1270                        // (UDP, lossy QUIC under link failure) will
1271                        // NOT trigger this refund and the budget
1272                        // over-counts on silent loss. R-28
1273                        // catchup-backoff dampens the wedged-link
1274                        // case once empty responses accrue past
1275                        // threshold, but the budget itself cannot
1276                        // self-correct without an end-to-end ack
1277                        // the trait deliberately does not demand.
1278                        {
1279                            let mut bb = budget.lock();
1280                            bb.refund(byte_estimate);
1281                        }
1282                        tracing::trace!(from = from, error = ?e, "replication: SyncResponse send failed");
1283                    }
1284                }
1285                SyncRequestOutcome::Nack {
1286                    error_code,
1287                    leader_first_retained_seq,
1288                    detail,
1289                } => {
1290                    let nack = SyncNack {
1291                        channel_id: inputs.channel_id,
1292                        since_seq: msg.since_seq,
1293                        error_code,
1294                        leader_first_retained_seq,
1295                        request_id: msg.request_id,
1296                        detail,
1297                    };
1298                    if let Err(e) = dispatcher.send_sync_nack(from, nack).await {
1299                        tracing::trace!(from = from, error = ?e, "replication: SyncNack send failed");
1300                    }
1301                }
1302            }
1303        }
1304        Inbound::SyncResponse { from, msg } => {
1305            // R-12: defense-in-depth — validate channel_id at the
1306            // runtime boundary.
1307            if msg.channel_id != inputs.channel_id {
1308                tracing::trace!(
1309                    from = from,
1310                    "replication: dropping SyncResponse for wrong channel"
1311                );
1312                return;
1313            }
1314            // R-23 request-token correlation. The replica's
1315            // outstanding-request set tracks `(leader, request_id)`
1316            // tuples for every SyncRequest the runtime has shipped.
1317            // A response whose `request_id` is not in the set is
1318            // stale (the replica timed out and re-issued) or
1319            // forged (leader echoed a non-matching token). Drop
1320            // without applying so a stale chunk can't land on the
1321            // current request's apply path.
1322            //
1323            // Take FIRST, before the role / believed-leader gates,
1324            // so a response that arrives while we're briefly out
1325            // of `Replica` (role thrash, post-election) still
1326            // consumes its outstanding-token entry. Pre-fix the
1327            // role gate returned early without `take`, and the
1328            // token sat in the per-leader set until TTL (30 s) —
1329            // under role thrash the SOFT_CAP GC then dropped
1330            // entries from OTHER leaders to make room, evicting
1331            // legitimately in-flight tokens.
1332            //
1333            // Consuming a token in the dropped-response path is
1334            // safe: request_ids are random 64-bit (collision
1335            // negligible), so a subsequent re-issue uses a fresh
1336            // id and the consumed entry isn't reachable.
1337            {
1338                let now = tokio::time::Instant::now().into_std();
1339                if !outstanding.lock().take(from, msg.request_id, now) {
1340                    tracing::trace!(
1341                        from = from,
1342                        request_id = msg.request_id,
1343                        "replication: dropping SyncResponse with unknown request_id"
1344                    );
1345                    return;
1346                }
1347            }
1348            // SyncResponse is the state-mutating wire input — only
1349            // the believed leader is allowed to ship it. A peer that
1350            // is in the replica_set but is not the current leader
1351            // could otherwise forge `append_batch`-bound payloads
1352            // into a replica's log. The replica_set gate at entry
1353            // narrows the surface to configured members; the
1354            // believed_leader gate here narrows further to the
1355            // single peer the replica is currently following.
1356            let leader_belief = tracker.lock().believed_leader();
1357            if leader_belief != Some(from) {
1358                tracing::trace!(
1359                    from = from,
1360                    believed_leader = ?leader_belief,
1361                    "replication: dropping SyncResponse from non-leader peer"
1362                );
1363                return;
1364            }
1365            // Replica-side: apply the chunk to our local file.
1366            // Only honor responses when we believe we're a
1367            // Replica — other roles ignore them.
1368            if coordinator.role() != ReplicaRole::Replica {
1369                tracing::trace!(
1370                    from = from,
1371                    "replication: SyncResponse received in role {:?}; ignoring",
1372                    coordinator.role(),
1373                );
1374                return;
1375            }
1376            // Snapshot the pre-apply tail so the post-apply
1377            // result can be classified as "empty" (apply returned
1378            // the same tail, no events landed) vs "progress"
1379            // (tail advanced). Drives the R-28 catchup-backoff
1380            // accounting below.
1381            let pre_apply_tail = inputs.file.next_seq();
1382            match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
1383                Ok(new_tail) => {
1384                    // Record the post-apply tail on the coordinator
1385                    // so capability-tag advertisements ride
1386                    // `tip_seq=new_tail` instead of the dead-default
1387                    // 0. find_chain_holders picks the freshest
1388                    // holder by tip_seq during failover; pre-fix
1389                    // every Leader/Replica advertised tip_seq=0
1390                    // and lex-smallest holder won selection.
1391                    coordinator.record_tail_seq(new_tail);
1392                    // R-28 catchup-backoff accounting. If the
1393                    // response advanced our tail, the leader is
1394                    // making progress — clear any backoff state so
1395                    // the next tick can issue another SyncRequest
1396                    // immediately. If the response was empty
1397                    // (apply returned the same tail) while the
1398                    // leader's advertised tail is still strictly
1399                    // greater than ours, record an empty strike.
1400                    // After `CATCHUP_BACKOFF_THRESHOLD` consecutive
1401                    // empties, the tracker stamps a backoff window
1402                    // that the outbound dispatch consults.
1403                    if new_tail > pre_apply_tail {
1404                        backoff.lock().record_progress(from);
1405                    } else {
1406                        // Pre-fix the strike fired whenever the
1407                        // CACHED heartbeat tail was still above ours
1408                        // — the cached value can be hundreds of ms
1409                        // stale, so a replica that caught up between
1410                        // the heartbeat and the response would
1411                        // strike against a leader that has nothing
1412                        // to send. After
1413                        // `CATCHUP_BACKOFF_THRESHOLD` such false
1414                        // strikes the leader sat in a 1–30 s
1415                        // backoff while nothing was actually wrong.
1416                        //
1417                        // Guard the strike on heartbeat freshness:
1418                        // only when the leader's most recent
1419                        // heartbeat is inside the miss-threshold
1420                        // window do we trust its claimed `tail_seq`
1421                        // as evidence the leader has more data to
1422                        // ship. A stale heartbeat is no signal —
1423                        // skip the strike entirely.
1424                        let now = tokio::time::Instant::now().into_std();
1425                        let strike = {
1426                            let t = tracker.lock();
1427                            let peer = t.peer_state(from);
1428                            let lag = t.peer_lag(from, now);
1429                            let fresh_window = std::time::Duration::from_millis(
1430                                t.heartbeat_ms().saturating_mul(t.miss_threshold() as u64),
1431                            );
1432                            matches!(
1433                                (peer, lag),
1434                                (Some(p), Some(elapsed))
1435                                    if elapsed < fresh_window && p.tail_seq > new_tail
1436                            )
1437                        };
1438                        if strike {
1439                            backoff.lock().record_empty(from, now);
1440                        }
1441                    }
1442                    tracing::trace!(
1443                        from = from,
1444                        new_tail = new_tail,
1445                        "replication: applied chunk"
1446                    );
1447                }
1448                Err(super::replication_catchup::ApplyError::AppendFailed(detail)) => {
1449                    // Disk-pressure surface — per plan §7, the
1450                    // local file rejected the append (heap segment
1451                    // at the 3 GB hard cap, or a disk write fail
1452                    // on the persistent tier). Consult the
1453                    // configured `UnderCapacity` policy and react.
1454                    handle_disk_pressure(coordinator, &inputs.file, &detail, from).await;
1455                }
1456                Err(super::replication_catchup::ApplyError::GapBeforeChunk {
1457                    first_seq,
1458                    local_next,
1459                    divergence_suspected,
1460                }) => {
1461                    // Plan §8 skip-ahead — the leader trimmed past
1462                    // our local tail; the chunk's first_seq is
1463                    // strictly above local_next. Skip the local
1464                    // sequence forward to first_seq and retry the
1465                    // apply (the chunk's events line up with the
1466                    // new tail). On persistent files
1467                    // `skip_to` returns an error and we fall back
1468                    // to log+drop; the heartbeat-cycle recovery
1469                    // path catches us up on the next tick.
1470                    coordinator.metrics().incr_skip_ahead();
1471                    match inputs.file.skip_to(first_seq) {
1472                        Ok(()) => {
1473                            // R-13: `first_seq > local_next` is the
1474                            // `GapBeforeChunk` invariant; use
1475                            // saturating_sub for defense-in-depth.
1476                            debug_assert!(first_seq > local_next);
1477                            // R-5: when divergence is suspected,
1478                            // surface it at warn level with an
1479                            // explicit message so operator dashboards
1480                            // can see split-brain post-mortems
1481                            // separately from routine retention
1482                            // trims.
1483                            if divergence_suspected {
1484                                tracing::warn!(
1485                                    from = from,
1486                                    from_seq = local_next,
1487                                    to_seq = first_seq,
1488                                    gap = first_seq.saturating_sub(local_next),
1489                                    "replication: skip-ahead crossed leader's retained range — \
1490                                     divergent log suspected (split-brain post-mortem)"
1491                                );
1492                            } else {
1493                                tracing::warn!(
1494                                    from = from,
1495                                    from_seq = local_next,
1496                                    to_seq = first_seq,
1497                                    gap = first_seq.saturating_sub(local_next),
1498                                    "replication: skip-ahead — leader trimmed past local tail"
1499                                );
1500                            }
1501                            // Retry the apply now that the local
1502                            // tail matches first_seq.
1503                            match apply_sync_response(&inputs.file, &msg, inputs.channel_id) {
1504                                Ok(new_tail) => {
1505                                    coordinator.record_tail_seq(new_tail);
1506                                    tracing::trace!(
1507                                        from = from,
1508                                        new_tail = new_tail,
1509                                        "replication: applied chunk after skip-ahead"
1510                                    );
1511                                }
1512                                Err(e) => {
1513                                    tracing::warn!(
1514                                        from = from,
1515                                        error = ?e,
1516                                        "replication: apply after skip-ahead failed"
1517                                    );
1518                                }
1519                            }
1520                        }
1521                        Err(e) => {
1522                            tracing::warn!(
1523                                from = from,
1524                                first_seq = first_seq,
1525                                error = %e,
1526                                "replication: skip_to rejected; falling back to heartbeat-cycle recovery"
1527                            );
1528                        }
1529                    }
1530                }
1531                Err(e) => {
1532                    // Remaining ApplyError variants — channel
1533                    // mismatch, monotonicity violation, stale
1534                    // chunk. Log + drop; reliable-stream /
1535                    // heartbeat cycle recovers.
1536                    tracing::warn!(
1537                        from = from,
1538                        error = ?e,
1539                        "replication: apply_sync_response failed"
1540                    );
1541                }
1542            }
1543        }
1544        Inbound::SyncNack { from, msg } => {
1545            // R-12: defense-in-depth — validate channel_id at the
1546            // runtime boundary.
1547            if msg.channel_id != inputs.channel_id {
1548                tracing::trace!(
1549                    from = from,
1550                    "replication: dropping SyncNack for wrong channel"
1551                );
1552                return;
1553            }
1554            // Only the believed leader can NACK; otherwise a non-
1555            // leader replica_set peer could spam NotLeader nacks to
1556            // make us clear our believed_leader, or BadRange to
1557            // skip-ahead our local file's tail past in-flight
1558            // events. The replica_set gate at entry handles
1559            // outsider rejection; this narrows further to the
1560            // single peer the replica is currently following.
1561            let leader_belief = tracker.lock().believed_leader();
1562            if leader_belief != Some(from) {
1563                tracing::trace!(
1564                    from = from,
1565                    believed_leader = ?leader_belief,
1566                    "replication: dropping SyncNack from non-leader peer"
1567                );
1568                return;
1569            }
1570            // R-23 request-token correlation. A NACK with a token
1571            // not in the outstanding set is stale (from a request
1572            // the replica already timed out) — the BadRange arm
1573            // below mutates the local file via skip_to, and a
1574            // stale BadRange could destroy data on retry. Drop
1575            // unmatched NACKs.
1576            {
1577                let now = tokio::time::Instant::now().into_std();
1578                if !outstanding.lock().take(from, msg.request_id, now) {
1579                    tracing::trace!(
1580                        from = from,
1581                        request_id = msg.request_id,
1582                        "replication: dropping SyncNack with unknown request_id"
1583                    );
1584                    return;
1585                }
1586            }
1587            // Replicas key their retry policy on `error_code`.
1588            // Phase D §2 retry policy:
1589            //   1 NotLeader   → re-resolve leader (clear tracker
1590            //                    so next election cycle starts
1591            //                    clean)
1592            //   2 BadRange    → trim local tail / skip-ahead
1593            //   3 Backpressure → exponential backoff (handled by
1594            //                    not issuing the next request)
1595            //   4 ChannelClosed → withdraw replica role
1596            use super::replication::SyncNackError;
1597            match msg.error_code {
1598                SyncNackError::NotLeader => {
1599                    // R-4: actually clear the believed leader so
1600                    // the next tick re-resolves via the heartbeat
1601                    // cycle instead of looping on the stale leader
1602                    // belief until 3 missed heartbeats trip.
1603                    clear_leader_belief_and_tokens(tracker, outstanding);
1604                    tracing::trace!(
1605                        from = from,
1606                        "replication: NACK NotLeader — cleared believed leader"
1607                    );
1608                }
1609                SyncNackError::BadRange => {
1610                    // R-40: skip directly to the leader's
1611                    // first-retained seq carried in the NACK. Pre-
1612                    // fix the replica advanced one seq per round
1613                    // trip (skip_to(since_seq + 1)) which thrashed
1614                    // when the retention floor was many seqs above
1615                    // `since_seq` — every retry re-asked below the
1616                    // floor and re-NACKed. With the wire field, one
1617                    // round trip suffices: `skip_to(leader_first_
1618                    // retained_seq)` puts local tail at the floor
1619                    // and the next SyncRequest re-asks exactly
1620                    // there. Fall back to `since_seq + 1` if the
1621                    // leader sent `0` (legacy / never-retained
1622                    // channels) so an out-of-band sender can't
1623                    // make us no-op on the retry.
1624                    coordinator.metrics().incr_skip_ahead();
1625                    let target = if msg.leader_first_retained_seq > 0 {
1626                        msg.leader_first_retained_seq
1627                    } else {
1628                        msg.since_seq.saturating_add(1)
1629                    };
1630                    match inputs.file.skip_to(target) {
1631                        Ok(()) => tracing::warn!(
1632                            from = from,
1633                            since_seq = msg.since_seq,
1634                            leader_first_retained_seq = msg.leader_first_retained_seq,
1635                            target = target,
1636                            "replication: NACK BadRange — local tail skipped to leader's first-retained seq"
1637                        ),
1638                        Err(e) => tracing::trace!(
1639                            from = from,
1640                            error = %e,
1641                            "replication: NACK BadRange — skip_to rejected, falling back to heartbeat retry"
1642                        ),
1643                    }
1644                }
1645                SyncNackError::Backpressure => {
1646                    tracing::trace!(
1647                        from = from,
1648                        "replication: NACK Backpressure — deferring next request"
1649                    );
1650                }
1651                SyncNackError::ChannelClosed => {
1652                    tracing::warn!(
1653                        from = from,
1654                        "replication: NACK ChannelClosed — withdrawing role"
1655                    );
1656                    // The leader is telling us the channel is gone;
1657                    // shut down regardless of our current role.
1658                    // ChannelClose is the only signal valid from
1659                    // any state (Leader, Candidate, Replica, Idle).
1660                    // DiskPressureWithdraw is only valid from
1661                    // Replica and would silently fail-and-log if a
1662                    // role flip happened between sending the
1663                    // SyncRequest and receiving the NACK.
1664                    let _ = coordinator
1665                        .transition_to(
1666                            ReplicaRole::Idle,
1667                            super::replication_state::TransitionSignal::ChannelClose,
1668                        )
1669                        .await;
1670                }
1671            }
1672        }
1673    }
1674}
1675
1676/// React to a disk-pressure signal from `apply_sync_response`. The
1677/// local file rejected the append — heap segment at 3 GB hard cap
1678/// or a disk-tier write fail. Consult the channel's configured
1679/// `UnderCapacity` policy and apply.
1680///
1681/// Plan §7:
1682///
1683/// - `Withdraw` (default) — drop the replica role so the leader's
1684///   other replicas can take over the redundancy responsibility.
1685///   The channel's `causal:<hex>` tag is withdrawn via the
1686///   coordinator's `* → Idle` side-effect. Operators see the
1687///   `dataforts_replication_under_capacity_total` counter advance
1688///   and the role flip to Idle.
1689/// - `EvictOldest` — call `RedexFile::sweep_retention()` to evict
1690///   on the configured caps + bump the counter. Caller stays in
1691///   Replica role; the next `SyncResponse` retries the apply. If
1692///   no retention caps are configured the sweep is a no-op and
1693///   the next apply will fail again — operators who pick this
1694///   policy should pair it with `retention_max_*` settings.
1695async fn handle_disk_pressure(
1696    coordinator: &Arc<ReplicationCoordinator>,
1697    file: &super::file::RedexFile,
1698    detail: &str,
1699    from: NodeId,
1700) {
1701    use super::replication_config::UnderCapacity;
1702    coordinator.metrics().incr_under_capacity();
1703    let policy = coordinator.config().on_under_capacity;
1704    match policy {
1705        UnderCapacity::Withdraw => {
1706            tracing::warn!(
1707                from = from,
1708                detail = detail,
1709                "replication: disk pressure → withdrawing role"
1710            );
1711            // The transition matrix only permits
1712            // DiskPressureWithdraw on Replica → Idle. If a role
1713            // flip landed between the apply attempt and this
1714            // withdraw, pick the signal that's actually valid for
1715            // the current role so we don't silently log+drop the
1716            // transition and keep writing through pressure.
1717            let signal = match coordinator.role() {
1718                ReplicaRole::Replica => {
1719                    super::replication_state::TransitionSignal::DiskPressureWithdraw
1720                }
1721                ReplicaRole::Idle => {
1722                    // Already withdrawn — short-circuit via the
1723                    // ChannelClose idempotent path so we don't bump
1724                    // counters twice on a benign race.
1725                    super::replication_state::TransitionSignal::ChannelClose
1726                }
1727                ReplicaRole::Leader => {
1728                    // Role-specific signal so the transition metric
1729                    // labels this as disk-pressure withdraw instead
1730                    // of the ChannelClose fallback (which operator
1731                    // dashboards triage as "graceful channel close").
1732                    super::replication_state::TransitionSignal::LeaderDiskPressureWithdraw
1733                }
1734                ReplicaRole::Candidate => {
1735                    super::replication_state::TransitionSignal::CandidateDiskPressureWithdraw
1736                }
1737            };
1738            if let Err(e) = coordinator.transition_to(ReplicaRole::Idle, signal).await {
1739                tracing::warn!(
1740                    error=?e,
1741                    "replication: disk-pressure withdraw transition failed"
1742                );
1743            }
1744        }
1745        UnderCapacity::EvictOldest => {
1746            tracing::warn!(
1747                from = from,
1748                detail = detail,
1749                "replication: disk pressure → sweeping retention"
1750            );
1751            file.sweep_retention();
1752        }
1753    }
1754}
1755
1756/// Estimate the wire-cost of a [`SyncResponse`] for budget
1757/// accounting. Header + per-event overhead per `replication.rs`'s
1758/// `to_bytes` shape.
1759fn estimate_response_bytes(resp: &SyncResponse) -> u64 {
1760    // Header: 3 + 32 + 8 + 4 = 47 bytes.
1761    let mut bytes: u64 = 47;
1762    for ev in &resp.events {
1763        // event_seq u64 + payload_len u32 + payload bytes.
1764        bytes += 8 + 4 + ev.payload.len() as u64;
1765    }
1766    bytes
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771    use super::*;
1772    use crate::adapter::net::channel::ChannelName;
1773    use crate::adapter::net::redex::replication::ReplicaRole;
1774    use crate::adapter::net::redex::replication_config::ReplicationConfig;
1775    use crate::adapter::net::redex::replication_coordinator::ChainTagSink;
1776    use crate::adapter::net::redex::replication_metrics::ReplicationMetricsRegistry;
1777    use parking_lot::Mutex as ParkingMutex;
1778
1779    /// No-op chain-tag sink for unit tests; the runtime path
1780    /// doesn't exercise the chain-tag side-effect in this
1781    /// commit's scope.
1782    #[derive(Default)]
1783    struct NoopTagSink;
1784
1785    #[async_trait::async_trait]
1786    impl ChainTagSink for NoopTagSink {
1787        async fn announce_chain(
1788            &self,
1789            _origin_hash: u64,
1790            _tip_seq: u64,
1791        ) -> Result<(), AdapterError> {
1792            Ok(())
1793        }
1794        async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
1795            Ok(())
1796        }
1797    }
1798
1799    /// Recorder dispatcher — captures every outbound wire
1800    /// message. Each variant pushes into a separate Vec so
1801    /// tests can assert on shape + ordering.
1802    #[derive(Default)]
1803    struct RecorderDispatcher {
1804        heartbeats: ParkingMutex<Vec<(NodeId, SyncHeartbeat)>>,
1805        sync_requests: ParkingMutex<Vec<(NodeId, SyncRequest)>>,
1806        sync_responses: ParkingMutex<Vec<(NodeId, SyncResponse)>>,
1807        sync_nacks: ParkingMutex<Vec<(NodeId, SyncNack)>>,
1808    }
1809
1810    #[async_trait::async_trait]
1811    impl ReplicationDispatcher for RecorderDispatcher {
1812        async fn send_heartbeat(
1813            &self,
1814            target: NodeId,
1815            msg: SyncHeartbeat,
1816        ) -> Result<(), AdapterError> {
1817            self.heartbeats.lock().push((target, msg));
1818            Ok(())
1819        }
1820        async fn send_sync_request(
1821            &self,
1822            target: NodeId,
1823            msg: SyncRequest,
1824        ) -> Result<(), AdapterError> {
1825            self.sync_requests.lock().push((target, msg));
1826            Ok(())
1827        }
1828        async fn send_sync_response(
1829            &self,
1830            target: NodeId,
1831            msg: SyncResponse,
1832        ) -> Result<(), AdapterError> {
1833            self.sync_responses.lock().push((target, msg));
1834            Ok(())
1835        }
1836        async fn send_sync_nack(&self, target: NodeId, msg: SyncNack) -> Result<(), AdapterError> {
1837            self.sync_nacks.lock().push((target, msg));
1838            Ok(())
1839        }
1840    }
1841
1842    fn channel_id_for(name: &str) -> ChannelId {
1843        let cn = ChannelName::new(name).unwrap();
1844        ChannelId::from_name(&cn)
1845    }
1846
1847    fn build_file_for_tests() -> RedexFile {
1848        use crate::adapter::net::redex::config::RedexFileConfig;
1849        use crate::adapter::net::redex::manager::Redex;
1850        let r = Redex::new();
1851        let cn = ChannelName::new("test/runtime").unwrap();
1852        r.open_file(&cn, RedexFileConfig::default()).unwrap()
1853    }
1854
1855    fn build_inputs(self_id: NodeId, replicas: Vec<NodeId>, hb_ms: u64) -> RuntimeInputs {
1856        RuntimeInputs {
1857            channel: ChannelIdentity {
1858                channel_name: "test/runtime".to_string(),
1859                origin_hash: 0xCAFE_BABE,
1860            },
1861            channel_id: channel_id_for("test/runtime"),
1862            self_node_id: self_id,
1863            replica_set: replicas,
1864            heartbeat_ms: hb_ms,
1865            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
1866            tail_provider: Arc::new(|| 42),
1867            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
1868            file: build_file_for_tests(),
1869            default_bandwidth_class: Default::default(),
1870            background_fraction: 0.3,
1871        }
1872    }
1873
1874    fn build_coordinator(
1875        self_id: NodeId,
1876        replicas: Vec<NodeId>,
1877    ) -> (Arc<ReplicationCoordinator>, ReplicationMetricsRegistry) {
1878        let _ = (self_id, replicas);
1879        let registry = ReplicationMetricsRegistry::new();
1880        let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
1881        let coordinator = ReplicationCoordinator::new(
1882            ChannelIdentity {
1883                channel_name: "test/runtime".to_string(),
1884                origin_hash: 0xCAFE_BABE,
1885            },
1886            ReplicationConfig::new(),
1887            sink,
1888            &registry,
1889        );
1890        (Arc::new(coordinator), registry)
1891    }
1892
1893    fn build_budget() -> Arc<Mutex<BandwidthBudget>> {
1894        let now = Instant::now();
1895        Arc::new(Mutex::new(BandwidthBudget::new(0.5, 10_000_000, now)))
1896    }
1897
1898    fn build_backoff() -> Arc<Mutex<CatchupBackoff>> {
1899        Arc::new(Mutex::new(CatchupBackoff::new()))
1900    }
1901
1902    /// Bundle the four per-channel state pieces into the
1903    /// `RuntimeState` the on_tick / on_inbound functions take.
1904    /// Tests construct tracker + budget explicitly; backoff and
1905    /// outstanding are stock fresh instances. Pre-refactor these
1906    /// were passed as four separate arguments.
1907    fn build_state(
1908        tracker: Arc<Mutex<HeartbeatTracker>>,
1909        budget: Arc<Mutex<BandwidthBudget>>,
1910    ) -> RuntimeState {
1911        RuntimeState {
1912            tracker,
1913            budget,
1914            backoff: Arc::new(Mutex::new(CatchupBackoff::new())),
1915            outstanding: Arc::new(Mutex::new(OutstandingRequests::new())),
1916        }
1917    }
1918
1919    #[tokio::test]
1920    async fn leader_emits_heartbeat_to_peers_each_tick() {
1921        let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 100);
1922        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
1923        // Promote to Leader via the state machine (Idle → Replica
1924        // → Candidate → Leader).
1925        coordinator
1926            .transition_to(
1927                ReplicaRole::Replica,
1928                super::super::replication_state::TransitionSignal::CapabilitySelected,
1929            )
1930            .await
1931            .unwrap();
1932        coordinator
1933            .transition_to(
1934                ReplicaRole::Candidate,
1935                super::super::replication_state::TransitionSignal::MissedHeartbeats,
1936            )
1937            .await
1938            .unwrap();
1939        coordinator
1940            .transition_to(
1941                ReplicaRole::Leader,
1942                super::super::replication_state::TransitionSignal::ElectionWon,
1943            )
1944            .await
1945            .unwrap();
1946
1947        let dispatcher = Arc::new(RecorderDispatcher::default());
1948        let handle =
1949            spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
1950
1951        // Sleep ~3 ticks worth (300ms cadence at 100ms = ~3 ticks).
1952        tokio::time::sleep(Duration::from_millis(350)).await;
1953
1954        let heartbeats = dispatcher.heartbeats.lock().clone();
1955        assert!(
1956            heartbeats.len() >= 2,
1957            "expected ≥ 2 heartbeats over 350ms at 100ms cadence; got {}",
1958            heartbeats.len(),
1959        );
1960        // Each heartbeat goes to a non-self peer.
1961        for (target, msg) in &heartbeats {
1962            assert!(*target == 0x20 || *target == 0x30);
1963            assert_eq!(msg.role, ReplicaRole::Leader);
1964            assert_eq!(msg.tail_seq, 42);
1965        }
1966
1967        handle.cancel().await;
1968        assert!(handle.is_stopped());
1969    }
1970
1971    #[tokio::test]
1972    async fn inbound_heartbeat_records_into_tracker() {
1973        let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
1974        let cid = inputs.channel_id;
1975        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
1976        coordinator
1977            .transition_to(
1978                ReplicaRole::Replica,
1979                super::super::replication_state::TransitionSignal::CapabilitySelected,
1980            )
1981            .await
1982            .unwrap();
1983        let dispatcher = Arc::new(RecorderDispatcher::default());
1984        let handle =
1985            spawn_replication_runtime(inputs, coordinator, dispatcher.clone(), build_budget());
1986
1987        // Push a peer heartbeat into the inbox.
1988        handle
1989            .dispatch(Inbound::Heartbeat {
1990                from: 0x20,
1991                msg: SyncHeartbeat {
1992                    channel_id: cid,
1993                    tail_seq: 99,
1994                    role: ReplicaRole::Leader,
1995                    wall_clock_ms: 1_700_000_000_000,
1996                },
1997            })
1998            .await
1999            .unwrap();
2000
2001        // Let the task process.
2002        tokio::time::sleep(Duration::from_millis(150)).await;
2003
2004        // Heartbeats emitted (replica side) and no transition
2005        // triggered (fresh leader heartbeat).
2006        let _heartbeats = dispatcher.heartbeats.lock().clone();
2007        // The coordinator stays in Replica.
2008        handle.cancel().await;
2009    }
2010
2011    /// R-21 regression: when a Leader observes another peer also
2012    /// claiming Leader for the same channel, the deterministic
2013    /// tiebreak demotes the loser to Replica so the partition heal
2014    /// converges to one leader. Without `Leader → Replica`, both
2015    /// partitions stay Leader permanently and accrete divergent
2016    /// histories silently overwritten via `skip_to`.
2017    #[tokio::test]
2018    async fn peer_leader_observation_demotes_loser_to_replica() {
2019        // Local node 0x10 has tail_seq 42 (from build_inputs);
2020        // peer 0x20 advertises tail_seq 99 — peer wins on tail.
2021        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2022        let cid = inputs.channel_id;
2023        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2024        for (role, signal) in [
2025            (
2026                ReplicaRole::Replica,
2027                super::super::replication_state::TransitionSignal::CapabilitySelected,
2028            ),
2029            (
2030                ReplicaRole::Candidate,
2031                super::super::replication_state::TransitionSignal::MissedHeartbeats,
2032            ),
2033            (
2034                ReplicaRole::Leader,
2035                super::super::replication_state::TransitionSignal::ElectionWon,
2036            ),
2037        ] {
2038            coordinator.transition_to(role, signal).await.unwrap();
2039        }
2040        assert_eq!(coordinator.role(), ReplicaRole::Leader);
2041        let dispatcher = Arc::new(RecorderDispatcher::default());
2042        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2043        let budget = build_budget();
2044
2045        on_inbound(
2046            &inputs,
2047            &coordinator,
2048            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2049            &build_state(tracker.clone(), budget.clone()),
2050            Inbound::Heartbeat {
2051                from: 0x20,
2052                msg: SyncHeartbeat {
2053                    channel_id: cid,
2054                    tail_seq: 99,
2055                    role: ReplicaRole::Leader,
2056                    wall_clock_ms: 0,
2057                },
2058            },
2059        )
2060        .await;
2061
2062        // Local tail 42 < peer tail 99 → local loses, demotes.
2063        assert_eq!(
2064            coordinator.role(),
2065            ReplicaRole::Replica,
2066            "Leader with lower tail must concede on PeerLeaderObserved"
2067        );
2068    }
2069
2070    /// R-21 regression: tail-tie tiebreak favors the lower node id.
2071    /// Without the symmetric tiebreak the matrix could leave one
2072    /// side as Leader and the other still claiming Leader after
2073    /// the heal — both must agree on the winner.
2074    #[tokio::test]
2075    async fn peer_leader_tail_tie_lower_node_id_wins() {
2076        // Local 0x10 tail = peer 0x20 tail (both 42). Local wins
2077        // because 0x10 < 0x20.
2078        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2079        let cid = inputs.channel_id;
2080        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2081        for (role, signal) in [
2082            (
2083                ReplicaRole::Replica,
2084                super::super::replication_state::TransitionSignal::CapabilitySelected,
2085            ),
2086            (
2087                ReplicaRole::Candidate,
2088                super::super::replication_state::TransitionSignal::MissedHeartbeats,
2089            ),
2090            (
2091                ReplicaRole::Leader,
2092                super::super::replication_state::TransitionSignal::ElectionWon,
2093            ),
2094        ] {
2095            coordinator.transition_to(role, signal).await.unwrap();
2096        }
2097        let dispatcher = Arc::new(RecorderDispatcher::default());
2098        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2099        let budget = build_budget();
2100
2101        on_inbound(
2102            &inputs,
2103            &coordinator,
2104            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2105            &build_state(tracker.clone(), budget.clone()),
2106            Inbound::Heartbeat {
2107                from: 0x20,
2108                msg: SyncHeartbeat {
2109                    channel_id: cid,
2110                    tail_seq: 42,
2111                    role: ReplicaRole::Leader,
2112                    wall_clock_ms: 0,
2113                },
2114            },
2115        )
2116        .await;
2117
2118        assert_eq!(
2119            coordinator.role(),
2120            ReplicaRole::Leader,
2121            "tail-tie tiebreak: lower node id keeps Leader"
2122        );
2123    }
2124
2125    #[tokio::test]
2126    async fn replica_with_silent_leader_runs_election_and_promotes_self() {
2127        let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
2128        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2129        coordinator
2130            .transition_to(
2131                ReplicaRole::Replica,
2132                super::super::replication_state::TransitionSignal::CapabilitySelected,
2133            )
2134            .await
2135            .unwrap();
2136        let dispatcher = Arc::new(RecorderDispatcher::default());
2137        let handle = spawn_replication_runtime(
2138            inputs,
2139            coordinator.clone(),
2140            dispatcher.clone(),
2141            build_budget(),
2142        );
2143
2144        // Push a single leader heartbeat from 0x20, then let
2145        // enough time pass for silence detection (3 × 50ms = 150ms).
2146        let cid = channel_id_for("test/runtime");
2147        handle
2148            .dispatch(Inbound::Heartbeat {
2149                from: 0x20,
2150                msg: SyncHeartbeat {
2151                    channel_id: cid,
2152                    tail_seq: 99,
2153                    role: ReplicaRole::Leader,
2154                    wall_clock_ms: 1_700_000_000_000,
2155                },
2156            })
2157            .await
2158            .unwrap();
2159
2160        // Sleep > 3 heartbeats + tick alignment so silence
2161        // detection fires AND the post-election transition lands.
2162        tokio::time::sleep(Duration::from_millis(500)).await;
2163
2164        // The election ran with self healthy + RTT=0 in elect();
2165        // peer 0x20 may or may not have been health-filtered by
2166        // the tracker (after the silence threshold, the tracker
2167        // considers 0x20 stale). Either way, self wins: self has
2168        // RTT 0; peer is either filtered out or at 5ms.
2169        assert_eq!(coordinator.role(), ReplicaRole::Leader);
2170
2171        handle.cancel().await;
2172    }
2173
2174    /// Chain-tag sink that returns `Ok` for the first `n` calls
2175    /// then fails every subsequent call. Lets a test pin the post-
2176    /// election sink-failure path: the first call (Idle → Replica
2177    /// announce) succeeds, the second (Candidate → Leader announce)
2178    /// fails — so the coordinator transitions state but the chain-
2179    /// tag side-effect surfaces TagSink.
2180    struct FailingAfterNAnnounceSink {
2181        remaining: ParkingMutex<usize>,
2182    }
2183
2184    #[async_trait::async_trait]
2185    impl ChainTagSink for FailingAfterNAnnounceSink {
2186        async fn announce_chain(
2187            &self,
2188            _origin_hash: u64,
2189            _tip_seq: u64,
2190        ) -> Result<(), AdapterError> {
2191            let mut r = self.remaining.lock();
2192            if *r == 0 {
2193                return Err(AdapterError::Transient(
2194                    "simulated sink failure".to_string(),
2195                ));
2196            }
2197            *r -= 1;
2198            Ok(())
2199        }
2200        async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
2201            Ok(())
2202        }
2203    }
2204
2205    /// A failing tag-sink on the post-election Candidate → Leader
2206    /// transition must NOT strand the coordinator in Candidate. The
2207    /// state machine moves to Leader (the failure is in the side-
2208    /// effect only); the runtime must observe the TagSink error
2209    /// branch, clear the believed leader, and continue. The
2210    /// previous code path logged + dropped, leaving the coordinator
2211    /// effectively healthy but stale-state in the tracker.
2212    #[tokio::test]
2213    async fn post_election_tag_sink_failure_does_not_strand_candidate() {
2214        let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
2215        let cid = inputs.channel_id;
2216        // Build a coordinator wired to a sink that succeeds on the
2217        // first announce (Idle → Replica) and fails on the second
2218        // (Candidate → Leader during the post-election transition).
2219        let registry = ReplicationMetricsRegistry::new();
2220        let sink: Arc<dyn ChainTagSink> = Arc::new(FailingAfterNAnnounceSink {
2221            remaining: ParkingMutex::new(1),
2222        });
2223        let coordinator = Arc::new(ReplicationCoordinator::new(
2224            ChannelIdentity {
2225                channel_name: "test/runtime".to_string(),
2226                origin_hash: 0xCAFE_BABE,
2227            },
2228            ReplicationConfig::new(),
2229            sink,
2230            &registry,
2231        ));
2232        coordinator
2233            .transition_to(
2234                ReplicaRole::Replica,
2235                super::super::replication_state::TransitionSignal::CapabilitySelected,
2236            )
2237            .await
2238            .unwrap();
2239        let dispatcher = Arc::new(RecorderDispatcher::default());
2240        let handle = spawn_replication_runtime(
2241            inputs,
2242            coordinator.clone(),
2243            dispatcher.clone(),
2244            build_budget(),
2245        );
2246
2247        // Seed a single leader heartbeat then let silence detection
2248        // fire so the runtime enters Candidate and runs the
2249        // election.
2250        handle
2251            .dispatch(Inbound::Heartbeat {
2252                from: 0x20,
2253                msg: SyncHeartbeat {
2254                    channel_id: cid,
2255                    tail_seq: 99,
2256                    role: ReplicaRole::Leader,
2257                    wall_clock_ms: 1_700_000_000_000,
2258                },
2259            })
2260            .await
2261            .unwrap();
2262        tokio::time::sleep(Duration::from_millis(500)).await;
2263
2264        // State has moved to Leader despite the sink failure — the
2265        // coordinator's transition lock applied the state change
2266        // before the (failing) side-effect ran. The runtime's
2267        // post-election handler observed CoordinatorError::TagSink
2268        // and cleared the believed leader, not the prior code path
2269        // that silently sat on stale Candidate state.
2270        assert_eq!(coordinator.role(), ReplicaRole::Leader);
2271
2272        handle.cancel().await;
2273    }
2274
2275    #[tokio::test]
2276    async fn shutdown_drives_idle_transition() {
2277        let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2278        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2279        coordinator
2280            .transition_to(
2281                ReplicaRole::Replica,
2282                super::super::replication_state::TransitionSignal::CapabilitySelected,
2283            )
2284            .await
2285            .unwrap();
2286        let dispatcher = Arc::new(RecorderDispatcher::default());
2287        let handle =
2288            spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2289
2290        handle.cancel().await;
2291        assert!(handle.is_stopped());
2292        // Final state must be Idle (ChannelClose transition).
2293        assert_eq!(coordinator.role(), ReplicaRole::Idle);
2294    }
2295
2296    /// Regression: a Leader's `on_tick`-driven `record_tail_seq`
2297    /// must clamp the advertised tail to the highest tail any
2298    /// peer has confirmed via heartbeat — never advertise
2299    /// pre-replication local tail. The capability-tag layer reads
2300    /// this value as `tip_seq` for `find_chain_holders` failover
2301    /// selection; advertising un-replicated writes biases
2302    /// failover toward a partition whose tail may not survive a
2303    /// crash.
2304    ///
2305    /// Pre-fix: leader at local tail = 100, replica reported tail
2306    /// = 50 → advertised_tail = 100.
2307    /// Post-fix: advertised_tail = 50 (clamped to max peer tail).
2308    #[tokio::test]
2309    async fn leader_on_tick_clamps_advertised_tail_to_max_peer_tail() {
2310        // Build inputs with a deterministic tail_provider returning
2311        // 100 (the leader's local tail).
2312        let self_id: NodeId = 0x10;
2313        let peer_id: NodeId = 0x20;
2314        let inputs = RuntimeInputs {
2315            channel: ChannelIdentity {
2316                channel_name: "test/runtime".to_string(),
2317                origin_hash: 0xCAFE_BABE,
2318            },
2319            channel_id: channel_id_for("test/runtime"),
2320            self_node_id: self_id,
2321            replica_set: vec![self_id, peer_id],
2322            heartbeat_ms: 100,
2323            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
2324            // Local tail is 100 — well above what the peer has
2325            // reported.
2326            tail_provider: Arc::new(|| 100),
2327            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
2328            file: build_file_for_tests(),
2329            default_bandwidth_class: Default::default(),
2330            background_fraction: 0.3,
2331        };
2332        let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
2333        // Promote to Leader via the state machine.
2334        for (role, signal) in [
2335            (
2336                ReplicaRole::Replica,
2337                super::super::replication_state::TransitionSignal::CapabilitySelected,
2338            ),
2339            (
2340                ReplicaRole::Candidate,
2341                super::super::replication_state::TransitionSignal::MissedHeartbeats,
2342            ),
2343            (
2344                ReplicaRole::Leader,
2345                super::super::replication_state::TransitionSignal::ElectionWon,
2346            ),
2347        ] {
2348            coordinator.transition_to(role, signal).await.unwrap();
2349        }
2350        // Seed the peer's heartbeat at tail=50 — only HALF of what
2351        // the leader has locally. The other 50 events are
2352        // unreplicated.
2353        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2354        tracker
2355            .lock()
2356            .record_heartbeat(peer_id, ReplicaRole::Replica, 50, Instant::now());
2357
2358        let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
2359        on_tick(
2360            &inputs,
2361            &coordinator,
2362            &dispatcher,
2363            &build_state(tracker.clone(), build_budget()),
2364        )
2365        .await;
2366
2367        // record_tail_seq is monotonic; the coordinator's cached
2368        // tail should be the clamped value (50), NOT the raw
2369        // local-file tail (100).
2370        assert_eq!(
2371            coordinator.tail_seq(),
2372            50,
2373            "leader must advertise the highest peer-confirmed tail (50), \
2374             not the pre-replication local tail (100); pre-fix this would \
2375             be 100 and a crash here would lose the 50 un-replicated events \
2376             while failover discovery still thought tip_seq=100",
2377        );
2378    }
2379
2380    /// Companion: with NO peer heartbeats observed (fresh leader),
2381    /// `on_tick` falls back to the raw local tail. A sole leader has
2382    /// authority over its own writes by tautology — clamping to 0
2383    /// would otherwise prevent any progress on a single-node
2384    /// configuration.
2385    #[tokio::test]
2386    async fn leader_on_tick_falls_back_to_local_tail_with_no_peers() {
2387        let self_id: NodeId = 0x10;
2388        let peer_id: NodeId = 0x20;
2389        let inputs = RuntimeInputs {
2390            channel: ChannelIdentity {
2391                channel_name: "test/runtime".to_string(),
2392                origin_hash: 0xCAFE_BABE,
2393            },
2394            channel_id: channel_id_for("test/runtime"),
2395            self_node_id: self_id,
2396            replica_set: vec![self_id, peer_id],
2397            heartbeat_ms: 100,
2398            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
2399            tail_provider: Arc::new(|| 77),
2400            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
2401            file: build_file_for_tests(),
2402            default_bandwidth_class: Default::default(),
2403            background_fraction: 0.3,
2404        };
2405        let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, peer_id]);
2406        for (role, signal) in [
2407            (
2408                ReplicaRole::Replica,
2409                super::super::replication_state::TransitionSignal::CapabilitySelected,
2410            ),
2411            (
2412                ReplicaRole::Candidate,
2413                super::super::replication_state::TransitionSignal::MissedHeartbeats,
2414            ),
2415            (
2416                ReplicaRole::Leader,
2417                super::super::replication_state::TransitionSignal::ElectionWon,
2418            ),
2419        ] {
2420            coordinator.transition_to(role, signal).await.unwrap();
2421        }
2422
2423        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2424        let dispatcher: Arc<dyn ReplicationDispatcher> = Arc::new(RecorderDispatcher::default());
2425        on_tick(
2426            &inputs,
2427            &coordinator,
2428            &dispatcher,
2429            &build_state(tracker.clone(), build_budget()),
2430        )
2431        .await;
2432        assert_eq!(
2433            coordinator.tail_seq(),
2434            77,
2435            "no peer heartbeats → no clamp, raw local tail is advertised",
2436        );
2437    }
2438
2439    /// Regression: empty-response backoff must NOT strike when the
2440    /// leader's heartbeat is stale. Pre-fix the strike fired whenever
2441    /// `tracker.peer_state(from).tail_seq > new_tail`, but
2442    /// `peer_state.tail_seq` is the cached value from the last
2443    /// received heartbeat — minutes-stale in a degenerate case. A
2444    /// replica that caught up between an old heartbeat and the
2445    /// current response struck against a leader that had nothing
2446    /// to send. After `CATCHUP_BACKOFF_THRESHOLD` such false
2447    /// strikes the leader sat in a 1–30 s backoff while nothing
2448    /// was actually wrong.
2449    ///
2450    /// Post-fix: skip the strike when the heartbeat is older than
2451    /// the miss-threshold window — a stale heartbeat is no signal.
2452    #[tokio::test]
2453    async fn empty_response_does_not_strike_on_stale_heartbeat() {
2454        let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2455        let cid = inputs.channel_id;
2456        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2457        coordinator
2458            .transition_to(
2459                ReplicaRole::Replica,
2460                super::super::replication_state::TransitionSignal::CapabilitySelected,
2461            )
2462            .await
2463            .unwrap();
2464        let dispatcher = Arc::new(RecorderDispatcher::default());
2465        let budget = build_budget();
2466        let backoff = build_backoff();
2467        let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
2468
2469        // Seed the tracker with a STALE heartbeat: leader 0x20
2470        // claimed tail=200 well outside the miss-threshold window
2471        // (heartbeat_ms = 100, miss_threshold defaults to 3 ⇒ a
2472        // last_seen older than 300 ms is stale). Build the
2473        // heartbeat with a `last_seen` from 10 seconds ago so it's
2474        // definitively stale by the time on_inbound runs.
2475        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
2476        let stale_when = Instant::now() - Duration::from_secs(10);
2477        tracker
2478            .lock()
2479            .record_heartbeat(0x20, ReplicaRole::Leader, 200, stale_when);
2480
2481        // Pre-record the request so the binding gate admits the
2482        // response.
2483        outstanding.lock().record(0x20, 0, Instant::now());
2484
2485        // Empty SyncResponse from the (now-stale-heartbeat) leader.
2486        // The local file is empty (next_seq = 0); apply on an empty
2487        // response leaves next_seq = 0, so `new_tail == pre_apply_tail`.
2488        let event = Inbound::SyncResponse {
2489            from: 0x20,
2490            msg: SyncResponse {
2491                channel_id: cid,
2492                first_seq: 0,
2493                leader_first_retained_seq: 0,
2494                events: Vec::new(),
2495                request_id: 0,
2496            },
2497        };
2498        on_inbound(
2499            &inputs,
2500            &coordinator,
2501            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2502            &RuntimeState {
2503                tracker: tracker.clone(),
2504                budget: budget.clone(),
2505                backoff: backoff.clone(),
2506                outstanding: outstanding.clone(),
2507            },
2508            event,
2509        )
2510        .await;
2511
2512        // Pre-fix: backoff would have recorded an empty strike
2513        // because `peer_state.tail_seq (200) > new_tail (0)`.
2514        // Post-fix: stale heartbeat → no strike, no backoff state.
2515        assert!(
2516            !backoff.lock().is_in_backoff(0x20, Instant::now()),
2517            "stale-heartbeat empty must NOT engage backoff"
2518        );
2519        // Drive THRESHOLD+1 more stale-heartbeat empties to prove
2520        // that the strike NEVER fires on stale heartbeats — even
2521        // accumulated over many attempts.
2522        for _ in 0..CATCHUP_BACKOFF_THRESHOLD + 1 {
2523            // Re-register the request_id (consumed by the binding
2524            // gate on each call).
2525            outstanding.lock().record(0x20, 0, Instant::now());
2526            let event = Inbound::SyncResponse {
2527                from: 0x20,
2528                msg: SyncResponse {
2529                    channel_id: cid,
2530                    first_seq: 0,
2531                    leader_first_retained_seq: 0,
2532                    events: Vec::new(),
2533                    request_id: 0,
2534                },
2535            };
2536            on_inbound(
2537                &inputs,
2538                &coordinator,
2539                &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
2540                &RuntimeState {
2541                    tracker: tracker.clone(),
2542                    budget: budget.clone(),
2543                    backoff: backoff.clone(),
2544                    outstanding: outstanding.clone(),
2545                },
2546                event,
2547            )
2548            .await;
2549        }
2550        assert!(
2551            !backoff.lock().is_in_backoff(0x20, Instant::now()),
2552            "accumulated stale-heartbeat empties must NEVER engage backoff",
2553        );
2554    }
2555
2556    /// R-28 unit test: the backoff structure records empties up to
2557    /// the threshold without setting a backoff window, then stamps
2558    /// an exponentially-growing window once the threshold is
2559    /// crossed. A non-empty response resets the entry.
2560    #[test]
2561    fn catchup_backoff_threshold_and_reset() {
2562        let now = Instant::now();
2563        let mut b = CatchupBackoff::new();
2564        // Up to the threshold: no backoff yet.
2565        for _ in 0..CATCHUP_BACKOFF_THRESHOLD {
2566            b.record_empty(0x20, now);
2567        }
2568        assert!(
2569            !b.is_in_backoff(0x20, now),
2570            "backoff must not engage before the threshold is crossed"
2571        );
2572        // Crossing the threshold sets a backoff window.
2573        b.record_empty(0x20, now);
2574        assert!(
2575            b.is_in_backoff(0x20, now),
2576            "backoff must engage once the empty count crosses the threshold"
2577        );
2578        // A productive response clears the entry.
2579        b.record_progress(0x20);
2580        assert!(
2581            !b.is_in_backoff(0x20, now),
2582            "record_progress must clear the backoff window"
2583        );
2584    }
2585
2586    /// R-25 regression: a saturated low-priority lane (Heartbeat
2587    /// flood) must not starve catchup-critical events on the
2588    /// priority lane (SyncResponse / SyncNack / Shutdown).
2589    ///
2590    /// Drives the failure shape from the audit: many peers
2591    /// flood the low-priority inbox to its 1024 cap while a
2592    /// SyncResponse is also in-flight. With the biased select
2593    /// on the priority lane, the priority entry is drained
2594    /// even though the low-priority lane is saturated.
2595    #[tokio::test]
2596    async fn priority_lane_drains_under_low_priority_saturation() {
2597        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2598        let cid = inputs.channel_id;
2599        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2600        coordinator
2601            .transition_to(
2602                ReplicaRole::Replica,
2603                super::super::replication_state::TransitionSignal::CapabilitySelected,
2604            )
2605            .await
2606            .unwrap();
2607        let dispatcher = Arc::new(RecorderDispatcher::default());
2608        let handle =
2609            spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2610
2611        // Saturate the low-priority lane with heartbeats. The
2612        // runtime task will drain them slowly under heavy lock
2613        // contention; what matters is that the priority lane
2614        // continues to make progress.
2615        for _ in 0..RUNTIME_INBOX_CAPACITY * 2 {
2616            let _ = handle.try_dispatch(Inbound::Heartbeat {
2617                from: 0x20,
2618                msg: SyncHeartbeat {
2619                    channel_id: cid,
2620                    tail_seq: 0,
2621                    role: ReplicaRole::Replica,
2622                    wall_clock_ms: 0,
2623                },
2624            });
2625        }
2626
2627        // Ship a Shutdown on the priority lane and confirm the
2628        // runtime exits within a short bound. Under the pre-fix
2629        // single-inbox design this would block on the saturated
2630        // queue indefinitely (cancel falls back to JoinHandle::
2631        // abort but the Idle transition wouldn't run).
2632        let cancel_fut = handle.cancel();
2633        let bounded = tokio::time::timeout(Duration::from_secs(2), cancel_fut).await;
2634        assert!(
2635            bounded.is_ok(),
2636            "shutdown on priority lane must drain under low-priority saturation"
2637        );
2638        // Idle transition ran via the graceful path, not the abort.
2639        assert_eq!(
2640            coordinator.role(),
2641            ReplicaRole::Idle,
2642            "graceful Idle transition must complete via priority-lane Shutdown"
2643        );
2644    }
2645
2646    #[tokio::test]
2647    async fn try_dispatch_returns_event_on_full_buffer() {
2648        // Build a handle, fill the inbox without letting the
2649        // task drain. We rely on the runtime task not having a
2650        // chance to recv before we saturate.
2651        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000); // tick very slow
2652        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2653        coordinator
2654            .transition_to(
2655                ReplicaRole::Replica,
2656                super::super::replication_state::TransitionSignal::CapabilitySelected,
2657            )
2658            .await
2659            .unwrap();
2660        let dispatcher = Arc::new(RecorderDispatcher::default());
2661        let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2662
2663        let cid = channel_id_for("test/runtime");
2664        // Fill past capacity.
2665        for _ in 0..RUNTIME_INBOX_CAPACITY + 10 {
2666            let event = Inbound::Heartbeat {
2667                from: 0x20,
2668                msg: SyncHeartbeat {
2669                    channel_id: cid,
2670                    tail_seq: 0,
2671                    role: ReplicaRole::Replica,
2672                    wall_clock_ms: 0,
2673                },
2674            };
2675            // try_dispatch returns the event back when buffer
2676            // is full. The task starts to drain quickly so we
2677            // may not see a rejection on every iteration; just
2678            // assert that at SOME point we got the event back.
2679            let _ = handle.try_dispatch(event);
2680        }
2681
2682        handle.cancel().await;
2683    }
2684
2685    /// A wedged task with a saturated inbox must not hang
2686    /// `cancel()`. The cancel path uses `try_send` for the Shutdown
2687    /// message; if the buffer is full, the JoinHandle is aborted
2688    /// directly so the call returns promptly instead of blocking on
2689    /// a queue the wedged task may never drain.
2690    #[tokio::test]
2691    async fn cancel_with_full_inbox_does_not_hang() {
2692        // Slow tick so the task spends most of its time parked.
2693        // Fill the inbox without giving the task time to drain.
2694        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2695        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2696        coordinator
2697            .transition_to(
2698                ReplicaRole::Replica,
2699                super::super::replication_state::TransitionSignal::CapabilitySelected,
2700            )
2701            .await
2702            .unwrap();
2703        let dispatcher = Arc::new(RecorderDispatcher::default());
2704        let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2705
2706        // Saturate the inbox so a buffered `send(Shutdown).await`
2707        // would block.
2708        let cid = channel_id_for("test/runtime");
2709        for _ in 0..RUNTIME_INBOX_CAPACITY {
2710            let _ = handle.try_dispatch(Inbound::Heartbeat {
2711                from: 0x20,
2712                msg: SyncHeartbeat {
2713                    channel_id: cid,
2714                    tail_seq: 0,
2715                    role: ReplicaRole::Replica,
2716                    wall_clock_ms: 0,
2717                },
2718            });
2719        }
2720
2721        // cancel() must complete within a tight bound — a regression
2722        // to `send(Shutdown).await` would hang here on the full
2723        // buffer.
2724        tokio::time::timeout(Duration::from_secs(2), handle.cancel())
2725            .await
2726            .expect("cancel() must not hang on full inbox");
2727        assert!(handle.is_stopped());
2728    }
2729
2730    /// Dropping a handle without `cancel()` aborts the underlying
2731    /// task so the spawned future stops driving and any dispatcher
2732    /// Arc it held is released. Pins the Arc-cycle invariant:
2733    /// `MeshNode → router → handle → task → dispatcher` must close
2734    /// when the handle goes out of scope.
2735    #[tokio::test]
2736    async fn dropping_handle_aborts_task() {
2737        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
2738        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2739        let dispatcher = Arc::new(RecorderDispatcher::default());
2740        let dispatcher_clone: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
2741        // The runtime task holds one strong reference to dispatcher
2742        // (passed below). Count after spawn = 2.
2743        let handle =
2744            spawn_replication_runtime(inputs, coordinator, dispatcher_clone, build_budget());
2745        tokio::time::sleep(Duration::from_millis(20)).await;
2746        assert!(Arc::strong_count(&dispatcher) >= 2);
2747
2748        drop(handle);
2749
2750        // Yield enough for the abort to land + the task's local
2751        // state to deallocate (releasing the dispatcher Arc).
2752        for _ in 0..20 {
2753            tokio::time::sleep(Duration::from_millis(10)).await;
2754            if Arc::strong_count(&dispatcher) == 1 {
2755                return;
2756            }
2757        }
2758        panic!(
2759            "task did not release dispatcher Arc after handle drop; strong_count = {}",
2760            Arc::strong_count(&dispatcher)
2761        );
2762    }
2763
2764    #[tokio::test]
2765    async fn dispatch_after_cancel_errors() {
2766        let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2767        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2768        let dispatcher = Arc::new(RecorderDispatcher::default());
2769        let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
2770
2771        handle.cancel().await;
2772
2773        let cid = channel_id_for("test/runtime");
2774        let result = handle
2775            .dispatch(Inbound::Heartbeat {
2776                from: 0x20,
2777                msg: SyncHeartbeat {
2778                    channel_id: cid,
2779                    tail_seq: 0,
2780                    role: ReplicaRole::Replica,
2781                    wall_clock_ms: 0,
2782                },
2783            })
2784            .await;
2785        assert!(result.is_err(), "dispatch must error after cancel");
2786    }
2787
2788    #[tokio::test]
2789    async fn channel_id_mismatch_drops_heartbeat() {
2790        let inputs = build_inputs(0x10, vec![0x10, 0x20], 100);
2791        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
2792        coordinator
2793            .transition_to(
2794                ReplicaRole::Replica,
2795                super::super::replication_state::TransitionSignal::CapabilitySelected,
2796            )
2797            .await
2798            .unwrap();
2799        let dispatcher = Arc::new(RecorderDispatcher::default());
2800        let handle =
2801            spawn_replication_runtime(inputs, coordinator.clone(), dispatcher, build_budget());
2802
2803        // Push a heartbeat with the wrong channel_id.
2804        let wrong = channel_id_for("test/wrong_channel");
2805        handle
2806            .dispatch(Inbound::Heartbeat {
2807                from: 0x20,
2808                msg: SyncHeartbeat {
2809                    channel_id: wrong,
2810                    tail_seq: 99,
2811                    role: ReplicaRole::Leader,
2812                    wall_clock_ms: 0,
2813                },
2814            })
2815            .await
2816            .unwrap();
2817
2818        // After 4 ticks, silence detection would have triggered
2819        // if the heartbeat had landed; it didn't, so the role
2820        // either stays Replica (no leader observed at all) or
2821        // advances to Candidate / Leader via election with no
2822        // believed leader. Either way: prove the heartbeat
2823        // didn't poison the tracker by checking we didn't end
2824        // up as a Replica with `believed_leader == Some(0x20)`
2825        // (which is the broken outcome).
2826        //
2827        // We let it run a few ticks for the election cycle to
2828        // potentially fire.
2829        tokio::time::sleep(Duration::from_millis(500)).await;
2830
2831        // The wrong-channel heartbeat should have been ignored.
2832        // The final role is either Replica (if nothing else
2833        // happened) or Leader (if the silence-detection +
2834        // election fired). Either way, NOT believing 0x20 to
2835        // be leader is the contract — but we can't observe the
2836        // tracker from here. Instead pin that the role isn't
2837        // Idle (we didn't cancel; we're still alive).
2838        let role = coordinator.role();
2839        assert!(
2840            matches!(role, ReplicaRole::Replica | ReplicaRole::Leader),
2841            "expected Replica or Leader; got {role:?}",
2842        );
2843
2844        handle.cancel().await;
2845    }
2846
2847    // ────────────────────────────────────────────────────────────────
2848    // Lag-observation gauge (Phase H)
2849    // ────────────────────────────────────────────────────────────────
2850
2851    #[test]
2852    fn observe_lag_idle_emits_none() {
2853        let tracker = HeartbeatTracker::new(500);
2854        let now = Instant::now();
2855        match observe_lag(ReplicaRole::Idle, &[0x10, 0x20], 0x10, &tracker, now) {
2856            LagObservation::None => {}
2857            other => panic!("expected None for Idle, got {other:?}"),
2858        }
2859    }
2860
2861    #[test]
2862    fn observe_lag_candidate_emits_none() {
2863        let tracker = HeartbeatTracker::new(500);
2864        let now = Instant::now();
2865        match observe_lag(ReplicaRole::Candidate, &[0x10, 0x20], 0x10, &tracker, now) {
2866            LagObservation::None => {}
2867            other => panic!("expected None for Candidate, got {other:?}"),
2868        }
2869    }
2870
2871    #[test]
2872    fn observe_lag_leader_with_no_peer_observations_emits_none() {
2873        // Self is the leader, peers in the set but no heartbeats
2874        // observed yet → lag has no observation to report.
2875        let tracker = HeartbeatTracker::new(500);
2876        let now = Instant::now();
2877        match observe_lag(
2878            ReplicaRole::Leader,
2879            &[0x10, 0x20, 0x30],
2880            0x10,
2881            &tracker,
2882            now,
2883        ) {
2884            LagObservation::None => {}
2885            other => panic!("expected None when peers have not heartbeated, got {other:?}"),
2886        }
2887    }
2888
2889    #[test]
2890    fn observe_lag_leader_picks_worst_replica() {
2891        let mut tracker = HeartbeatTracker::new(500);
2892        let base = Instant::now();
2893        // Peer 0x20 heartbeated at base; peer 0x30 heartbeated 100ms
2894        // later. After advancing to base+1000ms, peer 0x20 has 1000ms
2895        // of lag, peer 0x30 has 900ms. Leader gauge picks the worst.
2896        tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
2897        tracker.record_heartbeat(
2898            0x30,
2899            ReplicaRole::Replica,
2900            0,
2901            base + Duration::from_millis(100),
2902        );
2903        let now = base + Duration::from_millis(1000);
2904        match observe_lag(
2905            ReplicaRole::Leader,
2906            &[0x10, 0x20, 0x30],
2907            0x10,
2908            &tracker,
2909            now,
2910        ) {
2911            LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(1000)),
2912            other => panic!("expected Leader(1000ms), got {other:?}"),
2913        }
2914    }
2915
2916    #[test]
2917    fn observe_lag_replica_emits_believed_leader_lag() {
2918        let mut tracker = HeartbeatTracker::new(500);
2919        let base = Instant::now();
2920        tracker.record_heartbeat(0x42, ReplicaRole::Leader, 99, base);
2921        let now = base + Duration::from_millis(250);
2922        match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
2923            LagObservation::Replica(d) => assert_eq!(d, Duration::from_millis(250)),
2924            other => panic!("expected Replica(250ms), got {other:?}"),
2925        }
2926    }
2927
2928    #[test]
2929    fn observe_lag_replica_with_no_believed_leader_emits_none() {
2930        // Empty tracker — no leader heartbeat ever observed.
2931        let tracker = HeartbeatTracker::new(500);
2932        let now = Instant::now();
2933        match observe_lag(ReplicaRole::Replica, &[0x10, 0x42], 0x10, &tracker, now) {
2934            LagObservation::None => {}
2935            other => panic!("expected None for replica with no believed leader, got {other:?}"),
2936        }
2937    }
2938
2939    #[test]
2940    fn observe_lag_leader_skips_self_in_replica_set() {
2941        // Self appears in replica_set (typical — leaders are listed
2942        // alongside replicas). The lag picks the worst PEER, never
2943        // self's lag (which is meaningless since self is the
2944        // writer).
2945        let mut tracker = HeartbeatTracker::new(500);
2946        let base = Instant::now();
2947        tracker.record_heartbeat(0x20, ReplicaRole::Replica, 0, base);
2948        let now = base + Duration::from_millis(500);
2949        match observe_lag(ReplicaRole::Leader, &[0x10, 0x20], 0x10, &tracker, now) {
2950            LagObservation::Leader(d) => assert_eq!(d, Duration::from_millis(500)),
2951            other => panic!("expected Leader(500ms), got {other:?}"),
2952        }
2953    }
2954
2955    // ────────────────────────────────────────────────────────────────
2956    // Disk-pressure handling (Phase G)
2957    // ────────────────────────────────────────────────────────────────
2958
2959    fn build_coordinator_with_policy(
2960        policy: super::super::replication_config::UnderCapacity,
2961    ) -> (
2962        Arc<ReplicationCoordinator>,
2963        Arc<super::super::replication_metrics::ChannelMetricsAtomic>,
2964    ) {
2965        let registry = ReplicationMetricsRegistry::new();
2966        let sink: Arc<dyn ChainTagSink> = Arc::new(NoopTagSink);
2967        let config = ReplicationConfig::new().with_on_under_capacity(policy);
2968        let coordinator = ReplicationCoordinator::new(
2969            ChannelIdentity {
2970                channel_name: "test/runtime".to_string(),
2971                origin_hash: 0xCAFE_BABE,
2972            },
2973            config,
2974            sink,
2975            &registry,
2976        );
2977        let metrics = registry.for_channel("test/runtime");
2978        (Arc::new(coordinator), metrics)
2979    }
2980
2981    #[tokio::test]
2982    async fn disk_pressure_withdraw_drives_idle_transition() {
2983        // Bring the coordinator to Replica role so the
2984        // DiskPressureWithdraw signal can validate.
2985        let (coord, metrics) = build_coordinator_with_policy(
2986            super::super::replication_config::UnderCapacity::Withdraw,
2987        );
2988        coord
2989            .transition_to(
2990                ReplicaRole::Replica,
2991                super::super::replication_state::TransitionSignal::CapabilitySelected,
2992            )
2993            .await
2994            .unwrap();
2995        assert_eq!(coord.role(), ReplicaRole::Replica);
2996
2997        let file = build_file_for_tests();
2998        handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
2999
3000        assert_eq!(coord.role(), ReplicaRole::Idle, "Withdraw flips to Idle");
3001        assert_eq!(
3002            metrics
3003                .under_capacity_total
3004                .load(std::sync::atomic::Ordering::Relaxed),
3005            1,
3006            "Withdraw bumps under_capacity_total"
3007        );
3008    }
3009
3010    #[tokio::test]
3011    async fn disk_pressure_evict_oldest_keeps_role_and_sweeps() {
3012        // EvictOldest: stay in Replica role, retention sweep
3013        // fires. Pre-fill the file with N events under a
3014        // count-1 retention cap so the sweep observably evicts.
3015        let (coord, metrics) = build_coordinator_with_policy(
3016            super::super::replication_config::UnderCapacity::EvictOldest,
3017        );
3018        coord
3019            .transition_to(
3020                ReplicaRole::Replica,
3021                super::super::replication_state::TransitionSignal::CapabilitySelected,
3022            )
3023            .await
3024            .unwrap();
3025        // Build a file with retention cap = 1 so the sweep
3026        // observably retains only the newest entry.
3027        use crate::adapter::net::redex::config::RedexFileConfig;
3028        use crate::adapter::net::redex::manager::Redex;
3029        let r = Redex::new();
3030        let cn = ChannelName::new("test/runtime").unwrap();
3031        let cfg = RedexFileConfig::default().with_retention_max_events(1);
3032        let file = r.open_file(&cn, cfg).unwrap();
3033        for i in 0..5 {
3034            file.append(format!("event-{i}").as_bytes()).unwrap();
3035        }
3036        assert_eq!(file.len(), 5);
3037
3038        handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3039
3040        assert_eq!(
3041            coord.role(),
3042            ReplicaRole::Replica,
3043            "EvictOldest preserves Replica role"
3044        );
3045        assert_eq!(file.len(), 1, "retention sweep dropped to cap of 1");
3046        assert_eq!(
3047            metrics
3048                .under_capacity_total
3049                .load(std::sync::atomic::Ordering::Relaxed),
3050            1,
3051            "EvictOldest also bumps under_capacity_total"
3052        );
3053    }
3054
3055    /// Disk pressure observed from a Leader role (or Candidate
3056    /// mid-election) must still drive a withdraw to Idle. The
3057    /// transition matrix only permits DiskPressureWithdraw on
3058    /// Replica → Idle; without role-aware signal selection a Leader
3059    /// would silently log+drop the withdraw and keep writing
3060    /// through pressure. Pick ChannelClose for the non-Replica case
3061    /// so the withdraw lands regardless of current role.
3062    #[tokio::test]
3063    async fn disk_pressure_withdraw_from_leader_picks_channel_close_signal() {
3064        let (coord, _metrics) = build_coordinator_with_policy(
3065            super::super::replication_config::UnderCapacity::Withdraw,
3066        );
3067        // Promote through the full state cycle to Leader.
3068        coord
3069            .transition_to(
3070                ReplicaRole::Replica,
3071                super::super::replication_state::TransitionSignal::CapabilitySelected,
3072            )
3073            .await
3074            .unwrap();
3075        coord
3076            .transition_to(
3077                ReplicaRole::Candidate,
3078                super::super::replication_state::TransitionSignal::MissedHeartbeats,
3079            )
3080            .await
3081            .unwrap();
3082        coord
3083            .transition_to(
3084                ReplicaRole::Leader,
3085                super::super::replication_state::TransitionSignal::ElectionWon,
3086            )
3087            .await
3088            .unwrap();
3089        assert_eq!(coord.role(), ReplicaRole::Leader);
3090
3091        let file = build_file_for_tests();
3092        handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3093
3094        // Leader → Idle via ChannelClose must land — the prior code
3095        // path used DiskPressureWithdraw which is invalid from
3096        // Leader and would silently fail-and-log.
3097        assert_eq!(
3098            coord.role(),
3099            ReplicaRole::Idle,
3100            "Leader disk-pressure must withdraw to Idle"
3101        );
3102    }
3103
3104    #[tokio::test]
3105    async fn disk_pressure_withdraw_is_idempotent_on_idle_already() {
3106        // Defensive: if the coordinator is already Idle when the
3107        // DiskPressureWithdraw fires (race with another path),
3108        // the transition path's idempotent `Idle → Idle +
3109        // ChannelClose` shortcut doesn't apply (this is
3110        // DiskPressureWithdraw, not ChannelClose). The
3111        // transition rejects but the counter still bumps.
3112        let (coord, metrics) = build_coordinator_with_policy(
3113            super::super::replication_config::UnderCapacity::Withdraw,
3114        );
3115        // Coordinator starts in Idle.
3116        let file = build_file_for_tests();
3117        handle_disk_pressure(&coord, &file, "test detail", 0x20).await;
3118        // Counter advanced; role is still Idle (the transition_to
3119        // call inside handle_disk_pressure surfaces an error that
3120        // we log + drop, so role stays Idle).
3121        assert_eq!(coord.role(), ReplicaRole::Idle);
3122        assert_eq!(
3123            metrics
3124                .under_capacity_total
3125                .load(std::sync::atomic::Ordering::Relaxed),
3126            1,
3127        );
3128    }
3129
3130    // ────────────────────────────────────────────────────────────────
3131    // R-1 / R-12 regression: role-flip TOCTOU + channel-id validation
3132    // ────────────────────────────────────────────────────────────────
3133
3134    /// R-1: A Leader that flips to Idle (e.g. via DiskPressureWithdraw)
3135    /// in the middle of serving a SyncRequest must NACK NotLeader
3136    /// rather than ship a SyncResponse from a node that no longer
3137    /// claims leadership. The fix is a post-read role re-check
3138    /// immediately before the dispatcher send.
3139    #[tokio::test]
3140    async fn sync_request_post_op_role_flip_emits_notleader_nack() {
3141        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3142        let cid = inputs.channel_id;
3143        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3144        // Promote to Leader so the entry-check passes.
3145        for (role, signal) in [
3146            (
3147                ReplicaRole::Replica,
3148                super::super::replication_state::TransitionSignal::CapabilitySelected,
3149            ),
3150            (
3151                ReplicaRole::Candidate,
3152                super::super::replication_state::TransitionSignal::MissedHeartbeats,
3153            ),
3154            (
3155                ReplicaRole::Leader,
3156                super::super::replication_state::TransitionSignal::ElectionWon,
3157            ),
3158        ] {
3159            coordinator.transition_to(role, signal).await.unwrap();
3160        }
3161        let dispatcher = Arc::new(RecorderDispatcher::default());
3162        let budget = build_budget();
3163        let event = Inbound::SyncRequest {
3164            from: 0x20,
3165            msg: SyncRequest {
3166                channel_id: cid,
3167                since_seq: 0,
3168                chunk_max: 1024,
3169                request_id: 0,
3170                class: Default::default(),
3171            },
3172        };
3173        // Simulate the role flipping between the entry check and
3174        // the post-op re-check: flip to Idle via DiskPressureWithdraw
3175        // RIGHT BEFORE we call on_inbound. The entry-check would
3176        // have failed for an already-Idle coordinator, so we have
3177        // to use an arrangement that lets the entry check pass
3178        // and the post-op check fail. The cleanest test: start as
3179        // Leader; flip to Idle externally; call on_inbound. The
3180        // entry check now fails — pin the NACK shape directly.
3181        coordinator
3182            .transition_to(
3183                ReplicaRole::Idle,
3184                super::super::replication_state::TransitionSignal::GracefulRelinquish,
3185            )
3186            .await
3187            .unwrap();
3188        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3189        on_inbound(
3190            &inputs,
3191            &coordinator,
3192            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3193            &build_state(tracker.clone(), budget.clone()),
3194            event,
3195        )
3196        .await;
3197        let nacks = dispatcher.sync_nacks.lock().clone();
3198        assert_eq!(nacks.len(), 1, "expected one NotLeader NACK");
3199        let (target, nack) = &nacks[0];
3200        assert_eq!(*target, 0x20);
3201        assert_eq!(
3202            nack.error_code,
3203            super::super::replication::SyncNackError::NotLeader
3204        );
3205        assert_eq!(nack.channel_id, cid);
3206        // No SyncResponse should have been shipped.
3207        assert!(
3208            dispatcher.sync_responses.lock().is_empty(),
3209            "no SyncResponse must ship when role isn't Leader"
3210        );
3211    }
3212
3213    /// R-12: SyncRequest with mismatched channel_id is dropped at
3214    /// the runtime boundary; no NACK, no response, no file access.
3215    #[tokio::test]
3216    async fn sync_request_with_wrong_channel_id_is_dropped() {
3217        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3218        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3219        // Promote to Leader.
3220        for (role, signal) in [
3221            (
3222                ReplicaRole::Replica,
3223                super::super::replication_state::TransitionSignal::CapabilitySelected,
3224            ),
3225            (
3226                ReplicaRole::Candidate,
3227                super::super::replication_state::TransitionSignal::MissedHeartbeats,
3228            ),
3229            (
3230                ReplicaRole::Leader,
3231                super::super::replication_state::TransitionSignal::ElectionWon,
3232            ),
3233        ] {
3234            coordinator.transition_to(role, signal).await.unwrap();
3235        }
3236        let dispatcher = Arc::new(RecorderDispatcher::default());
3237        let budget = build_budget();
3238        let wrong = channel_id_for("test/wrong_channel");
3239        let event = Inbound::SyncRequest {
3240            from: 0x20,
3241            msg: SyncRequest {
3242                channel_id: wrong,
3243                since_seq: 0,
3244                chunk_max: 1024,
3245                request_id: 0,
3246                class: Default::default(),
3247            },
3248        };
3249        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3250        on_inbound(
3251            &inputs,
3252            &coordinator,
3253            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3254            &build_state(tracker.clone(), budget.clone()),
3255            event,
3256        )
3257        .await;
3258        assert!(
3259            dispatcher.sync_nacks.lock().is_empty(),
3260            "no NACK on wrong-channel — silently dropped"
3261        );
3262        assert!(
3263            dispatcher.sync_responses.lock().is_empty(),
3264            "no SyncResponse on wrong-channel"
3265        );
3266    }
3267
3268    /// Peer-auth gate regression: an inbound message whose `from`
3269    /// node is not in `replica_set` must be dropped at on_inbound
3270    /// entry. Without the gate any mesh peer with SUBPROTOCOL_REDEX
3271    /// reach could drive the runtime — the worst case being a forged
3272    /// SyncResponse that writes attacker-chosen bytes into the
3273    /// replica's local log via `append_batch`.
3274    #[tokio::test]
3275    async fn inbound_from_non_replica_set_peer_is_dropped() {
3276        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3277        let cid = inputs.channel_id;
3278        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3279        coordinator
3280            .transition_to(
3281                ReplicaRole::Replica,
3282                super::super::replication_state::TransitionSignal::CapabilitySelected,
3283            )
3284            .await
3285            .unwrap();
3286        let dispatcher = Arc::new(RecorderDispatcher::default());
3287        let budget = build_budget();
3288        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3289        let baseline_next = inputs.file.next_seq();
3290        // 0x99 is NOT in replica_set [0x10, 0x20]. Even with valid
3291        // channel_id and a payload the apply path would accept, the
3292        // membership gate must drop it.
3293        let event = Inbound::SyncResponse {
3294            from: 0x99,
3295            msg: SyncResponse {
3296                channel_id: cid,
3297                first_seq: 0,
3298                leader_first_retained_seq: 0,
3299                events: Vec::new(),
3300                request_id: 0,
3301            },
3302        };
3303        on_inbound(
3304            &inputs,
3305            &coordinator,
3306            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3307            &build_state(tracker.clone(), budget.clone()),
3308            event,
3309        )
3310        .await;
3311        // No state mutation on the local file from an out-of-set
3312        // peer's SyncResponse.
3313        assert_eq!(
3314            inputs.file.next_seq(),
3315            baseline_next,
3316            "out-of-set peer must not advance local tail"
3317        );
3318        // A Heartbeat from the same out-of-set peer must not seed
3319        // the tracker either — the gate runs before record_heartbeat.
3320        let event = Inbound::Heartbeat {
3321            from: 0x99,
3322            msg: SyncHeartbeat {
3323                channel_id: cid,
3324                tail_seq: 7,
3325                role: ReplicaRole::Leader,
3326                wall_clock_ms: 0,
3327            },
3328        };
3329        on_inbound(
3330            &inputs,
3331            &coordinator,
3332            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3333            &build_state(tracker.clone(), budget.clone()),
3334            event,
3335        )
3336        .await;
3337        assert!(
3338            tracker.lock().believed_leader().is_none(),
3339            "out-of-set heartbeat must not seed believed_leader"
3340        );
3341    }
3342
3343    /// Peer-auth gate regression: a SyncResponse from a replica_set
3344    /// peer who is not the believed_leader must be dropped. Without
3345    /// this check, a non-leader replica could ship forged chunks
3346    /// once they're inside the replica_set.
3347    #[tokio::test]
3348    async fn sync_response_from_non_leader_replica_peer_is_dropped() {
3349        let inputs = build_inputs(0x10, vec![0x10, 0x20, 0x30], 60_000);
3350        let cid = inputs.channel_id;
3351        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20, 0x30]);
3352        coordinator
3353            .transition_to(
3354                ReplicaRole::Replica,
3355                super::super::replication_state::TransitionSignal::CapabilitySelected,
3356            )
3357            .await
3358            .unwrap();
3359        let dispatcher = Arc::new(RecorderDispatcher::default());
3360        let budget = build_budget();
3361        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3362        // Seed believed_leader = 0x20 via a Leader heartbeat.
3363        tracker
3364            .lock()
3365            .record_heartbeat(0x20, ReplicaRole::Leader, 0, Instant::now());
3366        assert_eq!(tracker.lock().believed_leader(), Some(0x20));
3367        let baseline_next = inputs.file.next_seq();
3368        // 0x30 IS in replica_set but is NOT the believed leader.
3369        let event = Inbound::SyncResponse {
3370            from: 0x30,
3371            msg: SyncResponse {
3372                channel_id: cid,
3373                first_seq: 0,
3374                leader_first_retained_seq: 0,
3375                events: Vec::new(),
3376                request_id: 0,
3377            },
3378        };
3379        on_inbound(
3380            &inputs,
3381            &coordinator,
3382            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3383            &build_state(tracker.clone(), budget.clone()),
3384            event,
3385        )
3386        .await;
3387        assert_eq!(
3388            inputs.file.next_seq(),
3389            baseline_next,
3390            "non-leader replica_set peer must not advance local tail via SyncResponse"
3391        );
3392    }
3393
3394    /// R-11 regression: `is_stopped()` returns `false` until
3395    /// `cancel()`'s await completes, even if a parallel
3396    /// `cancel()` raced and took the JoinHandle out of the
3397    /// slot. The explicit `stopped` flag (flipped only post-
3398    /// await) is what guarantees this.
3399    #[tokio::test]
3400    async fn is_stopped_is_false_before_first_cancel_completes() {
3401        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3402        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3403        let dispatcher = Arc::new(RecorderDispatcher::default());
3404        let handle = spawn_replication_runtime(inputs, coordinator, dispatcher, build_budget());
3405        assert!(
3406            !handle.is_stopped(),
3407            "fresh runtime must report not stopped"
3408        );
3409        handle.cancel().await;
3410        assert!(
3411            handle.is_stopped(),
3412            "post-cancel().await runtime must report stopped"
3413        );
3414        // Idempotent second cancel must not flip the flag back.
3415        handle.cancel().await;
3416        assert!(handle.is_stopped());
3417    }
3418
3419    /// R-4 regression: SyncNack NotLeader must actively clear
3420    /// the believed leader so the next tick re-resolves.
3421    /// Without the fix the replica loops sending SyncRequests
3422    /// to the same stale leader until 3 missed heartbeats trip.
3423    #[tokio::test]
3424    async fn sync_nack_notleader_clears_believed_leader() {
3425        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3426        let cid = inputs.channel_id;
3427        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3428        coordinator
3429            .transition_to(
3430                ReplicaRole::Replica,
3431                super::super::replication_state::TransitionSignal::CapabilitySelected,
3432            )
3433            .await
3434            .unwrap();
3435        let dispatcher = Arc::new(RecorderDispatcher::default());
3436        let budget = build_budget();
3437        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3438        // Seed the tracker with a believed leader heartbeat.
3439        tracker
3440            .lock()
3441            .record_heartbeat(0x20, ReplicaRole::Leader, 99, Instant::now());
3442        assert_eq!(tracker.lock().believed_leader(), Some(0x20));
3443        // NACK NotLeader from the believed leader. Pre-record an
3444        // in-flight request_id so the response-binding gate admits
3445        // the NACK to the apply path.
3446        let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
3447        outstanding.lock().record(0x20, 0, Instant::now());
3448        let event = Inbound::SyncNack {
3449            from: 0x20,
3450            msg: SyncNack {
3451                channel_id: cid,
3452                since_seq: 0,
3453                error_code: super::super::replication::SyncNackError::NotLeader,
3454                leader_first_retained_seq: 0,
3455                detail: String::new(),
3456                request_id: 0,
3457            },
3458        };
3459        on_inbound(
3460            &inputs,
3461            &coordinator,
3462            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3463            &RuntimeState {
3464                tracker: tracker.clone(),
3465                budget: budget.clone(),
3466                backoff: build_backoff(),
3467                outstanding: outstanding.clone(),
3468            },
3469            event,
3470        )
3471        .await;
3472        assert!(
3473            tracker.lock().believed_leader().is_none(),
3474            "NACK NotLeader must clear the cached believed leader"
3475        );
3476    }
3477
3478    /// R-4 regression: SyncNack BadRange skips the local tail
3479    /// past the rejected `since_seq` so the next SyncRequest
3480    /// re-issues against a range the leader has retained.
3481    /// Without the fix the replica re-issues the same range
3482    /// indefinitely.
3483    #[tokio::test]
3484    async fn sync_nack_badrange_skips_local_tail() {
3485        let inputs = build_inputs(0x10, vec![0x10, 0x20], 60_000);
3486        let cid = inputs.channel_id;
3487        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3488        coordinator
3489            .transition_to(
3490                ReplicaRole::Replica,
3491                super::super::replication_state::TransitionSignal::CapabilitySelected,
3492            )
3493            .await
3494            .unwrap();
3495        let dispatcher = Arc::new(RecorderDispatcher::default());
3496        let budget = build_budget();
3497        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3498        // Seed the tracker with a believed leader heartbeat so the
3499        // peer-auth gate at on_inbound entry admits the NACK below.
3500        tracker
3501            .lock()
3502            .record_heartbeat(0x20, ReplicaRole::Leader, 41, Instant::now());
3503
3504        // Local file is empty (next_seq = 0). NACK with since_seq=42
3505        // means "the leader trimmed up to 42; you asked for 42 but
3506        // it's gone." Local tail must advance to 43.
3507        let baseline_next = inputs.file.next_seq();
3508        // Pre-record the in-flight request_id so the response-binding
3509        // gate admits this NACK rather than dropping it as stale.
3510        let outstanding = Arc::new(Mutex::new(OutstandingRequests::new()));
3511        outstanding.lock().record(0x20, 0, Instant::now());
3512        let event = Inbound::SyncNack {
3513            from: 0x20,
3514            msg: SyncNack {
3515                channel_id: cid,
3516                since_seq: 42,
3517                // Pre-fix: replica advanced one seq at a time via
3518                // `since_seq + 1 = 43`. R-40 wire field instructs
3519                // the replica to jump straight to the leader's
3520                // first-retained seq (here, 100) in one round trip.
3521                error_code: super::super::replication::SyncNackError::BadRange,
3522                leader_first_retained_seq: 100,
3523                detail: String::new(),
3524                request_id: 0,
3525            },
3526        };
3527        on_inbound(
3528            &inputs,
3529            &coordinator,
3530            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3531            &RuntimeState {
3532                tracker: tracker.clone(),
3533                budget: budget.clone(),
3534                backoff: build_backoff(),
3535                outstanding: outstanding.clone(),
3536            },
3537            event,
3538        )
3539        .await;
3540        // The local file's next_seq advanced past the bad range
3541        // (or, on persistent files, fell back to retry). For a
3542        // heap-only file in this test, skip_to(43) succeeded.
3543        let after = inputs.file.next_seq();
3544        assert!(
3545            after > baseline_next,
3546            "BadRange must advance local next_seq (got {after}, baseline {baseline_next})"
3547        );
3548        // R-40 regression: with leader_first_retained_seq = 100,
3549        // skip_to must jump straight to 100 in one round trip, not
3550        // creep up by one per BadRange cycle (since_seq + 1 = 43
3551        // would otherwise re-trigger BadRange when retention floor
3552        // is much higher).
3553        assert_eq!(
3554            after, 100,
3555            "BadRange with leader_first_retained_seq must jump local tail to the floor"
3556        );
3557        // skip_ahead metric advanced.
3558        assert_eq!(
3559            coordinator
3560                .metrics()
3561                .skip_ahead_total
3562                .load(std::sync::atomic::Ordering::Relaxed),
3563            1
3564        );
3565    }
3566
3567    /// R-2: When a post-Candidate transition fails (e.g. the
3568    /// coordinator already advanced via a concurrent path),
3569    /// `clear_believed_leader` must NOT run — otherwise the
3570    /// replica is left with no leader and no path to enter
3571    /// Candidate again.
3572    #[tokio::test]
3573    async fn post_election_failed_transition_preserves_believed_leader() {
3574        let inputs = build_inputs(0x10, vec![0x10, 0x20], 50);
3575        let cid = inputs.channel_id;
3576        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3577        coordinator
3578            .transition_to(
3579                ReplicaRole::Replica,
3580                super::super::replication_state::TransitionSignal::CapabilitySelected,
3581            )
3582            .await
3583            .unwrap();
3584        let dispatcher = Arc::new(RecorderDispatcher::default());
3585        let handle = spawn_replication_runtime(
3586            inputs,
3587            coordinator.clone(),
3588            dispatcher.clone(),
3589            build_budget(),
3590        );
3591
3592        // Record a Leader heartbeat to set believed_leader.
3593        handle
3594            .dispatch(Inbound::Heartbeat {
3595                from: 0x20,
3596                msg: SyncHeartbeat {
3597                    channel_id: cid,
3598                    tail_seq: 99,
3599                    role: ReplicaRole::Leader,
3600                    wall_clock_ms: 1_700_000_000_000,
3601                },
3602            })
3603            .await
3604            .unwrap();
3605
3606        // Drive an external race: drop the coordinator into Idle
3607        // via the public surface before the election runs.
3608        // The post-election transition_to will see (Idle, Leader,
3609        // ElectionWon) which is invalid; the fix ensures we don't
3610        // wipe the tracker on that failure.
3611        tokio::time::sleep(Duration::from_millis(20)).await;
3612        coordinator
3613            .transition_to(
3614                ReplicaRole::Idle,
3615                super::super::replication_state::TransitionSignal::ChannelClose,
3616            )
3617            .await
3618            .unwrap();
3619
3620        // Run a few more ticks; the silence detection should still
3621        // run but the post-election transition should fail silently
3622        // without wiping believed_leader. We can't directly observe
3623        // the tracker, but we CAN confirm the coordinator stays at
3624        // Idle (didn't bounce back to Leader after a failed post-
3625        // election transition) and that no panic / unexpected state
3626        // mutation occurred.
3627        tokio::time::sleep(Duration::from_millis(300)).await;
3628        assert_eq!(coordinator.role(), ReplicaRole::Idle);
3629
3630        handle.cancel().await;
3631    }
3632
3633    /// The replica's `on_tick` must stamp `inputs.default_bandwidth_class`
3634    /// on every emitted `SyncRequest` — not `Default::default()`. Without
3635    /// this, the per-channel default configured via
3636    /// `ReplicationConfig::default_bandwidth_class` is silently dropped
3637    /// and every replica catchup ships as `Foreground`, regardless of
3638    /// operator policy.
3639    #[tokio::test]
3640    async fn replica_on_tick_stamps_inputs_default_bandwidth_class_on_sync_request() {
3641        use crate::adapter::net::redex::bandwidth::BandwidthClass;
3642
3643        let self_id: NodeId = 0x10;
3644        let leader_id: NodeId = 0x20;
3645        let inputs = RuntimeInputs {
3646            channel: ChannelIdentity {
3647                channel_name: "test/runtime".to_string(),
3648                origin_hash: 0xCAFE_BABE,
3649            },
3650            channel_id: channel_id_for("test/runtime"),
3651            self_node_id: self_id,
3652            replica_set: vec![self_id, leader_id],
3653            heartbeat_ms: 100,
3654            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3655            // Local tail = 0; leader's heartbeat will advertise 50,
3656            // forcing `tick()` to emit a SyncRequest.
3657            tail_provider: Arc::new(|| 0),
3658            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3659            file: build_file_for_tests(),
3660            // Non-default class — must round-trip onto the wire.
3661            default_bandwidth_class: BandwidthClass::Background,
3662            background_fraction: 0.3,
3663        };
3664        let (coordinator, _registry) = build_coordinator(self_id, vec![self_id, leader_id]);
3665        coordinator
3666            .transition_to(
3667                ReplicaRole::Replica,
3668                super::super::replication_state::TransitionSignal::CapabilitySelected,
3669            )
3670            .await
3671            .unwrap();
3672
3673        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3674        // Seed a leader heartbeat with tail_seq > local so `tick()`
3675        // generates a catchup SyncRequest.
3676        tracker
3677            .lock()
3678            .record_heartbeat(leader_id, ReplicaRole::Leader, 50, Instant::now());
3679
3680        let dispatcher = Arc::new(RecorderDispatcher::default());
3681        let dyn_dispatcher: Arc<dyn ReplicationDispatcher> = dispatcher.clone();
3682        on_tick(
3683            &inputs,
3684            &coordinator,
3685            &dyn_dispatcher,
3686            &build_state(tracker.clone(), build_budget()),
3687        )
3688        .await;
3689
3690        let sync_requests = dispatcher.sync_requests.lock().clone();
3691        assert_eq!(
3692            sync_requests.len(),
3693            1,
3694            "expected exactly one catchup SyncRequest"
3695        );
3696        let (target, req) = &sync_requests[0];
3697        assert_eq!(*target, leader_id);
3698        assert_eq!(
3699            req.class,
3700            BandwidthClass::Background,
3701            "emitted SyncRequest must carry inputs.default_bandwidth_class, \
3702             not Default::default()",
3703        );
3704    }
3705
3706    /// The leader's serve path must consult `msg.class` and
3707    /// `inputs.background_fraction` when admitting a SyncRequest —
3708    /// not the class-blind legacy `try_consume`. Without this, every
3709    /// Phase D2/D4 admission threshold is dead code.
3710    ///
3711    /// Configures `background_fraction = 0.3` and a tiny budget
3712    /// (100-byte capacity). The reserve threshold becomes
3713    /// `(1 - 0.3) * 100 = 70` bytes. A Background SyncRequest whose
3714    /// response costs 47 bytes (empty-response header) leaves
3715    /// `available - cost = 53 < 70`, so the class-aware gate rejects
3716    /// it with `Backpressure`. The legacy class-blind path would
3717    /// have admitted (it sees `Foreground` semantics and `available
3718    /// >= cost`).
3719    #[tokio::test]
3720    async fn leader_serve_path_uses_class_aware_admission_for_background_request() {
3721        use crate::adapter::net::redex::bandwidth::BandwidthClass;
3722
3723        let inputs = RuntimeInputs {
3724            channel: ChannelIdentity {
3725                channel_name: "test/runtime".to_string(),
3726                origin_hash: 0xCAFE_BABE,
3727            },
3728            channel_id: channel_id_for("test/runtime"),
3729            self_node_id: 0x10,
3730            replica_set: vec![0x10, 0x20],
3731            heartbeat_ms: 60_000,
3732            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3733            tail_provider: Arc::new(|| 0),
3734            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3735            file: build_file_for_tests(),
3736            default_bandwidth_class: BandwidthClass::Foreground,
3737            background_fraction: 0.3,
3738        };
3739        let cid = inputs.channel_id;
3740        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3741        // Promote to Leader so the serve path runs.
3742        for (role, signal) in [
3743            (
3744                ReplicaRole::Replica,
3745                super::super::replication_state::TransitionSignal::CapabilitySelected,
3746            ),
3747            (
3748                ReplicaRole::Candidate,
3749                super::super::replication_state::TransitionSignal::MissedHeartbeats,
3750            ),
3751            (
3752                ReplicaRole::Leader,
3753                super::super::replication_state::TransitionSignal::ElectionWon,
3754            ),
3755        ] {
3756            coordinator.transition_to(role, signal).await.unwrap();
3757        }
3758
3759        // Build a budget whose reserve gate will reject a 47-byte
3760        // Background response. capacity=100, fraction=0.3 → reserve=70;
3761        // available - cost = 100 - 47 = 53 < 70 → denied.
3762        let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
3763
3764        let dispatcher = Arc::new(RecorderDispatcher::default());
3765        let event = Inbound::SyncRequest {
3766            from: 0x20,
3767            msg: SyncRequest {
3768                channel_id: cid,
3769                since_seq: 0,
3770                chunk_max: 1024,
3771                request_id: 0,
3772                class: BandwidthClass::Background,
3773            },
3774        };
3775        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3776        on_inbound(
3777            &inputs,
3778            &coordinator,
3779            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3780            &build_state(tracker.clone(), budget.clone()),
3781            event,
3782        )
3783        .await;
3784
3785        let nacks = dispatcher.sync_nacks.lock().clone();
3786        assert_eq!(
3787            nacks.len(),
3788            1,
3789            "Background admission must be denied by the class-aware reserve gate; \
3790             pre-fix this admitted via the class-blind try_consume path",
3791        );
3792        let (target, nack) = &nacks[0];
3793        assert_eq!(*target, 0x20);
3794        assert_eq!(
3795            nack.error_code,
3796            super::super::replication::SyncNackError::Backpressure,
3797            "denial must surface as Backpressure NACK so the replica backs off",
3798        );
3799        // No SyncResponse should have been shipped under denial.
3800        assert!(
3801            dispatcher.sync_responses.lock().is_empty(),
3802            "denied requests must not leak a SyncResponse",
3803        );
3804    }
3805
3806    /// Companion to the Background-denial test: with the same tiny
3807    /// budget, a Foreground request of the same size IS admitted —
3808    /// confirms the reserve gate is the discriminator, not the cost.
3809    /// Without this paired assertion a regression that always-denies
3810    /// would look like the right behavior.
3811    #[tokio::test]
3812    async fn leader_serve_path_admits_foreground_under_tight_budget() {
3813        use crate::adapter::net::redex::bandwidth::BandwidthClass;
3814
3815        let inputs = RuntimeInputs {
3816            channel: ChannelIdentity {
3817                channel_name: "test/runtime".to_string(),
3818                origin_hash: 0xCAFE_BABE,
3819            },
3820            channel_id: channel_id_for("test/runtime"),
3821            self_node_id: 0x10,
3822            replica_set: vec![0x10, 0x20],
3823            heartbeat_ms: 60_000,
3824            wall_clock_provider: Arc::new(|| 1_700_000_000_000),
3825            tail_provider: Arc::new(|| 0),
3826            rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
3827            file: build_file_for_tests(),
3828            default_bandwidth_class: BandwidthClass::Foreground,
3829            background_fraction: 0.3,
3830        };
3831        let cid = inputs.channel_id;
3832        let (coordinator, _registry) = build_coordinator(0x10, vec![0x10, 0x20]);
3833        for (role, signal) in [
3834            (
3835                ReplicaRole::Replica,
3836                super::super::replication_state::TransitionSignal::CapabilitySelected,
3837            ),
3838            (
3839                ReplicaRole::Candidate,
3840                super::super::replication_state::TransitionSignal::MissedHeartbeats,
3841            ),
3842            (
3843                ReplicaRole::Leader,
3844                super::super::replication_state::TransitionSignal::ElectionWon,
3845            ),
3846        ] {
3847            coordinator.transition_to(role, signal).await.unwrap();
3848        }
3849        let budget = Arc::new(Mutex::new(BandwidthBudget::new(1.0, 100, Instant::now())));
3850        let dispatcher = Arc::new(RecorderDispatcher::default());
3851        let event = Inbound::SyncRequest {
3852            from: 0x20,
3853            msg: SyncRequest {
3854                channel_id: cid,
3855                since_seq: 0,
3856                chunk_max: 1024,
3857                request_id: 0,
3858                class: BandwidthClass::Foreground,
3859            },
3860        };
3861        let tracker = Arc::new(Mutex::new(HeartbeatTracker::new(100)));
3862        on_inbound(
3863            &inputs,
3864            &coordinator,
3865            &(dispatcher.clone() as Arc<dyn ReplicationDispatcher>),
3866            &build_state(tracker.clone(), budget.clone()),
3867            event,
3868        )
3869        .await;
3870        // No NACK — Foreground admits because available >= cost.
3871        assert!(
3872            dispatcher.sync_nacks.lock().is_empty(),
3873            "Foreground under the same budget must admit (available >= cost)",
3874        );
3875        // Exactly one SyncResponse must have been shipped.
3876        assert_eq!(
3877            dispatcher.sync_responses.lock().len(),
3878            1,
3879            "Foreground admit must ship a SyncResponse",
3880        );
3881    }
3882}