Skip to main content

hyperi_rustlib/worker/engine/
driver.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/driver.rs
3// Purpose:   Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//!   recv(max) -> WorkBatch        (recv now yields a WorkBatch natively)
18//!     -> apply_workbatch_dlq_policy  (route/discard/reject inline-DLQ entries)
19//!     -> lease_ingress_batch      (memory accounting on the block's bytes)
20//!     -> process(WorkBatch)       (transforms parse ON DEMAND via codec::parse)
21//!     -> sink(&out_batch).await   (async send of the whole block)
22//!     -> commit per CommitMode    (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//!   does NOT pre-parse. A transform that needs a field calls
38//!   [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//!   never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//!   hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//!   (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//!   closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//!   shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//!   dedup. This keeps the batch-parse + interner throughput win for apps that
47//!   opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67    /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68    /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69    /// skips the commit so the block is re-delivered. This is the engine-commits
70    /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71    Auto,
72    /// The sink owns the commit -- the engine does NOT commit. The sink is
73    /// handed the block (which carries `commit_tokens`) and decides when to
74    /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75    /// the former async run loop, lifted onto the block.
76    SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95///   [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96///   (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97///   the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99///   errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101///   terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109    /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110    pub records: Vec<Record>,
111
112    /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113    pub parsed: Vec<ParsedPayload>,
114
115    /// Input source acks for the whole block (decoupled from record count).
116    pub commit_tokens: Vec<T>,
117
118    /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119    /// that failed to parse (no-silent-drop).
120    pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122    /// Shared interner for hot routing-field-name dedup. The first time a field
123    /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124    /// bump. Reused from the engine so dedup persists across blocks.
125    pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129    /// Number of successfully-parsed records.
130    #[must_use]
131    pub fn len(&self) -> usize {
132        self.records.len()
133    }
134
135    /// Whether there are no successfully-parsed records.
136    #[must_use]
137    pub fn is_empty(&self) -> bool {
138        self.records.is_empty()
139    }
140
141    /// Intern a routing-field name through the shared interner.
142    ///
143    /// Use this to dedup the routing-key field name once per block rather than
144    /// re-allocating it per record.
145    #[must_use]
146    pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147        self.interner.intern(name)
148    }
149}
150
151impl BatchEngine {
152    /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
153    ///
154    /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
155    /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
156    /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
157    /// pay zero parse cost.
158    ///
159    /// - `process` runs on the loop task (cancellation-aware between awaits) and
160    ///   may fan records out or in; it MUST preserve `commit_tokens` (use
161    ///   [`WorkBatch::map_records`], which does so automatically).
162    /// - `sink` is async and receives the WHOLE out-batch by reference.
163    /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
164    ///   or [`CommitMode::SinkManaged`] (sink owns commit).
165    /// - `ticker` is an optional `(interval, fn)` that fires on the interval
166    ///   inside the select loop (flush timers, periodic maintenance).
167    ///
168    /// Stops cleanly when `shutdown` is cancelled.
169    ///
170    /// # Errors
171    ///
172    /// Returns [`EngineError::Transport`] if `recv` fails fatally,
173    /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
174    /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
175    /// the error returned by `process`.
176    ///
177    /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
178    /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
179    /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
180    /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
181    /// block is fetched and committed past them, which would silently skip the
182    /// never-sent records (data loss). On restart the source re-delivers from
183    /// the last committed watermark, preserving at-least-once. The app owns
184    /// restart/retry policy.
185    #[cfg(feature = "transport")]
186    #[allow(clippy::too_many_arguments)]
187    pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
188        &self,
189        receiver: &R,
190        shutdown: CancellationToken,
191        process: P,
192        mut sink: Sink,
193        commit: CommitMode,
194        ticker: Option<(Duration, Ticker)>,
195    ) -> Result<(), EngineError>
196    where
197        R: TransportReceiver,
198        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
199        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
200        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
201        Ticker: FnMut() -> TickerFut,
202        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
203    {
204        tracing::info!(
205            chunk_size = self.config.max_chunk_size,
206            commit = ?commit,
207            ticker = ticker.is_some(),
208            "BatchEngine (workbatch) starting"
209        );
210
211        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
212        let mut ticker_fn = ticker.map(|(_, f)| f);
213        if let Some(ref mut interval) = tick_interval {
214            interval.tick().await; // first tick fires immediately -- consume it
215        }
216
217        loop {
218            tokio::select! {
219                biased;
220
221                () = shutdown.cancelled() => {
222                    tracing::info!("BatchEngine (workbatch) shutting down");
223                    return Ok(());
224                }
225
226                _ = async {
227                    match tick_interval.as_mut() {
228                        Some(interval) => interval.tick().await,
229                        None => std::future::pending().await,
230                    }
231                } => {
232                    if let Some(ref mut f) = ticker_fn
233                        && let Err(e) = f().await
234                    {
235                        tracing::error!(error = %e, "Ticker (workbatch) failed");
236                    }
237                }
238
239                recv_result = receiver.recv(self.config.max_chunk_size) => {
240                    let work_batch = recv_result.map_err(EngineError::Transport)?;
241                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
242                        continue;
243                    };
244                    self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
245                }
246            }
247        }
248    }
249
250    /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
251    ///
252    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
253    /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
254    /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
255    /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
256    /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
257    /// is leased and processed.
258    ///
259    /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
260    /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
261    /// at-least-once is preserved: a sink error on any sub-block stops the block
262    /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
263    /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
264    /// the end.
265    ///
266    /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
267    /// one record, so a record larger than the target is still its own sub-block
268    /// and the loop never stalls). It is taken as a parameter so the path is
269    /// testable; Phase 3 wires the byte budget from the governor.
270    ///
271    /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
272    /// acks are still the batch's input tokens, committed once at the end.
273    ///
274    /// # Errors
275    ///
276    /// Same as [`run_workbatch`](Self::run_workbatch).
277    #[cfg(feature = "transport")]
278    #[allow(clippy::too_many_arguments)]
279    pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
280        &self,
281        receiver: &R,
282        shutdown: CancellationToken,
283        process: P,
284        mut sink: Sink,
285        commit: CommitMode,
286        sub_block_bytes: u64,
287        ticker: Option<(Duration, Ticker)>,
288    ) -> Result<(), EngineError>
289    where
290        R: TransportReceiver,
291        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
292        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
293        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
294        Ticker: FnMut() -> TickerFut,
295        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
296    {
297        tracing::info!(
298            chunk_size = self.config.max_chunk_size,
299            commit = ?commit,
300            sub_block_bytes,
301            ticker = ticker.is_some(),
302            "BatchEngine (workbatch streaming) starting"
303        );
304
305        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
306        let mut ticker_fn = ticker.map(|(_, f)| f);
307        if let Some(ref mut interval) = tick_interval {
308            interval.tick().await; // first tick fires immediately -- consume it
309        }
310
311        loop {
312            tokio::select! {
313                biased;
314
315                () = shutdown.cancelled() => {
316                    tracing::info!("BatchEngine (workbatch streaming) shutting down");
317                    return Ok(());
318                }
319
320                _ = async {
321                    match tick_interval.as_mut() {
322                        Some(interval) => interval.tick().await,
323                        None => std::future::pending().await,
324                    }
325                } => {
326                    if let Some(ref mut f) = ticker_fn
327                        && let Err(e) = f().await
328                    {
329                        tracing::error!(error = %e, "Ticker (workbatch streaming) failed");
330                    }
331                }
332
333                recv_result = receiver.recv(self.config.max_chunk_size) => {
334                    let work_batch = recv_result.map_err(EngineError::Transport)?;
335                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
336                        continue;
337                    };
338                    self.drive_block_streaming(
339                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
340                    )
341                    .await?;
342                }
343            }
344        }
345    }
346
347    /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
348    ///
349    /// This is what a self-regulating app calls instead of choosing between
350    /// [`run_workbatch`](Self::run_workbatch) and
351    /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
352    /// dispatches on whether the byte-budget lever is wired
353    /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
354    /// `ServiceRuntime` when `self_regulation.enabled = true`):
355    ///
356    /// - **Governor ON** (budget wired): streams each received block in
357    ///   sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
358    ///   peak in-flight memory to one sub-block, and folds each block's
359    ///   `(bytes, process_time, ingest_interval)` into the AIMD loop via
360    ///   [`observe`](crate::governor::ByteBudgetController::observe). The recv
361    ///   `max` is capped to the budget's poll-safety
362    ///   [`record_cap`](crate::governor::ByteBudgetController::record_cap).
363    ///   While pressure is LOW the budget sits at its big start value, so the
364    ///   block becomes a SINGLE sub-block -- no per-record overhead, behaviour
365    ///   matches the whole-batch loop.
366    /// - **Governor OFF** (no budget): delegates verbatim to
367    ///   [`run_workbatch`](Self::run_workbatch) -- byte-identical to
368    ///   pre-governor behaviour.
369    ///
370    /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
371    /// SEPARATELY into the receive transport, not here -- this method is the
372    /// driver-side lever (sub-block sizing + AIMD), the gate is the
373    /// transport-side brake. The two share the same `UnifiedPressure`.
374    ///
375    /// # Errors
376    ///
377    /// Same as [`run_workbatch`](Self::run_workbatch).
378    #[cfg(all(feature = "transport", feature = "governor"))]
379    #[allow(clippy::too_many_arguments)]
380    pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
381        &self,
382        receiver: &R,
383        shutdown: CancellationToken,
384        process: P,
385        mut sink: Sink,
386        commit: CommitMode,
387        ticker: Option<(Duration, Ticker)>,
388    ) -> Result<(), EngineError>
389    where
390        R: TransportReceiver,
391        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
392        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
393        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
394        Ticker: FnMut() -> TickerFut,
395        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
396    {
397        // Governor OFF -> the original whole-batch loop, byte-for-byte.
398        let Some(budget) = self.byte_budget.clone() else {
399            return self
400                .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
401                .await;
402        };
403
404        tracing::info!(
405            chunk_size = self.config.max_chunk_size,
406            commit = ?commit,
407            ticker = ticker.is_some(),
408            start_byte_budget = budget.byte_budget(),
409            "BatchEngine (governed) starting -- self-regulation ON"
410        );
411
412        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
413        let mut ticker_fn = ticker.map(|(_, f)| f);
414        if let Some(ref mut interval) = tick_interval {
415            interval.tick().await; // first tick fires immediately -- consume it
416        }
417
418        // Track the previous block's arrival instant so we can feed the AIMD
419        // loop a real ingest inter-arrival interval.
420        let mut last_recv: Option<std::time::Instant> = None;
421
422        loop {
423            // The recv limits bound a single poll by BOTH:
424            //   - the SMALLER of the config chunk size and the budget's
425            //     poll-safety record cap (a tiny-record flood cannot blow the
426            //     count even within the byte budget), AND
427            //   - the CURRENT byte budget (re-read per block), so a single poll
428            //     never RETAINS more than ~one budget's worth of inbound payload
429            //     BEFORE the sub-block split. This is the fix for the
430            //     "byte budget does not bound RECEIVE memory" gap: without the
431            //     byte cap, `recv(max)` could build a WorkBatch (and, for the
432            //     Kafka recv-arena, allocate one arena) far larger than the
433            //     budget before any sub-block lease ran.
434            let recv_limits = crate::transport::RecvLimits {
435                max_records: self.config.max_chunk_size.min(budget.record_cap()),
436                max_bytes: budget.byte_budget(),
437            };
438
439            tokio::select! {
440                biased;
441
442                () = shutdown.cancelled() => {
443                    tracing::info!("BatchEngine (governed) shutting down");
444                    return Ok(());
445                }
446
447                _ = async {
448                    match tick_interval.as_mut() {
449                        Some(interval) => interval.tick().await,
450                        None => std::future::pending().await,
451                    }
452                } => {
453                    if let Some(ref mut f) = ticker_fn
454                        && let Err(e) = f().await
455                    {
456                        tracing::error!(error = %e, "Ticker (governed) failed");
457                    }
458                }
459
460                recv_result = receiver.recv_limited(recv_limits) => {
461                    let now = std::time::Instant::now();
462                    let ingest_interval = last_recv
463                        .map(|prev| now.saturating_duration_since(prev))
464                        .unwrap_or_default();
465                    last_recv = Some(now);
466
467                    let work_batch = recv_result.map_err(EngineError::Transport)?;
468                    let block_bytes = work_batch.total_payload_bytes() as u64;
469                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
470                        // Empty block: still fold the timing so a quiet pipeline
471                        // can grow its budget back. No bytes -> treated as slack.
472                        budget.observe(0, Duration::ZERO, ingest_interval);
473                        continue;
474                    };
475
476                    // Re-read the budget for THIS block: low pressure -> big
477                    // budget -> one sub-block (no overhead); high pressure ->
478                    // shrunk budget -> peak in-flight bounded to one sub-block.
479                    let sub_block_bytes = budget.byte_budget();
480
481                    let process_start = std::time::Instant::now();
482                    self.drive_block_streaming(
483                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
484                    )
485                    .await?;
486                    let process_time = process_start.elapsed();
487
488                    // Fold the OBSERVED actual block bytes into the AIMD loop. A
489                    // memory HARD override inside observe() shrinks immediately
490                    // regardless of rho.
491                    budget.observe(block_bytes, process_time, ingest_interval);
492
493                    // Observability: surface the current budget + pressure as
494                    // gauges so throttling is visible, not mysterious, AND the
495                    // ACTUAL received block bytes so the gap between the budget
496                    // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
497                    // is measurable -- a persistent overshoot means the recv byte
498                    // cap is not holding. The gate edges (pause/resume) are
499                    // logged by the ObservingActuator.
500                    #[cfg(feature = "metrics")]
501                    {
502                        metrics::gauge!("self_regulation_byte_budget")
503                            .set(budget.byte_budget() as f64);
504                        metrics::gauge!("recv_block_bytes").set(block_bytes as f64);
505                        metrics::gauge!("pressure_ratio").set(budget.pressure().level());
506                    }
507                }
508            }
509        }
510    }
511
512    /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
513    ///
514    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
515    /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
516    /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
517    /// (records + aligned parsed payloads + shared
518    /// [`FieldInterner`](super::FieldInterner)). This keeps
519    /// the batch-parse + interner throughput win for apps that opt in.
520    ///
521    /// Records that fail to parse are handled per the configured
522    /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
523    /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
524    /// the parse-failure contract. `process_parsed` returns the final
525    /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
526    ///
527    /// # Errors
528    ///
529    /// Same as [`run_workbatch`](Self::run_workbatch).
530    #[cfg(feature = "transport")]
531    #[allow(clippy::too_many_arguments)]
532    pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
533        &self,
534        receiver: &R,
535        shutdown: CancellationToken,
536        process_parsed: P,
537        mut sink: Sink,
538        commit: CommitMode,
539        ticker: Option<(Duration, Ticker)>,
540    ) -> Result<(), EngineError>
541    where
542        R: TransportReceiver,
543        P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
544        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
545        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
546        Ticker: FnMut() -> TickerFut,
547        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
548    {
549        tracing::info!(
550            chunk_size = self.config.max_chunk_size,
551            commit = ?commit,
552            ticker = ticker.is_some(),
553            "BatchEngine (workbatch parsed) starting"
554        );
555
556        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
557        let mut ticker_fn = ticker.map(|(_, f)| f);
558        if let Some(ref mut interval) = tick_interval {
559            interval.tick().await;
560        }
561
562        loop {
563            tokio::select! {
564                biased;
565
566                () = shutdown.cancelled() => {
567                    tracing::info!("BatchEngine (workbatch parsed) shutting down");
568                    return Ok(());
569                }
570
571                _ = async {
572                    match tick_interval.as_mut() {
573                        Some(interval) => interval.tick().await,
574                        None => std::future::pending().await,
575                    }
576                } => {
577                    if let Some(ref mut f) = ticker_fn
578                        && let Err(e) = f().await
579                    {
580                        tracing::error!(error = %e, "Ticker (workbatch parsed) failed");
581                    }
582                }
583
584                recv_result = receiver.recv(self.config.max_chunk_size) => {
585                    let recv_batch = recv_result.map_err(EngineError::Transport)?;
586                    let Some(batch) = self.ingest_workbatch(recv_batch)? else {
587                        continue;
588                    };
589                    // Wrap the parse-then-process so drive_block stays generic.
590                    // parse_block honours ParseErrorAction: FailBatch surfaces a
591                    // terminal EngineError here (no commit), Dlq carries entries
592                    // forward for the driver to route, Skip drops silently+counted.
593                    let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
594                        let parsed = self.parse_block(b)?;
595                        process_parsed(parsed)
596                    };
597                    self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
598                }
599            }
600        }
601    }
602
603    /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
604    /// entries per the configured policy, then return the batch (with its
605    /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
606    /// the block has no records (caller should `continue`).
607    ///
608    /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
609    /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
610    /// [`WorkBatch::dlq_entries`] and are routed here via
611    /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
612    /// before processing. Memory accounting is performed in
613    /// [`drive_block`](Self::drive_block).
614    #[cfg(feature = "transport")]
615    fn ingest_workbatch<T: crate::transport::CommitToken>(
616        &self,
617        batch: WorkBatch<T>,
618    ) -> Result<Option<WorkBatch<T>>, EngineError> {
619        // Route/discard/reject inline-DLQ entries per the configured policy --
620        // never silently dropped. The batch comes back with its dlq_entries
621        // consumed so the process stage sees a clean block.
622        let batch = self.apply_workbatch_dlq_policy(batch)?;
623        if batch.is_empty() {
624            return Ok(None);
625        }
626        Ok(Some(batch))
627    }
628
629    /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
630    ///
631    /// Shared by both [`run_workbatch`](Self::run_workbatch) and
632    /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
633    /// between the two is the `process` closure they pass.
634    #[cfg(feature = "transport")]
635    async fn drive_block<R, P, Sink, SinkFut>(
636        &self,
637        receiver: &R,
638        batch: WorkBatch<R::Token>,
639        process: &P,
640        sink: &mut Sink,
641        commit: CommitMode,
642    ) -> Result<(), EngineError>
643    where
644        R: TransportReceiver,
645        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
646        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
647        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
648    {
649        // Account the in-flight ingress bytes against the MemoryGuard; the lease
650        // releases on every exit path of this block (sink-error early return,
651        // commit, ?-return) via Drop.
652        #[cfg(feature = "memory")]
653        let _ingress_lease = self.lease_ingress_batch(&batch);
654
655        // process() may fan out / fan in; it preserves the input commit_tokens.
656        let mut out_batch = process(batch)?;
657
658        // Route any parse/process-generated DLQ entries the out-batch carries,
659        // through the SAME policy + route point as the inbound-filter entries
660        // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
661        // sink/commit, so a parse/process dead-letter can never vanish on the
662        // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
663        // Route sink Err) is a terminal ack-barrier error -- the commit is
664        // skipped and the whole block re-delivered, so no later ordered commit
665        // advances past these undelivered dead-letters. Silent discard is opt-in
666        // only (FilterDlqPolicy::DiscardWithMetric).
667        if !out_batch.dlq_entries.is_empty() {
668            let entries = std::mem::take(&mut out_batch.dlq_entries);
669            if let Err(e) = self.route_dlq_entries(entries) {
670                tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
671                return Err(e);
672            }
673        }
674
675        // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
676        //
677        // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
678        // TERMINAL error -- it stops the run loop. The source commit is ordered
679        // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
680        // logged and continued, the NEXT block's commit would advance the
681        // committed watermark PAST this block's never-sent offsets, silently
682        // skipping records (data loss). Stopping the loop leaves THIS block's
683        // tokens uncommitted, so the source re-delivers from the last committed
684        // watermark on restart -- no later block can commit ahead of the
685        // failure. The app owns restart/retry policy; the engine never invents
686        // a silent skip.
687        if let Err(e) = sink(&out_batch).await {
688            tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
689            return Err(e);
690        }
691
692        // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
693        // output record count. This is the at-least-once block contract.
694        match commit {
695            CommitMode::Auto => {
696                // A commit failure is ALSO a terminal ack-barrier failure: a
697                // failed ordered commit must not be followed by a later block's
698                // commit advancing the watermark past these uncommitted offsets.
699                if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
700                    tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
701                    return Err(EngineError::Transport(e));
702                }
703            }
704            CommitMode::SinkManaged => {
705                // The sink owns the commit -- the engine does not commit here.
706            }
707        }
708        Ok(())
709    }
710
711    /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
712    /// bounded to ONE sub-block, the source acks commit once after the final
713    /// sub-block.
714    ///
715    /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
716    /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
717    /// sub-block never multiplies the source acks. Each sub-block's ingress lease
718    /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
719    /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
720    /// block.
721    ///
722    /// On ANY sub-block sink error the block stops and the commit is skipped (the
723    /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
724    /// propagates out and stops the run loop, so no LATER block's ordered commit
725    /// can advance the cumulative watermark past these never-committed offsets
726    /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
727    /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
728    /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
729    /// commit failure is likewise terminal.
730    #[cfg(feature = "transport")]
731    async fn drive_block_streaming<R, P, Sink, SinkFut>(
732        &self,
733        receiver: &R,
734        batch: WorkBatch<R::Token>,
735        process: &P,
736        sink: &mut Sink,
737        commit: CommitMode,
738        sub_block_bytes: u64,
739    ) -> Result<(), EngineError>
740    where
741        R: TransportReceiver,
742        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
743        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
744        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
745    {
746        // Carry the WHOLE block's source acks aside; the sub-block views below
747        // commit EMPTY tokens. The batch's tokens commit ONCE after the final
748        // sub-block (at-least-once on the whole block). dlq_entries were already
749        // routed by ingest_workbatch, so the block here carries none.
750        let WorkBatch {
751            records,
752            commit_tokens,
753            ..
754        } = batch;
755
756        // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
757        // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
758        // pulls it -- it never pre-materialises every sub-block vector up front,
759        // so the only sub-block resident is the one currently being leased and
760        // sunk (the streaming peak-memory contract holds for the SPLIT itself,
761        // not just the lease).
762        let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
763
764        while let Some(sub_records) = sub_blocks.next_sub_block() {
765            // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
766            // path of this iteration (sink-error early return, ?-return, or the
767            // end of the loop body) via Drop -- BEFORE the next sub-block leases.
768            // Peak in-flight lease is therefore one sub-block, never the block.
769            let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
770            #[cfg(feature = "memory")]
771            let _sub_lease = self.lease_ingress_batch(&sub_block);
772
773            // process() may fan out / fan in within the sub-block; it preserves
774            // the (empty) commit_tokens of the sub-block view.
775            let mut out_sub = process(sub_block)?;
776
777            // Route any parse/process-generated DLQ entries this sub-block
778            // carries BEFORE its sink -- same single policy + route point as the
779            // whole-batch path and the inbound-filter entries. Fallible: a route
780            // failure is terminal (ack barrier) so the commit for the WHOLE block
781            // is skipped and it is re-delivered -- a dead-letter is never lost on
782            // the path to a source commit.
783            if !out_sub.dlq_entries.is_empty() {
784                let entries = std::mem::take(&mut out_sub.dlq_entries);
785                if let Err(e) = self.route_dlq_entries(entries) {
786                    tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
787                    return Err(e);
788                }
789            }
790
791            // Sink this sub-block. A sink error stops the block and skips the
792            // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
793            // propagate so the run loop stops -- a later block's ordered commit
794            // must never advance the cumulative watermark past this block's
795            // uncommitted offsets.
796            if let Err(e) = sink(&out_sub).await {
797                tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
798                return Err(e);
799            }
800            // _sub_lease drops here -> bytes released before the next sub-block.
801        }
802
803        // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
804        match commit {
805            CommitMode::Auto => {
806                // Commit failure is terminal (ack barrier) -- same reasoning as
807                // the sink-error path above.
808                if let Err(e) = receiver.commit(&commit_tokens).await {
809                    tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
810                    return Err(EngineError::Transport(e));
811                }
812            }
813            CommitMode::SinkManaged => {
814                // The sink owns the commit -- the engine does not commit here.
815            }
816        }
817        Ok(())
818    }
819
820    /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
821    ///
822    /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
823    /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
824    /// sub-block shapes) ergonomic. Same splitting contract as
825    /// [`SubBlockDrain::next_sub_block`].
826    #[cfg(all(test, feature = "transport"))]
827    fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
828        let mut drain = SubBlockDrain::new(records, target_bytes);
829        let mut out = Vec::new();
830        while let Some(sub) = drain.next_sub_block() {
831            out.push(sub);
832        }
833        out
834    }
835
836    /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
837    /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
838    ///
839    /// Parses each record's payload via [`codec::parse`] on the worker pool
840    /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
841    /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
842    /// the engine's `parse_error_action` -- the SAME contract the legacy
843    /// `process_mid_tier` honoured (previously the parsed path hardcoded
844    /// route-to-DLQ, ignoring the config):
845    ///
846    /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
847    ///   appended to the batch's `dlq_entries` (no silent drop) and counted in
848    ///   errors + dlq. The driver routes those entries before commit.
849    /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
850    ///   in errors ONLY (a deliberate, configured drop -- not a silent vanish).
851    /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
852    ///   failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
853    ///   consistent with the P1 ack barrier, so the block is re-delivered rather
854    ///   than partially committed.
855    ///
856    /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
857    ///
858    /// # Errors
859    ///
860    /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
861    /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
862    #[cfg(feature = "transport")]
863    fn parse_block<T: crate::transport::CommitToken>(
864        &self,
865        batch: WorkBatch<T>,
866    ) -> Result<ParsedBatch<'_, T>, EngineError> {
867        use super::ParseErrorAction;
868        use crate::transport::PayloadFormat;
869
870        let WorkBatch {
871            records,
872            commit_tokens,
873            mut dlq_entries,
874        } = batch;
875
876        // Parse each record on the pool. The pool's map_owned applies the scaler
877        // semaphore per item, so the parse phase obeys the CPU cap exactly as the
878        // legacy parsed path does. Carry the index so failures keep their bytes.
879        let indexed: Vec<(usize, Record)> = records.into_iter().enumerate().collect();
880        let parsed_each: Vec<(usize, Record, Result<ParsedPayload, String>)> =
881            self.pool.map_owned(indexed, |(idx, record)| {
882                let format: PayloadFormat = record.metadata.format;
883                let result =
884                    codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
885                (idx, record, result)
886            });
887
888        let action = self.config.parse_error_action;
889        let mut keep_records = Vec::new();
890        let mut keep_parsed = Vec::new();
891        for (_idx, record, result) in parsed_each {
892            match result {
893                Ok(payload) => {
894                    keep_records.push(record);
895                    keep_parsed.push(payload);
896                }
897                Err(reason) => match action {
898                    ParseErrorAction::Dlq => {
899                        // No silent drop: the unparseable record's bytes go to DLQ.
900                        self.stats.incr_errors();
901                        self.stats.incr_dlq();
902                        dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
903                            payload: record.payload.to_vec(),
904                            key: record.key.clone(),
905                            reason,
906                        });
907                    }
908                    ParseErrorAction::Skip => {
909                        // Deliberate, configured drop -- counted in errors but NOT
910                        // dead-lettered. This is opt-in loss, not a silent vanish.
911                        self.stats.incr_errors();
912                    }
913                    ParseErrorAction::FailBatch => {
914                        // Terminal: the whole block fails its commit (ack barrier).
915                        self.stats.incr_errors();
916                        return Err(EngineError::ParseBatchFailed(reason));
917                    }
918                },
919            }
920        }
921
922        Ok(ParsedBatch {
923            records: keep_records,
924            parsed: keep_parsed,
925            commit_tokens,
926            dlq_entries,
927            interner: &self.interner,
928        })
929    }
930
931    /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
932    /// returning an RAII lease that releases them on drop.
933    ///
934    /// Drives the in-flight ingress accounting for the WorkBatch driver: the
935    /// lease is taken in [`drive_block`](Self::drive_block) and releases the
936    /// bytes on every block exit path via `Drop`.
937    ///
938    /// [`MemoryGuard`]: crate::memory::MemoryGuard
939    #[cfg(feature = "memory")]
940    pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
941        &self,
942        batch: &WorkBatch<T>,
943    ) -> Option<super::IngressLease<'_>> {
944        let guard = self.memory_guard.as_ref()?;
945        let bytes = batch.total_payload_bytes() as u64;
946        guard.add_bytes(bytes);
947        Some(super::IngressLease::new(guard, bytes))
948    }
949}
950
951/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
952/// of [`Record`]s at a time, so the streaming driver never pre-materialises
953/// every sub-block vector up front.
954///
955/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
956/// order) from the source until the accumulated `payload.len()` would overshoot
957/// `target_bytes`, then returns that sub-block; the remaining records stay
958/// un-pulled in the source iterator. Splitting contract:
959///
960/// - records are kept in order;
961/// - FLOOR of one record per sub-block: a record whose payload alone meets or
962///   exceeds `target_bytes` is its own single-record sub-block (never stalls);
963/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
964/// - an exhausted source yields `None`.
965///
966/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
967/// sub-block vector before the loop processed the first one. Here, at most ONE
968/// sub-block vector is allocated at a time -- the one the loop is about to lease
969/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
970#[cfg(feature = "transport")]
971struct SubBlockDrain {
972    /// Source records, drained in order. `peeked` holds a record we pulled but
973    /// could not fit into the sub-block being built (it starts the next one).
974    iter: std::vec::IntoIter<Record>,
975    peeked: Option<Record>,
976    target_bytes: u64,
977}
978
979#[cfg(feature = "transport")]
980impl SubBlockDrain {
981    fn new(records: Vec<Record>, target_bytes: u64) -> Self {
982        Self {
983            iter: records.into_iter(),
984            peeked: None,
985            target_bytes,
986        }
987    }
988
989    /// Yield the next consecutive sub-block, or `None` when the source is
990    /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
991    fn next_sub_block(&mut self) -> Option<Vec<Record>> {
992        // Start with the record carried over from the previous call (if any),
993        // else pull the first record of this sub-block from the source.
994        let first = self.peeked.take().or_else(|| self.iter.next())?;
995        let mut current_bytes = first.payload.len() as u64;
996        let mut current = vec![first];
997
998        // Pull more records while they fit. Floor 1: we already took one record
999        // above, so an oversized record is still its own sub-block.
1000        for record in self.iter.by_ref() {
1001            let record_bytes = record.payload.len() as u64;
1002            if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1003                // Does not fit -- carry it to the next sub-block and stop here.
1004                self.peeked = Some(record);
1005                break;
1006            }
1007            current_bytes = current_bytes.saturating_add(record_bytes);
1008            current.push(record);
1009        }
1010        Some(current)
1011    }
1012}
1013
1014#[cfg(all(test, feature = "transport-memory"))]
1015mod tests {
1016    use super::*;
1017    use crate::transport::memory::{MemoryConfig, MemoryTransport};
1018    use crate::transport::{CommitToken, PayloadFormat, RecordMeta};
1019    use crate::worker::engine::BatchProcessingConfig;
1020    use bytes::Bytes;
1021    use std::sync::Arc;
1022    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1023
1024    fn default_engine() -> BatchEngine {
1025        BatchEngine::new(BatchProcessingConfig::default())
1026    }
1027
1028    fn mem_transport(timeout_ms: u64) -> MemoryTransport {
1029        MemoryTransport::new(&MemoryConfig {
1030            recv_timeout_ms: timeout_ms,
1031            ..Default::default()
1032        })
1033        .expect("memory transport with valid config must construct")
1034    }
1035
1036    /// Cancel `shutdown` after `ms` to stop the run loop cleanly.
1037    fn cancel_after(shutdown: CancellationToken, ms: u64) {
1038        tokio::spawn(async move {
1039            tokio::time::sleep(Duration::from_millis(ms)).await;
1040            shutdown.cancel();
1041        });
1042    }
1043
1044    /// Clone one record into `factor` copies (1->N fan-out).
1045    fn fan_out(records: Vec<Record>, factor: usize) -> Vec<Record> {
1046        let mut out = Vec::with_capacity(records.len() * factor);
1047        for r in records {
1048            for _ in 0..factor {
1049                out.push(r.clone());
1050            }
1051        }
1052        out
1053    }
1054
1055    /// THE proving test: N source records, each with a distinct ack; a process
1056    /// that fans 1->2; assert all 2N records hit the sink AND commit acked
1057    /// EXACTLY N source tokens (committed_sequence advanced by the source acks,
1058    /// not the doubled output count).
1059    #[tokio::test]
1060    async fn fan_out_commits_source_tokens_not_output_count() {
1061        let n = 5usize;
1062        let transport = mem_transport(50);
1063        for i in 0..n {
1064            transport
1065                .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
1066                .await
1067                .unwrap();
1068        }
1069
1070        let engine = default_engine();
1071        let shutdown = CancellationToken::new();
1072        cancel_after(shutdown.clone(), 200);
1073
1074        let sink_records = Arc::new(AtomicUsize::new(0));
1075        let sink_tokens = Arc::new(AtomicUsize::new(0));
1076        let sr = Arc::clone(&sink_records);
1077        let st = Arc::clone(&sink_tokens);
1078
1079        engine
1080            .run_workbatch(
1081                &transport,
1082                shutdown,
1083                |batch| Ok(batch.map_records(|recs| fan_out(recs, 2))),
1084                |out: &WorkBatch<_>| {
1085                    let sr = Arc::clone(&sr);
1086                    let st = Arc::clone(&st);
1087                    let records = out.records.len();
1088                    let tokens = out.commit_tokens.len();
1089                    async move {
1090                        sr.fetch_add(records, Ordering::Relaxed);
1091                        st.fetch_add(tokens, Ordering::Relaxed);
1092                        Ok(())
1093                    }
1094                },
1095                CommitMode::Auto,
1096                None::<(
1097                    Duration,
1098                    fn() -> std::future::Ready<Result<(), EngineError>>,
1099                )>,
1100            )
1101            .await
1102            .unwrap();
1103
1104        // (a) all 2N records reached the sink.
1105        assert_eq!(
1106            sink_records.load(Ordering::Relaxed),
1107            2 * n,
1108            "all 2N records sunk"
1109        );
1110        // (b) the out-batch carried exactly N source tokens (fan-out did not
1111        // multiply the acks).
1112        assert_eq!(
1113            sink_tokens.load(Ordering::Relaxed),
1114            n,
1115            "N source tokens carried"
1116        );
1117        // (b cont.) commit acked exactly the N source tokens: MemoryToken seq is
1118        // 0..N, so committed_sequence (a fetch_max) lands on N-1.
1119        assert_eq!(
1120            transport.committed_sequence(),
1121            (n - 1) as u64,
1122            "commit advanced to the highest of the N source acks, not the 2N output count"
1123        );
1124    }
1125
1126    /// On a sink error the commit must NOT fire (the block is re-delivered) AND
1127    /// the run loop stops -- the sink error is a TERMINAL ack-barrier error.
1128    #[tokio::test]
1129    async fn sink_error_does_not_commit() {
1130        let transport = mem_transport(50);
1131        transport
1132            .inject(None, br#"{"id":1}"#.to_vec())
1133            .await
1134            .unwrap();
1135
1136        let engine = default_engine();
1137        let shutdown = CancellationToken::new();
1138        cancel_after(shutdown.clone(), 200);
1139
1140        let result = engine
1141            .run_workbatch(
1142                &transport,
1143                shutdown,
1144                |batch| Ok(batch),
1145                |_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
1146                CommitMode::Auto,
1147                None::<(
1148                    Duration,
1149                    fn() -> std::future::Ready<Result<(), EngineError>>,
1150                )>,
1151            )
1152            .await;
1153        assert!(
1154            matches!(result, Err(EngineError::Sink(_))),
1155            "sink error is terminal: the run returns the sink error, got {result:?}"
1156        );
1157
1158        // committed_sequence is a fetch_max seeded at 0 and the only injected
1159        // message had seq 0; a commit would still leave it at 0, so to PROVE the
1160        // commit did not fire we inject a higher-seq message that, if committed,
1161        // would advance the sequence past 0. Re-run with seq 1..=2.
1162        let transport = mem_transport(50);
1163        transport
1164            .inject(None, br#"{"a":1}"#.to_vec())
1165            .await
1166            .unwrap(); // seq 0
1167        transport
1168            .inject(None, br#"{"b":2}"#.to_vec())
1169            .await
1170            .unwrap(); // seq 1
1171        // drain seq 0 first so the failing block carries seq 1.
1172        let _ = transport.recv(1).await.unwrap();
1173        let shutdown = CancellationToken::new();
1174        cancel_after(shutdown.clone(), 200);
1175        let result = engine
1176            .run_workbatch(
1177                &transport,
1178                shutdown,
1179                |batch| Ok(batch),
1180                |_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
1181                CommitMode::Auto,
1182                None::<(
1183                    Duration,
1184                    fn() -> std::future::Ready<Result<(), EngineError>>,
1185                )>,
1186            )
1187            .await;
1188        assert!(result.is_err(), "sink error is terminal");
1189        assert_eq!(
1190            transport.committed_sequence(),
1191            0,
1192            "sink error must skip commit -- sequence stays at its initial 0"
1193        );
1194    }
1195
1196    /// `CommitMode::Auto` commits after a successful sink.
1197    #[tokio::test]
1198    async fn auto_commits_after_sink_ok() {
1199        let transport = mem_transport(50);
1200        for i in 0..3u64 {
1201            transport
1202                .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
1203                .await
1204                .unwrap();
1205        }
1206
1207        let engine = default_engine();
1208        let shutdown = CancellationToken::new();
1209        cancel_after(shutdown.clone(), 200);
1210
1211        engine
1212            .run_workbatch(
1213                &transport,
1214                shutdown,
1215                |batch| Ok(batch),
1216                |_out: &WorkBatch<_>| async { Ok(()) },
1217                CommitMode::Auto,
1218                None::<(
1219                    Duration,
1220                    fn() -> std::future::Ready<Result<(), EngineError>>,
1221                )>,
1222            )
1223            .await
1224            .unwrap();
1225
1226        // Three messages seq 0..=2 -> committed sequence is 2.
1227        assert_eq!(transport.committed_sequence(), 2);
1228    }
1229
1230    /// `CommitMode::SinkManaged` leaves the commit to the sink -- the engine
1231    /// does not commit.
1232    #[tokio::test]
1233    async fn sink_managed_does_not_commit_in_engine() {
1234        let transport = mem_transport(50);
1235        transport
1236            .inject(None, br#"{"a":1}"#.to_vec())
1237            .await
1238            .unwrap(); // seq 0
1239        transport
1240            .inject(None, br#"{"b":2}"#.to_vec())
1241            .await
1242            .unwrap(); // seq 1
1243        // Drain seq 0 so the block carries seq 1 -- a commit would push the
1244        // sequence past its initial 0.
1245        let _ = transport.recv(1).await.unwrap();
1246
1247        let engine = default_engine();
1248        let shutdown = CancellationToken::new();
1249        cancel_after(shutdown.clone(), 200);
1250
1251        engine
1252            .run_workbatch(
1253                &transport,
1254                shutdown,
1255                |batch| Ok(batch),
1256                // Sink does NOT commit here -- it could, but we prove the engine
1257                // does not commit on its behalf.
1258                |_out: &WorkBatch<_>| async { Ok(()) },
1259                CommitMode::SinkManaged,
1260                None::<(
1261                    Duration,
1262                    fn() -> std::future::Ready<Result<(), EngineError>>,
1263                )>,
1264            )
1265            .await
1266            .unwrap();
1267
1268        assert_eq!(
1269            transport.committed_sequence(),
1270            0,
1271            "SinkManaged: engine must not commit -- sequence stays at initial 0"
1272        );
1273    }
1274
1275    /// The ticker fires on its interval; shutdown stops the loop cleanly.
1276    #[tokio::test]
1277    async fn ticker_fires_and_shutdown_stops_loop() {
1278        let transport = mem_transport(50);
1279        let engine = default_engine();
1280        let shutdown = CancellationToken::new();
1281        cancel_after(shutdown.clone(), 350);
1282
1283        let ticks = Arc::new(AtomicU64::new(0));
1284        let tc = Arc::clone(&ticks);
1285
1286        let result = engine
1287            .run_workbatch(
1288                &transport,
1289                shutdown,
1290                |batch| Ok(batch),
1291                |_out: &WorkBatch<_>| async { Ok(()) },
1292                CommitMode::Auto,
1293                Some((Duration::from_millis(100), move || {
1294                    let tc = Arc::clone(&tc);
1295                    async move {
1296                        tc.fetch_add(1, Ordering::Relaxed);
1297                        Ok(())
1298                    }
1299                })),
1300            )
1301            .await;
1302
1303        assert!(result.is_ok(), "shutdown stops the loop cleanly");
1304        assert!(
1305            ticks.load(Ordering::Relaxed) >= 2,
1306            "ticker fired at least twice over 350ms at 100ms interval"
1307        );
1308    }
1309
1310    /// On-demand path: a transform that calls codec::parse reads the right field
1311    /// and can rewrite the payload, all without the driver pre-parsing.
1312    #[tokio::test]
1313    async fn on_demand_transform_reads_field_via_codec_parse() {
1314        let transport = mem_transport(50);
1315        transport
1316            .inject(None, br#"{"_table":"events","id":1}"#.to_vec())
1317            .await
1318            .unwrap();
1319
1320        let engine = default_engine();
1321        let shutdown = CancellationToken::new();
1322        cancel_after(shutdown.clone(), 200);
1323
1324        let seen_table = Arc::new(std::sync::Mutex::new(String::new()));
1325        let st = Arc::clone(&seen_table);
1326
1327        engine
1328            .run_workbatch(
1329                &transport,
1330                shutdown,
1331                move |batch| {
1332                    let st = Arc::clone(&st);
1333                    Ok(batch.map_records(move |recs| {
1334                        recs.into_iter()
1335                            .inspect(|r| {
1336                                // Parse ON DEMAND inside the transform.
1337                                let parsed = codec::parse(&r.payload, r.metadata.format)
1338                                    .expect("valid json");
1339                                if let Some(t) = parsed.field_str("_table") {
1340                                    *st.lock().unwrap() = t.to_string();
1341                                }
1342                            })
1343                            .collect()
1344                    }))
1345                },
1346                |_out: &WorkBatch<_>| async { Ok(()) },
1347                CommitMode::Auto,
1348                None::<(
1349                    Duration,
1350                    fn() -> std::future::Ready<Result<(), EngineError>>,
1351                )>,
1352            )
1353            .await
1354            .unwrap();
1355
1356        assert_eq!(*seen_table.lock().unwrap(), "events");
1357    }
1358
1359    /// Batch-parse path: the driver pre-parses; the process closure sees aligned
1360    /// parsed payloads, the interner dedups field names, and the logical result
1361    /// matches the on-demand path.
1362    #[tokio::test]
1363    async fn parsed_path_pre_parses_and_interner_dedups() {
1364        let transport = mem_transport(50);
1365        for i in 0..4 {
1366            transport
1367                .inject(
1368                    None,
1369                    format!(r#"{{"_table":"events","id":{i}}}"#).into_bytes(),
1370                )
1371                .await
1372                .unwrap();
1373        }
1374
1375        let engine = default_engine();
1376        let shutdown = CancellationToken::new();
1377        cancel_after(shutdown.clone(), 200);
1378
1379        let tables = Arc::new(AtomicUsize::new(0));
1380        let tc = Arc::clone(&tables);
1381
1382        engine
1383            .run_workbatch_parsed(
1384                &transport,
1385                shutdown,
1386                move |pb: ParsedBatch<'_, _>| {
1387                    // Records are aligned 1:1 with parsed payloads.
1388                    assert_eq!(pb.records.len(), pb.parsed.len());
1389                    // Intern the routing-field name once for the whole block.
1390                    let field = pb.intern("_table");
1391                    let mut hits = 0;
1392                    for parsed in &pb.parsed {
1393                        if parsed.field_str(&field) == Some("events") {
1394                            hits += 1;
1395                        }
1396                    }
1397                    tc.fetch_add(hits, Ordering::Relaxed);
1398                    // Re-assemble a WorkBatch preserving the source acks.
1399                    Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1400                        .with_dlq_entries(pb.dlq_entries))
1401                },
1402                |_out: &WorkBatch<_>| async { Ok(()) },
1403                CommitMode::Auto,
1404                None::<(
1405                    Duration,
1406                    fn() -> std::future::Ready<Result<(), EngineError>>,
1407                )>,
1408            )
1409            .await
1410            .unwrap();
1411
1412        assert_eq!(
1413            tables.load(Ordering::Relaxed),
1414            4,
1415            "all 4 records routed on _table"
1416        );
1417        assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
1418    }
1419
1420    /// Parsed path no-silent-drop (default `ParseErrorAction::Dlq`): an
1421    /// unparseable record is routed to the out-batch DLQ entries, the process
1422    /// closure sees them, AND they reach the DLQ route point (a `Route` policy
1423    /// sink) before commit -- not dropped -- while source acks stay intact.
1424    #[tokio::test]
1425    async fn parsed_path_routes_parse_failures_to_dlq() {
1426        use crate::worker::engine::FilterDlqPolicy;
1427        let transport = mem_transport(50);
1428        transport
1429            .inject(None, br#"{"id":1}"#.to_vec())
1430            .await
1431            .unwrap(); // seq 0 ok
1432        transport
1433            .inject(None, b"not json {{{".to_vec())
1434            .await
1435            .unwrap(); // seq 1 bad
1436        transport
1437            .inject(None, br#"{"id":3}"#.to_vec())
1438            .await
1439            .unwrap(); // seq 2 ok
1440
1441        // A Route policy captures the entries that reach the DLQ route point.
1442        let routed = Arc::new(AtomicUsize::new(0));
1443        let rc = Arc::clone(&routed);
1444        let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
1445            move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
1446                rc.fetch_add(entries.len(), Ordering::Relaxed);
1447                Ok(())
1448            },
1449        )));
1450        let shutdown = CancellationToken::new();
1451        cancel_after(shutdown.clone(), 200);
1452
1453        let dlq_seen = Arc::new(AtomicUsize::new(0));
1454        let kept = Arc::new(AtomicUsize::new(0));
1455        let ds = Arc::clone(&dlq_seen);
1456        let kp = Arc::clone(&kept);
1457
1458        engine
1459            .run_workbatch_parsed(
1460                &transport,
1461                shutdown,
1462                move |pb: ParsedBatch<'_, _>| {
1463                    ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
1464                    kp.fetch_add(pb.records.len(), Ordering::Relaxed);
1465                    Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1466                        .with_dlq_entries(pb.dlq_entries))
1467                },
1468                |_out: &WorkBatch<_>| async { Ok(()) },
1469                CommitMode::Auto,
1470                None::<(
1471                    Duration,
1472                    fn() -> std::future::Ready<Result<(), EngineError>>,
1473                )>,
1474            )
1475            .await
1476            .unwrap();
1477
1478        assert_eq!(kept.load(Ordering::Relaxed), 2, "2 records parsed cleanly");
1479        assert_eq!(
1480            dlq_seen.load(Ordering::Relaxed),
1481            1,
1482            "1 parse failure carried to the process closure as a DLQ entry"
1483        );
1484        assert_eq!(
1485            routed.load(Ordering::Relaxed),
1486            1,
1487            "the parse-failure DLQ entry reached the DLQ route point before commit"
1488        );
1489        // All three source acks are still committed -- a parse failure does not
1490        // lose the source ack (at-least-once on the WHOLE block).
1491        assert_eq!(transport.committed_sequence(), 2);
1492    }
1493
1494    /// Memory pressure / lease accounting on a WorkBatch.
1495    #[cfg(feature = "memory")]
1496    #[tokio::test]
1497    async fn lease_ingress_batch_accounts_and_releases() {
1498        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1499
1500        let mut engine = default_engine();
1501        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1502            limit_bytes: 1024 * 1024,
1503            ..Default::default()
1504        }));
1505        engine.set_memory_guard_for_test(Arc::clone(&guard));
1506
1507        let payloads: Vec<Record> = (0..4)
1508            .map(|i| Record {
1509                payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
1510                key: None,
1511                headers: vec![],
1512                metadata: RecordMeta {
1513                    timestamp_ms: None,
1514                    format: PayloadFormat::Json,
1515                },
1516            })
1517            .collect();
1518        let batch = WorkBatch::<MemTok>::from_records(payloads);
1519        let expected = batch.total_payload_bytes() as u64;
1520
1521        assert_eq!(guard.current_bytes(), 0);
1522        {
1523            let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
1524            assert_eq!(guard.current_bytes(), expected, "accounted while held");
1525        }
1526        assert_eq!(guard.current_bytes(), 0, "released on drop");
1527    }
1528
1529    /// A minimal CommitToken for the memory-lease unit test (no transport recv).
1530    #[cfg(feature = "memory")]
1531    #[derive(Debug, Clone)]
1532    struct MemTok;
1533    #[cfg(feature = "memory")]
1534    impl std::fmt::Display for MemTok {
1535        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536            f.write_str("memtok")
1537        }
1538    }
1539    #[cfg(feature = "memory")]
1540    impl CommitToken for MemTok {}
1541
1542    // ---- Remediation Phase 1: ordered-commit ack barrier -----------------
1543    //
1544    // Kafka (and MemoryTransport) commit is CUMULATIVE: `commit up to offset N`
1545    // advances a watermark via fetch_max. So if a block carrying token 0 fails
1546    // its sink/commit, a LATER block carrying token 1 must NEVER be committed --
1547    // doing so advances the watermark past token 0's never-sent records, which
1548    // silently skips them (data loss, at-least-once violated). These tests pin
1549    // the ack barrier: the committed watermark never advances past the last
1550    // successfully-sunk-and-committed block.
1551
1552    /// A real ORDERED receiver test double (real `Record`/`WorkBatch`/`MemoryToken`
1553    /// types, no internal-code mock). It hands out ONE record per `recv` with
1554    /// MONOTONIC tokens (seq 0, 1, 2, ...) and a CUMULATIVE commit -- the
1555    /// committed watermark is `fetch_max` of the committed tokens, exactly like
1556    /// a Kafka offset commit. This isolates the ordered-commit semantics from
1557    /// MemoryTransport's channel batching (which would coalesce all pending
1558    /// messages into a single block).
1559    struct OrderedReceiver {
1560        /// Next seq to deliver; one record per recv until exhausted.
1561        next_seq: Arc<AtomicU64>,
1562        /// How many records to deliver before recv blocks (pending) forever.
1563        total: u64,
1564        /// Cumulative committed watermark (highest committed seq + 1, or 0 if
1565        /// nothing committed). `u64::MAX` sentinel means "no commit yet".
1566        committed_hwm: Arc<AtomicU64>,
1567        /// Count of commit calls (to prove a later block's commit did not fire).
1568        commit_calls: Arc<AtomicUsize>,
1569        /// If set, `commit` returns an error (broker commit failure) for any
1570        /// block whose highest token seq equals this value.
1571        fail_commit_on_seq: Option<u64>,
1572    }
1573
1574    impl OrderedReceiver {
1575        fn new(total: u64) -> Self {
1576            Self {
1577                next_seq: Arc::new(AtomicU64::new(0)),
1578                total,
1579                committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
1580                commit_calls: Arc::new(AtomicUsize::new(0)),
1581                fail_commit_on_seq: None,
1582            }
1583        }
1584    }
1585
1586    impl crate::transport::TransportBase for OrderedReceiver {
1587        fn close(
1588            &self,
1589        ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
1590        {
1591            std::future::ready(Ok(()))
1592        }
1593        fn is_healthy(&self) -> bool {
1594            true
1595        }
1596        fn name(&self) -> &'static str {
1597            "ordered-test"
1598        }
1599    }
1600
1601    impl TransportReceiver for OrderedReceiver {
1602        type Token = crate::transport::memory::MemoryToken;
1603
1604        fn recv(
1605            &self,
1606            _max: usize,
1607        ) -> impl std::future::Future<
1608            Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
1609        > + Send {
1610            let next_seq = Arc::clone(&self.next_seq);
1611            let total = self.total;
1612            async move {
1613                let seq = next_seq.fetch_add(1, Ordering::Relaxed);
1614                if seq >= total {
1615                    // Exhausted: block forever so the loop only exits on shutdown
1616                    // (mirrors a quiet broker -- never an error/EOF).
1617                    next_seq.fetch_sub(1, Ordering::Relaxed);
1618                    std::future::pending::<()>().await;
1619                }
1620                let record = Record {
1621                    payload: Bytes::from(format!(r#"{{"seq":{seq}}}"#)),
1622                    key: None,
1623                    headers: vec![],
1624                    metadata: RecordMeta {
1625                        timestamp_ms: None,
1626                        format: PayloadFormat::Json,
1627                    },
1628                };
1629                Ok(WorkBatch::new(
1630                    vec![record],
1631                    vec![crate::transport::memory::MemoryToken { seq }],
1632                ))
1633            }
1634        }
1635
1636        async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
1637            self.commit_calls.fetch_add(1, Ordering::Relaxed);
1638            let Some(max_seq) = tokens.iter().map(|t| t.seq).max() else {
1639                return Ok(());
1640            };
1641            if self.fail_commit_on_seq == Some(max_seq) {
1642                return Err(crate::transport::TransportError::Commit(format!(
1643                    "broker commit failed for seq {max_seq}"
1644                )));
1645            }
1646            // Cumulative: watermark = max(current, this block's highest seq).
1647            self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
1648            Ok(())
1649        }
1650    }
1651
1652    /// THE ack-barrier bug test (sink failure). Token 0's block fails at the
1653    /// sink; token 1's block would succeed. With an ORDERED/cumulative commit,
1654    /// the engine must NEVER commit token 1 (which would advance the watermark
1655    /// past the never-sent token 0). Assert: the committed watermark never
1656    /// advances past the last successfully-sunk block -- i.e. NOTHING is
1657    /// committed, and the run STOPS (terminal) rather than draining token 1.
1658    #[tokio::test]
1659    async fn sink_error_blocks_later_ordered_commits() {
1660        let receiver = OrderedReceiver::new(3);
1661        let committed = Arc::clone(&receiver.committed_hwm);
1662        let commit_calls = Arc::clone(&receiver.commit_calls);
1663
1664        let engine = default_engine();
1665        let shutdown = CancellationToken::new();
1666        // Safety net: if the loop wrongly continued, shutdown stops it so the
1667        // test cannot hang. The assertions still catch the data-loss advance.
1668        cancel_after(shutdown.clone(), 500);
1669
1670        let sink_calls = Arc::new(AtomicUsize::new(0));
1671        let sc = Arc::clone(&sink_calls);
1672
1673        let result = engine
1674            .run_workbatch(
1675                &receiver,
1676                shutdown,
1677                |batch| Ok(batch),
1678                move |out: &WorkBatch<_>| {
1679                    let sc = Arc::clone(&sc);
1680                    // Fail the sink for the block carrying token 0.
1681                    let carries_zero = out.commit_tokens.iter().any(|t| t.seq == 0);
1682                    async move {
1683                        sc.fetch_add(1, Ordering::Relaxed);
1684                        if carries_zero {
1685                            Err(EngineError::Sink("boom on token 0".into()))
1686                        } else {
1687                            Ok(())
1688                        }
1689                    }
1690                },
1691                CommitMode::Auto,
1692                None::<(
1693                    Duration,
1694                    fn() -> std::future::Ready<Result<(), EngineError>>,
1695                )>,
1696            )
1697            .await;
1698
1699        // The ack barrier: token 0 failed, so the watermark must NOT advance
1700        // past it. NOTHING may be committed (token 1 must never commit ahead).
1701        assert_eq!(
1702            committed.load(Ordering::Relaxed),
1703            u64::MAX,
1704            "sink error on token 0 must leave the committed watermark unmoved -- \
1705             a later token must NOT be committed past the failed offset"
1706        );
1707        assert_eq!(
1708            commit_calls.load(Ordering::Relaxed),
1709            0,
1710            "no commit may fire while token 0's block is unsent"
1711        );
1712        // The fix makes the sink error TERMINAL: the run returns Err and the
1713        // loop never advances to deliver token 1.
1714        assert!(
1715            result.is_err(),
1716            "sink failure under Auto must be a terminal engine error (ack barrier), \
1717             not a logged continue that drains later blocks"
1718        );
1719        assert_eq!(
1720            sink_calls.load(Ordering::Relaxed),
1721            1,
1722            "loop must stop at the failed block -- token 1 must not be fetched+sunk"
1723        );
1724    }
1725
1726    /// Ack-barrier on COMMIT failure. The sink succeeds but the COMMIT for
1727    /// token 0's block fails (broker commit error). The engine must treat this
1728    /// as a terminal ack-barrier failure and NOT advance to fetch+commit
1729    /// token 1 past the failed offset.
1730    #[tokio::test]
1731    async fn commit_error_blocks_later_ordered_commits() {
1732        let mut receiver = OrderedReceiver::new(3);
1733        receiver.fail_commit_on_seq = Some(0);
1734        let committed = Arc::clone(&receiver.committed_hwm);
1735
1736        let engine = default_engine();
1737        let shutdown = CancellationToken::new();
1738        cancel_after(shutdown.clone(), 500);
1739
1740        let sink_calls = Arc::new(AtomicUsize::new(0));
1741        let sc = Arc::clone(&sink_calls);
1742
1743        let result = engine
1744            .run_workbatch(
1745                &receiver,
1746                shutdown,
1747                |batch| Ok(batch),
1748                move |_out: &WorkBatch<_>| {
1749                    let sc = Arc::clone(&sc);
1750                    async move {
1751                        sc.fetch_add(1, Ordering::Relaxed);
1752                        Ok(())
1753                    }
1754                },
1755                CommitMode::Auto,
1756                None::<(
1757                    Duration,
1758                    fn() -> std::future::Ready<Result<(), EngineError>>,
1759                )>,
1760            )
1761            .await;
1762
1763        // Commit of token 0 failed -> watermark unmoved, run terminates, token 1
1764        // is never fetched/committed past the failed offset.
1765        assert_eq!(
1766            committed.load(Ordering::Relaxed),
1767            u64::MAX,
1768            "failed commit must not leave a later commit to advance past it"
1769        );
1770        assert!(
1771            result.is_err(),
1772            "commit failure must be a terminal ack-barrier error"
1773        );
1774        assert_eq!(
1775            sink_calls.load(Ordering::Relaxed),
1776            1,
1777            "loop must stop at the failed commit -- token 1 must not be processed"
1778        );
1779    }
1780
1781    /// Streaming variant of the ack barrier: a sink error on token 0's block
1782    /// (streamed in sub-blocks) must block any later ordered commit. Mid-block
1783    /// sink failure stops the block AND must not let a later block's commit
1784    /// advance the watermark past it.
1785    #[tokio::test]
1786    async fn streaming_sink_error_blocks_later_ordered_commits() {
1787        let receiver = OrderedReceiver::new(3);
1788        let committed = Arc::clone(&receiver.committed_hwm);
1789        let commit_calls = Arc::clone(&receiver.commit_calls);
1790
1791        let engine = default_engine();
1792        let shutdown = CancellationToken::new();
1793        cancel_after(shutdown.clone(), 500);
1794
1795        let sink_calls = Arc::new(AtomicUsize::new(0));
1796        let sc = Arc::clone(&sink_calls);
1797
1798        let result = engine
1799            .run_workbatch_streaming(
1800                &receiver,
1801                shutdown,
1802                |batch| Ok(batch),
1803                move |out: &WorkBatch<_>| {
1804                    let sc = Arc::clone(&sc);
1805                    // Streaming sub-block views carry EMPTY commit_tokens, so we
1806                    // identify token 0's block by its payload bytes ({"seq":0}).
1807                    let carries_zero = out
1808                        .records
1809                        .iter()
1810                        .any(|r| r.payload.as_ref() == br#"{"seq":0}"#);
1811                    async move {
1812                        sc.fetch_add(1, Ordering::Relaxed);
1813                        if carries_zero {
1814                            Err(EngineError::Sink("boom on token 0 (streaming)".into()))
1815                        } else {
1816                            Ok(())
1817                        }
1818                    }
1819                },
1820                CommitMode::Auto,
1821                64, // one record per sub-block (records are tiny)
1822                None::<(
1823                    Duration,
1824                    fn() -> std::future::Ready<Result<(), EngineError>>,
1825                )>,
1826            )
1827            .await;
1828
1829        assert_eq!(
1830            committed.load(Ordering::Relaxed),
1831            u64::MAX,
1832            "streaming sink error on token 0 must not let a later token commit ahead"
1833        );
1834        assert_eq!(
1835            commit_calls.load(Ordering::Relaxed),
1836            0,
1837            "no commit may fire while token 0's block is unsent (streaming)"
1838        );
1839        assert!(
1840            result.is_err(),
1841            "streaming sink failure under Auto must be a terminal ack-barrier error"
1842        );
1843        assert_eq!(
1844            sink_calls.load(Ordering::Relaxed),
1845            1,
1846            "streaming loop must stop at the failed block"
1847        );
1848    }
1849
1850    // ---- Task G4: per-unit streaming -------------------------------------
1851
1852    /// split_into_sub_blocks unit coverage: byte-budget splitting + floor-1.
1853    #[test]
1854    fn split_groups_by_byte_target() {
1855        // Five 10-byte records, target 25 -> sub-blocks of {2,2,1} records
1856        // (20 <= 25; adding the 3rd would be 30 > 25 -> close at 2).
1857        let records: Vec<Record> = (0..5)
1858            .map(|_| Record {
1859                payload: Bytes::from_static(b"0123456789"), // 10 bytes
1860                key: None,
1861                headers: vec![],
1862                metadata: RecordMeta {
1863                    timestamp_ms: None,
1864                    format: PayloadFormat::Json,
1865                },
1866            })
1867            .collect();
1868        let sub = BatchEngine::split_into_sub_blocks(records, 25);
1869        let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
1870        assert_eq!(lens, vec![2, 2, 1], "20<=25 per block, never overshoot 25");
1871    }
1872
1873    #[test]
1874    fn split_floor_one_oversized_record() {
1875        // A record larger than the target is still its own sub-block (no stall).
1876        let records = vec![
1877            Record {
1878                payload: Bytes::from_static(b"this-payload-is-way-over-the-target"),
1879                key: None,
1880                headers: vec![],
1881                metadata: RecordMeta {
1882                    timestamp_ms: None,
1883                    format: PayloadFormat::Json,
1884                },
1885            },
1886            Record {
1887                payload: Bytes::from_static(b"small"),
1888                key: None,
1889                headers: vec![],
1890                metadata: RecordMeta {
1891                    timestamp_ms: None,
1892                    format: PayloadFormat::Json,
1893                },
1894            },
1895        ];
1896        let sub = BatchEngine::split_into_sub_blocks(records, 4);
1897        let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
1898        assert_eq!(lens, vec![1, 1], "oversized record floors to one-per-block");
1899    }
1900
1901    #[test]
1902    fn split_empty_yields_no_sub_blocks() {
1903        let sub = BatchEngine::split_into_sub_blocks(Vec::new(), 100);
1904        assert!(sub.is_empty());
1905    }
1906
1907    #[test]
1908    fn split_smaller_than_target_is_one_sub_block() {
1909        let records: Vec<Record> = (0..3)
1910            .map(|_| Record {
1911                payload: Bytes::from_static(b"abc"),
1912                key: None,
1913                headers: vec![],
1914                metadata: RecordMeta {
1915                    timestamp_ms: None,
1916                    format: PayloadFormat::Json,
1917                },
1918            })
1919            .collect();
1920        let sub = BatchEngine::split_into_sub_blocks(records, 10_000);
1921        assert_eq!(sub.len(), 1, "whole batch under target -> single sub-block");
1922        assert_eq!(sub[0].len(), 3);
1923    }
1924
1925    /// THE peak-memory proving test: a batch of N records totalling B bytes,
1926    /// streamed with sub_block_bytes ~= B/4. A guard with a registered guard
1927    /// (no heap source) reports current_bytes() = the outstanding lease. The sink
1928    /// samples guard.current_bytes() on EACH call (the sub-block lease is held
1929    /// during the sink); the high-water must stay at ~one sub-block, NOT the whole
1930    /// batch B. The contrast: drive_block would peak at B.
1931    #[cfg(feature = "memory")]
1932    #[tokio::test]
1933    async fn streaming_peak_lease_bounded_to_one_sub_block() {
1934        use crate::memory::{MemoryGuard, MemoryGuardConfig};
1935
1936        // 16 records of 64 bytes each = 1024 bytes total.
1937        const RECORD_BYTES: usize = 64;
1938        const N: usize = 16;
1939        let total: u64 = (RECORD_BYTES * N) as u64; // 1024
1940        let payload = vec![b'x'; RECORD_BYTES];
1941
1942        let transport = mem_transport(50);
1943        for _ in 0..N {
1944            transport.inject(None, payload.clone()).await.unwrap();
1945        }
1946
1947        let mut engine = default_engine();
1948        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1949            limit_bytes: 1024 * 1024,
1950            ..Default::default()
1951        }));
1952        engine.set_memory_guard_for_test(Arc::clone(&guard));
1953
1954        let shutdown = CancellationToken::new();
1955        cancel_after(shutdown.clone(), 200);
1956
1957        // Sub-block target ~= B/4 -> ~256 bytes -> 4 records per sub-block.
1958        let sub_block_bytes = total / 4; // 256
1959        let one_sub_block_bytes = sub_block_bytes; // 4 records * 64 = 256
1960
1961        // High-water of the guard's accounted bytes, sampled while the sub-block
1962        // lease is held (the sink runs inside the leased window).
1963        let high_water = Arc::new(AtomicU64::new(0));
1964        let guard_for_sink = Arc::clone(&guard);
1965        let hw = Arc::clone(&high_water);
1966
1967        engine
1968            .run_workbatch_streaming(
1969                &transport,
1970                shutdown,
1971                |batch| Ok(batch),
1972                move |_out: &WorkBatch<_>| {
1973                    let guard = Arc::clone(&guard_for_sink);
1974                    let hw = Arc::clone(&hw);
1975                    async move {
1976                        let now = guard.current_bytes();
1977                        hw.fetch_max(now, Ordering::Relaxed);
1978                        Ok(())
1979                    }
1980                },
1981                CommitMode::Auto,
1982                sub_block_bytes,
1983                None::<(
1984                    Duration,
1985                    fn() -> std::future::Ready<Result<(), EngineError>>,
1986                )>,
1987            )
1988            .await
1989            .unwrap();
1990
1991        let peak = high_water.load(Ordering::Relaxed);
1992        // Peak in-flight lease is ONE sub-block, never the whole batch.
1993        assert!(
1994            peak <= one_sub_block_bytes,
1995            "peak lease {peak} exceeded one sub-block {one_sub_block_bytes} \
1996             (a whole-batch lease would be {total})"
1997        );
1998        assert!(
1999            peak > 0 && peak < total,
2000            "peak {peak} must be a partial sub-block, strictly less than the \
2001             whole batch {total}"
2002        );
2003        // Lease fully released after the run.
2004        assert_eq!(guard.current_bytes(), 0, "all leases released after run");
2005    }
2006
2007    /// A counting receiver: delegates recv/lifecycle to an inner MemoryTransport,
2008    /// but records EACH commit call (count + the tokens + how many sink calls had
2009    /// happened by then) so the test can prove "commit fires exactly once, after
2010    /// the final sub-block, with all N source tokens".
2011    struct CountingReceiver {
2012        inner: MemoryTransport,
2013        commit_calls: Arc<AtomicUsize>,
2014        commit_token_count: Arc<AtomicUsize>,
2015        sink_calls: Arc<AtomicUsize>,
2016        sink_calls_at_commit: Arc<AtomicUsize>,
2017    }
2018
2019    impl crate::transport::TransportBase for CountingReceiver {
2020        fn close(
2021            &self,
2022        ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
2023        {
2024            self.inner.close()
2025        }
2026        fn is_healthy(&self) -> bool {
2027            self.inner.is_healthy()
2028        }
2029        fn name(&self) -> &'static str {
2030            self.inner.name()
2031        }
2032    }
2033
2034    impl TransportReceiver for CountingReceiver {
2035        type Token = <MemoryTransport as TransportReceiver>::Token;
2036
2037        fn recv(
2038            &self,
2039            max: usize,
2040        ) -> impl std::future::Future<
2041            Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2042        > + Send {
2043            self.inner.recv(max)
2044        }
2045
2046        async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
2047            self.commit_calls.fetch_add(1, Ordering::Relaxed);
2048            self.commit_token_count
2049                .fetch_add(tokens.len(), Ordering::Relaxed);
2050            self.sink_calls_at_commit
2051                .store(self.sink_calls.load(Ordering::Relaxed), Ordering::Relaxed);
2052            self.inner.commit(tokens).await
2053        }
2054    }
2055
2056    /// Commit-once-after-final: N source tokens streamed across multiple
2057    /// sub-blocks. Commit must fire EXACTLY once, AFTER the last sub-block's sink,
2058    /// carrying ALL N source tokens (at-least-once on the whole block).
2059    #[tokio::test]
2060    async fn streaming_commits_once_after_final_sub_block() {
2061        const N: usize = 12;
2062        const RECORD_BYTES: usize = 32;
2063        let payload = vec![b'y'; RECORD_BYTES];
2064
2065        let inner = mem_transport(50);
2066        for _ in 0..N {
2067            inner.inject(None, payload.clone()).await.unwrap();
2068        }
2069
2070        let commit_calls = Arc::new(AtomicUsize::new(0));
2071        let commit_token_count = Arc::new(AtomicUsize::new(0));
2072        let sink_calls = Arc::new(AtomicUsize::new(0));
2073        let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
2074        let receiver = CountingReceiver {
2075            inner,
2076            commit_calls: Arc::clone(&commit_calls),
2077            commit_token_count: Arc::clone(&commit_token_count),
2078            sink_calls: Arc::clone(&sink_calls),
2079            sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
2080        };
2081
2082        let engine = default_engine();
2083        let shutdown = CancellationToken::new();
2084        cancel_after(shutdown.clone(), 200);
2085
2086        let sc = Arc::clone(&sink_calls);
2087        // ~3 records per sub-block (96 bytes) -> 4 sub-blocks for 12 records.
2088        let sub_block_bytes = (RECORD_BYTES * 3) as u64;
2089
2090        engine
2091            .run_workbatch_streaming(
2092                &receiver,
2093                shutdown,
2094                |batch| Ok(batch),
2095                move |_out: &WorkBatch<_>| {
2096                    let sc = Arc::clone(&sc);
2097                    async move {
2098                        sc.fetch_add(1, Ordering::Relaxed);
2099                        Ok(())
2100                    }
2101                },
2102                CommitMode::Auto,
2103                sub_block_bytes,
2104                None::<(
2105                    Duration,
2106                    fn() -> std::future::Ready<Result<(), EngineError>>,
2107                )>,
2108            )
2109            .await
2110            .unwrap();
2111
2112        let total_sinks = sink_calls.load(Ordering::Relaxed);
2113        assert!(
2114            total_sinks >= 4,
2115            "expected multiple sub-block sinks, got {total_sinks}"
2116        );
2117        // Commit fired exactly ONCE.
2118        assert_eq!(commit_calls.load(Ordering::Relaxed), 1, "commit fires once");
2119        // It carried ALL N source tokens.
2120        assert_eq!(
2121            commit_token_count.load(Ordering::Relaxed),
2122            N,
2123            "commit carried all N source tokens"
2124        );
2125        // It fired AFTER the final sub-block sink (all sinks done by commit time).
2126        assert_eq!(
2127            sink_calls_at_commit.load(Ordering::Relaxed),
2128            total_sinks,
2129            "commit fired after the last sub-block sink"
2130        );
2131    }
2132
2133    /// A sink error on a MIDDLE sub-block stops the block and skips the commit
2134    /// (the whole block is re-delivered -- at-least-once).
2135    #[tokio::test]
2136    async fn streaming_mid_sub_block_sink_error_skips_commit() {
2137        const N: usize = 9;
2138        const RECORD_BYTES: usize = 32;
2139        let payload = vec![b'z'; RECORD_BYTES];
2140
2141        let inner = mem_transport(50);
2142        for _ in 0..N {
2143            inner.inject(None, payload.clone()).await.unwrap();
2144        }
2145
2146        let commit_calls = Arc::new(AtomicUsize::new(0));
2147        let commit_token_count = Arc::new(AtomicUsize::new(0));
2148        let sink_calls = Arc::new(AtomicUsize::new(0));
2149        let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
2150        let receiver = CountingReceiver {
2151            inner,
2152            commit_calls: Arc::clone(&commit_calls),
2153            commit_token_count: Arc::clone(&commit_token_count),
2154            sink_calls: Arc::clone(&sink_calls),
2155            sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
2156        };
2157
2158        let engine = default_engine();
2159        let shutdown = CancellationToken::new();
2160        cancel_after(shutdown.clone(), 200);
2161
2162        let sc = Arc::clone(&sink_calls);
2163        // ~3 records per sub-block -> 3 sub-blocks; fail on the 2nd (middle).
2164        let sub_block_bytes = (RECORD_BYTES * 3) as u64;
2165
2166        let result = engine
2167            .run_workbatch_streaming(
2168                &receiver,
2169                shutdown,
2170                |batch| Ok(batch),
2171                move |_out: &WorkBatch<_>| {
2172                    let sc = Arc::clone(&sc);
2173                    async move {
2174                        let nth = sc.fetch_add(1, Ordering::Relaxed) + 1;
2175                        if nth == 2 {
2176                            Err(EngineError::Sink("boom on middle sub-block".into()))
2177                        } else {
2178                            Ok(())
2179                        }
2180                    }
2181                },
2182                CommitMode::Auto,
2183                sub_block_bytes,
2184                None::<(
2185                    Duration,
2186                    fn() -> std::future::Ready<Result<(), EngineError>>,
2187                )>,
2188            )
2189            .await;
2190
2191        // The sink error is TERMINAL (ack barrier): the run returns the error.
2192        assert!(
2193            matches!(result, Err(EngineError::Sink(_))),
2194            "mid sub-block sink error is terminal, got {result:?}"
2195        );
2196        // The block stopped at the failing sub-block: no commit, and the 3rd
2197        // sub-block was never sunk.
2198        assert_eq!(
2199            commit_calls.load(Ordering::Relaxed),
2200            0,
2201            "mid sub-block sink error must skip commit"
2202        );
2203        assert_eq!(
2204            sink_calls.load(Ordering::Relaxed),
2205            2,
2206            "stopped after the failing 2nd sub-block (3rd never sunk)"
2207        );
2208    }
2209
2210    /// Floor case: a batch smaller than sub_block_bytes streams as ONE sub-block
2211    /// and behaves like drive_block (all records sunk once, commit once).
2212    #[tokio::test]
2213    async fn streaming_small_batch_is_single_sub_block() {
2214        let transport = mem_transport(50);
2215        for i in 0..3u64 {
2216            transport
2217                .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2218                .await
2219                .unwrap();
2220        }
2221
2222        let engine = default_engine();
2223        let shutdown = CancellationToken::new();
2224        cancel_after(shutdown.clone(), 200);
2225
2226        let sink_calls = Arc::new(AtomicUsize::new(0));
2227        let sink_records = Arc::new(AtomicUsize::new(0));
2228        let scz = Arc::clone(&sink_calls);
2229        let srz = Arc::clone(&sink_records);
2230
2231        engine
2232            .run_workbatch_streaming(
2233                &transport,
2234                shutdown,
2235                |batch| Ok(batch),
2236                move |out: &WorkBatch<_>| {
2237                    let scz = Arc::clone(&scz);
2238                    let srz = Arc::clone(&srz);
2239                    let n = out.records.len();
2240                    async move {
2241                        scz.fetch_add(1, Ordering::Relaxed);
2242                        srz.fetch_add(n, Ordering::Relaxed);
2243                        Ok(())
2244                    }
2245                },
2246                CommitMode::Auto,
2247                10_000, // target far larger than the whole batch
2248                None::<(
2249                    Duration,
2250                    fn() -> std::future::Ready<Result<(), EngineError>>,
2251                )>,
2252            )
2253            .await
2254            .unwrap();
2255
2256        assert_eq!(
2257            sink_calls.load(Ordering::Relaxed),
2258            1,
2259            "under-target batch sinks once (single sub-block)"
2260        );
2261        assert_eq!(
2262            sink_records.load(Ordering::Relaxed),
2263            3,
2264            "all 3 records sunk"
2265        );
2266        assert_eq!(
2267            transport.committed_sequence(),
2268            2,
2269            "all 3 acks committed once"
2270        );
2271    }
2272
2273    // ---- Phase 3: governed run path (default-on self-regulation) ----------
2274
2275    /// Build a real governor over a MemoryGuard and wire its byte budget into
2276    /// the engine, returning (engine, governor) so the test can inspect both.
2277    #[cfg(feature = "governor")]
2278    fn governed_engine() -> (BatchEngine, crate::governor::SelfRegulationGovernor) {
2279        use crate::memory::{MemoryGuard, MemoryGuardConfig};
2280        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2281            limit_bytes: 1024 * 1024,
2282            ..Default::default()
2283        }));
2284        let gov = crate::governor::SelfRegulationConfig::default()
2285            .build(guard)
2286            .expect("enabled by default");
2287        let mut engine = default_engine();
2288        engine.set_byte_budget(gov.budget());
2289        (engine, gov)
2290    }
2291
2292    /// Governor ON: the governed driver streams the input end-to-end through a
2293    /// MemoryTransport, all records reach the sink, the source acks commit, and
2294    /// the AIMD budget moves (observe is folded in per block).
2295    #[cfg(feature = "governor")]
2296    #[tokio::test]
2297    async fn governed_on_streams_and_commits_via_memory_transport() {
2298        let transport = mem_transport(50);
2299        for i in 0..6u64 {
2300            transport
2301                .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2302                .await
2303                .unwrap();
2304        }
2305
2306        let (engine, _gov) = governed_engine();
2307        assert!(engine.is_self_regulated(), "budget wired -> governed path");
2308
2309        let shutdown = CancellationToken::new();
2310        cancel_after(shutdown.clone(), 200);
2311
2312        let sink_records = Arc::new(AtomicUsize::new(0));
2313        let sr = Arc::clone(&sink_records);
2314
2315        engine
2316            .run_governed(
2317                &transport,
2318                shutdown,
2319                |batch| Ok(batch),
2320                move |out: &WorkBatch<_>| {
2321                    let sr = Arc::clone(&sr);
2322                    let n = out.records.len();
2323                    async move {
2324                        sr.fetch_add(n, Ordering::Relaxed);
2325                        Ok(())
2326                    }
2327                },
2328                CommitMode::Auto,
2329                None::<(
2330                    Duration,
2331                    fn() -> std::future::Ready<Result<(), EngineError>>,
2332                )>,
2333            )
2334            .await
2335            .unwrap();
2336
2337        assert_eq!(
2338            sink_records.load(Ordering::Relaxed),
2339            6,
2340            "all records streamed to the sink under the governor"
2341        );
2342        assert_eq!(transport.committed_sequence(), 5, "all 6 acks committed");
2343    }
2344
2345    /// Governor OFF: with no byte budget wired, run_governed delegates to the
2346    /// whole-batch run_workbatch -- behaviour is unchanged (one sink call for
2347    /// the whole block, commit once).
2348    #[cfg(feature = "governor")]
2349    #[tokio::test]
2350    async fn governed_off_is_whole_batch_passthrough() {
2351        let transport = mem_transport(50);
2352        for i in 0..4u64 {
2353            transport
2354                .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2355                .await
2356                .unwrap();
2357        }
2358
2359        // No set_byte_budget -> byte_budget is None -> OFF path.
2360        let engine = default_engine();
2361        assert!(
2362            !engine.is_self_regulated(),
2363            "no budget wired -> whole-batch path"
2364        );
2365
2366        let shutdown = CancellationToken::new();
2367        cancel_after(shutdown.clone(), 200);
2368
2369        let sink_calls = Arc::new(AtomicUsize::new(0));
2370        let sink_records = Arc::new(AtomicUsize::new(0));
2371        let sc = Arc::clone(&sink_calls);
2372        let sr = Arc::clone(&sink_records);
2373
2374        engine
2375            .run_governed(
2376                &transport,
2377                shutdown,
2378                |batch| Ok(batch),
2379                move |out: &WorkBatch<_>| {
2380                    let sc = Arc::clone(&sc);
2381                    let sr = Arc::clone(&sr);
2382                    let n = out.records.len();
2383                    async move {
2384                        sc.fetch_add(1, Ordering::Relaxed);
2385                        sr.fetch_add(n, Ordering::Relaxed);
2386                        Ok(())
2387                    }
2388                },
2389                CommitMode::Auto,
2390                None::<(
2391                    Duration,
2392                    fn() -> std::future::Ready<Result<(), EngineError>>,
2393                )>,
2394            )
2395            .await
2396            .unwrap();
2397
2398        assert_eq!(
2399            sink_calls.load(Ordering::Relaxed),
2400            1,
2401            "OFF path = whole-batch: the block sinks ONCE (not per sub-block)"
2402        );
2403        assert_eq!(sink_records.load(Ordering::Relaxed), 4, "all records sunk");
2404        assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
2405    }
2406
2407    /// The shared pressure feeds an InboundGate: under high memory the gate
2408    /// holds (Admit::Hold) and the budget shrinks; low memory admits and the
2409    /// budget sits at start-big. Proves the gate + budget share one pressure.
2410    #[cfg(feature = "governor")]
2411    #[test]
2412    fn governed_gate_and_budget_share_pressure() {
2413        use crate::governor::{Admit, InboundGate, NoopActuator};
2414        use crate::memory::{MemoryGuard, MemoryGuardConfig};
2415
2416        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2417            limit_bytes: 1000,
2418            pressure_threshold: 0.80,
2419            ..Default::default()
2420        }));
2421        let gov = crate::governor::SelfRegulationConfig::default()
2422            .build(Arc::clone(&guard))
2423            .expect("enabled");
2424
2425        let gate = InboundGate::new(gov.pressure(), Box::new(NoopActuator));
2426        let budget = gov.budget();
2427        let start = budget.byte_budget();
2428
2429        // Low memory -> gate admits, budget unchanged on a slack observe.
2430        assert_eq!(gate.evaluate(), Admit::Yes, "low pressure admits");
2431
2432        // Slam memory high -> the SAME pressure both holds the gate AND, via the
2433        // HARD override in observe(), shrinks the budget regardless of rho.
2434        guard.add_bytes(950); // 95% of limit
2435        assert_eq!(gate.evaluate(), Admit::Hold, "high pressure holds the gate");
2436        budget.observe(0, Duration::from_millis(1), Duration::from_millis(100));
2437        assert!(
2438            budget.byte_budget() < start,
2439            "high memory shrinks the shared budget (HARD override)"
2440        );
2441    }
2442
2443    // ---- Phase 4: validation ---------------------------------------------
2444
2445    /// THE send-unaffected invariant: the OUTBOUND drain (sink) is NEVER gated
2446    /// by pressure -- only the INBOUND recv side is. With a `UnifiedPressure`
2447    /// pinned HARD-HIGH so `should_hold()` is true, the SAME transport's
2448    /// `send` / `send_batch` still succeed. Gating the drain would deadlock the
2449    /// pipeline (in-flight work could never leave), so the governor must never
2450    /// touch it. MemoryTransport's send path consults no pressure governor by
2451    /// construction; this test proves that holds even when a governor that the
2452    /// inbound side WOULD obey is wired and saturated.
2453    #[cfg(feature = "governor")]
2454    #[tokio::test]
2455    async fn send_unaffected_by_pressure_pinned_high() {
2456        use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
2457        use crate::memory::{MemoryGuard, MemoryGuardConfig};
2458        use crate::transport::TransportSender;
2459
2460        // Pin a REAL HARD memory source high so the latch holds (>= pause_above).
2461        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2462            limit_bytes: 1000,
2463            pressure_threshold: 0.80,
2464            ..Default::default()
2465        }));
2466        guard.add_bytes(950); // 95% -> HARD high
2467        let pressure = Arc::new(UnifiedPressure::new(
2468            vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
2469            Hysteresis::new(0.80, 0.65).expect("valid band"),
2470        ));
2471        assert!(
2472            pressure.should_hold(),
2473            "pinned-high governor must hold the INBOUND gate"
2474        );
2475
2476        // The OUTBOUND sink: send / send_batch must still succeed under hold.
2477        let transport = mem_transport(50);
2478
2479        let single = transport
2480            .send("k", Bytes::from_static(br#"{"id":1}"#))
2481            .await;
2482        assert!(
2483            single.is_ok(),
2484            "single send must succeed under pressure (sink never gated), got {single:?}"
2485        );
2486
2487        let records: Vec<Record> = (0..5)
2488            .map(|i| Record {
2489                payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
2490                key: Some(Arc::from(format!("k{i}").as_str())),
2491                headers: vec![],
2492                metadata: RecordMeta {
2493                    timestamp_ms: None,
2494                    format: PayloadFormat::Json,
2495                },
2496            })
2497            .collect();
2498        let batch_res = transport.send_batch(&records).await;
2499        assert!(
2500            batch_res.is_ok(),
2501            "send_batch must succeed under pressure (sink never gated), got {batch_res:?}"
2502        );
2503
2504        // Pressure is STILL high after the sends -- nothing about the send path
2505        // cleared or consulted it.
2506        assert!(
2507            pressure.should_hold(),
2508            "send does not touch the pressure latch"
2509        );
2510
2511        // And the sent data is intact on the wire (the drain really ran).
2512        let got = transport.recv(10).await.unwrap().records;
2513        assert_eq!(got.len(), 6, "1 single + 5 batched records all drained");
2514    }
2515
2516    /// Build a governed engine over a guard with a LOW limit, sharing ONE guard
2517    /// between the governor (pressure + budget) and the engine's ingress-lease
2518    /// accounting. Returns `(engine, governor, guard)`.
2519    #[cfg(all(feature = "governor", feature = "memory"))]
2520    fn governed_engine_low_limit(
2521        limit_bytes: u64,
2522    ) -> (
2523        BatchEngine,
2524        crate::governor::SelfRegulationGovernor,
2525        Arc<crate::memory::MemoryGuard>,
2526    ) {
2527        use crate::memory::{MemoryGuard, MemoryGuardConfig};
2528        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2529            limit_bytes,
2530            pressure_threshold: 0.80,
2531            ..Default::default()
2532        }));
2533        // The governor's pressure + AIMD budget run off THIS guard.
2534        let gov = crate::governor::SelfRegulationConfig::default()
2535            .build(Arc::clone(&guard))
2536            .expect("enabled by default");
2537        // A SMALL recv chunk so the load arrives over many blocks: the AIMD loop
2538        // (and the memory HARD override) shrink the budget block-to-block as
2539        // pressure builds, rather than pulling the whole load in one cold-budget
2540        // block. This is the realistic streaming shape -- a real broker/source
2541        // delivers in poll-sized chunks, not one giant block.
2542        let mut engine = BatchEngine::new(BatchProcessingConfig {
2543            max_chunk_size: 16,
2544            ..Default::default()
2545        });
2546        engine.set_byte_budget(gov.budget());
2547        // The engine's ingress leases must account against the SAME guard so the
2548        // streaming peak-lease feeds back into the pressure the budget reads.
2549        engine.set_memory_guard_for_test(Arc::clone(&guard));
2550        (engine, gov, guard)
2551    }
2552
2553    /// THE operational never-OOM test (in-process logical form).
2554    ///
2555    /// Drives sustained, large load through `run_governed` over a real
2556    /// `MemoryTransport`, governor ON, with a `MemoryGuard` on a LOW limit. It
2557    /// proves the four never-OOM invariants without a cgroup harness:
2558    ///
2559    ///   1. the inbound GATE engages -- with the governor's pressure pinned by
2560    ///      sustained ingress, an `InboundGate` over the SAME pressure returns
2561    ///      `Admit::Hold` (the brake the transport would apply);
2562    ///   2. the sink/drain KEEPS RUNNING -- every record reaches the sink and
2563    ///      the source acks commit (the drain is never gated);
2564    ///   3. `MemoryGuard::current_bytes()` stays BOUNDED -- the streaming
2565    ///      peak-lease holds at most ~one shrunk sub-block in flight, well under
2566    ///      the whole-batch footprint, sampled at its high-water inside the sink;
2567    ///   4. the pipeline does NOT panic and the budget never collapses below its
2568    ///      floor (>= 1, never 0).
2569    ///
2570    /// A full OS-level cgroup OOM-kill test (a memory-limited container + a real
2571    /// broker or transport under load) is FLAGGED for a CI harness (Phase 5.5);
2572    /// see the report.
2573    #[cfg(all(feature = "governor", feature = "memory"))]
2574    #[tokio::test]
2575    async fn operational_never_oom_governed_pipeline_bounds_memory() {
2576        use crate::governor::{Admit, InboundGate, NoopActuator};
2577
2578        // LOW limit, sized so a SINGLE in-flight poll-chunk (16 x 1 KiB =
2579        // 16 KiB) sits above the 80% pressure threshold (16/18 ~= 0.89), so the
2580        // gate engages while a sub-block is leased -- yet the streaming
2581        // peak-lease keeps the in-flight footprint at one chunk, never the whole
2582        // load. This is the never-OOM shape: high pressure brakes inbound, but
2583        // memory stays bounded because only one sub-block is ever resident.
2584        const LIMIT: u64 = 18 * 1024; // 18 KiB
2585        // Records far larger than the floor; many of them -> sustained load.
2586        const RECORD_BYTES: usize = 1024; // 1 KiB each
2587        const N: usize = 256; // 256 KiB of payload total -- 14x the limit
2588        let payload = vec![b'q'; RECORD_BYTES];
2589        let total_payload: u64 = (RECORD_BYTES * N) as u64;
2590
2591        let transport = mem_transport(50);
2592        for _ in 0..N {
2593            transport.inject(None, payload.clone()).await.unwrap();
2594        }
2595
2596        let (engine, gov, guard) = governed_engine_low_limit(LIMIT);
2597        assert!(engine.is_self_regulated(), "budget wired -> governed path");
2598
2599        // The gate the transport WOULD wire in, over the governor's shared
2600        // pressure. We evaluate it from inside the sink to observe the brake.
2601        let gate = Arc::new(InboundGate::new(gov.pressure(), Box::new(NoopActuator)));
2602
2603        let shutdown = CancellationToken::new();
2604        cancel_after(shutdown.clone(), 600);
2605
2606        let sink_records = Arc::new(AtomicUsize::new(0));
2607        let high_water = Arc::new(AtomicU64::new(0));
2608        let gate_held_ever = Arc::new(std::sync::atomic::AtomicBool::new(false));
2609
2610        let sr = Arc::clone(&sink_records);
2611        let hw = Arc::clone(&high_water);
2612        let geh = Arc::clone(&gate_held_ever);
2613        let guard_for_sink = Arc::clone(&guard);
2614        let gate_for_sink = Arc::clone(&gate);
2615
2616        engine
2617            .run_governed(
2618                &transport,
2619                shutdown,
2620                |batch| Ok(batch),
2621                move |out: &WorkBatch<_>| {
2622                    let sr = Arc::clone(&sr);
2623                    let hw = Arc::clone(&hw);
2624                    let geh = Arc::clone(&geh);
2625                    let guard = Arc::clone(&guard_for_sink);
2626                    let gate = Arc::clone(&gate_for_sink);
2627                    let n = out.records.len();
2628                    async move {
2629                        // (3) sample current_bytes() while the sub-block lease is
2630                        // held -- this is the in-flight high-water.
2631                        hw.fetch_max(guard.current_bytes(), Ordering::Relaxed);
2632                        // (1) evaluate the gate over the SAME pressure: under
2633                        // sustained ingress it engages (Hold).
2634                        if gate.evaluate() == Admit::Hold {
2635                            geh.store(true, Ordering::Relaxed);
2636                        }
2637                        // (2) the drain keeps running -- count every record sunk.
2638                        sr.fetch_add(n, Ordering::Relaxed);
2639                        Ok(())
2640                    }
2641                },
2642                CommitMode::Auto,
2643                None::<(
2644                    Duration,
2645                    fn() -> std::future::Ready<Result<(), EngineError>>,
2646                )>,
2647            )
2648            .await
2649            .unwrap();
2650
2651        // (2) The drain KEPT RUNNING: every record reached the sink and the
2652        // source acks committed -- the sink is never gated.
2653        assert_eq!(
2654            sink_records.load(Ordering::Relaxed),
2655            N,
2656            "all {N} records drained through the governed sink"
2657        );
2658        assert_eq!(
2659            transport.committed_sequence(),
2660            (N - 1) as u64,
2661            "all source acks committed (drain never stalled)"
2662        );
2663
2664        // (1) The inbound gate ENGAGED at least once under the sustained load --
2665        // the brake the transport would apply did fire.
2666        assert!(
2667            gate_held_ever.load(Ordering::Relaxed),
2668            "inbound gate must engage (Admit::Hold) under sustained pressure"
2669        );
2670
2671        // (3) Peak in-flight bytes stayed BOUNDED, NOT the whole payload. The
2672        // streaming peak-lease bounds it to ~one shrunk sub-block; allow generous
2673        // headroom but it must be a small fraction of the whole-batch footprint.
2674        let peak = high_water.load(Ordering::Relaxed);
2675        assert!(
2676            peak > 0,
2677            "some bytes must be accounted while a sub-block is in flight"
2678        );
2679        assert!(
2680            peak < total_payload / 2,
2681            "peak in-flight {peak} must stay well under half the whole payload \
2682             {total_payload} (streaming peak-lease bounds it, never OOM)"
2683        );
2684
2685        // (4) Budget respected its floor (>= 1, never 0) and the run did not
2686        // panic (reaching here proves it). All leases released after the run.
2687        assert!(
2688            gov.budget().byte_budget() >= 1,
2689            "byte budget never collapses below its floor"
2690        );
2691        assert_eq!(
2692            guard.current_bytes(),
2693            0,
2694            "all ingress leases released after the run -- no leak"
2695        );
2696    }
2697
2698    // ---- Remediation Phase 2: byte-aware recv bounds RECEIVE memory -------
2699    //
2700    // The gap (Codex finding): the governed driver bounds memory by the
2701    // post-recv SUB-BLOCK lease, but `recv(max)` is RECORD-bounded only -- a
2702    // single poll can build a WorkBatch whose total bytes >> byte_budget BEFORE
2703    // any sub-block split, so the byte budget did NOT bound RECEIVE memory. The
2704    // fix routes the governed recv through `recv_limited(RecvLimits)` so the poll
2705    // is bounded by BOTH the record cap AND the byte budget.
2706
2707    /// A REAL test transport (not a mock of internal code -- a concrete
2708    /// `TransportReceiver` over owned `Record`/`WorkBatch`/`MemoryToken`) that
2709    /// makes the gap observable:
2710    ///
2711    /// - `recv(max)` is RECORD-bounded: it hands out up to `max` records in ONE
2712    ///   block regardless of their bytes -- exactly the pre-fix behaviour that
2713    ///   let a single poll retain bytes >> budget.
2714    /// - `recv_limited(limits)` is BYTE-bounded: it accumulates records until the
2715    ///   payload bytes reach `limits.max_bytes`, FLOOR one record.
2716    ///
2717    /// Every handed-out block's total payload bytes are folded into a shared
2718    /// high-water so the test can assert the bytes RETAINED at recv time.
2719    struct ByteAwareSource {
2720        /// Remaining records to hand out (front = next).
2721        remaining: std::sync::Mutex<std::collections::VecDeque<Record>>,
2722        /// High-water of the bytes handed out in any single recv/recv_limited.
2723        recv_high_water: Arc<AtomicU64>,
2724        committed: Arc<AtomicU64>,
2725    }
2726
2727    impl ByteAwareSource {
2728        fn new(records: Vec<Record>, recv_high_water: Arc<AtomicU64>) -> Self {
2729            Self {
2730                remaining: std::sync::Mutex::new(records.into_iter().collect()),
2731                recv_high_water,
2732                committed: Arc::new(AtomicU64::new(0)),
2733            }
2734        }
2735
2736        /// Pull a block (front records) bounded by an optional byte cap and a
2737        /// record cap, folding its total bytes into the high-water. Returns
2738        /// `None` when the source is exhausted (the caller then PENDS forever so
2739        /// the run loop parks until shutdown -- never a busy spin).
2740        fn pull(&self, max_records: usize, max_bytes: Option<u64>) -> Option<WorkBatch<MemTok2>> {
2741            let mut q = self.remaining.lock().unwrap();
2742            if q.is_empty() {
2743                return None;
2744            }
2745            let mut records = Vec::new();
2746            let mut bytes: u64 = 0;
2747            while records.len() < max_records {
2748                let Some(front) = q.front() else { break };
2749                let rb = front.payload.len() as u64;
2750                // Byte cap with floor-1: stop only once we already hold >= 1.
2751                if let Some(cap) = max_bytes
2752                    && !records.is_empty()
2753                    && bytes.saturating_add(rb) > cap
2754                {
2755                    break;
2756                }
2757                bytes = bytes.saturating_add(rb);
2758                records.push(q.pop_front().expect("front exists"));
2759            }
2760            self.recv_high_water.fetch_max(bytes, Ordering::Relaxed);
2761            let n = records.len() as u64;
2762            let base = self.committed.load(Ordering::Relaxed);
2763            let tokens: Vec<MemTok2> = (0..n).map(|i| MemTok2 { seq: base + i }).collect();
2764            Some(WorkBatch::new(records, tokens))
2765        }
2766    }
2767
2768    #[derive(Debug, Clone, Copy)]
2769    struct MemTok2 {
2770        seq: u64,
2771    }
2772    impl std::fmt::Display for MemTok2 {
2773        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2774            write!(f, "memtok2:{}", self.seq)
2775        }
2776    }
2777    impl crate::transport::CommitToken for MemTok2 {}
2778
2779    impl crate::transport::TransportBase for ByteAwareSource {
2780        fn close(
2781            &self,
2782        ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
2783        {
2784            std::future::ready(Ok(()))
2785        }
2786        fn is_healthy(&self) -> bool {
2787            true
2788        }
2789        fn name(&self) -> &'static str {
2790            "byte-aware-source"
2791        }
2792    }
2793
2794    impl TransportReceiver for ByteAwareSource {
2795        type Token = MemTok2;
2796
2797        fn recv(
2798            &self,
2799            max: usize,
2800        ) -> impl std::future::Future<
2801            Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2802        > + Send {
2803            // RECORD-bounded only -- ignores bytes. This is the pre-fix shape: a
2804            // single poll can retain bytes >> any budget.
2805            let pulled = self.pull(max, None);
2806            async move {
2807                match pulled {
2808                    Some(batch) => Ok(batch),
2809                    // Exhausted: park forever so the loop only exits on shutdown
2810                    // (mirrors a quiet source -- never a busy spin).
2811                    None => std::future::pending().await,
2812                }
2813            }
2814        }
2815
2816        fn recv_limited(
2817            &self,
2818            limits: crate::transport::RecvLimits,
2819        ) -> impl std::future::Future<
2820            Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2821        > + Send {
2822            // BYTE-bounded (floor one record): the fix path.
2823            let pulled = self.pull(limits.max_records, Some(limits.max_bytes));
2824            async move {
2825                match pulled {
2826                    Some(batch) => Ok(batch),
2827                    None => std::future::pending().await,
2828                }
2829            }
2830        }
2831
2832        async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
2833            if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
2834                self.committed.fetch_max(max_seq, Ordering::Relaxed);
2835            }
2836            Ok(())
2837        }
2838    }
2839
2840    /// THE reproduce/fix test: drive the GOVERNED loop over a source that could
2841    /// deliver a block whose total bytes are FAR larger than the byte budget.
2842    ///
2843    /// PRE-FIX (governed recv == `recv(record_cap)`): the source's record-bounded
2844    /// `recv` hands out the whole big block in one poll, so the bytes RETAINED at
2845    /// recv time = the whole block >> budget. The high-water assertion below
2846    /// FAILS (this is the reproduction).
2847    ///
2848    /// POST-FIX (governed recv == `recv_limited(record_cap, byte_budget)`): the
2849    /// source's byte-bounded `recv_limited` caps each poll at the budget (+ one
2850    /// record), so the retained bytes stay ~<= budget + one record.
2851    #[cfg(feature = "governor")]
2852    #[tokio::test]
2853    async fn governed_recv_is_byte_bounded_not_record_bounded() {
2854        use crate::memory::{MemoryGuard, MemoryGuardConfig};
2855
2856        // 64 records of 4 KiB each = 256 KiB total available in the source.
2857        const RECORD_BYTES: usize = 4 * 1024;
2858        const N: usize = 64;
2859        // A SMALL byte budget: 16 KiB (4 records). The record cap is large (2000
2860        // default) so the count NEVER bounds the poll -- only the byte cap can.
2861        const BUDGET: u64 = 16 * 1024;
2862
2863        let total: u64 = (RECORD_BYTES * N) as u64; // 256 KiB
2864        let payload = vec![b'b'; RECORD_BYTES];
2865        let records: Vec<Record> = (0..N)
2866            .map(|_| Record {
2867                payload: Bytes::from(payload.clone()),
2868                key: None,
2869                headers: vec![],
2870                metadata: RecordMeta {
2871                    timestamp_ms: None,
2872                    format: PayloadFormat::Json,
2873                },
2874            })
2875            .collect();
2876
2877        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2878            limit_bytes: 1024 * 1024,
2879            ..Default::default()
2880        }));
2881        let cfg = crate::governor::ByteBudgetConfig {
2882            start_bytes: BUDGET,
2883            max_bytes: BUDGET, // pin it so the budget cannot grow past BUDGET
2884            floor_records: 1,
2885            nominal_record_bytes: RECORD_BYTES as u64,
2886            record_cap: 4096, // far above N -- count never bounds the poll
2887            ..Default::default()
2888        };
2889        let pressure = crate::governor::SelfRegulationConfig::default()
2890            .build(Arc::clone(&guard))
2891            .expect("enabled")
2892            .pressure();
2893        let budget = Arc::new(crate::governor::ByteBudgetController::new(
2894            cfg,
2895            Arc::clone(&pressure),
2896        ));
2897
2898        let recv_high_water = Arc::new(AtomicU64::new(0));
2899        let source = ByteAwareSource::new(records, Arc::clone(&recv_high_water));
2900
2901        let mut engine = BatchEngine::new(BatchProcessingConfig {
2902            // Big chunk so config never bounds the poll either -- the byte budget
2903            // is the ONLY thing that can.
2904            max_chunk_size: 4096,
2905            ..Default::default()
2906        });
2907        engine.set_byte_budget(budget);
2908
2909        let shutdown = CancellationToken::new();
2910        cancel_after(shutdown.clone(), 250);
2911
2912        engine
2913            .run_governed(
2914                &source,
2915                shutdown,
2916                |batch| Ok(batch),
2917                |_out: &WorkBatch<_>| async { Ok(()) },
2918                CommitMode::Auto,
2919                None::<(
2920                    Duration,
2921                    fn() -> std::future::Ready<Result<(), EngineError>>,
2922                )>,
2923            )
2924            .await
2925            .unwrap();
2926
2927        let peak = recv_high_water.load(Ordering::Relaxed);
2928        // The fix: a single governed recv retains at most the byte budget plus
2929        // one oversized-record floor -- NOT the whole 256 KiB block.
2930        assert!(
2931            peak <= BUDGET + RECORD_BYTES as u64,
2932            "governed recv retained {peak} bytes at recv time -- must be bounded \
2933             by the byte budget {BUDGET} (+ one record {RECORD_BYTES}), not the \
2934             whole {total}-byte block (record-bounded recv would retain all of it)"
2935        );
2936        assert!(
2937            peak > 0,
2938            "the source did hand out records (sanity: the loop ran)"
2939        );
2940    }
2941
2942    /// The sub-block drain is LAZY: it yields one sub-block at a time and does
2943    /// NOT allocate every sub-block up front. We assert incremental yield -- the
2944    /// first `next_sub_block()` returns one budget-sized sub-block while records
2945    /// for later sub-blocks remain un-pulled in the drain.
2946    #[test]
2947    fn sub_block_drain_yields_incrementally() {
2948        // 6 records of 10 bytes; target 25 -> sub-blocks {2, 2, 2}.
2949        let records: Vec<Record> = (0..6)
2950            .map(|_| Record {
2951                payload: Bytes::from_static(b"0123456789"),
2952                key: None,
2953                headers: vec![],
2954                metadata: RecordMeta {
2955                    timestamp_ms: None,
2956                    format: PayloadFormat::Json,
2957                },
2958            })
2959            .collect();
2960        let mut drain = SubBlockDrain::new(records, 25);
2961
2962        // First pull yields ONE sub-block (2 records); the remaining 4 are still
2963        // inside the drain, NOT pre-materialised into sub-block vectors.
2964        let first = drain.next_sub_block().expect("first sub-block");
2965        assert_eq!(first.len(), 2, "first sub-block is one budget's worth");
2966        // The drain still has records to give (proves it did not eagerly split).
2967        let second = drain.next_sub_block().expect("second sub-block");
2968        assert_eq!(second.len(), 2);
2969        let third = drain.next_sub_block().expect("third sub-block");
2970        assert_eq!(third.len(), 2);
2971        // Now exhausted.
2972        assert!(drain.next_sub_block().is_none(), "drain exhausted");
2973    }
2974
2975    // ---- Remediation Phase 3: DLQ + parse-error-action semantics ----------
2976    //
2977    // Two findings the parsed/process paths had:
2978    //   1. parse_block hardcoded route-to-DLQ, ignoring ParseErrorAction.
2979    //   2. out_batch.dlq_entries from process were never routed before commit
2980    //      (silent-drop path) -- only inbound-filter entries were routed.
2981    // These tests pin the fixed contract: one route point, one policy, fallible
2982    // route, parse_error_action honoured on the parsed path.
2983
2984    use crate::worker::engine::FilterDlqPolicy;
2985    use crate::worker::engine::config::ParseErrorAction;
2986
2987    /// An engine with a specific `ParseErrorAction` (default config otherwise).
2988    fn engine_with_parse_action(action: ParseErrorAction) -> BatchEngine {
2989        BatchEngine::new(BatchProcessingConfig {
2990            parse_error_action: action,
2991            ..Default::default()
2992        })
2993    }
2994
2995    /// Finding 1 -- `ParseErrorAction::Skip`: a parse failure on the parsed path
2996    /// is DROPPED silently (NO DLQ entry routed) yet the survivors are kept and
2997    /// ALL source acks commit (the block's tokens are decoupled from records).
2998    #[tokio::test]
2999    async fn parsed_parse_error_skip_drops_without_dlq_and_commits_survivors() {
3000        let transport = mem_transport(50);
3001        transport
3002            .inject(None, br#"{"id":1}"#.to_vec())
3003            .await
3004            .unwrap(); // seq 0 ok
3005        transport
3006            .inject(None, b"not json {{{".to_vec())
3007            .await
3008            .unwrap(); // seq 1 bad
3009        transport
3010            .inject(None, br#"{"id":3}"#.to_vec())
3011            .await
3012            .unwrap(); // seq 2 ok
3013
3014        // Route policy so we can PROVE no entry is routed under Skip.
3015        let routed = Arc::new(AtomicUsize::new(0));
3016        let rc = Arc::clone(&routed);
3017        let engine = engine_with_parse_action(ParseErrorAction::Skip).with_filter_dlq_policy(
3018            FilterDlqPolicy::Route(Arc::new(
3019                move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3020                    rc.fetch_add(entries.len(), Ordering::Relaxed);
3021                    Ok(())
3022                },
3023            )),
3024        );
3025        let shutdown = CancellationToken::new();
3026        cancel_after(shutdown.clone(), 200);
3027
3028        let dlq_seen = Arc::new(AtomicUsize::new(0));
3029        let kept = Arc::new(AtomicUsize::new(0));
3030        let ds = Arc::clone(&dlq_seen);
3031        let kp = Arc::clone(&kept);
3032
3033        engine
3034            .run_workbatch_parsed(
3035                &transport,
3036                shutdown,
3037                move |pb: ParsedBatch<'_, _>| {
3038                    ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
3039                    kp.fetch_add(pb.records.len(), Ordering::Relaxed);
3040                    Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3041                        .with_dlq_entries(pb.dlq_entries))
3042                },
3043                |_out: &WorkBatch<_>| async { Ok(()) },
3044                CommitMode::Auto,
3045                None::<(
3046                    Duration,
3047                    fn() -> std::future::Ready<Result<(), EngineError>>,
3048                )>,
3049            )
3050            .await
3051            .unwrap();
3052
3053        assert_eq!(kept.load(Ordering::Relaxed), 2, "2 survivors kept");
3054        assert_eq!(
3055            dlq_seen.load(Ordering::Relaxed),
3056            0,
3057            "Skip: parse failure produces NO DLQ entry (dropped, not dead-lettered)"
3058        );
3059        assert_eq!(
3060            routed.load(Ordering::Relaxed),
3061            0,
3062            "Skip: nothing reaches the DLQ route point"
3063        );
3064        // All three source acks committed -- survivors and the dropped record's
3065        // ack alike (at-least-once on the whole block; Skip is opt-in loss).
3066        assert_eq!(transport.committed_sequence(), 2);
3067    }
3068
3069    /// Finding 1 -- `ParseErrorAction::FailBatch`: a parse failure fails the
3070    /// WHOLE block terminally (no commit), consistent with the ack barrier. The
3071    /// run returns the terminal error and the source watermark does not advance.
3072    #[tokio::test]
3073    async fn parsed_parse_error_fail_batch_skips_commit() {
3074        // OrderedReceiver hands one record per recv with monotonic tokens and a
3075        // cumulative watermark, so we can prove the commit never fired.
3076        let receiver = OrderedReceiverBad::new();
3077        let committed = Arc::clone(&receiver.committed_hwm);
3078
3079        let engine = engine_with_parse_action(ParseErrorAction::FailBatch);
3080        let shutdown = CancellationToken::new();
3081        cancel_after(shutdown.clone(), 500);
3082
3083        let sink_calls = Arc::new(AtomicUsize::new(0));
3084        let sc = Arc::clone(&sink_calls);
3085
3086        let result = engine
3087            .run_workbatch_parsed(
3088                &receiver,
3089                shutdown,
3090                |pb: ParsedBatch<'_, _>| {
3091                    Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3092                        .with_dlq_entries(pb.dlq_entries))
3093                },
3094                move |_out: &WorkBatch<_>| {
3095                    let sc = Arc::clone(&sc);
3096                    async move {
3097                        sc.fetch_add(1, Ordering::Relaxed);
3098                        Ok(())
3099                    }
3100                },
3101                CommitMode::Auto,
3102                None::<(
3103                    Duration,
3104                    fn() -> std::future::Ready<Result<(), EngineError>>,
3105                )>,
3106            )
3107            .await;
3108
3109        assert!(
3110            matches!(result, Err(EngineError::ParseBatchFailed(_))),
3111            "FailBatch: a parse failure is a terminal engine error, got {result:?}"
3112        );
3113        assert_eq!(
3114            committed.load(Ordering::Relaxed),
3115            u64::MAX,
3116            "FailBatch: the whole block fails its commit -- watermark unmoved"
3117        );
3118        assert_eq!(
3119            sink_calls.load(Ordering::Relaxed),
3120            0,
3121            "FailBatch: the block never reaches the sink (parse fails first)"
3122        );
3123    }
3124
3125    /// Finding 1 -- `ParseErrorAction::Dlq`: a parse failure routes to the DLQ
3126    /// route point BEFORE commit, survivors are sunk, all source acks commit.
3127    #[tokio::test]
3128    async fn parsed_parse_error_dlq_routes_before_commit() {
3129        let transport = Arc::new(mem_transport(50));
3130        transport
3131            .inject(None, br#"{"id":1}"#.to_vec())
3132            .await
3133            .unwrap(); // seq 0 ok
3134        transport
3135            .inject(None, b"not json {{{".to_vec())
3136            .await
3137            .unwrap(); // seq 1 bad
3138        transport
3139            .inject(None, br#"{"id":3}"#.to_vec())
3140            .await
3141            .unwrap(); // seq 2 ok
3142
3143        // Sample committed_sequence at DLQ-route time to prove route precedes
3144        // commit: when the route sink fires, the commit must NOT yet have run.
3145        let routed = Arc::new(AtomicUsize::new(0));
3146        let committed_at_route = Arc::new(AtomicU64::new(u64::MAX));
3147        let rc = Arc::clone(&routed);
3148        let car = Arc::clone(&committed_at_route);
3149        let transport_for_route = Arc::clone(&transport);
3150        let engine = engine_with_parse_action(ParseErrorAction::Dlq).with_filter_dlq_policy(
3151            FilterDlqPolicy::Route(Arc::new(
3152                move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3153                    car.store(transport_for_route.committed_sequence(), Ordering::Relaxed);
3154                    rc.fetch_add(entries.len(), Ordering::Relaxed);
3155                    Ok(())
3156                },
3157            )),
3158        );
3159        let shutdown = CancellationToken::new();
3160        cancel_after(shutdown.clone(), 200);
3161
3162        engine
3163            .run_workbatch_parsed(
3164                &*transport,
3165                shutdown,
3166                |pb: ParsedBatch<'_, _>| {
3167                    Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3168                        .with_dlq_entries(pb.dlq_entries))
3169                },
3170                |_out: &WorkBatch<_>| async { Ok(()) },
3171                CommitMode::Auto,
3172                None::<(
3173                    Duration,
3174                    fn() -> std::future::Ready<Result<(), EngineError>>,
3175                )>,
3176            )
3177            .await
3178            .unwrap();
3179
3180        assert_eq!(
3181            routed.load(Ordering::Relaxed),
3182            1,
3183            "Dlq: the parse failure reached the DLQ route point"
3184        );
3185        // The route fired BEFORE the commit: MemoryTransport's committed_sequence
3186        // starts at 0; the block's highest seq is 2, so a commit would set it to
3187        // 2. At route time it must still be its pre-commit value (0).
3188        assert_eq!(
3189            committed_at_route.load(Ordering::Relaxed),
3190            0,
3191            "DLQ route ran BEFORE the source commit advanced the watermark"
3192        );
3193        assert_eq!(
3194            transport.committed_sequence(),
3195            2,
3196            "all 3 acks committed after"
3197        );
3198    }
3199
3200    /// Finding 2 -- the STANDARD (on-demand) `run_workbatch` path must NOT
3201    /// silently drop DLQ entries that `process` emits on the out-batch. A
3202    /// process closure that attaches a dlq_entry has it ROUTED (reaches the DLQ
3203    /// route point) before the sink-success leads to a source commit -- it does
3204    /// not depend on the sink closure remembering to carry it.
3205    #[tokio::test]
3206    async fn standard_send_batch_sink_does_not_silently_drop_dlq_entries() {
3207        let transport = mem_transport(50);
3208        transport
3209            .inject(None, br#"{"id":1}"#.to_vec())
3210            .await
3211            .unwrap();
3212        transport
3213            .inject(None, br#"{"id":2}"#.to_vec())
3214            .await
3215            .unwrap();
3216
3217        let routed = Arc::new(AtomicUsize::new(0));
3218        let rc = Arc::clone(&routed);
3219        let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
3220            move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3221                rc.fetch_add(entries.len(), Ordering::Relaxed);
3222                Ok(())
3223            },
3224        )));
3225        let shutdown = CancellationToken::new();
3226        cancel_after(shutdown.clone(), 200);
3227
3228        // The SINK ignores dlq_entries entirely (the realistic app shape). The
3229        // PROCESS closure emits a dlq_entry on the out-batch. Pre-fix this entry
3230        // would vanish; post-fix the driver routes it before commit.
3231        engine
3232            .run_workbatch(
3233                &transport,
3234                shutdown,
3235                |batch| {
3236                    let dlq = vec![crate::transport::filter::FilteredDlqEntry {
3237                        payload: b"process-emitted dead-letter".to_vec(),
3238                        key: None,
3239                        reason: "process decided this record is bad".to_string(),
3240                    }];
3241                    let tokens = batch.commit_tokens;
3242                    let records = batch.records;
3243                    Ok(WorkBatch::new(records, tokens).with_dlq_entries(dlq))
3244                },
3245                |_out: &WorkBatch<_>| async { Ok(()) },
3246                CommitMode::Auto,
3247                None::<(
3248                    Duration,
3249                    fn() -> std::future::Ready<Result<(), EngineError>>,
3250                )>,
3251            )
3252            .await
3253            .unwrap();
3254
3255        assert!(
3256            routed.load(Ordering::Relaxed) >= 1,
3257            "process-emitted DLQ entry must reach the DLQ route point, not be \
3258             silently dropped on the path to commit"
3259        );
3260        // Source acks still commit -- the dead-letter routing is independent of
3261        // the source ack (at-least-once on the whole block).
3262        assert_eq!(transport.committed_sequence(), 1);
3263    }
3264
3265    /// Finding 3 -- a DLQ-route FAILURE under `Route` is a terminal ack-barrier
3266    /// error: the source commit is skipped (no later ordered commit advances
3267    /// past the undelivered dead-letters). Silent discard is opt-in only.
3268    #[tokio::test]
3269    async fn dlq_route_failure_is_terminal_and_blocks_commit() {
3270        let receiver = OrderedReceiverBad::without_parse_fail();
3271        let committed = Arc::clone(&receiver.committed_hwm);
3272
3273        // A Route sink that FAILS, simulating a DLQ transport outage.
3274        let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
3275            |_e: Vec<crate::transport::filter::FilteredDlqEntry>| {
3276                Err(EngineError::Sink("dlq transport down".into()))
3277            },
3278        )));
3279        let shutdown = CancellationToken::new();
3280        cancel_after(shutdown.clone(), 500);
3281
3282        let result = engine
3283            .run_workbatch(
3284                &receiver,
3285                shutdown,
3286                |batch| {
3287                    // process emits a dlq entry; routing it will fail.
3288                    let dlq = vec![crate::transport::filter::FilteredDlqEntry {
3289                        payload: b"bad".to_vec(),
3290                        key: None,
3291                        reason: "process dlq".to_string(),
3292                    }];
3293                    Ok(WorkBatch::new(batch.records, batch.commit_tokens).with_dlq_entries(dlq))
3294                },
3295                |_out: &WorkBatch<_>| async { Ok(()) },
3296                CommitMode::Auto,
3297                None::<(
3298                    Duration,
3299                    fn() -> std::future::Ready<Result<(), EngineError>>,
3300                )>,
3301            )
3302            .await;
3303
3304        assert!(
3305            result.is_err(),
3306            "DLQ route failure must be a terminal ack-barrier error, got {result:?}"
3307        );
3308        assert_eq!(
3309            committed.load(Ordering::Relaxed),
3310            u64::MAX,
3311            "DLQ route failure must skip the commit -- watermark unmoved"
3312        );
3313    }
3314
3315    /// An ordered receiver that delivers ONE bad (unparseable) record then parks.
3316    /// Cumulative watermark via fetch_max, so a commit is observable. Used to
3317    /// prove FailBatch / DLQ-route-failure leave the watermark unmoved.
3318    struct OrderedReceiverBad {
3319        next: Arc<AtomicU64>,
3320        committed_hwm: Arc<AtomicU64>,
3321        good_payload: bool,
3322    }
3323
3324    impl OrderedReceiverBad {
3325        fn new() -> Self {
3326            Self {
3327                next: Arc::new(AtomicU64::new(0)),
3328                committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
3329                good_payload: false,
3330            }
3331        }
3332        /// Delivers a PARSEABLE record (for the DLQ-route-failure test, where the
3333        /// dead-letter comes from the process closure, not a parse failure).
3334        fn without_parse_fail() -> Self {
3335            Self {
3336                next: Arc::new(AtomicU64::new(0)),
3337                committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
3338                good_payload: true,
3339            }
3340        }
3341    }
3342
3343    impl crate::transport::TransportBase for OrderedReceiverBad {
3344        fn close(
3345            &self,
3346        ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
3347        {
3348            std::future::ready(Ok(()))
3349        }
3350        fn is_healthy(&self) -> bool {
3351            true
3352        }
3353        fn name(&self) -> &'static str {
3354            "ordered-bad-test"
3355        }
3356    }
3357
3358    impl TransportReceiver for OrderedReceiverBad {
3359        type Token = crate::transport::memory::MemoryToken;
3360
3361        fn recv(
3362            &self,
3363            _max: usize,
3364        ) -> impl std::future::Future<
3365            Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
3366        > + Send {
3367            let next = Arc::clone(&self.next);
3368            let good = self.good_payload;
3369            async move {
3370                let seq = next.fetch_add(1, Ordering::Relaxed);
3371                if seq >= 1 {
3372                    next.fetch_sub(1, Ordering::Relaxed);
3373                    std::future::pending::<()>().await;
3374                }
3375                let payload = if good {
3376                    Bytes::from_static(br#"{"ok":1}"#)
3377                } else {
3378                    Bytes::from_static(b"not json {{{")
3379                };
3380                let record = Record {
3381                    payload,
3382                    key: None,
3383                    headers: vec![],
3384                    metadata: RecordMeta {
3385                        timestamp_ms: None,
3386                        format: PayloadFormat::Json,
3387                    },
3388                };
3389                Ok(WorkBatch::new(
3390                    vec![record],
3391                    vec![crate::transport::memory::MemoryToken { seq }],
3392                ))
3393            }
3394        }
3395
3396        async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
3397            if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
3398                self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
3399            }
3400            Ok(())
3401        }
3402    }
3403}