Skip to main content

melin_server_runtime/
response.rs

1//! io_uring-based response stage — routes matching output to connections via
2//! `IORING_OP_SEND`.
3//!
4//! Replaces the blocking `write(2)` + `BufWriter` flush path with batched
5//! io_uring sends. Instead of N `write(2)` syscalls (one per dirty connection
6//! on flush), we submit N SEND SQEs in a single `io_uring_enter` call.
7//!
8//! Same SPSC consumption and journal cursor gating as `response.rs`.
9//! Runs on a dedicated OS thread.
10
11use std::collections::{HashMap, HashSet};
12use std::os::unix::io::RawFd;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::mpsc;
16use std::time::{Duration, Instant};
17
18use io_uring::{IoUring, opcode, types};
19use tracing::{debug, error};
20
21use melin_pipeline::ring;
22
23use crate::durability_policy::{CursorView, DurabilityMode, EvalStatus, Policy};
24use crate::replication::ReplicationMetrics;
25use melin_app::Application;
26use melin_transport_core::pipeline::{OutputPayload, OutputSlot, StageUtilization};
27#[cfg(feature = "latency-trace")]
28use melin_transport_core::trace;
29
30use melin_wire_protocol::control::TransportResponse;
31use melin_wire_protocol::control_codec;
32
33/// Maximum number of output slots consumed per batch.
34const MAX_BATCH: usize = 1024;
35
36/// Maximum encoded response size. PositionSnapshot is the largest variant
37/// at up to 330 bytes (length(4) + tag(1) + account(4) + count(1) +
38/// 16*(currency(4)+free(8)+reserved(8))). 512 bytes covers all variants.
39const MAX_RESPONSE_BUF: usize = 512;
40
41/// io_uring submission queue depth for sends. Must be ≥ max concurrent
42/// connections to avoid SQ overflow when all connections are dirty.
43/// Power of 2 for io_uring alignment. 4096 supports 1024+ client
44/// benchmarks where all connections flush simultaneously.
45const RING_SIZE: u32 = 4096;
46
47/// Maximum accumulated send buffer per connection (64 KiB). If a client
48/// falls behind and the buffer exceeds this, the connection is dropped.
49/// 64 KiB holds ~500 response frames — well beyond any reasonable lag.
50const MAX_SEND_BUF: usize = 64 * 1024;
51
52pub use crate::ControlEvent;
53
54/// Encoder type alias: response encoder bound to the application's
55/// `Report` / `QueryResponse` types. Hides the long
56/// `dyn ResponseEncoder<Report = ..., Query = ...>` at call sites.
57pub type ResponseEncoderArc<A> = Arc<
58    dyn melin_app::encoder::ResponseEncoder<
59            Report = <A as Application>::Report,
60            Query = <A as Application>::QueryResponse,
61        >,
62>;
63
64/// Configuration and shared state for the response stage.
65pub struct Response<A: Application> {
66    /// Highest wire seq durably persisted on the primary's journal.
67    /// In the same sequence space as `OutputSlot.wire_seq` and the
68    /// replica metrics (`metrics.in_memory_sequence` /
69    /// `metrics.acked_sequence`), so the durability gate can compare
70    /// these values numerically and the comparison is meaningful
71    /// regardless of `starting_sequence` (fresh vs recovered primary).
72    /// Updated by the journal stage after every fsync batch via
73    /// `set_last_seq_publisher`.
74    pub journal_persisted_wire_seq: Arc<AtomicU64>,
75    /// Operator-selected durability mode, published through a shared
76    /// [`AtomicU8`] so the admin `DURABILITY` command can swap it at
77    /// runtime without restarting the node. The response stage reads
78    /// this once per gate iteration with a relaxed load (cheaper than a
79    /// `Mutex` or refcounted `Arc<Policy>` snapshot) and rebuilds its
80    /// local [`Policy`] when the byte changes. See
81    /// [`crate::durability_policy::DurabilityMode::as_u8`] for the
82    /// encoding.
83    pub durability_mode: Arc<std::sync::atomic::AtomicU8>,
84    /// Per-slot replica cursors. `None` for standalone deployments
85    /// (no replication wiring) — the policy then evaluates against the
86    /// primary alone.
87    pub replication_metrics: Option<Arc<ReplicationMetrics>>,
88    /// Per-slot replica active flags. Only "true" slots are included in
89    /// the cursor view fed to `Policy::evaluate`, so disconnected slots
90    /// don't pollute the view with stale zero cursors. When the
91    /// resulting view is too small to satisfy a clause, the policy
92    /// reports degraded and the gate stalls.
93    /// Mirrors `replication_metrics` — `None` in standalone.
94    pub replica_active: Option<[Arc<AtomicBool>; 2]>,
95    pub heartbeat_interval: Option<Duration>,
96    pub busy_spin: bool,
97    pub utilization: Arc<StageUtilization>,
98    /// Wire encoder for application-shaped payloads. Constructed
99    /// once at boot (`Arc::new(ExchangeResponseEncoder)`) and shared
100    /// with the DPDK response stage.
101    pub encoder: ResponseEncoderArc<A>,
102}
103
104/// Per-connection state for batched io_uring sends.
105struct ConnectionEntry {
106    fd: RawFd,
107    /// Owns the write half of the socket to keep the fd alive.
108    _owner: Box<dyn Send>,
109    /// Accumulates encoded response frames between flushes.
110    /// The full wire frame (length prefix + payload) is appended here.
111    /// Vec's internal data pointer is heap-stable, so io_uring SEND SQEs
112    /// referencing `as_ptr()` remain valid even if the HashMap relocates
113    /// this struct — as long as we don't reallocate the Vec during in-flight sends.
114    send_buf: Vec<u8>,
115    /// Last time data was sent to this connection. Used for heartbeat scheduling.
116    last_send: Instant,
117}
118
119/// Run the io_uring response stage loop. Blocks the calling thread until shutdown.
120///
121/// Consumes from the output SPSC, waits for durability confirmation, and
122/// sends responses via io_uring SEND.
123///
124/// Durability gating: every gate iteration reads the journal cursor
125/// (primary persisted) plus per-slot replica cursors (in-memory and
126/// persisted) from `replication_metrics` and feeds them through the
127/// configured [`Policy`]. See [`evaluate_durability`].
128pub fn run<A: Application>(
129    mut consumer: ring::Consumer<OutputSlot<A::Report, A::QueryResponse>>,
130    control_rx: mpsc::Receiver<ControlEvent>,
131    config: Response<A>,
132    shutdown: &AtomicBool,
133) {
134    let Response {
135        journal_persisted_wire_seq,
136        durability_mode,
137        replication_metrics,
138        replica_active,
139        heartbeat_interval,
140        busy_spin,
141        utilization,
142        encoder,
143    } = config;
144    // Resolve the starting mode from the shared atomic and derive the
145    // local Policy. The atomic is the single source of truth across the
146    // process lifetime; the response thread keeps a thread-local copy
147    // for cheap per-iteration use and rebuilds it when an admin
148    // `DURABILITY` command swaps the atomic. Initialise as Hybrid (the
149    // default mode) if the atomic ever holds a corrupted byte — better
150    // than panicking on a degraded process and matches the default
151    // operators see at boot.
152    let mut active_mode =
153        DurabilityMode::from_u8(durability_mode.load(std::sync::atomic::Ordering::Relaxed))
154            .unwrap_or_else(|| {
155                tracing::error!(
156                    "durability_mode atomic held a corrupted byte at startup; defaulting to hybrid"
157                );
158                DurabilityMode::Hybrid
159            });
160    let mut policy = active_mode.to_policy();
161    let mut ring =
162        IoUring::new(RING_SIZE).expect("failed to create io_uring instance for response stage");
163
164    // Connection table: maps connection IDs to their state.
165    // HashMap for O(1) lookup. Pre-sized for a reasonable number of concurrent clients.
166    let mut connections: HashMap<u64, ConnectionEntry> = HashMap::with_capacity(256);
167
168    let mut batch = [OutputSlot::<A::Report, A::QueryResponse>::default(); MAX_BATCH];
169    let mut encode_buf = [0u8; MAX_RESPONSE_BUF];
170
171    // Cached durability position to avoid atomic reads on every slot.
172    // Initialised below from the policy's startup evaluation; updated
173    // via `evaluate_durability` on every gate iteration.
174    let mut cached_durable_pos: u64;
175
176    // Degradation logger. Tracks transitions, suppresses sub-second
177    // flap noise, and drives the `/healthz` `policy_degraded` gauge.
178    // See `DegradationLogger` for the full state machine. Initialised
179    // below from the policy's startup evaluation so an unsatisfiable
180    // policy (e.g. a primary that just lost both replicas while
181    // running `hybrid` or `durably-replicated`) is visible immediately
182    // on `/healthz` and in the journal.
183    let startup_now = Instant::now();
184    let mut last_policy_check = startup_now;
185    /// Re-emit interval for the "still degraded" reminder.
186    const DEGRADED_LOG_INTERVAL: Duration = Duration::from_secs(5);
187    /// Cadence at which the idle path re-evaluates the policy. Bounds
188    /// the lag between a connection-state change and the `/healthz`
189    /// gauge / warn-log reflecting it. Cheap (a handful of atomic
190    /// loads + the policy evaluator) at this rate.
191    const POLICY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
192
193    // Initial evaluation so the cached durable position and the
194    // `/healthz` gauge reflect the cluster's startup shape before
195    // the first batch arrives.
196    let mut degraded_logger;
197    {
198        let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
199        let metrics_ref = replication_metrics.as_deref();
200        let active_ref = replica_active.as_ref();
201        let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
202        cached_durable_pos = status.durable_pos;
203        utilization
204            .policy_degraded
205            .store(status.degraded, Ordering::Relaxed);
206        degraded_logger = if status.degraded {
207            DegradationLogger::new_starting_degraded(startup_now, &policy)
208        } else {
209            DegradationLogger::new(startup_now)
210        };
211    }
212
213    // Stage histograms registered with the global registry — see
214    // `melin_transport_core::trace`. The four breakdown stages
215    // (journal-wait, replica-wait, encode, egress) feed the bench's
216    // tick-to-trade decomposition; spsc/dispatch/server-e2e are kept
217    // alongside as overall sanity checks.
218    #[cfg(feature = "latency-trace")]
219    let mut spsc_rec =
220        trace::register_stage("response: SPSC wakeup (matching publish → response consume)");
221    #[cfg(feature = "latency-trace")]
222    let mut dispatch_rec = trace::register_stage("response: dispatch (consume → socket write)");
223    #[cfg(feature = "latency-trace")]
224    let mut server_e2e_rec = trace::register_stage("server e2e (reader recv → response flush)");
225    // Tick-to-trade breakdown: per-slot wait observed for each
226    // durability path (recorded only when the gate actually held us
227    // up — cache-hit paths skip to avoid inflating the metric with
228    // crossings that happened before we noticed). Encode is wall-time
229    // around `encode_transport_response`. Egress wraps a `flush_sends`
230    // call (one sample per io_uring flush, batching many slots).
231    // Gated on `tick-to-trade`, not `latency-trace`, because these
232    // stages roughly double the hot-path mutex traffic vs the lighter
233    // 4-stage mode.
234    #[cfg(feature = "tick-to-trade")]
235    let mut journal_wait_rec =
236        trace::register_stage("response: journal-wait (match_complete → journal cursor crossed)");
237    #[cfg(feature = "tick-to-trade")]
238    let mut replica_wait_rec = trace::register_stage(
239        "response: replica-wait (match_complete → replication cursor crossed)",
240    );
241    #[cfg(feature = "tick-to-trade")]
242    let mut encode_rec = trace::register_stage("response: encode (per-kind wire encoding)");
243    #[cfg(feature = "tick-to-trade")]
244    let mut egress_rec = trace::register_stage("response: egress (flush_sends elapsed)");
245
246    // Track connections with buffered (unflushed) writes across batches.
247    let mut dirty_connections: HashSet<u64> = HashSet::new();
248
249    // Connections to remove after flush (send errors).
250    let mut to_remove: Vec<u64> = Vec::new();
251
252    // Pre-allocated CQE collection buffer. Must collect CQEs before
253    // processing because the CQ borrow must end before mutating connections.
254    // Pre-sized to RING_SIZE to avoid per-iteration heap allocation.
255    let mut cqes: Vec<(u64, i32)> = Vec::with_capacity(RING_SIZE as usize);
256
257    // Pre-encode the heartbeat response frame once. Full wire frame
258    // (length prefix + tag) for direct append to send_buf.
259    let heartbeat_wire_frame = {
260        let mut buf = [0u8; 8];
261        let written =
262            control_codec::encode_transport_response(&TransportResponse::Heartbeat, &mut buf)
263                .expect("heartbeat encodes");
264        buf[..written].to_vec()
265    };
266
267    // Coarse timestamp for heartbeat scan — avoids Instant::now() on every spin.
268    let mut last_heartbeat_scan = Instant::now();
269
270    // Adaptive spin: spin first (fast wakeup), yield after threshold.
271    let mut idle_spins: u32 = 0;
272
273    let mut busy_count: u64 = 0;
274    let mut idle_count: u64 = 0;
275
276    loop {
277        // Observe runtime mode swaps from the admin `DURABILITY`
278        // command. Relaxed load (single writer is the admin handler,
279        // single reader is this thread). When the byte changes,
280        // rebuild the local Policy and reset the cached durable
281        // position so the next gate evaluation starts from a clean
282        // slate under the new shape; log the transition for the audit
283        // trail. An unknown byte is treated as memory corruption: we
284        // log and keep the prior mode rather than silently downgrading.
285        let observed_byte = durability_mode.load(Ordering::Relaxed);
286        if observed_byte != active_mode.as_u8() {
287            match DurabilityMode::from_u8(observed_byte) {
288                Some(next) => {
289                    tracing::info!(
290                        prev = active_mode.as_str(),
291                        next = next.as_str(),
292                        "durability mode swapped at runtime"
293                    );
294                    active_mode = next;
295                    policy = active_mode.to_policy();
296                    // The fresh policy may evaluate degraded/undegraded
297                    // differently against the same cluster shape; let
298                    // the next gate evaluation re-derive.
299                    cached_durable_pos = 0;
300                    // Re-seed the degradation logger so a transition
301                    // out of (or into) degraded under the new policy
302                    // surfaces immediately rather than waiting for the
303                    // sustained-state hold to roll over.
304                    degraded_logger = DegradationLogger::new(Instant::now());
305                }
306                None => {
307                    tracing::error!(
308                        byte = observed_byte,
309                        "durability_mode atomic held a corrupted byte; retaining prior mode"
310                    );
311                }
312            }
313        }
314
315        if shutdown.load(Ordering::Relaxed) {
316            // Best-effort flush before shutdown.
317            if !dirty_connections.is_empty() {
318                flush_sends(
319                    &mut ring,
320                    &mut connections,
321                    &dirty_connections,
322                    &mut to_remove,
323                    &mut cqes,
324                );
325                dirty_connections.clear();
326            }
327            utilization.busy.store(busy_count, Ordering::Relaxed);
328            utilization.idle.store(idle_count, Ordering::Relaxed);
329            #[cfg(feature = "pipeline-stats")]
330            print_utilization("response", busy_count, idle_count);
331            return;
332        }
333
334        // Poll control channel (non-blocking) for connect/disconnect.
335        while let Ok(event) = control_rx.try_recv() {
336            match event {
337                ControlEvent::Connected {
338                    connection_id,
339                    fd,
340                    writer,
341                } => {
342                    // The writer keeps the fd alive — store it as the owner.
343                    let owner: Box<dyn Send> = Box::new(writer);
344                    connections.insert(
345                        connection_id,
346                        ConnectionEntry {
347                            fd,
348                            _owner: owner,
349                            send_buf: Vec::with_capacity(4096),
350                            last_send: Instant::now(),
351                        },
352                    );
353                }
354                ControlEvent::Disconnected { connection_id } => {
355                    connections.remove(&connection_id);
356                    dirty_connections.remove(&connection_id);
357                }
358            }
359        }
360
361        // Consume output slots from matching stage.
362        let count = consumer.consume_batch(&mut batch, MAX_BATCH);
363        if count == 0 {
364            // SPSC is empty — flush all dirty connections via io_uring.
365            // This is the response-data egress path; heartbeat flushes
366            // below aren't sampled because they're admin traffic, not
367            // on the client RTT path.
368            if !dirty_connections.is_empty() {
369                #[cfg(feature = "tick-to-trade")]
370                let egress_start = trace::mono_trace_ns();
371                flush_sends(
372                    &mut ring,
373                    &mut connections,
374                    &dirty_connections,
375                    &mut to_remove,
376                    &mut cqes,
377                );
378                #[cfg(feature = "tick-to-trade")]
379                egress_rec.record_elapsed(egress_start, trace::mono_trace_ns());
380                for conn_id in to_remove.drain(..) {
381                    connections.remove(&conn_id);
382                }
383                dirty_connections.clear();
384            }
385
386            // Send heartbeats to idle connections. Only checked during
387            // idle periods (SPSC empty) to avoid overhead on the hot path.
388            if let Some(interval) = heartbeat_interval {
389                let now = Instant::now();
390                // Coarse gate: only scan at most once per second.
391                if now.duration_since(last_heartbeat_scan) >= Duration::from_secs(1) {
392                    last_heartbeat_scan = now;
393                    for (&conn_id, entry) in connections.iter_mut() {
394                        if now.duration_since(entry.last_send) >= interval {
395                            entry.send_buf.extend_from_slice(&heartbeat_wire_frame);
396                            dirty_connections.insert(conn_id);
397                            entry.last_send = now;
398                        }
399                    }
400                    // Flush the heartbeat sends immediately.
401                    if !dirty_connections.is_empty() {
402                        flush_sends(
403                            &mut ring,
404                            &mut connections,
405                            &dirty_connections,
406                            &mut to_remove,
407                            &mut cqes,
408                        );
409                        for conn_id in to_remove.drain(..) {
410                            connections.remove(&conn_id);
411                        }
412                        dirty_connections.clear();
413                    }
414                }
415            }
416
417            // Re-evaluate the durability policy on a slow timer so the
418            // `policy_degraded` flag and the periodic warn track the
419            // cluster's real state even on idle / quiet venues. The
420            // gate-open block also calls `update_degraded_state` after
421            // each consumed batch; this is the equivalent for the
422            // no-batch path.
423            {
424                let now_ts = Instant::now();
425                if now_ts.duration_since(last_policy_check) >= POLICY_CHECK_INTERVAL {
426                    last_policy_check = now_ts;
427                    let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
428                    let metrics_ref = replication_metrics.as_deref();
429                    let active_ref = replica_active.as_ref();
430                    let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
431                    degraded_logger.tick(
432                        &policy,
433                        &utilization,
434                        status.degraded,
435                        now_ts,
436                        DEGRADED_LOG_INTERVAL,
437                    );
438                    // Cache the position so the next batch's gate sees a
439                    // fresh value rather than spinning from a stale cache.
440                    cached_durable_pos = status.durable_pos;
441                }
442            }
443
444            idle_count += 1;
445            if idle_count.is_multiple_of(1024) {
446                utilization.busy.store(busy_count, Ordering::Relaxed);
447                utilization.idle.store(idle_count, Ordering::Relaxed);
448            }
449            if busy_spin || idle_spins < 1000 {
450                idle_spins = idle_spins.wrapping_add(1);
451                std::hint::spin_loop();
452            } else {
453                std::thread::yield_now();
454            }
455            continue;
456        }
457        idle_spins = 0;
458        busy_count += 1;
459
460        #[cfg(feature = "latency-trace")]
461        let consume_ts = trace::mono_trace_ns();
462
463        // Wait for durability confirmation before sending responses.
464        //
465        // Each iteration: read the primary journal cursor + per-slot
466        // replica cursors (both in-memory and persisted), build a
467        // `CursorView`, and evaluate the configured policy. Spin until
468        // the durable position catches up to the batch's max input_seq.
469        //
470        // Per-slot journal-wait / replica-wait tracker. See
471        // `GateCrossTracker` for the rationale (only records cursors
472        // that were actually on the critical path). Attribution uses
473        // `repl_min` = min of connected-replica persisted cursors so
474        // operators see "which subsystem to optimize" the same way as
475        // before the policy refactor.
476        #[cfg(feature = "tick-to-trade")]
477        let mut gate_tracker;
478        {
479            // Gate on `wire_seq`, not `input_seq`. `input_seq` is in
480            // local-consumer space (the matching cursor on the input
481            // ring, starts at 0 in this process) while replica metrics
482            // and the primary's `journal_persisted_wire_seq` live in
483            // wire-seq space (allocated by the journal stage starting
484            // at `starting_sequence`). A `needed` derived from
485            // `input_seq` and compared against wire-seq cursors only
486            // works when `starting_sequence == 1`; a recovered primary
487            // (or any process whose journal already has prior content
488            // pushing `starting_sequence` above 1) would silently open
489            // the gate ahead of the replica's actual replicated state.
490            //
491            // Every cursor in the policy view (`journal_persisted_wire_seq`,
492            // `metrics.in_memory_sequence`, `metrics.acked_sequence`)
493            // carries "highest wire seq known to be in that state on
494            // node X". A batch's `needed` is therefore the *exact*
495            // wire seq the gate must see — not `+1` — for the latest
496            // event in the batch to be considered durable. The legacy
497            // `+1` was load-bearing only because `input_seq` was off
498            // by `starting_sequence - 1` from wire seq; with the
499            // wire-seq stamp it would over-shoot by one event and
500            // make the gate stall an extra round-trip per response.
501            let needed = batch[..count]
502                .iter()
503                .map(|s| s.wire_seq)
504                .max()
505                .expect("non-empty batch");
506            #[cfg(feature = "tick-to-trade")]
507            {
508                gate_tracker = GateCrossTracker::new(needed);
509            }
510            // Durability-gate carve-out for halt-state output. Slots
511            // tagged `durability_bypass = true` at emission carry no
512            // engine state worth replicating before delivery — see
513            // `OutputSlot::durability_bypass` for the correctness
514            // argument. When every slot in the batch carries the flag
515            // the gate is skipped entirely, so clients receive the halt
516            // reason immediately rather than blocking on a structurally
517            // unsatisfiable policy (e.g. `Hybrid` with all replicas
518            // disconnected, which would otherwise stall the gate until
519            // peers return). If even one normal slot is present, gate
520            // the whole batch as usual — the bypass slots ride along
521            // behind the gated one, which is safe (no ordering
522            // inversion vs. a strict-gate world).
523            let needs_gate = batch[..count].iter().any(|s| !s.durability_bypass);
524            if needs_gate && cached_durable_pos < needed {
525                loop {
526                    // Inside the gate-wait spin loop, also observe a
527                    // mode swap. Without this, a batch whose gate
528                    // becomes structurally unsatisfiable (e.g. all
529                    // replicas die while a non-bypass slot is in
530                    // flight under `Hybrid`) would wedge the response
531                    // stage forever, even if an operator sends the
532                    // remediating `DURABILITY local` — the outer loop
533                    // observation never gets a chance to run. The
534                    // relaxed load is ~1 cycle on x86; cheaper than
535                    // the `spin_loop` hint below.
536                    let observed_byte = durability_mode.load(Ordering::Relaxed);
537                    if observed_byte != active_mode.as_u8()
538                        && let Some(next) = DurabilityMode::from_u8(observed_byte)
539                    {
540                        tracing::info!(
541                            prev = active_mode.as_str(),
542                            next = next.as_str(),
543                            "durability mode swapped during gate wait"
544                        );
545                        active_mode = next;
546                        policy = active_mode.to_policy();
547                        degraded_logger = DegradationLogger::new(Instant::now());
548                    }
549
550                    let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
551                    let metrics_ref = replication_metrics.as_deref();
552                    let active_ref = replica_active.as_ref();
553                    let repl_min = connected_persisted_min(metrics_ref, active_ref);
554
555                    #[cfg(feature = "tick-to-trade")]
556                    gate_tracker.observe(journal_pos, repl_min, trace::mono_trace_ns());
557
558                    let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
559                    cached_durable_pos = status.durable_pos;
560                    utilization
561                        .policy_degraded
562                        .store(status.degraded, Ordering::Relaxed);
563
564                    if cached_durable_pos >= needed {
565                        // Attribution: which subsystem was slowest at
566                        // the moment the gate opened. Relaxed is fine —
567                        // health reads are infrequent.
568                        if journal_pos <= repl_min {
569                            utilization.gate_journal.fetch_add(1, Ordering::Relaxed);
570                        } else {
571                            utilization.gate_replication.fetch_add(1, Ordering::Relaxed);
572                        }
573                        break;
574                    }
575                    std::hint::spin_loop();
576                }
577            }
578        }
579
580        let batch_now = Instant::now();
581
582        // Log degradation transitions / re-emit the reminder. Same
583        // logger the idle path uses; transitions are gated on a
584        // sustained-state hold so sub-second flap doesn't spam.
585        let degraded_now = utilization.policy_degraded.load(Ordering::Relaxed);
586        degraded_logger.tick(
587            &policy,
588            &utilization,
589            degraded_now,
590            batch_now,
591            DEGRADED_LOG_INTERVAL,
592        );
593        // Bump the idle-path's check timestamp so we don't double-
594        // tick the logger when traffic stops.
595        last_policy_check = batch_now;
596
597        for slot in &batch[..count] {
598            #[cfg(feature = "latency-trace")]
599            spsc_rec.record_elapsed(slot.match_complete_ts, consume_ts);
600
601            // Per-slot durability-gate breakdown. Recorded only when
602            // the gate actually held us up (the tracker captured a
603            // cross). Note: the cross timestamp is for the *batch's*
604            // `needed` — for slots earlier in the batch, the cursor
605            // may have crossed their individual `input_seq+1` earlier,
606            // so this systematically overestimates wait for non-last
607            // slots by up to the batch's matching span. Acceptable
608            // noise for the operator-facing breakdown; documented in
609            // `docs/benchmarking.md`.
610            #[cfg(feature = "tick-to-trade")]
611            if let Some(ts) = gate_tracker.journal_crossed() {
612                journal_wait_rec.record_elapsed(slot.match_complete_ts, ts);
613            }
614            #[cfg(feature = "tick-to-trade")]
615            if let Some(ts) = gate_tracker.replica_crossed() {
616                replica_wait_rec.record_elapsed(slot.match_complete_ts, ts);
617            }
618
619            // Each slot expands to at most two wire frames: the
620            // application payload (Report / Query / EngineError) and
621            // an optional trailing `BatchEnd` when
622            // `is_last_in_request` is set. Application-shaped
623            // payloads go through the encoder; transport-shaped
624            // frames (EngineError, BatchEnd) are encoded by the
625            // runtime directly.
626            if let Some(entry) = connections.get_mut(&slot.connection_id) {
627                // Frame 1: application payload (if any). BatchEnd
628                // payloads carry no body — the terminator below
629                // handles them via `is_last_in_request`.
630                let payload_result: Option<Result<usize, &'static str>> = match slot.payload {
631                    OutputPayload::Report(ref report) => {
632                        Some(encoder.encode_report(report, &mut encode_buf))
633                    }
634                    OutputPayload::QueryResponse(ref q) => {
635                        Some(encoder.encode_query(q, &mut encode_buf))
636                    }
637                    OutputPayload::EngineError => Some(
638                        control_codec::encode_transport_response(
639                            &TransportResponse::EngineError,
640                            &mut encode_buf,
641                        )
642                        .map_err(|_| "encode error"),
643                    ),
644                    OutputPayload::BatchEnd => None,
645                };
646
647                let payload_handled = if let Some(result) = payload_result {
648                    #[cfg(feature = "tick-to-trade")]
649                    let encode_start = trace::mono_trace_ns();
650                    let outcome = append_frame(
651                        result,
652                        slot.connection_id,
653                        entry,
654                        &encode_buf,
655                        batch_now,
656                        &mut dirty_connections,
657                        &mut to_remove,
658                    );
659                    #[cfg(feature = "tick-to-trade")]
660                    encode_rec.record_elapsed(encode_start, trace::mono_trace_ns());
661                    outcome
662                } else {
663                    AppendOutcome::Continue
664                };
665
666                // Frame 2: BatchEnd terminator. Skipped if the payload
667                // append dropped the connection.
668                if matches!(payload_handled, AppendOutcome::Continue) && slot.is_last_in_request {
669                    let result = control_codec::encode_transport_response(
670                        &TransportResponse::BatchEnd,
671                        &mut encode_buf,
672                    )
673                    .map_err(|_| "encode error");
674                    let outcome = append_frame(
675                        result,
676                        slot.connection_id,
677                        entry,
678                        &encode_buf,
679                        batch_now,
680                        &mut dirty_connections,
681                        &mut to_remove,
682                    );
683                    // Record server-side end-to-end: reader recv ->
684                    // response flush. Only the BatchEnd frame carries
685                    // this measurement; tracked here after the append
686                    // so a dropped connection doesn't skew the metric.
687                    #[cfg(feature = "latency-trace")]
688                    if matches!(outcome, AppendOutcome::Continue) {
689                        server_e2e_rec.record_elapsed(slot.recv_ts, trace::mono_trace_ns());
690                    }
691                    let _ = outcome;
692                }
693            }
694        }
695
696        // Remove connections that exceeded the send buffer limit.
697        for conn_id in to_remove.drain(..) {
698            connections.remove(&conn_id);
699            dirty_connections.remove(&conn_id);
700        }
701
702        #[cfg(feature = "latency-trace")]
703        dispatch_rec.record_elapsed(consume_ts, trace::mono_trace_ns());
704    }
705}
706
707/// Submit io_uring SEND SQEs for all dirty connections and wait for completions.
708///
709/// Outcome of a single per-frame append. `Continue` means the
710/// caller may proceed to the next frame for this slot;
711/// `ConnectionDropped` means the connection's send buffer overflowed
712/// or the encode failed, and the connection has been queued for
713/// removal — no further frames should be appended for this slot.
714#[derive(Clone, Copy)]
715enum AppendOutcome {
716    Continue,
717    ConnectionDropped,
718}
719
720/// Copy an encoded frame into the connection's send buffer with
721/// overflow checking. Splits the responsibilities the inline encode
722/// loop used to have: the caller passes in the encode result (so it
723/// can come from the `ResponseEncoder` trait for application
724/// payloads, or `encode_transport_response` for transport-shaped
725/// frames), and this helper handles size accounting + dirty
726/// tracking uniformly.
727#[allow(clippy::too_many_arguments)]
728fn append_frame(
729    result: Result<usize, &'static str>,
730    connection_id: u64,
731    entry: &mut ConnectionEntry,
732    encode_buf: &[u8],
733    batch_now: Instant,
734    dirty_connections: &mut HashSet<u64>,
735    to_remove: &mut Vec<u64>,
736) -> AppendOutcome {
737    let written = match result {
738        Ok(n) => n,
739        Err(reason) => {
740            tracing::error!(connection_id, reason, "encode error");
741            return AppendOutcome::Continue;
742        }
743    };
744
745    // Drop slow clients whose send buffer has grown too large. This
746    // prevents unbounded memory growth from a single laggy connection
747    // causing allocator pressure and tail latency spikes.
748    if entry.send_buf.len() + written > MAX_SEND_BUF {
749        debug!(
750            connection_id,
751            send_buf_len = entry.send_buf.len(),
752            "send buffer exceeded limit, dropping connection"
753        );
754        to_remove.push(connection_id);
755        return AppendOutcome::ConnectionDropped;
756    }
757
758    // Append the full wire frame to the connection's send buffer.
759    // The encoder writes [length(4) | payload], which is the complete
760    // wire format — no extra framing needed.
761    entry.send_buf.extend_from_slice(&encode_buf[..written]);
762    entry.last_send = batch_now;
763    dirty_connections.insert(connection_id);
764    AppendOutcome::Continue
765}
766
767/// Each dirty connection's accumulated send buffer is sent in a single SEND
768/// operation. Partial sends are retried until all bytes are delivered.
769/// Failed connections are collected in `to_remove` for the caller to clean up.
770fn flush_sends(
771    ring: &mut IoUring,
772    connections: &mut HashMap<u64, ConnectionEntry>,
773    dirty: &HashSet<u64>,
774    to_remove: &mut Vec<u64>,
775    cqes: &mut Vec<(u64, i32)>,
776) {
777    // Submit SEND SQEs for all dirty connections.
778    let mut pending: usize = 0;
779    for &conn_id in dirty {
780        if let Some(entry) = connections.get(&conn_id) {
781            if entry.send_buf.is_empty() {
782                continue;
783            }
784            let sqe = opcode::Send::new(
785                types::Fd(entry.fd),
786                entry.send_buf.as_ptr(),
787                entry.send_buf.len() as u32,
788            )
789            .build()
790            .user_data(conn_id);
791
792            unsafe {
793                ring.submission()
794                    .push(&sqe)
795                    .expect("io_uring SQ full — increase RING_SIZE");
796            }
797            pending += 1;
798        }
799    }
800
801    if pending == 0 {
802        return;
803    }
804
805    // Submit and wait for all completions.
806    if let Err(e) = ring.submit_and_wait(pending) {
807        error!(error = %e, "io_uring submit_and_wait failed in response stage");
808        return;
809    }
810
811    // Drain completions into pre-allocated buffer. Must collect to
812    // release CQ borrow before mutating connections.
813    cqes.clear();
814    cqes.extend(ring.completion().map(|cqe| (cqe.user_data(), cqe.result())));
815
816    for &(conn_id, result) in cqes.iter() {
817        if result < 0 {
818            debug!(
819                connection_id = conn_id,
820                error = result,
821                "send error, dropping connection"
822            );
823            to_remove.push(conn_id);
824            continue;
825        }
826
827        let sent = result as usize;
828        if let Some(entry) = connections.get_mut(&conn_id) {
829            if sent >= entry.send_buf.len() {
830                entry.send_buf.clear();
831            } else {
832                // Partial send — drain sent bytes, retry remainder.
833                // Rare for small response frames over TCP/UDS but must
834                // be handled for correctness (e.g., send buffer pressure).
835                entry.send_buf.drain(..sent);
836                retry_send(ring, entry, conn_id, to_remove);
837            }
838        }
839    }
840}
841
842/// Retry sending remaining bytes after a partial send. Loops until the
843/// entire buffer is delivered or an error occurs.
844fn retry_send(
845    ring: &mut IoUring,
846    entry: &mut ConnectionEntry,
847    conn_id: u64,
848    to_remove: &mut Vec<u64>,
849) {
850    while !entry.send_buf.is_empty() {
851        let sqe = opcode::Send::new(
852            types::Fd(entry.fd),
853            entry.send_buf.as_ptr(),
854            entry.send_buf.len() as u32,
855        )
856        .build()
857        .user_data(conn_id);
858
859        unsafe {
860            ring.submission()
861                .push(&sqe)
862                .expect("io_uring SQ full during send retry");
863        }
864
865        if let Err(e) = ring.submit_and_wait(1) {
866            debug!(connection_id = conn_id, error = %e, "send retry failed");
867            to_remove.push(conn_id);
868            return;
869        }
870
871        if let Some(cqe) = ring.completion().next() {
872            let result = cqe.result();
873            if result <= 0 {
874                debug!(
875                    connection_id = conn_id,
876                    error = result,
877                    "send retry error, dropping connection"
878                );
879                to_remove.push(conn_id);
880                return;
881            }
882            let sent = result as usize;
883            if sent >= entry.send_buf.len() {
884                entry.send_buf.clear();
885            } else {
886                entry.send_buf.drain(..sent);
887            }
888        }
889    }
890}
891
892/// Evaluate the durability policy against the live cursor state.
893///
894/// Builds a `CursorView` containing the primary plus every *currently
895/// connected* replica slot and returns the highest sequence at which
896/// the policy is satisfied. The primary's in-memory cursor is modeled
897/// as `u64::MAX` because the response stage only gates events that have
898/// already been processed by the matching engine — those are trivially
899/// in-memory on the primary by construction.
900///
901/// Disconnected slots are *omitted from the view* rather than included
902/// with zero cursors. The view's `len()` reflects how many nodes are
903/// actually available; if it's too small to satisfy a clause, the
904/// policy reports degraded and the gate stalls.
905#[inline]
906pub(crate) fn evaluate_durability(
907    policy: &Policy,
908    journal_pos: u64,
909    metrics: Option<&ReplicationMetrics>,
910    replica_active: Option<&[Arc<AtomicBool>; 2]>,
911) -> EvalStatus {
912    // Primary + up to 2 replica slots = 3 nodes max.
913    let mut nodes: [[u64; 2]; 3] = [[0, 0]; 3];
914    nodes[0] = [u64::MAX, journal_pos];
915    let mut len = 1;
916    if let (Some(m), Some(active)) = (metrics, replica_active) {
917        for (i, slot_active) in active.iter().enumerate() {
918            // Skip inactive slots up-front.
919            if !slot_active.load(Ordering::Acquire) {
920                continue;
921            }
922            let in_mem = m.in_memory_sequence[i].load(Ordering::Acquire);
923            let persisted = m.acked_sequence[i].load(Ordering::Acquire);
924            nodes[len] = [in_mem, persisted];
925            len += 1;
926        }
927    }
928    policy.evaluate_with_status(&CursorView::new(&nodes[..len]))
929}
930
931/// Hold-time before a state transition is committed to the log.
932/// Suppresses log spam when a replica flaps faster than this — only
933/// transitions that hold for at least this long emit warn/info
934/// entries. The `/healthz` gauge updates immediately regardless,
935/// so dashboards and alerts still see real-time state.
936const DEGRADED_FLAP_HOLD: Duration = Duration::from_secs(1);
937
938/// Tracks degradation state across calls and emits warn/info logs
939/// with sustained-state gating + a periodic heartbeat re-emit.
940///
941/// The hot path calls [`Self::tick`] every gate iteration / idle
942/// poll with the current `degraded` value and the wall clock. The
943/// logger handles:
944///
945/// - Updating the `policy_degraded` health gauge immediately.
946/// - Suppressing log lines for transitions that don't hold for at
947///   least [`DEGRADED_FLAP_HOLD`] — a replica flapping at sub-second
948///   cadence produces no log noise, only a quietly-updating gauge.
949/// - Emitting a warn at the moment a sustained degraded state
950///   crosses the hold threshold, plus a periodic re-emit every
951///   `heartbeat_interval` while it persists.
952/// - Emitting an info when a sustained healthy state crosses the
953///   hold threshold (the cluster is back to its target shape and
954///   stayed there long enough that we trust the recovery).
955pub(crate) struct DegradationLogger {
956    /// Last value passed to `tick`; what we'd log about if it stayed
957    /// at this value past the hold threshold.
958    pending_state: bool,
959    /// When `pending_state` first appeared. Reset on every flip.
960    pending_since: Instant,
961    /// Whether the current pending state has been logged yet. Only
962    /// the *first* log per sustained streak crosses; subsequent
963    /// re-emits while degraded are heartbeat warns.
964    pending_logged: bool,
965    /// When the last warn fired. Drives the periodic re-emit while
966    /// degraded.
967    last_log: Option<Instant>,
968}
969
970impl DegradationLogger {
971    pub(crate) fn new(now: Instant) -> Self {
972        Self {
973            pending_state: false,
974            pending_since: now,
975            pending_logged: true, // healthy is the assumed initial state; nothing to log
976            last_log: None,
977        }
978    }
979
980    /// Use when the policy is known to start in a degraded state
981    /// (e.g. a primary in `hybrid` mode with no replica yet
982    /// connected). Logs a startup warn immediately and treats the
983    /// state as already-logged so the next tick doesn't re-emit.
984    pub(crate) fn new_starting_degraded(now: Instant, policy: &Policy) -> Self {
985        tracing::warn!(
986            policy = %policy,
987            "durability policy starts in degraded mode — fewer connected nodes than the target count"
988        );
989        Self {
990            pending_state: true,
991            pending_since: now,
992            pending_logged: true,
993            last_log: Some(now),
994        }
995    }
996
997    /// Update the gauge + emit transition/heartbeat logs as needed.
998    /// Cheap on the hot path: one atomic store, a few branches, one
999    /// `Instant::duration_since`.
1000    pub(crate) fn tick(
1001        &mut self,
1002        policy: &Policy,
1003        utilization: &StageUtilization,
1004        degraded_now: bool,
1005        now: Instant,
1006        heartbeat_interval: Duration,
1007    ) {
1008        utilization
1009            .policy_degraded
1010            .store(degraded_now, Ordering::Relaxed);
1011
1012        if degraded_now != self.pending_state {
1013            // State changed — start a new hold window. Don't log
1014            // until / unless this new state stays long enough.
1015            self.pending_state = degraded_now;
1016            self.pending_since = now;
1017            self.pending_logged = false;
1018            return;
1019        }
1020
1021        // State held. If we haven't yet logged this streak's onset,
1022        // and it's been pending for at least the flap-hold time,
1023        // emit the transition message and mark logged.
1024        if !self.pending_logged && now.duration_since(self.pending_since) >= DEGRADED_FLAP_HOLD {
1025            if degraded_now {
1026                tracing::warn!(
1027                    policy = %policy,
1028                    "durability policy operating in degraded mode — fewer connected nodes than the target count, gate clamped to surviving cluster"
1029                );
1030            } else {
1031                tracing::info!(
1032                    policy = %policy,
1033                    "durability policy returned to target shape"
1034                );
1035            }
1036            self.pending_logged = true;
1037            self.last_log = Some(now);
1038            return;
1039        }
1040
1041        // Heartbeat re-emit while a degraded state persists.
1042        if degraded_now
1043            && self.pending_logged
1044            && self
1045                .last_log
1046                .is_none_or(|t| now.duration_since(t) >= heartbeat_interval)
1047        {
1048            tracing::warn!(
1049                policy = %policy,
1050                "durability policy still degraded — fewer connected nodes than the target count"
1051            );
1052            self.last_log = Some(now);
1053        }
1054    }
1055}
1056
1057/// Minimum persisted cursor across currently-connected replica slots.
1058/// Used for gate-bottleneck attribution and the journal-wait /
1059/// replica-wait histograms — *not* for durability decisions, which go
1060/// through [`evaluate_durability`].
1061///
1062/// Returns `u64::MAX` when no replica is connected, which makes
1063/// attribution always credit the journal — correct, because in
1064/// standalone mode the journal is the only path.
1065#[inline]
1066pub(crate) fn connected_persisted_min(
1067    metrics: Option<&ReplicationMetrics>,
1068    replica_active: Option<&[Arc<AtomicBool>; 2]>,
1069) -> u64 {
1070    let (Some(m), Some(active)) = (metrics, replica_active) else {
1071        return u64::MAX;
1072    };
1073    let mut min = u64::MAX;
1074    for (i, slot_active) in active.iter().enumerate() {
1075        if !slot_active.load(Ordering::Acquire) {
1076            continue;
1077        }
1078        let v = m.acked_sequence[i].load(Ordering::Acquire);
1079        if v < min {
1080            min = v;
1081        }
1082    }
1083    min
1084}
1085
1086/// Tracks per-cursor "first observed transition from below to >= needed"
1087/// inside the durability gate loop, to drive the journal-wait /
1088/// replica-wait histograms in the bench's tick-to-trade decomposition.
1089///
1090/// A sample is recorded only for cursors that were strictly below
1091/// `needed` at the loop's first observation. Cursors already past at
1092/// entry were not on the critical path for this batch, so attributing
1093/// "wait time" to them would inflate the metric with cursor-poll
1094/// observation timestamps that have nothing to do with how long the
1095/// stage actually held us up.
1096///
1097/// `now_ns` is taken as a parameter rather than read internally so
1098/// tests can supply deterministic timestamps. The caller's hot path
1099/// reads `trace::mono_trace_ns()` once per gate iteration and feeds it in.
1100#[cfg(feature = "tick-to-trade")]
1101pub(crate) struct GateCrossTracker {
1102    needed: u64,
1103    journal_crossed_ts: Option<trace::MonoTraceInstant>,
1104    replica_crossed_ts: Option<trace::MonoTraceInstant>,
1105    journal_was_below: bool,
1106    replica_was_below: bool,
1107    first: bool,
1108}
1109
1110#[cfg(feature = "tick-to-trade")]
1111impl GateCrossTracker {
1112    pub(crate) fn new(needed: u64) -> Self {
1113        Self {
1114            needed,
1115            journal_crossed_ts: None,
1116            replica_crossed_ts: None,
1117            journal_was_below: false,
1118            replica_was_below: false,
1119            first: true,
1120        }
1121    }
1122
1123    pub(crate) fn observe(
1124        &mut self,
1125        journal_pos: u64,
1126        repl_min: u64,
1127        now_ns: trace::MonoTraceInstant,
1128    ) {
1129        if self.first {
1130            self.journal_was_below = journal_pos < self.needed;
1131            self.replica_was_below = repl_min < self.needed;
1132            self.first = false;
1133        }
1134        if self.journal_was_below && self.journal_crossed_ts.is_none() && journal_pos >= self.needed
1135        {
1136            self.journal_crossed_ts = Some(now_ns);
1137        }
1138        if self.replica_was_below && self.replica_crossed_ts.is_none() && repl_min >= self.needed {
1139            self.replica_crossed_ts = Some(now_ns);
1140        }
1141    }
1142
1143    pub(crate) fn journal_crossed(&self) -> Option<trace::MonoTraceInstant> {
1144        self.journal_crossed_ts
1145    }
1146
1147    pub(crate) fn replica_crossed(&self) -> Option<trace::MonoTraceInstant> {
1148        self.replica_crossed_ts
1149    }
1150}
1151
1152/// Print busy/idle utilization for a pipeline stage on shutdown.
1153#[cfg(feature = "pipeline-stats")]
1154fn print_utilization(stage: &str, busy: u64, idle: u64) {
1155    let total = busy + idle;
1156    if total == 0 {
1157        tracing::info!(stage, "no iterations recorded");
1158        return;
1159    }
1160    let pct = (busy as f64 / total as f64) * 100.0;
1161    tracing::info!(
1162        stage,
1163        pct_busy = format_args!("{pct:.2}%"),
1164        busy,
1165        idle,
1166        total,
1167        "pipeline utilization",
1168    );
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173    #[cfg(feature = "tick-to-trade")]
1174    use super::GateCrossTracker;
1175    use super::{DegradationLogger, connected_persisted_min, evaluate_durability};
1176    use crate::durability_policy::{Clause, Level, Policy};
1177    use crate::replication::ReplicationMetrics;
1178
1179    /// Build a [`Policy`] from a mini DSL: one or more
1180    /// `"<level>>=<count>"` clauses joined with `&&`. Test-only
1181    /// ergonomics — production builds policies via
1182    /// [`DurabilityMode::to_policy`].
1183    fn parse(s: &str) -> Result<Policy, String> {
1184        let mut clauses = Vec::new();
1185        for raw in s.split("&&") {
1186            let token = raw.trim();
1187            let (lvl, rhs) = token
1188                .split_once(">=")
1189                .ok_or_else(|| format!("clause `{token}` missing `>=`"))?;
1190            let level = match lvl.trim() {
1191                "persisted" => Level::Persisted,
1192                "in_memory" => Level::InMemory,
1193                other => return Err(format!("unknown level `{other}`")),
1194            };
1195            let count: u8 = rhs.trim().parse().map_err(|e| format!("bad count: {e}"))?;
1196            clauses.push(Clause { count, level });
1197        }
1198        Policy::new(clauses).map_err(|e| e.to_string())
1199    }
1200    use melin_transport_core::pipeline::StageUtilization;
1201    use std::sync::Arc;
1202    use std::sync::atomic::{AtomicBool, Ordering};
1203    use std::time::{Duration, Instant};
1204
1205    /// Build a `ReplicationMetrics` with both slots populated. Tests
1206    /// that need to simulate a disconnected slot use [`flags`] to mark
1207    /// it inactive — its cursors are then ignored regardless of value.
1208    fn metrics(slot0: (u64, u64), slot1: (u64, u64)) -> Arc<ReplicationMetrics> {
1209        let m = Arc::new(ReplicationMetrics::default());
1210        m.in_memory_sequence[0].store(slot0.0, Ordering::Relaxed);
1211        m.acked_sequence[0].store(slot0.1, Ordering::Relaxed);
1212        m.in_memory_sequence[1].store(slot1.0, Ordering::Relaxed);
1213        m.acked_sequence[1].store(slot1.1, Ordering::Relaxed);
1214        m
1215    }
1216
1217    /// Build a `[active; 2]` flags array.
1218    fn flags(slot0_active: bool, slot1_active: bool) -> [Arc<AtomicBool>; 2] {
1219        [
1220            Arc::new(AtomicBool::new(slot0_active)),
1221            Arc::new(AtomicBool::new(slot1_active)),
1222        ]
1223    }
1224
1225    /// Both replicas active — the common healthy-cluster case.
1226    fn both_active() -> [Arc<AtomicBool>; 2] {
1227        flags(true, true)
1228    }
1229
1230    // --- Standalone (no replicas wired) ---
1231
1232    #[test]
1233    fn standalone_persisted_one_gates_on_journal() {
1234        // No metrics → only the primary is in the view. `persisted>=1`
1235        // is satisfied by the primary alone at journal_pos.
1236        let p = parse("persisted>=1").unwrap();
1237        assert_eq!(evaluate_durability(&p, 500, None, None).durable_pos, 500);
1238    }
1239
1240    #[test]
1241    fn standalone_strict_persisted_two_never_opens() {
1242        // `persisted>=2` on a standalone primary stays at 0: the
1243        // operator asked for two copies and there is only one. The
1244        // policy surfaces as degraded so the operator sees the gate
1245        // is stalled because the cluster can't meet the policy.
1246        let p = parse("persisted>=2").unwrap();
1247        let r = evaluate_durability(&p, 500, None, None);
1248        assert_eq!(r.durable_pos, 0);
1249        assert!(
1250            r.degraded,
1251            "policy structurally unsatisfiable on this shape → degraded",
1252        );
1253    }
1254
1255    // --- 2 replicas connected ---
1256
1257    #[test]
1258    fn quorum_both_replicas_ahead_of_journal() {
1259        // Both replicas persisted past journal. `persisted>=2` returns
1260        // the 2nd-largest persisted across {primary, slot0, slot1}.
1261        let p = parse("persisted>=2").unwrap();
1262        let m = metrics((100, 100), (120, 120));
1263        let a = both_active();
1264        assert_eq!(
1265            evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1266            100
1267        );
1268    }
1269
1270    #[test]
1271    fn quorum_journal_ahead_of_both_replicas() {
1272        // Journal at 500, replicas at 100/120. 2nd-largest persisted = 120.
1273        let p = parse("persisted>=2").unwrap();
1274        let m = metrics((100, 100), (120, 120));
1275        let a = both_active();
1276        assert_eq!(
1277            evaluate_durability(&p, 500, Some(&m), Some(&a)).durable_pos,
1278            120
1279        );
1280    }
1281
1282    #[test]
1283    fn quorum_journal_between_slow_and_fast_replica() {
1284        // {primary=150, slot0_persisted=50, slot1_persisted=200}.
1285        // 2nd-largest = 150 (primary itself).
1286        let p = parse("persisted>=2").unwrap();
1287        let m = metrics((50, 50), (200, 200));
1288        let a = both_active();
1289        assert_eq!(
1290            evaluate_durability(&p, 150, Some(&m), Some(&a)).durable_pos,
1291            150
1292        );
1293    }
1294
1295    // --- Single replica connected ---
1296
1297    #[test]
1298    fn single_replica_strict_persisted_two_requires_both_survivors() {
1299        // Slot 0 connected, slot 1 disconnected. View = {primary, slot0}.
1300        // Strict `persisted>=2`: 2nd-largest of the 2-row view =
1301        // min(primary, slot0). Strictly stronger than legacy auto-
1302        // degrade-to-1-node in the same shape.
1303        let p = parse("persisted>=2").unwrap();
1304        let m = metrics((100, 100), (999, 999)); // slot 1 cursors ignored
1305        let a = flags(true, false);
1306        assert_eq!(
1307            evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1308            50
1309        );
1310        assert_eq!(
1311            evaluate_durability(&p, 200, Some(&m), Some(&a)).durable_pos,
1312            100
1313        );
1314    }
1315
1316    #[test]
1317    fn single_replica_persisted_two_requires_both_survivors() {
1318        // 2-node view (primary + surviving replica). `persisted>=2` is
1319        // satisfiable; the gate opens at the slower of the two and the
1320        // policy is not degraded.
1321        let p = parse("persisted>=2").unwrap();
1322        let m = metrics((100, 100), (999, 999));
1323        let a = flags(true, false);
1324        let r = evaluate_durability(&p, 50, Some(&m), Some(&a));
1325        assert_eq!(r.durable_pos, 50);
1326        assert!(!r.degraded);
1327    }
1328
1329    #[test]
1330    fn both_replicas_disconnected_strict_stalls() {
1331        // View has only the primary. Strict `persisted>=2` cannot be
1332        // satisfied — operator opted out of degrade.
1333        let p = parse("persisted>=2").unwrap();
1334        let m = metrics((999, 999), (999, 999));
1335        let a = flags(false, false);
1336        assert_eq!(
1337            evaluate_durability(&p, 500, Some(&m), Some(&a)).durable_pos,
1338            0
1339        );
1340    }
1341
1342    #[test]
1343    fn both_replicas_disconnected_strict_stalls_and_flags_degraded() {
1344        // With `persisted>=2` and both replicas down, the cursor view
1345        // collapses to {primary}: the clause's count (=2) exceeds the
1346        // view size, so the gate stays at 0 and the policy flags
1347        // degraded. Note the matching stage's separate halt at
1348        // `replicas_connected==0` rejects new orders before they reach
1349        // the gate; this verifies the gate semantics in isolation.
1350        let p = parse("persisted>=2").unwrap();
1351        let m = metrics((999, 999), (999, 999));
1352        let a = flags(false, false);
1353        let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1354        assert_eq!(r.durable_pos, 0);
1355        assert!(r.degraded);
1356    }
1357
1358    // --- Mixed-level policies ---
1359
1360    #[test]
1361    fn persisted_one_and_in_memory_two() {
1362        // "Leader persists, plus one other node has it in memory" —
1363        // the cheap-but-non-zero durability target. Slot 0 has it in
1364        // memory, slot 1 disconnected.
1365        let p = parse("persisted>=1 && in_memory>=2").unwrap();
1366        // primary persisted=50, slot0 in_mem=80 / persisted=20.
1367        // persisted>=1: max(50, 20, 0) = 50.
1368        // in_memory>=2: primary in_mem=u64::MAX (always), slot0_eff=max(80, 20)=80,
1369        //               slot1=0. 2nd-largest = 80.
1370        // min(50, 80) = 50.
1371        let m = metrics((80, 20), (999, 999));
1372        let a = flags(true, false);
1373        assert_eq!(
1374            evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1375            50
1376        );
1377    }
1378
1379    // --- Edge: journal at 0 ---
1380
1381    #[test]
1382    fn journal_at_zero_with_replicas_persisted_one() {
1383        // Journal hasn't fsynced anything; both replicas have. With
1384        // `persisted>=1` the gate opens at the fastest replica.
1385        let p = parse("persisted>=1").unwrap();
1386        let m = metrics((100, 100), (200, 200));
1387        let a = both_active();
1388        assert_eq!(
1389            evaluate_durability(&p, 0, Some(&m), Some(&a)).durable_pos,
1390            200
1391        );
1392    }
1393
1394    // --- connected_persisted_min — used for gate-bottleneck attribution ---
1395
1396    #[test]
1397    fn attribution_min_skips_disconnected_slots() {
1398        // Slot 1 disconnected via active flag.
1399        let m = metrics((150, 100), (999, 999));
1400        let a = flags(true, false);
1401        assert_eq!(connected_persisted_min(Some(&m), Some(&a)), 100);
1402    }
1403
1404    #[test]
1405    fn attribution_min_returns_max_when_standalone() {
1406        // No metrics wired → u64::MAX, which makes attribution always
1407        // credit the journal. Correct for a standalone deployment.
1408        assert_eq!(connected_persisted_min(None, None), u64::MAX);
1409    }
1410
1411    /// Fresh-cluster catch-up: a replica that handshakes at sequence
1412    /// 0 (the legitimate genesis case, not a stale-flag race) must be
1413    /// included in the cursor view with its zero cursors so the policy
1414    /// behaves the same way it would for a 1-replica deployment that
1415    /// has just produced its first batch. The disconnect-race
1416    /// mitigations (B1 seed-on-connect + B2 reorder) keep this from
1417    /// being conflated with the stale-flag-paired-with-zero-cursor
1418    /// case under normal cluster lifecycles.
1419    #[test]
1420    fn fresh_cluster_zero_cursors_included_in_view() {
1421        let p = parse("persisted>=2").unwrap();
1422        // Both replicas just handshook at seq 0, primary also at 0
1423        // (fresh cluster, no events yet). View = 3 nodes; the clause's
1424        // count (=2) is met by the view size, so the policy is not
1425        // degraded and the gate sits at the 2nd-largest persisted = 0.
1426        let m = metrics((0, 0), (0, 0));
1427        let a = both_active();
1428        let r = evaluate_durability(&p, 0, Some(&m), Some(&a));
1429        assert_eq!(r.durable_pos, 0);
1430        assert!(
1431            !r.degraded,
1432            "all 3 nodes present, view meets clause target — should not flag degraded"
1433        );
1434    }
1435
1436    #[test]
1437    fn attribution_min_takes_smaller_when_both_connected() {
1438        let m = metrics((150, 100), (180, 80));
1439        let a = both_active();
1440        assert_eq!(connected_persisted_min(Some(&m), Some(&a)), 80);
1441    }
1442
1443    // -- Race-window regression tests --
1444    //
1445    // The replication senders fix two memory-ordering issues at the
1446    // active-flag transition points:
1447    //
1448    //   B1 (`a84540a`): seed `metrics.{acked,in_memory}_sequence[i]`
1449    //   to `handshake.last_sequence` BEFORE setting active_flag=true
1450    //   on reconnect. Without this, the gate would observe (active=
1451    //   true, cursor=0) for ~1 RTT after a replica catch-up completed,
1452    //   freezing the gate on a degrade-friendly clause.
1453    //
1454    //   B2 (`8888732`): zero `metrics.{acked,in_memory}_sequence[i]`
1455    //   BEFORE setting active_flag=false on disconnect. Without this,
1456    //   a weak-memory reader could observe (active=true, cursor=0)
1457    //   for one iteration during the disconnect window.
1458    //
1459    // Both fixes are in the senders, but the gate's *behaviour* under
1460    // the race-window inputs is tested here. The intent is to lock in
1461    // the invariant: even under a hypothetical (active=true,cursor=0)
1462    // observation, the gate must not produce a spuriously-open answer
1463    // that would cause a client to be told "your event is durable"
1464    // when it isn't. Stalling-briefly is safe; opening-spuriously is
1465    // not.
1466
1467    #[test]
1468    fn race_b1_post_seed_gate_doesnt_freeze_on_reconnect() {
1469        // Post-B1-fix state: replica reconnected, cursors seeded to
1470        // `handshake.last_sequence` (480) before active flipped to
1471        // true. Primary kept moving and is at 500. The gate's view
1472        // is now [primary=500, slot=480]; the durable position dips
1473        // from 500 (primary alone, degraded) to 480 (both nodes).
1474        //
1475        // The dip is correct, not a bug: once a 2nd node is
1476        // connected, durability is bounded by the slower of the two.
1477        // Events 481-500 were already served as durable on primary
1478        // alone — they aren't unsent. New responses for seq>500 wait
1479        // until slot acks; we just don't freeze at 0.
1480        let p = parse("persisted>=2").unwrap();
1481        let m = metrics((480, 480), (999, 999));
1482        let a = flags(true, false);
1483        let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1484        assert_eq!(
1485            r.durable_pos, 480,
1486            "post-seed reconnect should produce a coherent gate position equal to the slower node, not freeze at 0"
1487        );
1488    }
1489
1490    #[test]
1491    fn race_b1_pre_seed_freeze_is_what_the_fix_avoids() {
1492        // Pre-B1-fix state: cursors at 0, active=true. The gate sees
1493        // [primary=500, slot=[0,0]] and 2nd-largest persisted = 0.
1494        // The gate WOULD freeze at 0. This test documents the bug
1495        // the seeding fix is designed to avoid; the senders ensure
1496        // this state is never observed in production.
1497        let p = parse("persisted>=2").unwrap();
1498        let m = metrics((0, 0), (999, 999));
1499        let a = flags(true, false);
1500        let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1501        assert_eq!(
1502            r.durable_pos, 0,
1503            "the gate behaviour under (active=true, cursor=0) — if the senders ever fail to seed before flipping active, this is the freeze the operator would see"
1504        );
1505    }
1506
1507    #[test]
1508    fn race_b2_disconnect_window_doesnt_open_gate_spuriously() {
1509        // Simulates the B2 race window: a weak-memory reader observes
1510        // (active=true, cursor=0) for one iteration during the
1511        // disconnect transition. The slot legitimately has cursor=0
1512        // because the disconnect handler just zeroed the metrics.
1513        //
1514        // Critical invariant: the gate must NOT produce a higher
1515        // durable_pos than it would with the slot correctly excluded.
1516        // Specifically: with primary at 500, slot stale-zero-included,
1517        // the gate must not "see" the primary alone and open at 500
1518        // — that would let a client be told a seq is durable when
1519        // only the primary has it under a `persisted>=2` policy that
1520        // demands 2 nodes.
1521        let p = parse("persisted>=2").unwrap();
1522        let m = metrics((0, 0), (999, 999));
1523        let a = flags(true, false);
1524        let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1525        // 2nd-largest persisted across {primary=500, slot=0} = 0.
1526        // Gate stalls. ✓
1527        assert_eq!(r.durable_pos, 0);
1528
1529        // Post-disconnect (active=false): view shrinks to {primary}.
1530        // `persisted>=2` is structurally unsatisfiable on a 1-node
1531        // view, so the gate stays at 0 AND surfaces degraded. The
1532        // matching stage's `replicas_connected==0` halt is what stops
1533        // accepting new orders; the gate side's job is just to keep
1534        // the existing in-flight orders stalled and the alert lit.
1535        let a_disconnected = flags(false, false);
1536        let r_after = evaluate_durability(&p, 500, Some(&m), Some(&a_disconnected));
1537        assert_eq!(r_after.durable_pos, 0);
1538        assert!(
1539            r_after.degraded,
1540            "post-disconnect view of size 1 cannot meet persisted>=2 → degraded"
1541        );
1542    }
1543
1544    #[test]
1545    fn race_invariant_zero_cursor_never_opens_gate_above_slower_node() {
1546        // Property under both B1 and B2 race windows: for any slot
1547        // observed at cursor=0 with active=true, the gate cannot
1548        // produce a durable_pos that exceeds what an honest 2-node
1549        // evaluation would give. Spot-check a handful of primary
1550        // positions to lock the invariant.
1551        let p = parse("persisted>=2").unwrap();
1552        let m = metrics((0, 0), (999, 999));
1553        let a = flags(true, false);
1554        for primary_pos in [0, 1, 100, 500, 1_000_000_000_u64] {
1555            let r = evaluate_durability(&p, primary_pos, Some(&m), Some(&a));
1556            // 2nd-largest of {primary_pos, 0} = 0 for any primary > 0.
1557            // For primary_pos = 0, also 0. So always 0.
1558            assert_eq!(
1559                r.durable_pos, 0,
1560                "race-window observation must not open the gate above 0 for any primary position (got {} for primary_pos={primary_pos})",
1561                r.durable_pos
1562            );
1563        }
1564    }
1565
1566    // ------------------------------------------------------------------
1567    // GateCrossTracker — per-cursor "first transition from below to
1568    // crossed" inside the gate loop, used by the journal-wait /
1569    // replica-wait histograms.
1570    // ------------------------------------------------------------------
1571
1572    #[cfg(feature = "tick-to-trade")]
1573    #[test]
1574    fn gate_cross_tracker_records_journal_when_strictly_below() {
1575        // Journal starts at 5 (< 10), repl_min already at 100.
1576        // Journal crosses on the second observation. Replica was already
1577        // past at entry, so no replica sample.
1578        let mut t = GateCrossTracker::new(10);
1579        t.observe(5, 100, 1_000);
1580        t.observe(15, 100, 2_000);
1581        assert_eq!(t.journal_crossed(), Some(2_000));
1582        assert_eq!(t.replica_crossed(), None);
1583    }
1584
1585    #[cfg(feature = "tick-to-trade")]
1586    #[test]
1587    fn gate_cross_tracker_records_replica_when_strictly_below() {
1588        // Mirror image: journal already past, replica below at entry.
1589        let mut t = GateCrossTracker::new(10);
1590        t.observe(50, 5, 1_000);
1591        t.observe(50, 12, 2_000);
1592        assert_eq!(t.journal_crossed(), None);
1593        assert_eq!(t.replica_crossed(), Some(2_000));
1594    }
1595
1596    #[cfg(feature = "tick-to-trade")]
1597    #[test]
1598    fn gate_cross_tracker_records_both_when_both_below() {
1599        // Both below at entry, both cross independently.
1600        let mut t = GateCrossTracker::new(100);
1601        t.observe(50, 60, 1_000); // both below
1602        t.observe(105, 60, 2_000); // journal crosses
1603        t.observe(105, 110, 3_000); // replica crosses
1604        assert_eq!(t.journal_crossed(), Some(2_000));
1605        assert_eq!(t.replica_crossed(), Some(3_000));
1606    }
1607
1608    #[cfg(feature = "tick-to-trade")]
1609    #[test]
1610    fn gate_cross_tracker_skips_cursor_already_past_at_entry() {
1611        // Both cursors already >= needed at first observation —
1612        // neither was on the critical path. No samples.
1613        let mut t = GateCrossTracker::new(10);
1614        t.observe(50, 100, 1_000);
1615        // Even later observations don't backfill: was_below is sticky.
1616        t.observe(60, 110, 2_000);
1617        assert_eq!(t.journal_crossed(), None);
1618        assert_eq!(t.replica_crossed(), None);
1619    }
1620
1621    #[cfg(feature = "tick-to-trade")]
1622    #[test]
1623    fn gate_cross_tracker_first_observation_only_for_cross_decision() {
1624        // A cursor that goes back below `needed` after first iteration
1625        // (impossible in practice — cursors are monotonic — but we
1626        // verify the first-iteration snapshot is what gates the
1627        // sample). Journal: 50 < 10 false → was_below=false → no sample.
1628        let mut t = GateCrossTracker::new(10);
1629        t.observe(50, 5, 1_000); // journal already past, replica below
1630        t.observe(20, 12, 2_000); // both >= needed now
1631        // Journal: was_below=false at entry → still no sample.
1632        assert_eq!(t.journal_crossed(), None);
1633        // Replica: was_below=true at entry, crosses on iter 2 → sample.
1634        assert_eq!(t.replica_crossed(), Some(2_000));
1635    }
1636
1637    #[cfg(feature = "tick-to-trade")]
1638    #[test]
1639    fn gate_cross_tracker_holds_first_cross_only() {
1640        // Once a cross is recorded, later observations don't
1641        // overwrite — the metric is "when did it first cross", not
1642        // "when was it last below".
1643        let mut t = GateCrossTracker::new(10);
1644        t.observe(5, 100, 1_000);
1645        t.observe(15, 100, 2_000); // first cross
1646        t.observe(25, 100, 3_000); // would otherwise re-record
1647        assert_eq!(t.journal_crossed(), Some(2_000));
1648    }
1649
1650    // -- DegradationLogger flap-suppression --
1651    //
1652    // The logger gates transition logs on a sustained-state hold so a
1653    // replica flapping at sub-second cadence doesn't spam the journal.
1654    // These tests don't observe the logs themselves (tracing is
1655    // process-global and brittle to capture in unit tests); they
1656    // assert the underlying state machine via the `policy_degraded`
1657    // gauge, which the logger updates on every tick regardless of
1658    // log emission.
1659
1660    fn logger_test_policy() -> crate::durability_policy::Policy {
1661        parse("persisted>=2").unwrap()
1662    }
1663
1664    /// Tick the logger N times at `step` intervals, alternating
1665    /// `degraded` per call. Returns the gauge value after the last
1666    /// tick — useful for asserting that flap cycles don't leak the
1667    /// AtomicBool into a wrong terminal state.
1668    fn drive_logger(
1669        logger: &mut DegradationLogger,
1670        utilization: &StageUtilization,
1671        policy: &crate::durability_policy::Policy,
1672        start: Instant,
1673        states: &[bool],
1674        step: Duration,
1675    ) -> bool {
1676        for (i, &state) in states.iter().enumerate() {
1677            let now = start + step.checked_mul(i as u32).unwrap_or(Duration::ZERO);
1678            logger.tick(policy, utilization, state, now, Duration::from_secs(5));
1679        }
1680        utilization.policy_degraded.load(Ordering::Relaxed)
1681    }
1682
1683    #[test]
1684    fn logger_gauge_tracks_state_immediately() {
1685        // The /healthz gauge reflects the *latest* state on every
1686        // tick — this is what dashboards / alerts read. Sustained-
1687        // state gating only affects the warn/info log emission.
1688        let p = logger_test_policy();
1689        let utilization = StageUtilization::new();
1690        let now = Instant::now();
1691        let mut logger = DegradationLogger::new(now);
1692
1693        logger.tick(&p, &utilization, true, now, Duration::from_secs(5));
1694        assert!(utilization.policy_degraded.load(Ordering::Relaxed));
1695
1696        logger.tick(
1697            &p,
1698            &utilization,
1699            false,
1700            now + Duration::from_millis(50),
1701            Duration::from_secs(5),
1702        );
1703        assert!(!utilization.policy_degraded.load(Ordering::Relaxed));
1704    }
1705
1706    #[test]
1707    fn logger_starting_degraded_marks_initial_state_logged() {
1708        // `new_starting_degraded` emits the startup warn and treats
1709        // the state as already-logged so the next tick at the same
1710        // cluster shape doesn't re-emit instantly. The gauge starts
1711        // at 1.
1712        let p = logger_test_policy();
1713        let utilization = StageUtilization::new();
1714        let now = Instant::now();
1715        let mut logger = DegradationLogger::new_starting_degraded(now, &p);
1716        // The logger doesn't write the gauge from the constructor —
1717        // first tick does. Tick at the same state to settle the
1718        // gauge. No new log line should fire (state hasn't changed).
1719        logger.tick(
1720            &p,
1721            &utilization,
1722            true,
1723            now + Duration::from_millis(10),
1724            Duration::from_secs(5),
1725        );
1726        assert!(utilization.policy_degraded.load(Ordering::Relaxed));
1727    }
1728
1729    #[test]
1730    fn logger_handles_rapid_flap_without_panic() {
1731        // Drive the logger through 100 alternating flips at 100ms
1732        // each (faster than the 1s flap-hold). The state machine
1733        // must remain coherent — no panics, gauge tracks final
1734        // state, and `pending_logged` doesn't get stuck.
1735        let p = logger_test_policy();
1736        let utilization = StageUtilization::new();
1737        let now = Instant::now();
1738        let mut logger = DegradationLogger::new(now);
1739        let states: Vec<bool> = (0..100u32).map(|i| i.is_multiple_of(2)).collect();
1740        let final_state = drive_logger(
1741            &mut logger,
1742            &utilization,
1743            &p,
1744            now,
1745            &states,
1746            Duration::from_millis(100),
1747        );
1748        // 100 states starting at i=0 → final state is i=99 → odd → false.
1749        assert!(!final_state);
1750    }
1751
1752    #[test]
1753    fn logger_sustained_degraded_eventually_settles() {
1754        // After a sustained-true state, the logger should be in the
1755        // "logged the onset" mode. Drive 5 ticks of degraded=true at
1756        // 500ms intervals — total 2s, well past the 1s flap-hold.
1757        // Last tick should leave gauge=1 and the heartbeat re-emit
1758        // window primed (last_log set).
1759        let p = logger_test_policy();
1760        let utilization = StageUtilization::new();
1761        let now = Instant::now();
1762        let mut logger = DegradationLogger::new(now);
1763        let final_state = drive_logger(
1764            &mut logger,
1765            &utilization,
1766            &p,
1767            now,
1768            &[true; 5],
1769            Duration::from_millis(500),
1770        );
1771        assert!(final_state);
1772    }
1773
1774    #[test]
1775    fn logger_recovery_to_healthy_settles_gauge() {
1776        // Sustained degraded → sustained healthy. Gauge should end at 0.
1777        let p = logger_test_policy();
1778        let utilization = StageUtilization::new();
1779        let now = Instant::now();
1780        let mut logger = DegradationLogger::new_starting_degraded(now, &p);
1781        let mut states = vec![true; 5]; // 2.5s degraded
1782        states.extend(vec![false; 5]); // 2.5s healthy
1783        let final_state = drive_logger(
1784            &mut logger,
1785            &utilization,
1786            &p,
1787            now,
1788            &states,
1789            Duration::from_millis(500),
1790        );
1791        assert!(!final_state);
1792    }
1793}