melin_server_runtime/response.rs
1//! io_uring-based response stage — routes matching output to connections via
2//! `IORING_OP_SEND`.
3//!
4//! Replaces the blocking `write(2)` + `BufWriter` flush path with batched
5//! io_uring sends. Instead of N `write(2)` syscalls (one per dirty connection
6//! on flush), we submit N SEND SQEs in a single `io_uring_enter` call.
7//!
8//! Same SPSC consumption and journal cursor gating as `response.rs`.
9//! Runs on a dedicated OS thread.
10
11use std::collections::{HashMap, HashSet};
12use std::os::unix::io::RawFd;
13use std::sync::Arc;
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::mpsc;
16use std::time::{Duration, Instant};
17
18use io_uring::{IoUring, opcode, types};
19use tracing::{debug, error};
20
21use melin_pipeline::ring;
22
23use crate::durability_policy::{CursorView, DurabilityMode, EvalStatus, Policy};
24use crate::replication::ReplicationMetrics;
25use melin_app::Application;
26use melin_transport_core::pipeline::{OutputPayload, OutputSlot, StageUtilization};
27#[cfg(feature = "latency-trace")]
28use melin_transport_core::trace;
29
30use melin_wire_protocol::control::TransportResponse;
31use melin_wire_protocol::control_codec;
32
33/// Maximum number of output slots consumed per batch.
34const MAX_BATCH: usize = 1024;
35
36/// Maximum encoded response size. PositionSnapshot is the largest variant
37/// at up to 330 bytes (length(4) + tag(1) + account(4) + count(1) +
38/// 16*(currency(4)+free(8)+reserved(8))). 512 bytes covers all variants.
39const MAX_RESPONSE_BUF: usize = 512;
40
41/// io_uring submission queue depth for sends. Must be ≥ max concurrent
42/// connections to avoid SQ overflow when all connections are dirty.
43/// Power of 2 for io_uring alignment. 4096 supports 1024+ client
44/// benchmarks where all connections flush simultaneously.
45const RING_SIZE: u32 = 4096;
46
47/// Maximum accumulated send buffer per connection (64 KiB). If a client
48/// falls behind and the buffer exceeds this, the connection is dropped.
49/// 64 KiB holds ~500 response frames — well beyond any reasonable lag.
50const MAX_SEND_BUF: usize = 64 * 1024;
51
52pub use crate::ControlEvent;
53
54/// Encoder type alias: response encoder bound to the application's
55/// `Report` / `QueryResponse` types. Hides the long
56/// `dyn ResponseEncoder<Report = ..., Query = ...>` at call sites.
57pub type ResponseEncoderArc<A> = Arc<
58 dyn melin_app::encoder::ResponseEncoder<
59 Report = <A as Application>::Report,
60 Query = <A as Application>::QueryResponse,
61 >,
62>;
63
64/// Configuration and shared state for the response stage.
65pub struct Response<A: Application> {
66 /// Highest wire seq durably persisted on the primary's journal.
67 /// In the same sequence space as `OutputSlot.wire_seq` and the
68 /// replica metrics (`metrics.in_memory_sequence` /
69 /// `metrics.acked_sequence`), so the durability gate can compare
70 /// these values numerically and the comparison is meaningful
71 /// regardless of `starting_sequence` (fresh vs recovered primary).
72 /// Updated by the journal stage after every fsync batch via
73 /// `set_last_seq_publisher`.
74 pub journal_persisted_wire_seq: Arc<AtomicU64>,
75 /// Operator-selected durability mode, published through a shared
76 /// [`AtomicU8`] so the admin `DURABILITY` command can swap it at
77 /// runtime without restarting the node. The response stage reads
78 /// this once per gate iteration with a relaxed load (cheaper than a
79 /// `Mutex` or refcounted `Arc<Policy>` snapshot) and rebuilds its
80 /// local [`Policy`] when the byte changes. See
81 /// [`crate::durability_policy::DurabilityMode::as_u8`] for the
82 /// encoding.
83 pub durability_mode: Arc<std::sync::atomic::AtomicU8>,
84 /// Per-slot replica cursors. `None` for standalone deployments
85 /// (no replication wiring) — the policy then evaluates against the
86 /// primary alone.
87 pub replication_metrics: Option<Arc<ReplicationMetrics>>,
88 /// Per-slot replica active flags. Only "true" slots are included in
89 /// the cursor view fed to `Policy::evaluate`, so disconnected slots
90 /// don't pollute the view with stale zero cursors. When the
91 /// resulting view is too small to satisfy a clause, the policy
92 /// reports degraded and the gate stalls.
93 /// Mirrors `replication_metrics` — `None` in standalone.
94 pub replica_active: Option<[Arc<AtomicBool>; 2]>,
95 pub heartbeat_interval: Option<Duration>,
96 pub busy_spin: bool,
97 pub utilization: Arc<StageUtilization>,
98 /// Wire encoder for application-shaped payloads. Constructed
99 /// once at boot (`Arc::new(ExchangeResponseEncoder)`) and shared
100 /// with the DPDK response stage.
101 pub encoder: ResponseEncoderArc<A>,
102}
103
104/// Per-connection state for batched io_uring sends.
105struct ConnectionEntry {
106 fd: RawFd,
107 /// Owns the write half of the socket to keep the fd alive.
108 _owner: Box<dyn Send>,
109 /// Accumulates encoded response frames between flushes.
110 /// The full wire frame (length prefix + payload) is appended here.
111 /// Vec's internal data pointer is heap-stable, so io_uring SEND SQEs
112 /// referencing `as_ptr()` remain valid even if the HashMap relocates
113 /// this struct — as long as we don't reallocate the Vec during in-flight sends.
114 send_buf: Vec<u8>,
115 /// Last time data was sent to this connection. Used for heartbeat scheduling.
116 last_send: Instant,
117}
118
119/// Run the io_uring response stage loop. Blocks the calling thread until shutdown.
120///
121/// Consumes from the output SPSC, waits for durability confirmation, and
122/// sends responses via io_uring SEND.
123///
124/// Durability gating: every gate iteration reads the journal cursor
125/// (primary persisted) plus per-slot replica cursors (in-memory and
126/// persisted) from `replication_metrics` and feeds them through the
127/// configured [`Policy`]. See [`evaluate_durability`].
128pub fn run<A: Application>(
129 mut consumer: ring::Consumer<OutputSlot<A::Report, A::QueryResponse>>,
130 control_rx: mpsc::Receiver<ControlEvent>,
131 config: Response<A>,
132 shutdown: &AtomicBool,
133) {
134 let Response {
135 journal_persisted_wire_seq,
136 durability_mode,
137 replication_metrics,
138 replica_active,
139 heartbeat_interval,
140 busy_spin,
141 utilization,
142 encoder,
143 } = config;
144 // Resolve the starting mode from the shared atomic and derive the
145 // local Policy. The atomic is the single source of truth across the
146 // process lifetime; the response thread keeps a thread-local copy
147 // for cheap per-iteration use and rebuilds it when an admin
148 // `DURABILITY` command swaps the atomic. Initialise as Hybrid (the
149 // default mode) if the atomic ever holds a corrupted byte — better
150 // than panicking on a degraded process and matches the default
151 // operators see at boot.
152 let mut active_mode =
153 DurabilityMode::from_u8(durability_mode.load(std::sync::atomic::Ordering::Relaxed))
154 .unwrap_or_else(|| {
155 tracing::error!(
156 "durability_mode atomic held a corrupted byte at startup; defaulting to hybrid"
157 );
158 DurabilityMode::Hybrid
159 });
160 let mut policy = active_mode.to_policy();
161 let mut ring =
162 IoUring::new(RING_SIZE).expect("failed to create io_uring instance for response stage");
163
164 // Connection table: maps connection IDs to their state.
165 // HashMap for O(1) lookup. Pre-sized for a reasonable number of concurrent clients.
166 let mut connections: HashMap<u64, ConnectionEntry> = HashMap::with_capacity(256);
167
168 let mut batch = [OutputSlot::<A::Report, A::QueryResponse>::default(); MAX_BATCH];
169 let mut encode_buf = [0u8; MAX_RESPONSE_BUF];
170
171 // Cached durability position to avoid atomic reads on every slot.
172 // Initialised below from the policy's startup evaluation; updated
173 // via `evaluate_durability` on every gate iteration.
174 let mut cached_durable_pos: u64;
175
176 // Degradation logger. Tracks transitions, suppresses sub-second
177 // flap noise, and drives the `/healthz` `policy_degraded` gauge.
178 // See `DegradationLogger` for the full state machine. Initialised
179 // below from the policy's startup evaluation so an unsatisfiable
180 // policy (e.g. a primary that just lost both replicas while
181 // running `hybrid` or `durably-replicated`) is visible immediately
182 // on `/healthz` and in the journal.
183 let startup_now = Instant::now();
184 let mut last_policy_check = startup_now;
185 /// Re-emit interval for the "still degraded" reminder.
186 const DEGRADED_LOG_INTERVAL: Duration = Duration::from_secs(5);
187 /// Cadence at which the idle path re-evaluates the policy. Bounds
188 /// the lag between a connection-state change and the `/healthz`
189 /// gauge / warn-log reflecting it. Cheap (a handful of atomic
190 /// loads + the policy evaluator) at this rate.
191 const POLICY_CHECK_INTERVAL: Duration = Duration::from_secs(1);
192
193 // Initial evaluation so the cached durable position and the
194 // `/healthz` gauge reflect the cluster's startup shape before
195 // the first batch arrives.
196 let mut degraded_logger;
197 {
198 let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
199 let metrics_ref = replication_metrics.as_deref();
200 let active_ref = replica_active.as_ref();
201 let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
202 cached_durable_pos = status.durable_pos;
203 utilization
204 .policy_degraded
205 .store(status.degraded, Ordering::Relaxed);
206 degraded_logger = if status.degraded {
207 DegradationLogger::new_starting_degraded(startup_now, &policy)
208 } else {
209 DegradationLogger::new(startup_now)
210 };
211 }
212
213 // Stage histograms registered with the global registry — see
214 // `melin_transport_core::trace`. The four breakdown stages
215 // (journal-wait, replica-wait, encode, egress) feed the bench's
216 // tick-to-trade decomposition; spsc/dispatch/server-e2e are kept
217 // alongside as overall sanity checks.
218 #[cfg(feature = "latency-trace")]
219 let mut spsc_rec =
220 trace::register_stage("response: SPSC wakeup (matching publish → response consume)");
221 #[cfg(feature = "latency-trace")]
222 let mut dispatch_rec = trace::register_stage("response: dispatch (consume → socket write)");
223 #[cfg(feature = "latency-trace")]
224 let mut server_e2e_rec = trace::register_stage("server e2e (reader recv → response flush)");
225 // Tick-to-trade breakdown: per-slot wait observed for each
226 // durability path (recorded only when the gate actually held us
227 // up — cache-hit paths skip to avoid inflating the metric with
228 // crossings that happened before we noticed). Encode is wall-time
229 // around `encode_transport_response`. Egress wraps a `flush_sends`
230 // call (one sample per io_uring flush, batching many slots).
231 // Gated on `tick-to-trade`, not `latency-trace`, because these
232 // stages roughly double the hot-path mutex traffic vs the lighter
233 // 4-stage mode.
234 #[cfg(feature = "tick-to-trade")]
235 let mut journal_wait_rec =
236 trace::register_stage("response: journal-wait (match_complete → journal cursor crossed)");
237 #[cfg(feature = "tick-to-trade")]
238 let mut replica_wait_rec = trace::register_stage(
239 "response: replica-wait (match_complete → replication cursor crossed)",
240 );
241 #[cfg(feature = "tick-to-trade")]
242 let mut encode_rec = trace::register_stage("response: encode (per-kind wire encoding)");
243 #[cfg(feature = "tick-to-trade")]
244 let mut egress_rec = trace::register_stage("response: egress (flush_sends elapsed)");
245
246 // Track connections with buffered (unflushed) writes across batches.
247 let mut dirty_connections: HashSet<u64> = HashSet::new();
248
249 // Connections to remove after flush (send errors).
250 let mut to_remove: Vec<u64> = Vec::new();
251
252 // Pre-allocated CQE collection buffer. Must collect CQEs before
253 // processing because the CQ borrow must end before mutating connections.
254 // Pre-sized to RING_SIZE to avoid per-iteration heap allocation.
255 let mut cqes: Vec<(u64, i32)> = Vec::with_capacity(RING_SIZE as usize);
256
257 // Pre-encode the heartbeat response frame once. Full wire frame
258 // (length prefix + tag) for direct append to send_buf.
259 let heartbeat_wire_frame = {
260 let mut buf = [0u8; 8];
261 let written =
262 control_codec::encode_transport_response(&TransportResponse::Heartbeat, &mut buf)
263 .expect("heartbeat encodes");
264 buf[..written].to_vec()
265 };
266
267 // Coarse timestamp for heartbeat scan — avoids Instant::now() on every spin.
268 let mut last_heartbeat_scan = Instant::now();
269
270 // Adaptive spin: spin first (fast wakeup), yield after threshold.
271 let mut idle_spins: u32 = 0;
272
273 let mut busy_count: u64 = 0;
274 let mut idle_count: u64 = 0;
275
276 loop {
277 // Observe runtime mode swaps from the admin `DURABILITY`
278 // command. Relaxed load (single writer is the admin handler,
279 // single reader is this thread). When the byte changes,
280 // rebuild the local Policy and reset the cached durable
281 // position so the next gate evaluation starts from a clean
282 // slate under the new shape; log the transition for the audit
283 // trail. An unknown byte is treated as memory corruption: we
284 // log and keep the prior mode rather than silently downgrading.
285 let observed_byte = durability_mode.load(Ordering::Relaxed);
286 if observed_byte != active_mode.as_u8() {
287 match DurabilityMode::from_u8(observed_byte) {
288 Some(next) => {
289 tracing::info!(
290 prev = active_mode.as_str(),
291 next = next.as_str(),
292 "durability mode swapped at runtime"
293 );
294 active_mode = next;
295 policy = active_mode.to_policy();
296 // The fresh policy may evaluate degraded/undegraded
297 // differently against the same cluster shape; let
298 // the next gate evaluation re-derive.
299 cached_durable_pos = 0;
300 // Re-seed the degradation logger so a transition
301 // out of (or into) degraded under the new policy
302 // surfaces immediately rather than waiting for the
303 // sustained-state hold to roll over.
304 degraded_logger = DegradationLogger::new(Instant::now());
305 }
306 None => {
307 tracing::error!(
308 byte = observed_byte,
309 "durability_mode atomic held a corrupted byte; retaining prior mode"
310 );
311 }
312 }
313 }
314
315 if shutdown.load(Ordering::Relaxed) {
316 // Best-effort flush before shutdown.
317 if !dirty_connections.is_empty() {
318 flush_sends(
319 &mut ring,
320 &mut connections,
321 &dirty_connections,
322 &mut to_remove,
323 &mut cqes,
324 );
325 dirty_connections.clear();
326 }
327 utilization.busy.store(busy_count, Ordering::Relaxed);
328 utilization.idle.store(idle_count, Ordering::Relaxed);
329 #[cfg(feature = "pipeline-stats")]
330 print_utilization("response", busy_count, idle_count);
331 return;
332 }
333
334 // Poll control channel (non-blocking) for connect/disconnect.
335 while let Ok(event) = control_rx.try_recv() {
336 match event {
337 ControlEvent::Connected {
338 connection_id,
339 fd,
340 writer,
341 } => {
342 // The writer keeps the fd alive — store it as the owner.
343 let owner: Box<dyn Send> = Box::new(writer);
344 connections.insert(
345 connection_id,
346 ConnectionEntry {
347 fd,
348 _owner: owner,
349 send_buf: Vec::with_capacity(4096),
350 last_send: Instant::now(),
351 },
352 );
353 }
354 ControlEvent::Disconnected { connection_id } => {
355 connections.remove(&connection_id);
356 dirty_connections.remove(&connection_id);
357 }
358 }
359 }
360
361 // Consume output slots from matching stage.
362 let count = consumer.consume_batch(&mut batch, MAX_BATCH);
363 if count == 0 {
364 // SPSC is empty — flush all dirty connections via io_uring.
365 // This is the response-data egress path; heartbeat flushes
366 // below aren't sampled because they're admin traffic, not
367 // on the client RTT path.
368 if !dirty_connections.is_empty() {
369 #[cfg(feature = "tick-to-trade")]
370 let egress_start = trace::mono_trace_ns();
371 flush_sends(
372 &mut ring,
373 &mut connections,
374 &dirty_connections,
375 &mut to_remove,
376 &mut cqes,
377 );
378 #[cfg(feature = "tick-to-trade")]
379 egress_rec.record_elapsed(egress_start, trace::mono_trace_ns());
380 for conn_id in to_remove.drain(..) {
381 connections.remove(&conn_id);
382 }
383 dirty_connections.clear();
384 }
385
386 // Send heartbeats to idle connections. Only checked during
387 // idle periods (SPSC empty) to avoid overhead on the hot path.
388 if let Some(interval) = heartbeat_interval {
389 let now = Instant::now();
390 // Coarse gate: only scan at most once per second.
391 if now.duration_since(last_heartbeat_scan) >= Duration::from_secs(1) {
392 last_heartbeat_scan = now;
393 for (&conn_id, entry) in connections.iter_mut() {
394 if now.duration_since(entry.last_send) >= interval {
395 entry.send_buf.extend_from_slice(&heartbeat_wire_frame);
396 dirty_connections.insert(conn_id);
397 entry.last_send = now;
398 }
399 }
400 // Flush the heartbeat sends immediately.
401 if !dirty_connections.is_empty() {
402 flush_sends(
403 &mut ring,
404 &mut connections,
405 &dirty_connections,
406 &mut to_remove,
407 &mut cqes,
408 );
409 for conn_id in to_remove.drain(..) {
410 connections.remove(&conn_id);
411 }
412 dirty_connections.clear();
413 }
414 }
415 }
416
417 // Re-evaluate the durability policy on a slow timer so the
418 // `policy_degraded` flag and the periodic warn track the
419 // cluster's real state even on idle / quiet venues. The
420 // gate-open block also calls `update_degraded_state` after
421 // each consumed batch; this is the equivalent for the
422 // no-batch path.
423 {
424 let now_ts = Instant::now();
425 if now_ts.duration_since(last_policy_check) >= POLICY_CHECK_INTERVAL {
426 last_policy_check = now_ts;
427 let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
428 let metrics_ref = replication_metrics.as_deref();
429 let active_ref = replica_active.as_ref();
430 let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
431 degraded_logger.tick(
432 &policy,
433 &utilization,
434 status.degraded,
435 now_ts,
436 DEGRADED_LOG_INTERVAL,
437 );
438 // Cache the position so the next batch's gate sees a
439 // fresh value rather than spinning from a stale cache.
440 cached_durable_pos = status.durable_pos;
441 }
442 }
443
444 idle_count += 1;
445 if idle_count.is_multiple_of(1024) {
446 utilization.busy.store(busy_count, Ordering::Relaxed);
447 utilization.idle.store(idle_count, Ordering::Relaxed);
448 }
449 if busy_spin || idle_spins < 1000 {
450 idle_spins = idle_spins.wrapping_add(1);
451 std::hint::spin_loop();
452 } else {
453 std::thread::yield_now();
454 }
455 continue;
456 }
457 idle_spins = 0;
458 busy_count += 1;
459
460 #[cfg(feature = "latency-trace")]
461 let consume_ts = trace::mono_trace_ns();
462
463 // Wait for durability confirmation before sending responses.
464 //
465 // Each iteration: read the primary journal cursor + per-slot
466 // replica cursors (both in-memory and persisted), build a
467 // `CursorView`, and evaluate the configured policy. Spin until
468 // the durable position catches up to the batch's max input_seq.
469 //
470 // Per-slot journal-wait / replica-wait tracker. See
471 // `GateCrossTracker` for the rationale (only records cursors
472 // that were actually on the critical path). Attribution uses
473 // `repl_min` = min of connected-replica persisted cursors so
474 // operators see "which subsystem to optimize" the same way as
475 // before the policy refactor.
476 #[cfg(feature = "tick-to-trade")]
477 let mut gate_tracker;
478 {
479 // Gate on `wire_seq`, not `input_seq`. `input_seq` is in
480 // local-consumer space (the matching cursor on the input
481 // ring, starts at 0 in this process) while replica metrics
482 // and the primary's `journal_persisted_wire_seq` live in
483 // wire-seq space (allocated by the journal stage starting
484 // at `starting_sequence`). A `needed` derived from
485 // `input_seq` and compared against wire-seq cursors only
486 // works when `starting_sequence == 1`; a recovered primary
487 // (or any process whose journal already has prior content
488 // pushing `starting_sequence` above 1) would silently open
489 // the gate ahead of the replica's actual replicated state.
490 //
491 // Every cursor in the policy view (`journal_persisted_wire_seq`,
492 // `metrics.in_memory_sequence`, `metrics.acked_sequence`)
493 // carries "highest wire seq known to be in that state on
494 // node X". A batch's `needed` is therefore the *exact*
495 // wire seq the gate must see — not `+1` — for the latest
496 // event in the batch to be considered durable. The legacy
497 // `+1` was load-bearing only because `input_seq` was off
498 // by `starting_sequence - 1` from wire seq; with the
499 // wire-seq stamp it would over-shoot by one event and
500 // make the gate stall an extra round-trip per response.
501 let needed = batch[..count]
502 .iter()
503 .map(|s| s.wire_seq)
504 .max()
505 .expect("non-empty batch");
506 #[cfg(feature = "tick-to-trade")]
507 {
508 gate_tracker = GateCrossTracker::new(needed);
509 }
510 // Durability-gate carve-out for halt-state output. Slots
511 // tagged `durability_bypass = true` at emission carry no
512 // engine state worth replicating before delivery — see
513 // `OutputSlot::durability_bypass` for the correctness
514 // argument. When every slot in the batch carries the flag
515 // the gate is skipped entirely, so clients receive the halt
516 // reason immediately rather than blocking on a structurally
517 // unsatisfiable policy (e.g. `Hybrid` with all replicas
518 // disconnected, which would otherwise stall the gate until
519 // peers return). If even one normal slot is present, gate
520 // the whole batch as usual — the bypass slots ride along
521 // behind the gated one, which is safe (no ordering
522 // inversion vs. a strict-gate world).
523 let needs_gate = batch[..count].iter().any(|s| !s.durability_bypass);
524 if needs_gate && cached_durable_pos < needed {
525 loop {
526 // Inside the gate-wait spin loop, also observe a
527 // mode swap. Without this, a batch whose gate
528 // becomes structurally unsatisfiable (e.g. all
529 // replicas die while a non-bypass slot is in
530 // flight under `Hybrid`) would wedge the response
531 // stage forever, even if an operator sends the
532 // remediating `DURABILITY local` — the outer loop
533 // observation never gets a chance to run. The
534 // relaxed load is ~1 cycle on x86; cheaper than
535 // the `spin_loop` hint below.
536 let observed_byte = durability_mode.load(Ordering::Relaxed);
537 if observed_byte != active_mode.as_u8()
538 && let Some(next) = DurabilityMode::from_u8(observed_byte)
539 {
540 tracing::info!(
541 prev = active_mode.as_str(),
542 next = next.as_str(),
543 "durability mode swapped during gate wait"
544 );
545 active_mode = next;
546 policy = active_mode.to_policy();
547 degraded_logger = DegradationLogger::new(Instant::now());
548 }
549
550 let journal_pos = journal_persisted_wire_seq.load(Ordering::Acquire);
551 let metrics_ref = replication_metrics.as_deref();
552 let active_ref = replica_active.as_ref();
553 let repl_min = connected_persisted_min(metrics_ref, active_ref);
554
555 #[cfg(feature = "tick-to-trade")]
556 gate_tracker.observe(journal_pos, repl_min, trace::mono_trace_ns());
557
558 let status = evaluate_durability(&policy, journal_pos, metrics_ref, active_ref);
559 cached_durable_pos = status.durable_pos;
560 utilization
561 .policy_degraded
562 .store(status.degraded, Ordering::Relaxed);
563
564 if cached_durable_pos >= needed {
565 // Attribution: which subsystem was slowest at
566 // the moment the gate opened. Relaxed is fine —
567 // health reads are infrequent.
568 if journal_pos <= repl_min {
569 utilization.gate_journal.fetch_add(1, Ordering::Relaxed);
570 } else {
571 utilization.gate_replication.fetch_add(1, Ordering::Relaxed);
572 }
573 break;
574 }
575 std::hint::spin_loop();
576 }
577 }
578 }
579
580 let batch_now = Instant::now();
581
582 // Log degradation transitions / re-emit the reminder. Same
583 // logger the idle path uses; transitions are gated on a
584 // sustained-state hold so sub-second flap doesn't spam.
585 let degraded_now = utilization.policy_degraded.load(Ordering::Relaxed);
586 degraded_logger.tick(
587 &policy,
588 &utilization,
589 degraded_now,
590 batch_now,
591 DEGRADED_LOG_INTERVAL,
592 );
593 // Bump the idle-path's check timestamp so we don't double-
594 // tick the logger when traffic stops.
595 last_policy_check = batch_now;
596
597 for slot in &batch[..count] {
598 #[cfg(feature = "latency-trace")]
599 spsc_rec.record_elapsed(slot.match_complete_ts, consume_ts);
600
601 // Per-slot durability-gate breakdown. Recorded only when
602 // the gate actually held us up (the tracker captured a
603 // cross). Note: the cross timestamp is for the *batch's*
604 // `needed` — for slots earlier in the batch, the cursor
605 // may have crossed their individual `input_seq+1` earlier,
606 // so this systematically overestimates wait for non-last
607 // slots by up to the batch's matching span. Acceptable
608 // noise for the operator-facing breakdown; documented in
609 // `docs/benchmarking.md`.
610 #[cfg(feature = "tick-to-trade")]
611 if let Some(ts) = gate_tracker.journal_crossed() {
612 journal_wait_rec.record_elapsed(slot.match_complete_ts, ts);
613 }
614 #[cfg(feature = "tick-to-trade")]
615 if let Some(ts) = gate_tracker.replica_crossed() {
616 replica_wait_rec.record_elapsed(slot.match_complete_ts, ts);
617 }
618
619 // Each slot expands to at most two wire frames: the
620 // application payload (Report / Query / EngineError) and
621 // an optional trailing `BatchEnd` when
622 // `is_last_in_request` is set. Application-shaped
623 // payloads go through the encoder; transport-shaped
624 // frames (EngineError, BatchEnd) are encoded by the
625 // runtime directly.
626 if let Some(entry) = connections.get_mut(&slot.connection_id) {
627 // Frame 1: application payload (if any). BatchEnd
628 // payloads carry no body — the terminator below
629 // handles them via `is_last_in_request`.
630 let payload_result: Option<Result<usize, &'static str>> = match slot.payload {
631 OutputPayload::Report(ref report) => {
632 Some(encoder.encode_report(report, &mut encode_buf))
633 }
634 OutputPayload::QueryResponse(ref q) => {
635 Some(encoder.encode_query(q, &mut encode_buf))
636 }
637 OutputPayload::EngineError => Some(
638 control_codec::encode_transport_response(
639 &TransportResponse::EngineError,
640 &mut encode_buf,
641 )
642 .map_err(|_| "encode error"),
643 ),
644 OutputPayload::BatchEnd => None,
645 };
646
647 let payload_handled = if let Some(result) = payload_result {
648 #[cfg(feature = "tick-to-trade")]
649 let encode_start = trace::mono_trace_ns();
650 let outcome = append_frame(
651 result,
652 slot.connection_id,
653 entry,
654 &encode_buf,
655 batch_now,
656 &mut dirty_connections,
657 &mut to_remove,
658 );
659 #[cfg(feature = "tick-to-trade")]
660 encode_rec.record_elapsed(encode_start, trace::mono_trace_ns());
661 outcome
662 } else {
663 AppendOutcome::Continue
664 };
665
666 // Frame 2: BatchEnd terminator. Skipped if the payload
667 // append dropped the connection.
668 if matches!(payload_handled, AppendOutcome::Continue) && slot.is_last_in_request {
669 let result = control_codec::encode_transport_response(
670 &TransportResponse::BatchEnd,
671 &mut encode_buf,
672 )
673 .map_err(|_| "encode error");
674 let outcome = append_frame(
675 result,
676 slot.connection_id,
677 entry,
678 &encode_buf,
679 batch_now,
680 &mut dirty_connections,
681 &mut to_remove,
682 );
683 // Record server-side end-to-end: reader recv ->
684 // response flush. Only the BatchEnd frame carries
685 // this measurement; tracked here after the append
686 // so a dropped connection doesn't skew the metric.
687 #[cfg(feature = "latency-trace")]
688 if matches!(outcome, AppendOutcome::Continue) {
689 server_e2e_rec.record_elapsed(slot.recv_ts, trace::mono_trace_ns());
690 }
691 let _ = outcome;
692 }
693 }
694 }
695
696 // Remove connections that exceeded the send buffer limit.
697 for conn_id in to_remove.drain(..) {
698 connections.remove(&conn_id);
699 dirty_connections.remove(&conn_id);
700 }
701
702 #[cfg(feature = "latency-trace")]
703 dispatch_rec.record_elapsed(consume_ts, trace::mono_trace_ns());
704 }
705}
706
707/// Submit io_uring SEND SQEs for all dirty connections and wait for completions.
708///
709/// Outcome of a single per-frame append. `Continue` means the
710/// caller may proceed to the next frame for this slot;
711/// `ConnectionDropped` means the connection's send buffer overflowed
712/// or the encode failed, and the connection has been queued for
713/// removal — no further frames should be appended for this slot.
714#[derive(Clone, Copy)]
715enum AppendOutcome {
716 Continue,
717 ConnectionDropped,
718}
719
720/// Copy an encoded frame into the connection's send buffer with
721/// overflow checking. Splits the responsibilities the inline encode
722/// loop used to have: the caller passes in the encode result (so it
723/// can come from the `ResponseEncoder` trait for application
724/// payloads, or `encode_transport_response` for transport-shaped
725/// frames), and this helper handles size accounting + dirty
726/// tracking uniformly.
727#[allow(clippy::too_many_arguments)]
728fn append_frame(
729 result: Result<usize, &'static str>,
730 connection_id: u64,
731 entry: &mut ConnectionEntry,
732 encode_buf: &[u8],
733 batch_now: Instant,
734 dirty_connections: &mut HashSet<u64>,
735 to_remove: &mut Vec<u64>,
736) -> AppendOutcome {
737 let written = match result {
738 Ok(n) => n,
739 Err(reason) => {
740 tracing::error!(connection_id, reason, "encode error");
741 return AppendOutcome::Continue;
742 }
743 };
744
745 // Drop slow clients whose send buffer has grown too large. This
746 // prevents unbounded memory growth from a single laggy connection
747 // causing allocator pressure and tail latency spikes.
748 if entry.send_buf.len() + written > MAX_SEND_BUF {
749 debug!(
750 connection_id,
751 send_buf_len = entry.send_buf.len(),
752 "send buffer exceeded limit, dropping connection"
753 );
754 to_remove.push(connection_id);
755 return AppendOutcome::ConnectionDropped;
756 }
757
758 // Append the full wire frame to the connection's send buffer.
759 // The encoder writes [length(4) | payload], which is the complete
760 // wire format — no extra framing needed.
761 entry.send_buf.extend_from_slice(&encode_buf[..written]);
762 entry.last_send = batch_now;
763 dirty_connections.insert(connection_id);
764 AppendOutcome::Continue
765}
766
767/// Each dirty connection's accumulated send buffer is sent in a single SEND
768/// operation. Partial sends are retried until all bytes are delivered.
769/// Failed connections are collected in `to_remove` for the caller to clean up.
770fn flush_sends(
771 ring: &mut IoUring,
772 connections: &mut HashMap<u64, ConnectionEntry>,
773 dirty: &HashSet<u64>,
774 to_remove: &mut Vec<u64>,
775 cqes: &mut Vec<(u64, i32)>,
776) {
777 // Submit SEND SQEs for all dirty connections.
778 let mut pending: usize = 0;
779 for &conn_id in dirty {
780 if let Some(entry) = connections.get(&conn_id) {
781 if entry.send_buf.is_empty() {
782 continue;
783 }
784 let sqe = opcode::Send::new(
785 types::Fd(entry.fd),
786 entry.send_buf.as_ptr(),
787 entry.send_buf.len() as u32,
788 )
789 .build()
790 .user_data(conn_id);
791
792 unsafe {
793 ring.submission()
794 .push(&sqe)
795 .expect("io_uring SQ full — increase RING_SIZE");
796 }
797 pending += 1;
798 }
799 }
800
801 if pending == 0 {
802 return;
803 }
804
805 // Submit and wait for all completions.
806 if let Err(e) = ring.submit_and_wait(pending) {
807 error!(error = %e, "io_uring submit_and_wait failed in response stage");
808 return;
809 }
810
811 // Drain completions into pre-allocated buffer. Must collect to
812 // release CQ borrow before mutating connections.
813 cqes.clear();
814 cqes.extend(ring.completion().map(|cqe| (cqe.user_data(), cqe.result())));
815
816 for &(conn_id, result) in cqes.iter() {
817 if result < 0 {
818 debug!(
819 connection_id = conn_id,
820 error = result,
821 "send error, dropping connection"
822 );
823 to_remove.push(conn_id);
824 continue;
825 }
826
827 let sent = result as usize;
828 if let Some(entry) = connections.get_mut(&conn_id) {
829 if sent >= entry.send_buf.len() {
830 entry.send_buf.clear();
831 } else {
832 // Partial send — drain sent bytes, retry remainder.
833 // Rare for small response frames over TCP/UDS but must
834 // be handled for correctness (e.g., send buffer pressure).
835 entry.send_buf.drain(..sent);
836 retry_send(ring, entry, conn_id, to_remove);
837 }
838 }
839 }
840}
841
842/// Retry sending remaining bytes after a partial send. Loops until the
843/// entire buffer is delivered or an error occurs.
844fn retry_send(
845 ring: &mut IoUring,
846 entry: &mut ConnectionEntry,
847 conn_id: u64,
848 to_remove: &mut Vec<u64>,
849) {
850 while !entry.send_buf.is_empty() {
851 let sqe = opcode::Send::new(
852 types::Fd(entry.fd),
853 entry.send_buf.as_ptr(),
854 entry.send_buf.len() as u32,
855 )
856 .build()
857 .user_data(conn_id);
858
859 unsafe {
860 ring.submission()
861 .push(&sqe)
862 .expect("io_uring SQ full during send retry");
863 }
864
865 if let Err(e) = ring.submit_and_wait(1) {
866 debug!(connection_id = conn_id, error = %e, "send retry failed");
867 to_remove.push(conn_id);
868 return;
869 }
870
871 if let Some(cqe) = ring.completion().next() {
872 let result = cqe.result();
873 if result <= 0 {
874 debug!(
875 connection_id = conn_id,
876 error = result,
877 "send retry error, dropping connection"
878 );
879 to_remove.push(conn_id);
880 return;
881 }
882 let sent = result as usize;
883 if sent >= entry.send_buf.len() {
884 entry.send_buf.clear();
885 } else {
886 entry.send_buf.drain(..sent);
887 }
888 }
889 }
890}
891
892/// Evaluate the durability policy against the live cursor state.
893///
894/// Builds a `CursorView` containing the primary plus every *currently
895/// connected* replica slot and returns the highest sequence at which
896/// the policy is satisfied. The primary's in-memory cursor is modeled
897/// as `u64::MAX` because the response stage only gates events that have
898/// already been processed by the matching engine — those are trivially
899/// in-memory on the primary by construction.
900///
901/// Disconnected slots are *omitted from the view* rather than included
902/// with zero cursors. The view's `len()` reflects how many nodes are
903/// actually available; if it's too small to satisfy a clause, the
904/// policy reports degraded and the gate stalls.
905#[inline]
906pub(crate) fn evaluate_durability(
907 policy: &Policy,
908 journal_pos: u64,
909 metrics: Option<&ReplicationMetrics>,
910 replica_active: Option<&[Arc<AtomicBool>; 2]>,
911) -> EvalStatus {
912 // Primary + up to 2 replica slots = 3 nodes max.
913 let mut nodes: [[u64; 2]; 3] = [[0, 0]; 3];
914 nodes[0] = [u64::MAX, journal_pos];
915 let mut len = 1;
916 if let (Some(m), Some(active)) = (metrics, replica_active) {
917 for (i, slot_active) in active.iter().enumerate() {
918 // Skip inactive slots up-front.
919 if !slot_active.load(Ordering::Acquire) {
920 continue;
921 }
922 let in_mem = m.in_memory_sequence[i].load(Ordering::Acquire);
923 let persisted = m.acked_sequence[i].load(Ordering::Acquire);
924 nodes[len] = [in_mem, persisted];
925 len += 1;
926 }
927 }
928 policy.evaluate_with_status(&CursorView::new(&nodes[..len]))
929}
930
931/// Hold-time before a state transition is committed to the log.
932/// Suppresses log spam when a replica flaps faster than this — only
933/// transitions that hold for at least this long emit warn/info
934/// entries. The `/healthz` gauge updates immediately regardless,
935/// so dashboards and alerts still see real-time state.
936const DEGRADED_FLAP_HOLD: Duration = Duration::from_secs(1);
937
938/// Tracks degradation state across calls and emits warn/info logs
939/// with sustained-state gating + a periodic heartbeat re-emit.
940///
941/// The hot path calls [`Self::tick`] every gate iteration / idle
942/// poll with the current `degraded` value and the wall clock. The
943/// logger handles:
944///
945/// - Updating the `policy_degraded` health gauge immediately.
946/// - Suppressing log lines for transitions that don't hold for at
947/// least [`DEGRADED_FLAP_HOLD`] — a replica flapping at sub-second
948/// cadence produces no log noise, only a quietly-updating gauge.
949/// - Emitting a warn at the moment a sustained degraded state
950/// crosses the hold threshold, plus a periodic re-emit every
951/// `heartbeat_interval` while it persists.
952/// - Emitting an info when a sustained healthy state crosses the
953/// hold threshold (the cluster is back to its target shape and
954/// stayed there long enough that we trust the recovery).
955pub(crate) struct DegradationLogger {
956 /// Last value passed to `tick`; what we'd log about if it stayed
957 /// at this value past the hold threshold.
958 pending_state: bool,
959 /// When `pending_state` first appeared. Reset on every flip.
960 pending_since: Instant,
961 /// Whether the current pending state has been logged yet. Only
962 /// the *first* log per sustained streak crosses; subsequent
963 /// re-emits while degraded are heartbeat warns.
964 pending_logged: bool,
965 /// When the last warn fired. Drives the periodic re-emit while
966 /// degraded.
967 last_log: Option<Instant>,
968}
969
970impl DegradationLogger {
971 pub(crate) fn new(now: Instant) -> Self {
972 Self {
973 pending_state: false,
974 pending_since: now,
975 pending_logged: true, // healthy is the assumed initial state; nothing to log
976 last_log: None,
977 }
978 }
979
980 /// Use when the policy is known to start in a degraded state
981 /// (e.g. a primary in `hybrid` mode with no replica yet
982 /// connected). Logs a startup warn immediately and treats the
983 /// state as already-logged so the next tick doesn't re-emit.
984 pub(crate) fn new_starting_degraded(now: Instant, policy: &Policy) -> Self {
985 tracing::warn!(
986 policy = %policy,
987 "durability policy starts in degraded mode — fewer connected nodes than the target count"
988 );
989 Self {
990 pending_state: true,
991 pending_since: now,
992 pending_logged: true,
993 last_log: Some(now),
994 }
995 }
996
997 /// Update the gauge + emit transition/heartbeat logs as needed.
998 /// Cheap on the hot path: one atomic store, a few branches, one
999 /// `Instant::duration_since`.
1000 pub(crate) fn tick(
1001 &mut self,
1002 policy: &Policy,
1003 utilization: &StageUtilization,
1004 degraded_now: bool,
1005 now: Instant,
1006 heartbeat_interval: Duration,
1007 ) {
1008 utilization
1009 .policy_degraded
1010 .store(degraded_now, Ordering::Relaxed);
1011
1012 if degraded_now != self.pending_state {
1013 // State changed — start a new hold window. Don't log
1014 // until / unless this new state stays long enough.
1015 self.pending_state = degraded_now;
1016 self.pending_since = now;
1017 self.pending_logged = false;
1018 return;
1019 }
1020
1021 // State held. If we haven't yet logged this streak's onset,
1022 // and it's been pending for at least the flap-hold time,
1023 // emit the transition message and mark logged.
1024 if !self.pending_logged && now.duration_since(self.pending_since) >= DEGRADED_FLAP_HOLD {
1025 if degraded_now {
1026 tracing::warn!(
1027 policy = %policy,
1028 "durability policy operating in degraded mode — fewer connected nodes than the target count, gate clamped to surviving cluster"
1029 );
1030 } else {
1031 tracing::info!(
1032 policy = %policy,
1033 "durability policy returned to target shape"
1034 );
1035 }
1036 self.pending_logged = true;
1037 self.last_log = Some(now);
1038 return;
1039 }
1040
1041 // Heartbeat re-emit while a degraded state persists.
1042 if degraded_now
1043 && self.pending_logged
1044 && self
1045 .last_log
1046 .is_none_or(|t| now.duration_since(t) >= heartbeat_interval)
1047 {
1048 tracing::warn!(
1049 policy = %policy,
1050 "durability policy still degraded — fewer connected nodes than the target count"
1051 );
1052 self.last_log = Some(now);
1053 }
1054 }
1055}
1056
1057/// Minimum persisted cursor across currently-connected replica slots.
1058/// Used for gate-bottleneck attribution and the journal-wait /
1059/// replica-wait histograms — *not* for durability decisions, which go
1060/// through [`evaluate_durability`].
1061///
1062/// Returns `u64::MAX` when no replica is connected, which makes
1063/// attribution always credit the journal — correct, because in
1064/// standalone mode the journal is the only path.
1065#[inline]
1066pub(crate) fn connected_persisted_min(
1067 metrics: Option<&ReplicationMetrics>,
1068 replica_active: Option<&[Arc<AtomicBool>; 2]>,
1069) -> u64 {
1070 let (Some(m), Some(active)) = (metrics, replica_active) else {
1071 return u64::MAX;
1072 };
1073 let mut min = u64::MAX;
1074 for (i, slot_active) in active.iter().enumerate() {
1075 if !slot_active.load(Ordering::Acquire) {
1076 continue;
1077 }
1078 let v = m.acked_sequence[i].load(Ordering::Acquire);
1079 if v < min {
1080 min = v;
1081 }
1082 }
1083 min
1084}
1085
1086/// Tracks per-cursor "first observed transition from below to >= needed"
1087/// inside the durability gate loop, to drive the journal-wait /
1088/// replica-wait histograms in the bench's tick-to-trade decomposition.
1089///
1090/// A sample is recorded only for cursors that were strictly below
1091/// `needed` at the loop's first observation. Cursors already past at
1092/// entry were not on the critical path for this batch, so attributing
1093/// "wait time" to them would inflate the metric with cursor-poll
1094/// observation timestamps that have nothing to do with how long the
1095/// stage actually held us up.
1096///
1097/// `now_ns` is taken as a parameter rather than read internally so
1098/// tests can supply deterministic timestamps. The caller's hot path
1099/// reads `trace::mono_trace_ns()` once per gate iteration and feeds it in.
1100#[cfg(feature = "tick-to-trade")]
1101pub(crate) struct GateCrossTracker {
1102 needed: u64,
1103 journal_crossed_ts: Option<trace::MonoTraceInstant>,
1104 replica_crossed_ts: Option<trace::MonoTraceInstant>,
1105 journal_was_below: bool,
1106 replica_was_below: bool,
1107 first: bool,
1108}
1109
1110#[cfg(feature = "tick-to-trade")]
1111impl GateCrossTracker {
1112 pub(crate) fn new(needed: u64) -> Self {
1113 Self {
1114 needed,
1115 journal_crossed_ts: None,
1116 replica_crossed_ts: None,
1117 journal_was_below: false,
1118 replica_was_below: false,
1119 first: true,
1120 }
1121 }
1122
1123 pub(crate) fn observe(
1124 &mut self,
1125 journal_pos: u64,
1126 repl_min: u64,
1127 now_ns: trace::MonoTraceInstant,
1128 ) {
1129 if self.first {
1130 self.journal_was_below = journal_pos < self.needed;
1131 self.replica_was_below = repl_min < self.needed;
1132 self.first = false;
1133 }
1134 if self.journal_was_below && self.journal_crossed_ts.is_none() && journal_pos >= self.needed
1135 {
1136 self.journal_crossed_ts = Some(now_ns);
1137 }
1138 if self.replica_was_below && self.replica_crossed_ts.is_none() && repl_min >= self.needed {
1139 self.replica_crossed_ts = Some(now_ns);
1140 }
1141 }
1142
1143 pub(crate) fn journal_crossed(&self) -> Option<trace::MonoTraceInstant> {
1144 self.journal_crossed_ts
1145 }
1146
1147 pub(crate) fn replica_crossed(&self) -> Option<trace::MonoTraceInstant> {
1148 self.replica_crossed_ts
1149 }
1150}
1151
1152/// Print busy/idle utilization for a pipeline stage on shutdown.
1153#[cfg(feature = "pipeline-stats")]
1154fn print_utilization(stage: &str, busy: u64, idle: u64) {
1155 let total = busy + idle;
1156 if total == 0 {
1157 tracing::info!(stage, "no iterations recorded");
1158 return;
1159 }
1160 let pct = (busy as f64 / total as f64) * 100.0;
1161 tracing::info!(
1162 stage,
1163 pct_busy = format_args!("{pct:.2}%"),
1164 busy,
1165 idle,
1166 total,
1167 "pipeline utilization",
1168 );
1169}
1170
1171#[cfg(test)]
1172mod tests {
1173 #[cfg(feature = "tick-to-trade")]
1174 use super::GateCrossTracker;
1175 use super::{DegradationLogger, connected_persisted_min, evaluate_durability};
1176 use crate::durability_policy::{Clause, Level, Policy};
1177 use crate::replication::ReplicationMetrics;
1178
1179 /// Build a [`Policy`] from a mini DSL: one or more
1180 /// `"<level>>=<count>"` clauses joined with `&&`. Test-only
1181 /// ergonomics — production builds policies via
1182 /// [`DurabilityMode::to_policy`].
1183 fn parse(s: &str) -> Result<Policy, String> {
1184 let mut clauses = Vec::new();
1185 for raw in s.split("&&") {
1186 let token = raw.trim();
1187 let (lvl, rhs) = token
1188 .split_once(">=")
1189 .ok_or_else(|| format!("clause `{token}` missing `>=`"))?;
1190 let level = match lvl.trim() {
1191 "persisted" => Level::Persisted,
1192 "in_memory" => Level::InMemory,
1193 other => return Err(format!("unknown level `{other}`")),
1194 };
1195 let count: u8 = rhs.trim().parse().map_err(|e| format!("bad count: {e}"))?;
1196 clauses.push(Clause { count, level });
1197 }
1198 Policy::new(clauses).map_err(|e| e.to_string())
1199 }
1200 use melin_transport_core::pipeline::StageUtilization;
1201 use std::sync::Arc;
1202 use std::sync::atomic::{AtomicBool, Ordering};
1203 use std::time::{Duration, Instant};
1204
1205 /// Build a `ReplicationMetrics` with both slots populated. Tests
1206 /// that need to simulate a disconnected slot use [`flags`] to mark
1207 /// it inactive — its cursors are then ignored regardless of value.
1208 fn metrics(slot0: (u64, u64), slot1: (u64, u64)) -> Arc<ReplicationMetrics> {
1209 let m = Arc::new(ReplicationMetrics::default());
1210 m.in_memory_sequence[0].store(slot0.0, Ordering::Relaxed);
1211 m.acked_sequence[0].store(slot0.1, Ordering::Relaxed);
1212 m.in_memory_sequence[1].store(slot1.0, Ordering::Relaxed);
1213 m.acked_sequence[1].store(slot1.1, Ordering::Relaxed);
1214 m
1215 }
1216
1217 /// Build a `[active; 2]` flags array.
1218 fn flags(slot0_active: bool, slot1_active: bool) -> [Arc<AtomicBool>; 2] {
1219 [
1220 Arc::new(AtomicBool::new(slot0_active)),
1221 Arc::new(AtomicBool::new(slot1_active)),
1222 ]
1223 }
1224
1225 /// Both replicas active — the common healthy-cluster case.
1226 fn both_active() -> [Arc<AtomicBool>; 2] {
1227 flags(true, true)
1228 }
1229
1230 // --- Standalone (no replicas wired) ---
1231
1232 #[test]
1233 fn standalone_persisted_one_gates_on_journal() {
1234 // No metrics → only the primary is in the view. `persisted>=1`
1235 // is satisfied by the primary alone at journal_pos.
1236 let p = parse("persisted>=1").unwrap();
1237 assert_eq!(evaluate_durability(&p, 500, None, None).durable_pos, 500);
1238 }
1239
1240 #[test]
1241 fn standalone_strict_persisted_two_never_opens() {
1242 // `persisted>=2` on a standalone primary stays at 0: the
1243 // operator asked for two copies and there is only one. The
1244 // policy surfaces as degraded so the operator sees the gate
1245 // is stalled because the cluster can't meet the policy.
1246 let p = parse("persisted>=2").unwrap();
1247 let r = evaluate_durability(&p, 500, None, None);
1248 assert_eq!(r.durable_pos, 0);
1249 assert!(
1250 r.degraded,
1251 "policy structurally unsatisfiable on this shape → degraded",
1252 );
1253 }
1254
1255 // --- 2 replicas connected ---
1256
1257 #[test]
1258 fn quorum_both_replicas_ahead_of_journal() {
1259 // Both replicas persisted past journal. `persisted>=2` returns
1260 // the 2nd-largest persisted across {primary, slot0, slot1}.
1261 let p = parse("persisted>=2").unwrap();
1262 let m = metrics((100, 100), (120, 120));
1263 let a = both_active();
1264 assert_eq!(
1265 evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1266 100
1267 );
1268 }
1269
1270 #[test]
1271 fn quorum_journal_ahead_of_both_replicas() {
1272 // Journal at 500, replicas at 100/120. 2nd-largest persisted = 120.
1273 let p = parse("persisted>=2").unwrap();
1274 let m = metrics((100, 100), (120, 120));
1275 let a = both_active();
1276 assert_eq!(
1277 evaluate_durability(&p, 500, Some(&m), Some(&a)).durable_pos,
1278 120
1279 );
1280 }
1281
1282 #[test]
1283 fn quorum_journal_between_slow_and_fast_replica() {
1284 // {primary=150, slot0_persisted=50, slot1_persisted=200}.
1285 // 2nd-largest = 150 (primary itself).
1286 let p = parse("persisted>=2").unwrap();
1287 let m = metrics((50, 50), (200, 200));
1288 let a = both_active();
1289 assert_eq!(
1290 evaluate_durability(&p, 150, Some(&m), Some(&a)).durable_pos,
1291 150
1292 );
1293 }
1294
1295 // --- Single replica connected ---
1296
1297 #[test]
1298 fn single_replica_strict_persisted_two_requires_both_survivors() {
1299 // Slot 0 connected, slot 1 disconnected. View = {primary, slot0}.
1300 // Strict `persisted>=2`: 2nd-largest of the 2-row view =
1301 // min(primary, slot0). Strictly stronger than legacy auto-
1302 // degrade-to-1-node in the same shape.
1303 let p = parse("persisted>=2").unwrap();
1304 let m = metrics((100, 100), (999, 999)); // slot 1 cursors ignored
1305 let a = flags(true, false);
1306 assert_eq!(
1307 evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1308 50
1309 );
1310 assert_eq!(
1311 evaluate_durability(&p, 200, Some(&m), Some(&a)).durable_pos,
1312 100
1313 );
1314 }
1315
1316 #[test]
1317 fn single_replica_persisted_two_requires_both_survivors() {
1318 // 2-node view (primary + surviving replica). `persisted>=2` is
1319 // satisfiable; the gate opens at the slower of the two and the
1320 // policy is not degraded.
1321 let p = parse("persisted>=2").unwrap();
1322 let m = metrics((100, 100), (999, 999));
1323 let a = flags(true, false);
1324 let r = evaluate_durability(&p, 50, Some(&m), Some(&a));
1325 assert_eq!(r.durable_pos, 50);
1326 assert!(!r.degraded);
1327 }
1328
1329 #[test]
1330 fn both_replicas_disconnected_strict_stalls() {
1331 // View has only the primary. Strict `persisted>=2` cannot be
1332 // satisfied — operator opted out of degrade.
1333 let p = parse("persisted>=2").unwrap();
1334 let m = metrics((999, 999), (999, 999));
1335 let a = flags(false, false);
1336 assert_eq!(
1337 evaluate_durability(&p, 500, Some(&m), Some(&a)).durable_pos,
1338 0
1339 );
1340 }
1341
1342 #[test]
1343 fn both_replicas_disconnected_strict_stalls_and_flags_degraded() {
1344 // With `persisted>=2` and both replicas down, the cursor view
1345 // collapses to {primary}: the clause's count (=2) exceeds the
1346 // view size, so the gate stays at 0 and the policy flags
1347 // degraded. Note the matching stage's separate halt at
1348 // `replicas_connected==0` rejects new orders before they reach
1349 // the gate; this verifies the gate semantics in isolation.
1350 let p = parse("persisted>=2").unwrap();
1351 let m = metrics((999, 999), (999, 999));
1352 let a = flags(false, false);
1353 let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1354 assert_eq!(r.durable_pos, 0);
1355 assert!(r.degraded);
1356 }
1357
1358 // --- Mixed-level policies ---
1359
1360 #[test]
1361 fn persisted_one_and_in_memory_two() {
1362 // "Leader persists, plus one other node has it in memory" —
1363 // the cheap-but-non-zero durability target. Slot 0 has it in
1364 // memory, slot 1 disconnected.
1365 let p = parse("persisted>=1 && in_memory>=2").unwrap();
1366 // primary persisted=50, slot0 in_mem=80 / persisted=20.
1367 // persisted>=1: max(50, 20, 0) = 50.
1368 // in_memory>=2: primary in_mem=u64::MAX (always), slot0_eff=max(80, 20)=80,
1369 // slot1=0. 2nd-largest = 80.
1370 // min(50, 80) = 50.
1371 let m = metrics((80, 20), (999, 999));
1372 let a = flags(true, false);
1373 assert_eq!(
1374 evaluate_durability(&p, 50, Some(&m), Some(&a)).durable_pos,
1375 50
1376 );
1377 }
1378
1379 // --- Edge: journal at 0 ---
1380
1381 #[test]
1382 fn journal_at_zero_with_replicas_persisted_one() {
1383 // Journal hasn't fsynced anything; both replicas have. With
1384 // `persisted>=1` the gate opens at the fastest replica.
1385 let p = parse("persisted>=1").unwrap();
1386 let m = metrics((100, 100), (200, 200));
1387 let a = both_active();
1388 assert_eq!(
1389 evaluate_durability(&p, 0, Some(&m), Some(&a)).durable_pos,
1390 200
1391 );
1392 }
1393
1394 // --- connected_persisted_min — used for gate-bottleneck attribution ---
1395
1396 #[test]
1397 fn attribution_min_skips_disconnected_slots() {
1398 // Slot 1 disconnected via active flag.
1399 let m = metrics((150, 100), (999, 999));
1400 let a = flags(true, false);
1401 assert_eq!(connected_persisted_min(Some(&m), Some(&a)), 100);
1402 }
1403
1404 #[test]
1405 fn attribution_min_returns_max_when_standalone() {
1406 // No metrics wired → u64::MAX, which makes attribution always
1407 // credit the journal. Correct for a standalone deployment.
1408 assert_eq!(connected_persisted_min(None, None), u64::MAX);
1409 }
1410
1411 /// Fresh-cluster catch-up: a replica that handshakes at sequence
1412 /// 0 (the legitimate genesis case, not a stale-flag race) must be
1413 /// included in the cursor view with its zero cursors so the policy
1414 /// behaves the same way it would for a 1-replica deployment that
1415 /// has just produced its first batch. The disconnect-race
1416 /// mitigations (B1 seed-on-connect + B2 reorder) keep this from
1417 /// being conflated with the stale-flag-paired-with-zero-cursor
1418 /// case under normal cluster lifecycles.
1419 #[test]
1420 fn fresh_cluster_zero_cursors_included_in_view() {
1421 let p = parse("persisted>=2").unwrap();
1422 // Both replicas just handshook at seq 0, primary also at 0
1423 // (fresh cluster, no events yet). View = 3 nodes; the clause's
1424 // count (=2) is met by the view size, so the policy is not
1425 // degraded and the gate sits at the 2nd-largest persisted = 0.
1426 let m = metrics((0, 0), (0, 0));
1427 let a = both_active();
1428 let r = evaluate_durability(&p, 0, Some(&m), Some(&a));
1429 assert_eq!(r.durable_pos, 0);
1430 assert!(
1431 !r.degraded,
1432 "all 3 nodes present, view meets clause target — should not flag degraded"
1433 );
1434 }
1435
1436 #[test]
1437 fn attribution_min_takes_smaller_when_both_connected() {
1438 let m = metrics((150, 100), (180, 80));
1439 let a = both_active();
1440 assert_eq!(connected_persisted_min(Some(&m), Some(&a)), 80);
1441 }
1442
1443 // -- Race-window regression tests --
1444 //
1445 // The replication senders fix two memory-ordering issues at the
1446 // active-flag transition points:
1447 //
1448 // B1 (`a84540a`): seed `metrics.{acked,in_memory}_sequence[i]`
1449 // to `handshake.last_sequence` BEFORE setting active_flag=true
1450 // on reconnect. Without this, the gate would observe (active=
1451 // true, cursor=0) for ~1 RTT after a replica catch-up completed,
1452 // freezing the gate on a degrade-friendly clause.
1453 //
1454 // B2 (`8888732`): zero `metrics.{acked,in_memory}_sequence[i]`
1455 // BEFORE setting active_flag=false on disconnect. Without this,
1456 // a weak-memory reader could observe (active=true, cursor=0)
1457 // for one iteration during the disconnect window.
1458 //
1459 // Both fixes are in the senders, but the gate's *behaviour* under
1460 // the race-window inputs is tested here. The intent is to lock in
1461 // the invariant: even under a hypothetical (active=true,cursor=0)
1462 // observation, the gate must not produce a spuriously-open answer
1463 // that would cause a client to be told "your event is durable"
1464 // when it isn't. Stalling-briefly is safe; opening-spuriously is
1465 // not.
1466
1467 #[test]
1468 fn race_b1_post_seed_gate_doesnt_freeze_on_reconnect() {
1469 // Post-B1-fix state: replica reconnected, cursors seeded to
1470 // `handshake.last_sequence` (480) before active flipped to
1471 // true. Primary kept moving and is at 500. The gate's view
1472 // is now [primary=500, slot=480]; the durable position dips
1473 // from 500 (primary alone, degraded) to 480 (both nodes).
1474 //
1475 // The dip is correct, not a bug: once a 2nd node is
1476 // connected, durability is bounded by the slower of the two.
1477 // Events 481-500 were already served as durable on primary
1478 // alone — they aren't unsent. New responses for seq>500 wait
1479 // until slot acks; we just don't freeze at 0.
1480 let p = parse("persisted>=2").unwrap();
1481 let m = metrics((480, 480), (999, 999));
1482 let a = flags(true, false);
1483 let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1484 assert_eq!(
1485 r.durable_pos, 480,
1486 "post-seed reconnect should produce a coherent gate position equal to the slower node, not freeze at 0"
1487 );
1488 }
1489
1490 #[test]
1491 fn race_b1_pre_seed_freeze_is_what_the_fix_avoids() {
1492 // Pre-B1-fix state: cursors at 0, active=true. The gate sees
1493 // [primary=500, slot=[0,0]] and 2nd-largest persisted = 0.
1494 // The gate WOULD freeze at 0. This test documents the bug
1495 // the seeding fix is designed to avoid; the senders ensure
1496 // this state is never observed in production.
1497 let p = parse("persisted>=2").unwrap();
1498 let m = metrics((0, 0), (999, 999));
1499 let a = flags(true, false);
1500 let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1501 assert_eq!(
1502 r.durable_pos, 0,
1503 "the gate behaviour under (active=true, cursor=0) — if the senders ever fail to seed before flipping active, this is the freeze the operator would see"
1504 );
1505 }
1506
1507 #[test]
1508 fn race_b2_disconnect_window_doesnt_open_gate_spuriously() {
1509 // Simulates the B2 race window: a weak-memory reader observes
1510 // (active=true, cursor=0) for one iteration during the
1511 // disconnect transition. The slot legitimately has cursor=0
1512 // because the disconnect handler just zeroed the metrics.
1513 //
1514 // Critical invariant: the gate must NOT produce a higher
1515 // durable_pos than it would with the slot correctly excluded.
1516 // Specifically: with primary at 500, slot stale-zero-included,
1517 // the gate must not "see" the primary alone and open at 500
1518 // — that would let a client be told a seq is durable when
1519 // only the primary has it under a `persisted>=2` policy that
1520 // demands 2 nodes.
1521 let p = parse("persisted>=2").unwrap();
1522 let m = metrics((0, 0), (999, 999));
1523 let a = flags(true, false);
1524 let r = evaluate_durability(&p, 500, Some(&m), Some(&a));
1525 // 2nd-largest persisted across {primary=500, slot=0} = 0.
1526 // Gate stalls. ✓
1527 assert_eq!(r.durable_pos, 0);
1528
1529 // Post-disconnect (active=false): view shrinks to {primary}.
1530 // `persisted>=2` is structurally unsatisfiable on a 1-node
1531 // view, so the gate stays at 0 AND surfaces degraded. The
1532 // matching stage's `replicas_connected==0` halt is what stops
1533 // accepting new orders; the gate side's job is just to keep
1534 // the existing in-flight orders stalled and the alert lit.
1535 let a_disconnected = flags(false, false);
1536 let r_after = evaluate_durability(&p, 500, Some(&m), Some(&a_disconnected));
1537 assert_eq!(r_after.durable_pos, 0);
1538 assert!(
1539 r_after.degraded,
1540 "post-disconnect view of size 1 cannot meet persisted>=2 → degraded"
1541 );
1542 }
1543
1544 #[test]
1545 fn race_invariant_zero_cursor_never_opens_gate_above_slower_node() {
1546 // Property under both B1 and B2 race windows: for any slot
1547 // observed at cursor=0 with active=true, the gate cannot
1548 // produce a durable_pos that exceeds what an honest 2-node
1549 // evaluation would give. Spot-check a handful of primary
1550 // positions to lock the invariant.
1551 let p = parse("persisted>=2").unwrap();
1552 let m = metrics((0, 0), (999, 999));
1553 let a = flags(true, false);
1554 for primary_pos in [0, 1, 100, 500, 1_000_000_000_u64] {
1555 let r = evaluate_durability(&p, primary_pos, Some(&m), Some(&a));
1556 // 2nd-largest of {primary_pos, 0} = 0 for any primary > 0.
1557 // For primary_pos = 0, also 0. So always 0.
1558 assert_eq!(
1559 r.durable_pos, 0,
1560 "race-window observation must not open the gate above 0 for any primary position (got {} for primary_pos={primary_pos})",
1561 r.durable_pos
1562 );
1563 }
1564 }
1565
1566 // ------------------------------------------------------------------
1567 // GateCrossTracker — per-cursor "first transition from below to
1568 // crossed" inside the gate loop, used by the journal-wait /
1569 // replica-wait histograms.
1570 // ------------------------------------------------------------------
1571
1572 #[cfg(feature = "tick-to-trade")]
1573 #[test]
1574 fn gate_cross_tracker_records_journal_when_strictly_below() {
1575 // Journal starts at 5 (< 10), repl_min already at 100.
1576 // Journal crosses on the second observation. Replica was already
1577 // past at entry, so no replica sample.
1578 let mut t = GateCrossTracker::new(10);
1579 t.observe(5, 100, 1_000);
1580 t.observe(15, 100, 2_000);
1581 assert_eq!(t.journal_crossed(), Some(2_000));
1582 assert_eq!(t.replica_crossed(), None);
1583 }
1584
1585 #[cfg(feature = "tick-to-trade")]
1586 #[test]
1587 fn gate_cross_tracker_records_replica_when_strictly_below() {
1588 // Mirror image: journal already past, replica below at entry.
1589 let mut t = GateCrossTracker::new(10);
1590 t.observe(50, 5, 1_000);
1591 t.observe(50, 12, 2_000);
1592 assert_eq!(t.journal_crossed(), None);
1593 assert_eq!(t.replica_crossed(), Some(2_000));
1594 }
1595
1596 #[cfg(feature = "tick-to-trade")]
1597 #[test]
1598 fn gate_cross_tracker_records_both_when_both_below() {
1599 // Both below at entry, both cross independently.
1600 let mut t = GateCrossTracker::new(100);
1601 t.observe(50, 60, 1_000); // both below
1602 t.observe(105, 60, 2_000); // journal crosses
1603 t.observe(105, 110, 3_000); // replica crosses
1604 assert_eq!(t.journal_crossed(), Some(2_000));
1605 assert_eq!(t.replica_crossed(), Some(3_000));
1606 }
1607
1608 #[cfg(feature = "tick-to-trade")]
1609 #[test]
1610 fn gate_cross_tracker_skips_cursor_already_past_at_entry() {
1611 // Both cursors already >= needed at first observation —
1612 // neither was on the critical path. No samples.
1613 let mut t = GateCrossTracker::new(10);
1614 t.observe(50, 100, 1_000);
1615 // Even later observations don't backfill: was_below is sticky.
1616 t.observe(60, 110, 2_000);
1617 assert_eq!(t.journal_crossed(), None);
1618 assert_eq!(t.replica_crossed(), None);
1619 }
1620
1621 #[cfg(feature = "tick-to-trade")]
1622 #[test]
1623 fn gate_cross_tracker_first_observation_only_for_cross_decision() {
1624 // A cursor that goes back below `needed` after first iteration
1625 // (impossible in practice — cursors are monotonic — but we
1626 // verify the first-iteration snapshot is what gates the
1627 // sample). Journal: 50 < 10 false → was_below=false → no sample.
1628 let mut t = GateCrossTracker::new(10);
1629 t.observe(50, 5, 1_000); // journal already past, replica below
1630 t.observe(20, 12, 2_000); // both >= needed now
1631 // Journal: was_below=false at entry → still no sample.
1632 assert_eq!(t.journal_crossed(), None);
1633 // Replica: was_below=true at entry, crosses on iter 2 → sample.
1634 assert_eq!(t.replica_crossed(), Some(2_000));
1635 }
1636
1637 #[cfg(feature = "tick-to-trade")]
1638 #[test]
1639 fn gate_cross_tracker_holds_first_cross_only() {
1640 // Once a cross is recorded, later observations don't
1641 // overwrite — the metric is "when did it first cross", not
1642 // "when was it last below".
1643 let mut t = GateCrossTracker::new(10);
1644 t.observe(5, 100, 1_000);
1645 t.observe(15, 100, 2_000); // first cross
1646 t.observe(25, 100, 3_000); // would otherwise re-record
1647 assert_eq!(t.journal_crossed(), Some(2_000));
1648 }
1649
1650 // -- DegradationLogger flap-suppression --
1651 //
1652 // The logger gates transition logs on a sustained-state hold so a
1653 // replica flapping at sub-second cadence doesn't spam the journal.
1654 // These tests don't observe the logs themselves (tracing is
1655 // process-global and brittle to capture in unit tests); they
1656 // assert the underlying state machine via the `policy_degraded`
1657 // gauge, which the logger updates on every tick regardless of
1658 // log emission.
1659
1660 fn logger_test_policy() -> crate::durability_policy::Policy {
1661 parse("persisted>=2").unwrap()
1662 }
1663
1664 /// Tick the logger N times at `step` intervals, alternating
1665 /// `degraded` per call. Returns the gauge value after the last
1666 /// tick — useful for asserting that flap cycles don't leak the
1667 /// AtomicBool into a wrong terminal state.
1668 fn drive_logger(
1669 logger: &mut DegradationLogger,
1670 utilization: &StageUtilization,
1671 policy: &crate::durability_policy::Policy,
1672 start: Instant,
1673 states: &[bool],
1674 step: Duration,
1675 ) -> bool {
1676 for (i, &state) in states.iter().enumerate() {
1677 let now = start + step.checked_mul(i as u32).unwrap_or(Duration::ZERO);
1678 logger.tick(policy, utilization, state, now, Duration::from_secs(5));
1679 }
1680 utilization.policy_degraded.load(Ordering::Relaxed)
1681 }
1682
1683 #[test]
1684 fn logger_gauge_tracks_state_immediately() {
1685 // The /healthz gauge reflects the *latest* state on every
1686 // tick — this is what dashboards / alerts read. Sustained-
1687 // state gating only affects the warn/info log emission.
1688 let p = logger_test_policy();
1689 let utilization = StageUtilization::new();
1690 let now = Instant::now();
1691 let mut logger = DegradationLogger::new(now);
1692
1693 logger.tick(&p, &utilization, true, now, Duration::from_secs(5));
1694 assert!(utilization.policy_degraded.load(Ordering::Relaxed));
1695
1696 logger.tick(
1697 &p,
1698 &utilization,
1699 false,
1700 now + Duration::from_millis(50),
1701 Duration::from_secs(5),
1702 );
1703 assert!(!utilization.policy_degraded.load(Ordering::Relaxed));
1704 }
1705
1706 #[test]
1707 fn logger_starting_degraded_marks_initial_state_logged() {
1708 // `new_starting_degraded` emits the startup warn and treats
1709 // the state as already-logged so the next tick at the same
1710 // cluster shape doesn't re-emit instantly. The gauge starts
1711 // at 1.
1712 let p = logger_test_policy();
1713 let utilization = StageUtilization::new();
1714 let now = Instant::now();
1715 let mut logger = DegradationLogger::new_starting_degraded(now, &p);
1716 // The logger doesn't write the gauge from the constructor —
1717 // first tick does. Tick at the same state to settle the
1718 // gauge. No new log line should fire (state hasn't changed).
1719 logger.tick(
1720 &p,
1721 &utilization,
1722 true,
1723 now + Duration::from_millis(10),
1724 Duration::from_secs(5),
1725 );
1726 assert!(utilization.policy_degraded.load(Ordering::Relaxed));
1727 }
1728
1729 #[test]
1730 fn logger_handles_rapid_flap_without_panic() {
1731 // Drive the logger through 100 alternating flips at 100ms
1732 // each (faster than the 1s flap-hold). The state machine
1733 // must remain coherent — no panics, gauge tracks final
1734 // state, and `pending_logged` doesn't get stuck.
1735 let p = logger_test_policy();
1736 let utilization = StageUtilization::new();
1737 let now = Instant::now();
1738 let mut logger = DegradationLogger::new(now);
1739 let states: Vec<bool> = (0..100u32).map(|i| i.is_multiple_of(2)).collect();
1740 let final_state = drive_logger(
1741 &mut logger,
1742 &utilization,
1743 &p,
1744 now,
1745 &states,
1746 Duration::from_millis(100),
1747 );
1748 // 100 states starting at i=0 → final state is i=99 → odd → false.
1749 assert!(!final_state);
1750 }
1751
1752 #[test]
1753 fn logger_sustained_degraded_eventually_settles() {
1754 // After a sustained-true state, the logger should be in the
1755 // "logged the onset" mode. Drive 5 ticks of degraded=true at
1756 // 500ms intervals — total 2s, well past the 1s flap-hold.
1757 // Last tick should leave gauge=1 and the heartbeat re-emit
1758 // window primed (last_log set).
1759 let p = logger_test_policy();
1760 let utilization = StageUtilization::new();
1761 let now = Instant::now();
1762 let mut logger = DegradationLogger::new(now);
1763 let final_state = drive_logger(
1764 &mut logger,
1765 &utilization,
1766 &p,
1767 now,
1768 &[true; 5],
1769 Duration::from_millis(500),
1770 );
1771 assert!(final_state);
1772 }
1773
1774 #[test]
1775 fn logger_recovery_to_healthy_settles_gauge() {
1776 // Sustained degraded → sustained healthy. Gauge should end at 0.
1777 let p = logger_test_policy();
1778 let utilization = StageUtilization::new();
1779 let now = Instant::now();
1780 let mut logger = DegradationLogger::new_starting_degraded(now, &p);
1781 let mut states = vec![true; 5]; // 2.5s degraded
1782 states.extend(vec![false; 5]); // 2.5s healthy
1783 let final_state = drive_logger(
1784 &mut logger,
1785 &utilization,
1786 &p,
1787 now,
1788 &states,
1789 Duration::from_millis(500),
1790 );
1791 assert!(!final_state);
1792 }
1793}