Skip to main content

hyperi_rustlib/worker/engine/
driver.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/driver.rs
3// Purpose:   Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//!   recv(max) -> WorkBatch        (recv now yields a WorkBatch natively)
18//!     -> apply_workbatch_dlq_policy  (route/discard/reject inline-DLQ entries)
19//!     -> lease_ingress_batch      (memory accounting on the block's bytes)
20//!     -> process(WorkBatch)       (transforms parse ON DEMAND via codec::parse)
21//!     -> sink(&out_batch).await   (async send of the whole block)
22//!     -> commit per CommitMode    (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//!   does NOT pre-parse. A transform that needs a field calls
38//!   [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//!   never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//!   hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//!   (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//!   closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//!   shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//!   dedup. This keeps the batch-parse + interner throughput win for apps that
47//!   opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67    /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68    /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69    /// skips the commit so the block is re-delivered. This is the engine-commits
70    /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71    Auto,
72    /// The sink owns the commit -- the engine does NOT commit. The sink is
73    /// handed the block (which carries `commit_tokens`) and decides when to
74    /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75    /// the former async run loop, lifted onto the block.
76    SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95///   [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96///   (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97///   the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99///   errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101///   terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109    /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110    pub records: Vec<Record>,
111
112    /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113    pub parsed: Vec<ParsedPayload>,
114
115    /// Input source acks for the whole block (decoupled from record count).
116    pub commit_tokens: Vec<T>,
117
118    /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119    /// that failed to parse (no-silent-drop).
120    pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122    /// Shared interner for hot routing-field-name dedup. The first time a field
123    /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124    /// bump. Reused from the engine so dedup persists across blocks.
125    pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129    /// Number of successfully-parsed records.
130    #[must_use]
131    pub fn len(&self) -> usize {
132        self.records.len()
133    }
134
135    /// Whether there are no successfully-parsed records.
136    #[must_use]
137    pub fn is_empty(&self) -> bool {
138        self.records.is_empty()
139    }
140
141    /// Intern a routing-field name through the shared interner.
142    ///
143    /// Use this to dedup the routing-key field name once per block rather than
144    /// re-allocating it per record.
145    #[must_use]
146    pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147        self.interner.intern(name)
148    }
149}
150
151/// Optional periodic ticker shared by the run-loops (flush timers, periodic
152/// maintenance). Folds away the identical interval-setup + select-arm the four
153/// loops would otherwise each carry. The ticker is COLD (fires on the order of
154/// the flush interval, not per block), so this extraction does not touch the
155/// hot recv path.
156#[cfg(feature = "transport")]
157struct LoopTicker<F> {
158    interval: Option<tokio::time::Interval>,
159    callback: Option<F>,
160}
161
162#[cfg(feature = "transport")]
163impl<F, Fut> LoopTicker<F>
164where
165    F: FnMut() -> Fut,
166    Fut: std::future::Future<Output = Result<(), EngineError>>,
167{
168    fn new(ticker: Option<(Duration, F)>) -> Self {
169        // Start the first tick one period out (not immediately) so the loop
170        // does not fire a tick before it has polled the source once.
171        let interval = ticker
172            .as_ref()
173            .map(|(d, _)| tokio::time::interval_at(tokio::time::Instant::now() + *d, *d));
174        Self {
175            interval,
176            callback: ticker.map(|(_, f)| f),
177        }
178    }
179
180    /// Yield when the next tick is due, or never if no ticker is configured.
181    /// Cancel-safe: `Interval::tick` is cancel-safe and the no-ticker arm pends,
182    /// so this sits directly in `tokio::select!`.
183    async fn wait(&mut self) {
184        match self.interval.as_mut() {
185            Some(i) => {
186                i.tick().await;
187            }
188            None => std::future::pending::<()>().await,
189        }
190    }
191
192    /// Run the ticker callback; a callback error is logged, not fatal.
193    async fn fire(&mut self, label: &str) {
194        if let Some(f) = self.callback.as_mut()
195            && let Err(e) = f().await
196        {
197            tracing::error!(error = %e, ticker = label, "Ticker failed");
198        }
199    }
200}
201
202impl BatchEngine {
203    /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
204    ///
205    /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
206    /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
207    /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
208    /// pay zero parse cost.
209    ///
210    /// - `process` runs on the loop task (cancellation-aware between awaits) and
211    ///   may fan records out or in; it MUST preserve `commit_tokens` (use
212    ///   [`WorkBatch::map_records`], which does so automatically).
213    /// - `sink` is async and receives the WHOLE out-batch by reference.
214    /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
215    ///   or [`CommitMode::SinkManaged`] (sink owns commit).
216    /// - `ticker` is an optional `(interval, fn)` that fires on the interval
217    ///   inside the select loop (flush timers, periodic maintenance).
218    ///
219    /// Stops cleanly when `shutdown` is cancelled.
220    ///
221    /// # Errors
222    ///
223    /// Returns [`EngineError::Transport`] if `recv` fails fatally,
224    /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
225    /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
226    /// the error returned by `process`.
227    ///
228    /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
229    /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
230    /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
231    /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
232    /// block is fetched and committed past them, which would silently skip the
233    /// never-sent records (data loss). On restart the source re-delivers from
234    /// the last committed watermark, preserving at-least-once. The app owns
235    /// restart/retry policy.
236    #[cfg(feature = "transport")]
237    #[allow(clippy::too_many_arguments)]
238    pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
239        &self,
240        receiver: &R,
241        shutdown: CancellationToken,
242        process: P,
243        mut sink: Sink,
244        commit: CommitMode,
245        ticker: Option<(Duration, Ticker)>,
246    ) -> Result<(), EngineError>
247    where
248        R: TransportReceiver,
249        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
250        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
251        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
252        Ticker: FnMut() -> TickerFut,
253        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
254    {
255        tracing::info!(
256            chunk_size = self.config.max_chunk_size,
257            commit = ?commit,
258            ticker = ticker.is_some(),
259            "BatchEngine (workbatch) starting"
260        );
261
262        let mut ticker = LoopTicker::new(ticker);
263
264        loop {
265            tokio::select! {
266                biased;
267
268                () = shutdown.cancelled() => {
269                    tracing::info!("BatchEngine (workbatch) shutting down");
270                    return Ok(());
271                }
272
273                () = ticker.wait() => ticker.fire("workbatch").await,
274
275                recv_result = receiver.recv(self.config.max_chunk_size) => {
276                    let work_batch = recv_result.map_err(EngineError::Transport)?;
277                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
278                        continue;
279                    };
280                    self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
281                }
282            }
283        }
284    }
285
286    /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
287    ///
288    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
289    /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
290    /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
291    /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
292    /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
293    /// is leased and processed.
294    ///
295    /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
296    /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
297    /// at-least-once is preserved: a sink error on any sub-block stops the block
298    /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
299    /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
300    /// the end.
301    ///
302    /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
303    /// one record, so a record larger than the target is still its own sub-block
304    /// and the loop never stalls). Taken as an explicit parameter so the path is
305    /// testable in isolation; `run_governed` (`governor` feature) supplies it
306    /// from the governor's byte budget.
307    ///
308    /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
309    /// acks are still the batch's input tokens, committed once at the end.
310    ///
311    /// # Errors
312    ///
313    /// Same as [`run_workbatch`](Self::run_workbatch).
314    #[cfg(feature = "transport")]
315    #[allow(clippy::too_many_arguments)]
316    pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
317        &self,
318        receiver: &R,
319        shutdown: CancellationToken,
320        process: P,
321        mut sink: Sink,
322        commit: CommitMode,
323        sub_block_bytes: u64,
324        ticker: Option<(Duration, Ticker)>,
325    ) -> Result<(), EngineError>
326    where
327        R: TransportReceiver,
328        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
329        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
330        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
331        Ticker: FnMut() -> TickerFut,
332        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
333    {
334        // SinkManaged is unrepresentable on the streaming path: sub-block views
335        // carry EMPTY tokens, so the sink never sees the block's source acks and
336        // cannot own the commit. Fail fast at startup instead of freezing the
337        // partition at runtime.
338        if matches!(commit, CommitMode::SinkManaged) {
339            return Err(EngineError::SinkManagedUnsupported);
340        }
341
342        tracing::info!(
343            chunk_size = self.config.max_chunk_size,
344            commit = ?commit,
345            sub_block_bytes,
346            ticker = ticker.is_some(),
347            "BatchEngine (workbatch streaming) starting"
348        );
349
350        let mut ticker = LoopTicker::new(ticker);
351
352        loop {
353            tokio::select! {
354                biased;
355
356                () = shutdown.cancelled() => {
357                    tracing::info!("BatchEngine (workbatch streaming) shutting down");
358                    return Ok(());
359                }
360
361                () = ticker.wait() => ticker.fire("workbatch streaming").await,
362
363                recv_result = receiver.recv(self.config.max_chunk_size) => {
364                    let work_batch = recv_result.map_err(EngineError::Transport)?;
365                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
366                        continue;
367                    };
368                    self.drive_block_streaming(
369                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
370                    )
371                    .await?;
372                }
373            }
374        }
375    }
376
377    /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
378    ///
379    /// This is what a self-regulating app calls instead of choosing between
380    /// [`run_workbatch`](Self::run_workbatch) and
381    /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
382    /// dispatches on whether the byte-budget lever is wired
383    /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
384    /// `ServiceRuntime` when `self_regulation.enabled = true`):
385    ///
386    /// - **Governor ON** (budget wired): streams each received block in
387    ///   sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
388    ///   peak in-flight memory to one sub-block, and folds each block's
389    ///   `(bytes, process_time, ingest_interval)` into the AIMD loop via
390    ///   [`observe`](crate::governor::ByteBudgetController::observe). The recv
391    ///   `max` is capped to the budget's poll-safety
392    ///   [`record_cap`](crate::governor::ByteBudgetController::record_cap).
393    ///   While pressure is LOW the budget sits at its big start value, so the
394    ///   block becomes a SINGLE sub-block -- no per-record overhead, behaviour
395    ///   matches the whole-batch loop.
396    /// - **Governor OFF** (no budget): delegates verbatim to
397    ///   [`run_workbatch`](Self::run_workbatch) -- byte-identical to
398    ///   pre-governor behaviour.
399    ///
400    /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
401    /// SEPARATELY into the receive transport, not here -- this method is the
402    /// driver-side lever (sub-block sizing + AIMD), the gate is the
403    /// transport-side brake. The two share the same `UnifiedPressure`.
404    ///
405    /// # Errors
406    ///
407    /// Same as [`run_workbatch`](Self::run_workbatch).
408    #[cfg(all(feature = "transport", feature = "governor"))]
409    #[allow(clippy::too_many_arguments)]
410    pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
411        &self,
412        receiver: &R,
413        shutdown: CancellationToken,
414        process: P,
415        mut sink: Sink,
416        commit: CommitMode,
417        ticker: Option<(Duration, Ticker)>,
418    ) -> Result<(), EngineError>
419    where
420        R: TransportReceiver,
421        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
422        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
423        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
424        Ticker: FnMut() -> TickerFut,
425        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
426    {
427        // Governor OFF -> the original whole-batch loop, byte-for-byte. The
428        // whole-batch path DOES support SinkManaged (the sink receives the full
429        // block with its tokens), so the guard below must sit AFTER this
430        // delegate, not before it.
431        let Some(budget) = self.byte_budget.clone() else {
432            return self
433                .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
434                .await;
435        };
436
437        // Governor ON streams in sub-blocks whose views carry EMPTY tokens, so
438        // the sink can never own a SinkManaged commit -- reject it at startup
439        // rather than silently freeze the source offset. (Same guard as
440        // run_workbatch_streaming, which this path bypasses.)
441        if matches!(commit, CommitMode::SinkManaged) {
442            return Err(EngineError::SinkManagedUnsupported);
443        }
444
445        tracing::info!(
446            chunk_size = self.config.max_chunk_size,
447            commit = ?commit,
448            ticker = ticker.is_some(),
449            start_byte_budget = budget.byte_budget(),
450            "BatchEngine (governed) starting -- self-regulation ON"
451        );
452
453        let mut ticker = LoopTicker::new(ticker);
454
455        // Track the previous block's arrival instant so we can feed the AIMD
456        // loop a real ingest inter-arrival interval.
457        let mut last_recv: Option<std::time::Instant> = None;
458
459        loop {
460            // The recv limits bound a single poll by BOTH:
461            //   - the SMALLER of the config chunk size and the budget's
462            //     poll-safety record cap (a tiny-record flood cannot blow the
463            //     count even within the byte budget), AND
464            //   - the CURRENT byte budget (re-read per block), so a single poll
465            //     never RETAINS more than ~one budget's worth of inbound payload
466            //     BEFORE the sub-block split. This is the fix for the
467            //     "byte budget does not bound RECEIVE memory" gap: without the
468            //     byte cap, `recv(max)` could build a WorkBatch (and, for the
469            //     Kafka recv-arena, allocate one arena) far larger than the
470            //     budget before any sub-block lease ran.
471            let recv_limits = crate::transport::RecvLimits {
472                max_records: self.config.max_chunk_size.min(budget.record_cap()),
473                max_bytes: budget.byte_budget(),
474            };
475
476            tokio::select! {
477                biased;
478
479                () = shutdown.cancelled() => {
480                    tracing::info!("BatchEngine (governed) shutting down");
481                    return Ok(());
482                }
483
484                () = ticker.wait() => ticker.fire("governed").await,
485
486                recv_result = receiver.recv_limited(recv_limits) => {
487                    let now = std::time::Instant::now();
488                    let ingest_interval = last_recv
489                        .map(|prev| now.saturating_duration_since(prev))
490                        .unwrap_or_default();
491                    last_recv = Some(now);
492
493                    let work_batch = recv_result.map_err(EngineError::Transport)?;
494                    let block_bytes = work_batch.total_payload_bytes() as u64;
495                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
496                        // Empty block: still fold the timing so a quiet pipeline
497                        // can grow its budget back. No bytes -> treated as slack.
498                        budget.observe(0, Duration::ZERO, ingest_interval);
499                        continue;
500                    };
501
502                    // Re-read the budget for THIS block: low pressure -> big
503                    // budget -> one sub-block (no overhead); high pressure ->
504                    // shrunk budget -> peak in-flight bounded to one sub-block.
505                    let sub_block_bytes = budget.byte_budget();
506
507                    let process_start = std::time::Instant::now();
508                    self.drive_block_streaming(
509                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
510                    )
511                    .await?;
512                    let process_time = process_start.elapsed();
513
514                    // Fold the OBSERVED actual block bytes into the AIMD loop. A
515                    // memory HARD override inside observe() shrinks immediately
516                    // regardless of rho.
517                    budget.observe(block_bytes, process_time, ingest_interval);
518
519                    // Observability: surface the current budget + pressure as
520                    // gauges so throttling is visible, not mysterious, AND the
521                    // ACTUAL received block bytes so the gap between the budget
522                    // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
523                    // is measurable -- a persistent overshoot means the recv byte
524                    // cap is not holding. The gate edges (pause/resume) are
525                    // logged by the ObservingActuator.
526                    #[cfg(feature = "metrics")]
527                    {
528                        metrics::gauge!("self_regulation_byte_budget")
529                            .set(budget.byte_budget() as f64);
530                        metrics::gauge!("self_regulation_recv_block_bytes")
531                            .set(block_bytes as f64);
532                        // `self_regulation_` domain prefix: a bare `pressure_ratio`
533                        // collides with MemoryGuard's and ScalingPressure's own
534                        // pressure gauges on the same registry.
535                        metrics::gauge!("self_regulation_pressure_ratio")
536                            .set(budget.pressure().level());
537                    }
538                }
539            }
540        }
541    }
542
543    /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
544    ///
545    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
546    /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
547    /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
548    /// (records + aligned parsed payloads + shared
549    /// [`FieldInterner`](super::FieldInterner)). This keeps
550    /// the batch-parse + interner throughput win for apps that opt in.
551    ///
552    /// Records that fail to parse are handled per the configured
553    /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
554    /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
555    /// the parse-failure contract. `process_parsed` returns the final
556    /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
557    ///
558    /// # Errors
559    ///
560    /// Same as [`run_workbatch`](Self::run_workbatch).
561    #[cfg(feature = "transport")]
562    #[allow(clippy::too_many_arguments)]
563    pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
564        &self,
565        receiver: &R,
566        shutdown: CancellationToken,
567        process_parsed: P,
568        mut sink: Sink,
569        commit: CommitMode,
570        ticker: Option<(Duration, Ticker)>,
571    ) -> Result<(), EngineError>
572    where
573        R: TransportReceiver,
574        P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
575        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
576        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
577        Ticker: FnMut() -> TickerFut,
578        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
579    {
580        tracing::info!(
581            chunk_size = self.config.max_chunk_size,
582            commit = ?commit,
583            ticker = ticker.is_some(),
584            "BatchEngine (workbatch parsed) starting"
585        );
586
587        let mut ticker = LoopTicker::new(ticker);
588
589        loop {
590            tokio::select! {
591                biased;
592
593                () = shutdown.cancelled() => {
594                    tracing::info!("BatchEngine (workbatch parsed) shutting down");
595                    return Ok(());
596                }
597
598                () = ticker.wait() => ticker.fire("workbatch parsed").await,
599
600                recv_result = receiver.recv(self.config.max_chunk_size) => {
601                    let recv_batch = recv_result.map_err(EngineError::Transport)?;
602                    let Some(batch) = self.ingest_workbatch(recv_batch)? else {
603                        continue;
604                    };
605                    // Wrap the parse-then-process so drive_block stays generic.
606                    // parse_block honours ParseErrorAction: FailBatch surfaces a
607                    // terminal EngineError here (no commit), Dlq carries entries
608                    // forward for the driver to route, Skip drops silently+counted.
609                    let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
610                        let parsed = self.parse_block(b)?;
611                        process_parsed(parsed)
612                    };
613                    self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
614                }
615            }
616        }
617    }
618
619    /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
620    /// entries per the configured policy, then return the batch (with its
621    /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
622    /// the block has no records (caller should `continue`).
623    ///
624    /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
625    /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
626    /// [`WorkBatch::dlq_entries`] and are routed here via
627    /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
628    /// before processing. Memory accounting is performed in
629    /// [`drive_block`](Self::drive_block).
630    #[cfg(feature = "transport")]
631    fn ingest_workbatch<T: crate::transport::CommitToken>(
632        &self,
633        batch: WorkBatch<T>,
634    ) -> Result<Option<WorkBatch<T>>, EngineError> {
635        // Route/discard/reject inline-DLQ entries per the configured policy --
636        // never silently dropped. The batch comes back with its dlq_entries
637        // consumed so the process stage sees a clean block.
638        let batch = self.apply_workbatch_dlq_policy(batch)?;
639        // Skip ONLY a truly-empty block. A block with no records but with
640        // commit_tokens is the all-filtered case (every record was dropped/
641        // DLQ-routed by an inbound filter): those acks must still be committed
642        // so the source advances past the filtered records -- returning None
643        // here would strand them (stalled Kafka offset / leaked Redis PEL).
644        if batch.records.is_empty() && batch.commit_tokens.is_empty() {
645            return Ok(None);
646        }
647        Ok(Some(batch))
648    }
649
650    /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
651    ///
652    /// Shared by both [`run_workbatch`](Self::run_workbatch) and
653    /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
654    /// between the two is the `process` closure they pass.
655    #[cfg(feature = "transport")]
656    async fn drive_block<R, P, Sink, SinkFut>(
657        &self,
658        receiver: &R,
659        batch: WorkBatch<R::Token>,
660        process: &P,
661        sink: &mut Sink,
662        commit: CommitMode,
663    ) -> Result<(), EngineError>
664    where
665        R: TransportReceiver,
666        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
667        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
668        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
669    {
670        // Account the in-flight ingress bytes against the MemoryGuard; the lease
671        // releases on every exit path of this block (sink-error early return,
672        // commit, ?-return) via Drop.
673        #[cfg(feature = "memory")]
674        let _ingress_lease = self.lease_ingress_batch(&batch);
675
676        // process() may fan out / fan in; it preserves the input commit_tokens.
677        // Capture the input ack count so a contract breach is LOGGED rather than
678        // silently freezing the source offset: a closure that rebuilds its
679        // output with WorkBatch::from_records (instead of map_records) drops the
680        // tokens to zero, the Auto commit below commits `&[]`, and the partition
681        // stalls with no diagnostic. One len() compare per block (not per
682        // record) -- nil hot-path cost.
683        let input_token_count = batch.commit_tokens.len();
684
685        let mut out_batch = process(batch)?;
686
687        if out_batch.commit_tokens.len() != input_token_count {
688            tracing::warn!(
689                input_tokens = input_token_count,
690                output_tokens = out_batch.commit_tokens.len(),
691                "process() changed the commit-token count -- the run contract is \
692                 that process preserves source acks (transform records, not \
693                 tokens). A drop toward zero will under-commit and stall the \
694                 source offset; use map_records, not WorkBatch::from_records."
695            );
696        }
697
698        // Route any parse/process-generated DLQ entries the out-batch carries,
699        // through the SAME policy + route point as the inbound-filter entries
700        // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
701        // sink/commit, so a parse/process dead-letter can never vanish on the
702        // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
703        // Route sink Err) is a terminal ack-barrier error -- the commit is
704        // skipped and the whole block re-delivered, so no later ordered commit
705        // advances past these undelivered dead-letters. Silent discard is opt-in
706        // only (FilterDlqPolicy::DiscardWithMetric).
707        if !out_batch.dlq_entries.is_empty() {
708            let entries = std::mem::take(&mut out_batch.dlq_entries);
709            if let Err(e) = self.route_dlq_entries(entries) {
710                tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
711                return Err(e);
712            }
713        }
714
715        // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
716        //
717        // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
718        // TERMINAL error -- it stops the run loop. The source commit is ordered
719        // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
720        // logged and continued, the NEXT block's commit would advance the
721        // committed watermark PAST this block's never-sent offsets, silently
722        // skipping records (data loss). Stopping the loop leaves THIS block's
723        // tokens uncommitted, so the source re-delivers from the last committed
724        // watermark on restart -- no later block can commit ahead of the
725        // failure. The app owns restart/retry policy; the engine never invents
726        // a silent skip.
727        // Skip the sink when there is nothing to send (e.g. every record in the
728        // block was filtered out): the sink has no work, but the block's
729        // commit_tokens -- which include the filtered records' acks -- must still
730        // be committed below so the source advances past them. (The streaming
731        // path gets this for free: a zero-record block runs zero sub-blocks and
732        // still commits once at the end.)
733        if !out_batch.records.is_empty()
734            && let Err(e) = sink(&out_batch).await
735        {
736            tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
737            return Err(e);
738        }
739
740        // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
741        // output record count. This is the at-least-once block contract.
742        match commit {
743            CommitMode::Auto => {
744                // A commit failure is ALSO a terminal ack-barrier failure: a
745                // failed ordered commit must not be followed by a later block's
746                // commit advancing the watermark past these uncommitted offsets.
747                if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
748                    tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
749                    return Err(EngineError::Transport(e));
750                }
751            }
752            CommitMode::SinkManaged => {
753                // The sink owns the commit -- the engine does not commit here.
754            }
755        }
756        Ok(())
757    }
758
759    /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
760    /// bounded to ONE sub-block, the source acks commit once after the final
761    /// sub-block.
762    ///
763    /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
764    /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
765    /// sub-block never multiplies the source acks. Each sub-block's ingress lease
766    /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
767    /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
768    /// block.
769    ///
770    /// On ANY sub-block sink error the block stops and the commit is skipped (the
771    /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
772    /// propagates out and stops the run loop, so no LATER block's ordered commit
773    /// can advance the cumulative watermark past these never-committed offsets
774    /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
775    /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
776    /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
777    /// commit failure is likewise terminal.
778    #[cfg(feature = "transport")]
779    async fn drive_block_streaming<R, P, Sink, SinkFut>(
780        &self,
781        receiver: &R,
782        batch: WorkBatch<R::Token>,
783        process: &P,
784        sink: &mut Sink,
785        commit: CommitMode,
786        sub_block_bytes: u64,
787    ) -> Result<(), EngineError>
788    where
789        R: TransportReceiver,
790        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
791        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
792        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
793    {
794        // Carry the WHOLE block's source acks aside; the sub-block views below
795        // commit EMPTY tokens. The batch's tokens commit ONCE after the final
796        // sub-block (at-least-once on the whole block). dlq_entries were already
797        // routed by ingest_workbatch, so the block here carries none.
798        let WorkBatch {
799            records,
800            commit_tokens,
801            ..
802        } = batch;
803
804        // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
805        // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
806        // pulls it -- it never pre-materialises every sub-block vector up front,
807        // so the only sub-block resident is the one currently being leased and
808        // sunk (the streaming peak-memory contract holds for the SPLIT itself,
809        // not just the lease).
810        let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
811
812        while let Some(sub_records) = sub_blocks.next_sub_block() {
813            // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
814            // path of this iteration (sink-error early return, ?-return, or the
815            // end of the loop body) via Drop -- BEFORE the next sub-block leases.
816            // Peak in-flight lease is therefore one sub-block, never the block.
817            let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
818            #[cfg(feature = "memory")]
819            let _sub_lease = self.lease_ingress_batch(&sub_block);
820
821            // process() may fan out / fan in within the sub-block; it preserves
822            // the (empty) commit_tokens of the sub-block view.
823            let mut out_sub = process(sub_block)?;
824
825            // Route any parse/process-generated DLQ entries this sub-block
826            // carries BEFORE its sink -- same single policy + route point as the
827            // whole-batch path and the inbound-filter entries. Fallible: a route
828            // failure is terminal (ack barrier) so the commit for the WHOLE block
829            // is skipped and it is re-delivered -- a dead-letter is never lost on
830            // the path to a source commit.
831            if !out_sub.dlq_entries.is_empty() {
832                let entries = std::mem::take(&mut out_sub.dlq_entries);
833                if let Err(e) = self.route_dlq_entries(entries) {
834                    tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
835                    return Err(e);
836                }
837            }
838
839            // Sink this sub-block. A sink error stops the block and skips the
840            // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
841            // propagate so the run loop stops -- a later block's ordered commit
842            // must never advance the cumulative watermark past this block's
843            // uncommitted offsets.
844            if let Err(e) = sink(&out_sub).await {
845                tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
846                return Err(e);
847            }
848            // _sub_lease drops here -> bytes released before the next sub-block.
849        }
850
851        // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
852        match commit {
853            CommitMode::Auto => {
854                // Commit failure is terminal (ack barrier) -- same reasoning as
855                // the sink-error path above.
856                if let Err(e) = receiver.commit(&commit_tokens).await {
857                    tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
858                    return Err(EngineError::Transport(e));
859                }
860            }
861            CommitMode::SinkManaged => {
862                // The sink owns the commit -- the engine does not commit here.
863            }
864        }
865        Ok(())
866    }
867
868    /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
869    ///
870    /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
871    /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
872    /// sub-block shapes) ergonomic. Same splitting contract as
873    /// [`SubBlockDrain::next_sub_block`].
874    #[cfg(all(test, feature = "transport"))]
875    fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
876        let mut drain = SubBlockDrain::new(records, target_bytes);
877        let mut out = Vec::new();
878        while let Some(sub) = drain.next_sub_block() {
879            out.push(sub);
880        }
881        out
882    }
883
884    /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
885    /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
886    ///
887    /// Parses each record's payload via [`codec::parse`] on the worker pool
888    /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
889    /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
890    /// the engine's `parse_error_action` -- the SAME contract the legacy
891    /// `process_mid_tier` honoured (previously the parsed path hardcoded
892    /// route-to-DLQ, ignoring the config):
893    ///
894    /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
895    ///   appended to the batch's `dlq_entries` (no silent drop) and counted in
896    ///   errors + dlq. The driver routes those entries before commit.
897    /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
898    ///   in errors ONLY (a deliberate, configured drop -- not a silent vanish).
899    /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
900    ///   failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
901    ///   consistent with the P1 ack barrier, so the block is re-delivered rather
902    ///   than partially committed.
903    ///
904    /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
905    ///
906    /// # Errors
907    ///
908    /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
909    /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
910    #[cfg(feature = "transport")]
911    fn parse_block<T: crate::transport::CommitToken>(
912        &self,
913        batch: WorkBatch<T>,
914    ) -> Result<ParsedBatch<'_, T>, EngineError> {
915        use super::ParseErrorAction;
916        use crate::transport::PayloadFormat;
917
918        let WorkBatch {
919            records,
920            commit_tokens,
921            mut dlq_entries,
922        } = batch;
923
924        // Parse each record on the pool. The pool's map_owned applies the scaler
925        // semaphore per item, so the parse phase obeys the CPU cap exactly as the
926        // legacy parsed path does. map_owned preserves input order, so a record
927        // and its parse result stay aligned without threading an explicit index.
928        let parsed_each: Vec<(Record, Result<ParsedPayload, String>)> =
929            self.pool.map_owned(records, |record| {
930                let format: PayloadFormat = record.metadata.format;
931                let result =
932                    codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
933                (record, result)
934            });
935
936        let action = self.config.parse_error_action;
937        let mut keep_records = Vec::new();
938        let mut keep_parsed = Vec::new();
939        for (record, result) in parsed_each {
940            match result {
941                Ok(payload) => {
942                    keep_records.push(record);
943                    keep_parsed.push(payload);
944                }
945                Err(reason) => match action {
946                    ParseErrorAction::Dlq => {
947                        // No silent drop: the unparseable record's bytes go to DLQ.
948                        self.stats.incr_errors();
949                        self.stats.incr_dlq();
950                        dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
951                            payload: record.payload.to_vec(),
952                            key: record.key.clone(),
953                            reason,
954                        });
955                    }
956                    ParseErrorAction::Skip => {
957                        // Deliberate, configured drop -- counted in errors but NOT
958                        // dead-lettered. This is opt-in loss, not a silent vanish.
959                        self.stats.incr_errors();
960                    }
961                    ParseErrorAction::FailBatch => {
962                        // Terminal: the whole block fails its commit (ack barrier).
963                        self.stats.incr_errors();
964                        return Err(EngineError::ParseBatchFailed(reason));
965                    }
966                },
967            }
968        }
969
970        Ok(ParsedBatch {
971            records: keep_records,
972            parsed: keep_parsed,
973            commit_tokens,
974            dlq_entries,
975            interner: &self.interner,
976        })
977    }
978
979    /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
980    /// returning an RAII lease that releases them on drop.
981    ///
982    /// Drives the in-flight ingress accounting for the WorkBatch driver: the
983    /// lease is taken in [`drive_block`](Self::drive_block) and releases the
984    /// bytes on every block exit path via `Drop`.
985    ///
986    /// [`MemoryGuard`]: crate::memory::MemoryGuard
987    #[cfg(feature = "memory")]
988    pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
989        &self,
990        batch: &WorkBatch<T>,
991    ) -> Option<super::IngressLease<'_>> {
992        let guard = self.memory_guard.as_ref()?;
993        let bytes = batch.total_payload_bytes() as u64;
994        guard.add_bytes(bytes);
995        Some(super::IngressLease::new(guard, bytes))
996    }
997}
998
999/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
1000/// of [`Record`]s at a time, so the streaming driver never pre-materialises
1001/// every sub-block vector up front.
1002///
1003/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
1004/// order) from the source until the accumulated `payload.len()` would overshoot
1005/// `target_bytes`, then returns that sub-block; the remaining records stay
1006/// un-pulled in the source iterator. Splitting contract:
1007///
1008/// - records are kept in order;
1009/// - FLOOR of one record per sub-block: a record whose payload alone meets or
1010///   exceeds `target_bytes` is its own single-record sub-block (never stalls);
1011/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
1012/// - an exhausted source yields `None`.
1013///
1014/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
1015/// sub-block vector before the loop processed the first one. Here, at most ONE
1016/// sub-block vector is allocated at a time -- the one the loop is about to lease
1017/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
1018#[cfg(feature = "transport")]
1019struct SubBlockDrain {
1020    /// Source records, drained in order. `peeked` holds a record we pulled but
1021    /// could not fit into the sub-block being built (it starts the next one).
1022    iter: std::vec::IntoIter<Record>,
1023    peeked: Option<Record>,
1024    target_bytes: u64,
1025}
1026
1027#[cfg(feature = "transport")]
1028impl SubBlockDrain {
1029    fn new(records: Vec<Record>, target_bytes: u64) -> Self {
1030        Self {
1031            iter: records.into_iter(),
1032            peeked: None,
1033            target_bytes,
1034        }
1035    }
1036
1037    /// Yield the next consecutive sub-block, or `None` when the source is
1038    /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
1039    fn next_sub_block(&mut self) -> Option<Vec<Record>> {
1040        // Start with the record carried over from the previous call (if any),
1041        // else pull the first record of this sub-block from the source.
1042        let first = self.peeked.take().or_else(|| self.iter.next())?;
1043        let mut current_bytes = first.payload.len() as u64;
1044        let mut current = vec![first];
1045
1046        // Pull more records while they fit. Floor 1: we already took one record
1047        // above, so an oversized record is still its own sub-block.
1048        for record in self.iter.by_ref() {
1049            let record_bytes = record.payload.len() as u64;
1050            if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1051                // Does not fit -- carry it to the next sub-block and stop here.
1052                self.peeked = Some(record);
1053                break;
1054            }
1055            current_bytes = current_bytes.saturating_add(record_bytes);
1056            current.push(record);
1057        }
1058        Some(current)
1059    }
1060}
1061
1062#[cfg(all(test, feature = "transport-memory"))]
1063#[path = "driver_tests.rs"]
1064mod tests;