Skip to main content

hyperi_rustlib/worker/engine/
driver.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/driver.rs
3// Purpose:   Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//!   recv(max) -> WorkBatch        (recv now yields a WorkBatch natively)
18//!     -> apply_workbatch_dlq_policy  (route/discard/reject inline-DLQ entries)
19//!     -> lease_ingress_batch      (memory accounting on the block's bytes)
20//!     -> process(WorkBatch)       (transforms parse ON DEMAND via codec::parse)
21//!     -> sink(&out_batch).await   (async send of the whole block)
22//!     -> commit per CommitMode    (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//!   does NOT pre-parse. A transform that needs a field calls
38//!   [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//!   never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//!   hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//!   (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//!   closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//!   shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//!   dedup. This keeps the batch-parse + interner throughput win for apps that
47//!   opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67    /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68    /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69    /// skips the commit so the block is re-delivered. This is the engine-commits
70    /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71    Auto,
72    /// The sink owns the commit -- the engine does NOT commit. The sink is
73    /// handed the block (which carries `commit_tokens`) and decides when to
74    /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75    /// the former async run loop, lifted onto the block.
76    SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95///   [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96///   (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97///   the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99///   errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101///   terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109    /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110    pub records: Vec<Record>,
111
112    /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113    pub parsed: Vec<ParsedPayload>,
114
115    /// Input source acks for the whole block (decoupled from record count).
116    pub commit_tokens: Vec<T>,
117
118    /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119    /// that failed to parse (no-silent-drop).
120    pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122    /// Shared interner for hot routing-field-name dedup. The first time a field
123    /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124    /// bump. Reused from the engine so dedup persists across blocks.
125    pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129    /// Number of successfully-parsed records.
130    #[must_use]
131    pub fn len(&self) -> usize {
132        self.records.len()
133    }
134
135    /// Whether there are no successfully-parsed records.
136    #[must_use]
137    pub fn is_empty(&self) -> bool {
138        self.records.is_empty()
139    }
140
141    /// Intern a routing-field name through the shared interner.
142    ///
143    /// Use this to dedup the routing-key field name once per block rather than
144    /// re-allocating it per record.
145    #[must_use]
146    pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147        self.interner.intern(name)
148    }
149}
150
151/// Optional periodic ticker shared by the run-loops (flush timers, periodic
152/// maintenance). Folds away the identical interval-setup + select-arm the four
153/// loops would otherwise each carry. The ticker is COLD (fires on the order of
154/// the flush interval, not per block), so this extraction does not touch the
155/// hot recv path.
156#[cfg(feature = "transport")]
157struct LoopTicker<F> {
158    interval: Option<tokio::time::Interval>,
159    callback: Option<F>,
160}
161
162#[cfg(feature = "transport")]
163impl<F, Fut> LoopTicker<F>
164where
165    F: FnMut() -> Fut,
166    Fut: std::future::Future<Output = Result<(), EngineError>>,
167{
168    fn new(ticker: Option<(Duration, F)>) -> Self {
169        // Start the first tick one period out (not immediately) so the loop
170        // does not fire a tick before it has polled the source once.
171        let interval = ticker
172            .as_ref()
173            .map(|(d, _)| tokio::time::interval_at(tokio::time::Instant::now() + *d, *d));
174        Self {
175            interval,
176            callback: ticker.map(|(_, f)| f),
177        }
178    }
179
180    /// Yield when the next tick is due, or never if no ticker is configured.
181    /// Cancel-safe: `Interval::tick` is cancel-safe and the no-ticker arm pends,
182    /// so this sits directly in `tokio::select!`.
183    async fn wait(&mut self) {
184        match self.interval.as_mut() {
185            Some(i) => {
186                i.tick().await;
187            }
188            None => std::future::pending::<()>().await,
189        }
190    }
191
192    /// Run the ticker callback; a callback error is logged, not fatal.
193    async fn fire(&mut self, label: &str) {
194        if let Some(f) = self.callback.as_mut()
195            && let Err(e) = f().await
196        {
197            tracing::error!(error = %e, ticker = label, "Ticker failed");
198        }
199    }
200}
201
202impl BatchEngine {
203    /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
204    ///
205    /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
206    /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
207    /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
208    /// pay zero parse cost.
209    ///
210    /// - `process` runs on the loop task (cancellation-aware between awaits) and
211    ///   may fan records out or in; it MUST preserve `commit_tokens` (use
212    ///   [`WorkBatch::map_records`], which does so automatically).
213    /// - `sink` is async and receives the WHOLE out-batch by reference.
214    /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
215    ///   or [`CommitMode::SinkManaged`] (sink owns commit).
216    /// - `ticker` is an optional `(interval, fn)` that fires on the interval
217    ///   inside the select loop (flush timers, periodic maintenance).
218    ///
219    /// Stops cleanly when `shutdown` is cancelled.
220    ///
221    /// # Errors
222    ///
223    /// Returns [`EngineError::Transport`] if `recv` fails fatally,
224    /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
225    /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
226    /// the error returned by `process`.
227    ///
228    /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
229    /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
230    /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
231    /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
232    /// block is fetched and committed past them, which would silently skip the
233    /// never-sent records (data loss). On restart the source re-delivers from
234    /// the last committed watermark, preserving at-least-once. The app owns
235    /// restart/retry policy.
236    #[cfg(feature = "transport")]
237    #[allow(clippy::too_many_arguments)]
238    pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
239        &self,
240        receiver: &R,
241        shutdown: CancellationToken,
242        process: P,
243        mut sink: Sink,
244        commit: CommitMode,
245        ticker: Option<(Duration, Ticker)>,
246    ) -> Result<(), EngineError>
247    where
248        R: TransportReceiver,
249        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
250        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
251        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
252        Ticker: FnMut() -> TickerFut,
253        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
254    {
255        tracing::info!(
256            chunk_size = self.config.max_chunk_size,
257            commit = ?commit,
258            ticker = ticker.is_some(),
259            "BatchEngine (workbatch) starting"
260        );
261
262        let mut ticker = LoopTicker::new(ticker);
263
264        loop {
265            tokio::select! {
266                biased;
267
268                () = shutdown.cancelled() => {
269                    tracing::info!("BatchEngine (workbatch) shutting down");
270                    return Ok(());
271                }
272
273                () = ticker.wait() => ticker.fire("workbatch").await,
274
275                recv_result = receiver.recv(self.config.max_chunk_size) => {
276                    let work_batch = recv_result.map_err(EngineError::Transport)?;
277                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
278                        continue;
279                    };
280                    self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
281                }
282            }
283        }
284    }
285
286    /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
287    ///
288    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
289    /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
290    /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
291    /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
292    /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
293    /// is leased and processed.
294    ///
295    /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
296    /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
297    /// at-least-once is preserved: a sink error on any sub-block stops the block
298    /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
299    /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
300    /// the end.
301    ///
302    /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
303    /// one record, so a record larger than the target is still its own sub-block
304    /// and the loop never stalls). Taken as an explicit parameter so the path is
305    /// testable in isolation; `run_governed` (`governor` feature) supplies it
306    /// from the governor's byte budget.
307    ///
308    /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
309    /// acks are still the batch's input tokens, committed once at the end.
310    ///
311    /// # Errors
312    ///
313    /// Same as [`run_workbatch`](Self::run_workbatch).
314    #[cfg(feature = "transport")]
315    #[allow(clippy::too_many_arguments)]
316    pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
317        &self,
318        receiver: &R,
319        shutdown: CancellationToken,
320        process: P,
321        mut sink: Sink,
322        commit: CommitMode,
323        sub_block_bytes: u64,
324        ticker: Option<(Duration, Ticker)>,
325    ) -> Result<(), EngineError>
326    where
327        R: TransportReceiver,
328        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
329        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
330        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
331        Ticker: FnMut() -> TickerFut,
332        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
333    {
334        // SinkManaged is unrepresentable on the streaming path: sub-block views
335        // carry EMPTY tokens, so the sink never sees the block's source acks and
336        // cannot own the commit. Fail fast at startup instead of freezing the
337        // partition at runtime.
338        if matches!(commit, CommitMode::SinkManaged) {
339            return Err(EngineError::SinkManagedUnsupported);
340        }
341
342        tracing::info!(
343            chunk_size = self.config.max_chunk_size,
344            commit = ?commit,
345            sub_block_bytes,
346            ticker = ticker.is_some(),
347            "BatchEngine (workbatch streaming) starting"
348        );
349
350        let mut ticker = LoopTicker::new(ticker);
351
352        loop {
353            tokio::select! {
354                biased;
355
356                () = shutdown.cancelled() => {
357                    tracing::info!("BatchEngine (workbatch streaming) shutting down");
358                    return Ok(());
359                }
360
361                () = ticker.wait() => ticker.fire("workbatch streaming").await,
362
363                recv_result = receiver.recv(self.config.max_chunk_size) => {
364                    let work_batch = recv_result.map_err(EngineError::Transport)?;
365                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
366                        continue;
367                    };
368                    self.drive_block_streaming(
369                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
370                    )
371                    .await?;
372                }
373            }
374        }
375    }
376
377    /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
378    ///
379    /// This is what a self-regulating app calls instead of choosing between
380    /// [`run_workbatch`](Self::run_workbatch) and
381    /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
382    /// dispatches on whether the byte-budget lever is wired
383    /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
384    /// `ServiceRuntime` when `self_regulation.enabled = true`):
385    ///
386    /// - **Governor ON** (budget wired): streams each received block in
387    ///   sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
388    ///   peak in-flight memory to one sub-block, and folds each block's
389    ///   `(bytes, process_time, ingest_interval)` into the AIMD loop via
390    ///   [`observe`](crate::governor::ByteBudgetController::observe). The recv
391    ///   `max` is capped to the budget's poll-safety
392    ///   [`record_cap`](crate::governor::ByteBudgetController::record_cap).
393    ///   While pressure is LOW the budget sits at its big start value, so the
394    ///   block becomes a SINGLE sub-block -- no per-record overhead, behaviour
395    ///   matches the whole-batch loop.
396    /// - **Governor OFF** (no budget): delegates verbatim to
397    ///   [`run_workbatch`](Self::run_workbatch) -- byte-identical to
398    ///   pre-governor behaviour.
399    ///
400    /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
401    /// SEPARATELY into the receive transport, not here -- this method is the
402    /// driver-side lever (sub-block sizing + AIMD), the gate is the
403    /// transport-side brake. The two share the same `UnifiedPressure`.
404    ///
405    /// # Errors
406    ///
407    /// Same as [`run_workbatch`](Self::run_workbatch).
408    #[cfg(all(feature = "transport", feature = "governor"))]
409    #[allow(clippy::too_many_arguments)]
410    pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
411        &self,
412        receiver: &R,
413        shutdown: CancellationToken,
414        process: P,
415        mut sink: Sink,
416        commit: CommitMode,
417        ticker: Option<(Duration, Ticker)>,
418    ) -> Result<(), EngineError>
419    where
420        R: TransportReceiver,
421        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
422        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
423        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
424        Ticker: FnMut() -> TickerFut,
425        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
426    {
427        // Governor OFF -> the original whole-batch loop, byte-for-byte. The
428        // whole-batch path DOES support SinkManaged (the sink receives the full
429        // block with its tokens), so the guard below must sit AFTER this
430        // delegate, not before it.
431        let Some(budget) = self.byte_budget.clone() else {
432            return self
433                .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
434                .await;
435        };
436
437        // Governor ON streams in sub-blocks whose views carry EMPTY tokens, so
438        // the sink can never own a SinkManaged commit -- reject it at startup
439        // rather than silently freeze the source offset. (Same guard as
440        // run_workbatch_streaming, which this path bypasses.)
441        if matches!(commit, CommitMode::SinkManaged) {
442            return Err(EngineError::SinkManagedUnsupported);
443        }
444
445        tracing::info!(
446            chunk_size = self.config.max_chunk_size,
447            commit = ?commit,
448            ticker = ticker.is_some(),
449            start_byte_budget = budget.byte_budget(),
450            "BatchEngine (governed) starting -- self-regulation ON"
451        );
452
453        let mut ticker = LoopTicker::new(ticker);
454
455        // Track the previous block's arrival instant so we can feed the AIMD
456        // loop a real ingest inter-arrival interval.
457        let mut last_recv: Option<std::time::Instant> = None;
458
459        loop {
460            // The recv limits bound a single poll by BOTH:
461            //   - the SMALLER of the config chunk size and the budget's
462            //     poll-safety record cap (a tiny-record flood cannot blow the
463            //     count even within the byte budget), AND
464            //   - the CURRENT byte budget (re-read per block), so a single poll
465            //     never RETAINS more than ~one budget's worth of inbound payload
466            //     BEFORE the sub-block split. This is the fix for the
467            //     "byte budget does not bound RECEIVE memory" gap: without the
468            //     byte cap, `recv(max)` could build a WorkBatch (and, for the
469            //     Kafka recv-arena, allocate one arena) far larger than the
470            //     budget before any sub-block lease ran.
471            let recv_limits = crate::transport::RecvLimits {
472                max_records: self.config.max_chunk_size.min(budget.record_cap()),
473                max_bytes: budget.byte_budget(),
474            };
475
476            tokio::select! {
477                biased;
478
479                () = shutdown.cancelled() => {
480                    tracing::info!("BatchEngine (governed) shutting down");
481                    return Ok(());
482                }
483
484                () = ticker.wait() => ticker.fire("governed").await,
485
486                recv_result = receiver.recv_limited(recv_limits) => {
487                    let now = std::time::Instant::now();
488                    let ingest_interval = last_recv
489                        .map(|prev| now.saturating_duration_since(prev))
490                        .unwrap_or_default();
491                    last_recv = Some(now);
492
493                    let work_batch = recv_result.map_err(EngineError::Transport)?;
494                    let block_bytes = work_batch.total_payload_bytes() as u64;
495                    let Some(batch) = self.ingest_workbatch(work_batch)? else {
496                        // Empty block: still fold the timing so a quiet pipeline
497                        // can grow its budget back. No bytes -> treated as slack.
498                        budget.observe(0, Duration::ZERO, ingest_interval);
499                        continue;
500                    };
501
502                    // Re-read the budget for THIS block: low pressure -> big
503                    // budget -> one sub-block (no overhead); high pressure ->
504                    // shrunk budget -> peak in-flight bounded to one sub-block.
505                    let sub_block_bytes = budget.byte_budget();
506
507                    let process_start = std::time::Instant::now();
508                    self.drive_block_streaming(
509                        receiver, batch, &process, &mut sink, commit, sub_block_bytes,
510                    )
511                    .await?;
512                    let process_time = process_start.elapsed();
513
514                    // Fold the OBSERVED actual block bytes into the AIMD loop. A
515                    // memory HARD override inside observe() shrinks immediately
516                    // regardless of rho.
517                    budget.observe(block_bytes, process_time, ingest_interval);
518
519                    // Observability: surface the current budget + pressure as
520                    // gauges so throttling is visible, not mysterious, AND the
521                    // ACTUAL received block bytes so the gap between the budget
522                    // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
523                    // is measurable -- a persistent overshoot means the recv byte
524                    // cap is not holding. The gate edges (pause/resume) are
525                    // logged by the ObservingActuator.
526                    #[cfg(feature = "metrics")]
527                    {
528                        metrics::gauge!("self_regulation_byte_budget")
529                            .set(budget.byte_budget() as f64);
530                        // dual-emit: drop OLD in next release (MIGRATIONS) --
531                        // `_bytes` is the convention base-unit suffix.
532                        metrics::gauge!("self_regulation_byte_budget_bytes")
533                            .set(budget.byte_budget() as f64);
534                        metrics::gauge!("self_regulation_recv_block_bytes")
535                            .set(block_bytes as f64);
536                        // `self_regulation_` domain prefix: a bare `pressure_ratio`
537                        // collides with MemoryGuard's and ScalingPressure's own
538                        // pressure gauges on the same registry.
539                        metrics::gauge!("self_regulation_pressure_ratio")
540                            .set(budget.pressure().level());
541                    }
542                }
543            }
544        }
545    }
546
547    /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
548    ///
549    /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
550    /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
551    /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
552    /// (records + aligned parsed payloads + shared
553    /// [`FieldInterner`](super::FieldInterner)). This keeps
554    /// the batch-parse + interner throughput win for apps that opt in.
555    ///
556    /// Records that fail to parse are handled per the configured
557    /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
558    /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
559    /// the parse-failure contract. `process_parsed` returns the final
560    /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
561    ///
562    /// # Errors
563    ///
564    /// Same as [`run_workbatch`](Self::run_workbatch).
565    #[cfg(feature = "transport")]
566    #[allow(clippy::too_many_arguments)]
567    pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
568        &self,
569        receiver: &R,
570        shutdown: CancellationToken,
571        process_parsed: P,
572        mut sink: Sink,
573        commit: CommitMode,
574        ticker: Option<(Duration, Ticker)>,
575    ) -> Result<(), EngineError>
576    where
577        R: TransportReceiver,
578        P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
579        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
580        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
581        Ticker: FnMut() -> TickerFut,
582        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
583    {
584        tracing::info!(
585            chunk_size = self.config.max_chunk_size,
586            commit = ?commit,
587            ticker = ticker.is_some(),
588            "BatchEngine (workbatch parsed) starting"
589        );
590
591        let mut ticker = LoopTicker::new(ticker);
592
593        loop {
594            tokio::select! {
595                biased;
596
597                () = shutdown.cancelled() => {
598                    tracing::info!("BatchEngine (workbatch parsed) shutting down");
599                    return Ok(());
600                }
601
602                () = ticker.wait() => ticker.fire("workbatch parsed").await,
603
604                recv_result = receiver.recv(self.config.max_chunk_size) => {
605                    let recv_batch = recv_result.map_err(EngineError::Transport)?;
606                    let Some(batch) = self.ingest_workbatch(recv_batch)? else {
607                        continue;
608                    };
609                    // Wrap the parse-then-process so drive_block stays generic.
610                    // parse_block honours ParseErrorAction: FailBatch surfaces a
611                    // terminal EngineError here (no commit), Dlq carries entries
612                    // forward for the driver to route, Skip drops silently+counted.
613                    let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
614                        let parsed = self.parse_block(b)?;
615                        process_parsed(parsed)
616                    };
617                    self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
618                }
619            }
620        }
621    }
622
623    /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
624    /// entries per the configured policy, then return the batch (with its
625    /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
626    /// the block has no records (caller should `continue`).
627    ///
628    /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
629    /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
630    /// [`WorkBatch::dlq_entries`] and are routed here via
631    /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
632    /// before processing. Memory accounting is performed in
633    /// [`drive_block`](Self::drive_block).
634    #[cfg(feature = "transport")]
635    fn ingest_workbatch<T: crate::transport::CommitToken>(
636        &self,
637        batch: WorkBatch<T>,
638    ) -> Result<Option<WorkBatch<T>>, EngineError> {
639        // Route/discard/reject inline-DLQ entries per the configured policy --
640        // never silently dropped. The batch comes back with its dlq_entries
641        // consumed so the process stage sees a clean block.
642        let batch = self.apply_workbatch_dlq_policy(batch)?;
643        // Skip ONLY a truly-empty block. A block with no records but with
644        // commit_tokens is the all-filtered case (every record was dropped/
645        // DLQ-routed by an inbound filter): those acks must still be committed
646        // so the source advances past the filtered records -- returning None
647        // here would strand them (stalled Kafka offset / leaked Redis PEL).
648        if batch.records.is_empty() && batch.commit_tokens.is_empty() {
649            return Ok(None);
650        }
651        Ok(Some(batch))
652    }
653
654    /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
655    ///
656    /// Shared by both [`run_workbatch`](Self::run_workbatch) and
657    /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
658    /// between the two is the `process` closure they pass.
659    #[cfg(feature = "transport")]
660    async fn drive_block<R, P, Sink, SinkFut>(
661        &self,
662        receiver: &R,
663        batch: WorkBatch<R::Token>,
664        process: &P,
665        sink: &mut Sink,
666        commit: CommitMode,
667    ) -> Result<(), EngineError>
668    where
669        R: TransportReceiver,
670        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
671        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
672        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
673    {
674        // Account the in-flight ingress bytes against the MemoryGuard; the lease
675        // releases on every exit path of this block (sink-error early return,
676        // commit, ?-return) via Drop.
677        #[cfg(feature = "memory")]
678        let _ingress_lease = self.lease_ingress_batch(&batch);
679
680        // process() may fan out / fan in; it preserves the input commit_tokens.
681        // Capture the input ack count so a contract breach is LOGGED rather than
682        // silently freezing the source offset: a closure that rebuilds its
683        // output with WorkBatch::from_records (instead of map_records) drops the
684        // tokens to zero, the Auto commit below commits `&[]`, and the partition
685        // stalls with no diagnostic. One len() compare per block (not per
686        // record) -- nil hot-path cost.
687        let input_token_count = batch.commit_tokens.len();
688
689        let mut out_batch = process(batch)?;
690
691        if out_batch.commit_tokens.len() != input_token_count {
692            tracing::warn!(
693                input_tokens = input_token_count,
694                output_tokens = out_batch.commit_tokens.len(),
695                "process() changed the commit-token count -- the run contract is \
696                 that process preserves source acks (transform records, not \
697                 tokens). A drop toward zero will under-commit and stall the \
698                 source offset; use map_records, not WorkBatch::from_records."
699            );
700        }
701
702        // Route any parse/process-generated DLQ entries the out-batch carries,
703        // through the SAME policy + route point as the inbound-filter entries
704        // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
705        // sink/commit, so a parse/process dead-letter can never vanish on the
706        // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
707        // Route sink Err) is a terminal ack-barrier error -- the commit is
708        // skipped and the whole block re-delivered, so no later ordered commit
709        // advances past these undelivered dead-letters. Silent discard is opt-in
710        // only (FilterDlqPolicy::DiscardWithMetric).
711        if !out_batch.dlq_entries.is_empty() {
712            let entries = std::mem::take(&mut out_batch.dlq_entries);
713            if let Err(e) = self.route_dlq_entries(entries) {
714                tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
715                return Err(e);
716            }
717        }
718
719        // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
720        //
721        // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
722        // TERMINAL error -- it stops the run loop. The source commit is ordered
723        // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
724        // logged and continued, the NEXT block's commit would advance the
725        // committed watermark PAST this block's never-sent offsets, silently
726        // skipping records (data loss). Stopping the loop leaves THIS block's
727        // tokens uncommitted, so the source re-delivers from the last committed
728        // watermark on restart -- no later block can commit ahead of the
729        // failure. The app owns restart/retry policy; the engine never invents
730        // a silent skip.
731        // Skip the sink when there is nothing to send (e.g. every record in the
732        // block was filtered out): the sink has no work, but the block's
733        // commit_tokens -- which include the filtered records' acks -- must still
734        // be committed below so the source advances past them. (The streaming
735        // path gets this for free: a zero-record block runs zero sub-blocks and
736        // still commits once at the end.)
737        if !out_batch.records.is_empty()
738            && let Err(e) = sink(&out_batch).await
739        {
740            tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
741            return Err(e);
742        }
743
744        // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
745        // output record count. This is the at-least-once block contract.
746        match commit {
747            CommitMode::Auto => {
748                // A commit failure is ALSO a terminal ack-barrier failure: a
749                // failed ordered commit must not be followed by a later block's
750                // commit advancing the watermark past these uncommitted offsets.
751                if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
752                    tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
753                    return Err(EngineError::Transport(e));
754                }
755            }
756            CommitMode::SinkManaged => {
757                // The sink owns the commit -- the engine does not commit here.
758            }
759        }
760        Ok(())
761    }
762
763    /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
764    /// bounded to ONE sub-block, the source acks commit once after the final
765    /// sub-block.
766    ///
767    /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
768    /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
769    /// sub-block never multiplies the source acks. Each sub-block's ingress lease
770    /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
771    /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
772    /// block.
773    ///
774    /// On ANY sub-block sink error the block stops and the commit is skipped (the
775    /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
776    /// propagates out and stops the run loop, so no LATER block's ordered commit
777    /// can advance the cumulative watermark past these never-committed offsets
778    /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
779    /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
780    /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
781    /// commit failure is likewise terminal.
782    #[cfg(feature = "transport")]
783    async fn drive_block_streaming<R, P, Sink, SinkFut>(
784        &self,
785        receiver: &R,
786        batch: WorkBatch<R::Token>,
787        process: &P,
788        sink: &mut Sink,
789        commit: CommitMode,
790        sub_block_bytes: u64,
791    ) -> Result<(), EngineError>
792    where
793        R: TransportReceiver,
794        P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
795        Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
796        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
797    {
798        // Carry the WHOLE block's source acks aside; the sub-block views below
799        // commit EMPTY tokens. The batch's tokens commit ONCE after the final
800        // sub-block (at-least-once on the whole block). dlq_entries were already
801        // routed by ingest_workbatch, so the block here carries none.
802        let WorkBatch {
803            records,
804            commit_tokens,
805            ..
806        } = batch;
807
808        // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
809        // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
810        // pulls it -- it never pre-materialises every sub-block vector up front,
811        // so the only sub-block resident is the one currently being leased and
812        // sunk (the streaming peak-memory contract holds for the SPLIT itself,
813        // not just the lease).
814        let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
815
816        while let Some(sub_records) = sub_blocks.next_sub_block() {
817            // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
818            // path of this iteration (sink-error early return, ?-return, or the
819            // end of the loop body) via Drop -- BEFORE the next sub-block leases.
820            // Peak in-flight lease is therefore one sub-block, never the block.
821            let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
822            #[cfg(feature = "memory")]
823            let _sub_lease = self.lease_ingress_batch(&sub_block);
824
825            // process() may fan out / fan in within the sub-block; it preserves
826            // the (empty) commit_tokens of the sub-block view.
827            let mut out_sub = process(sub_block)?;
828
829            // Route any parse/process-generated DLQ entries this sub-block
830            // carries BEFORE its sink -- same single policy + route point as the
831            // whole-batch path and the inbound-filter entries. Fallible: a route
832            // failure is terminal (ack barrier) so the commit for the WHOLE block
833            // is skipped and it is re-delivered -- a dead-letter is never lost on
834            // the path to a source commit.
835            if !out_sub.dlq_entries.is_empty() {
836                let entries = std::mem::take(&mut out_sub.dlq_entries);
837                if let Err(e) = self.route_dlq_entries(entries) {
838                    tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
839                    return Err(e);
840                }
841            }
842
843            // Sink this sub-block. A sink error stops the block and skips the
844            // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
845            // propagate so the run loop stops -- a later block's ordered commit
846            // must never advance the cumulative watermark past this block's
847            // uncommitted offsets.
848            if let Err(e) = sink(&out_sub).await {
849                tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
850                return Err(e);
851            }
852            // _sub_lease drops here -> bytes released before the next sub-block.
853        }
854
855        // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
856        match commit {
857            CommitMode::Auto => {
858                // Commit failure is terminal (ack barrier) -- same reasoning as
859                // the sink-error path above.
860                if let Err(e) = receiver.commit(&commit_tokens).await {
861                    tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
862                    return Err(EngineError::Transport(e));
863                }
864            }
865            CommitMode::SinkManaged => {
866                // The sink owns the commit -- the engine does not commit here.
867            }
868        }
869        Ok(())
870    }
871
872    /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
873    ///
874    /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
875    /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
876    /// sub-block shapes) ergonomic. Same splitting contract as
877    /// [`SubBlockDrain::next_sub_block`].
878    #[cfg(all(test, feature = "transport"))]
879    fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
880        let mut drain = SubBlockDrain::new(records, target_bytes);
881        let mut out = Vec::new();
882        while let Some(sub) = drain.next_sub_block() {
883            out.push(sub);
884        }
885        out
886    }
887
888    /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
889    /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
890    ///
891    /// Parses each record's payload via [`codec::parse`] on the worker pool
892    /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
893    /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
894    /// the engine's `parse_error_action` -- the SAME contract the legacy
895    /// `process_mid_tier` honoured (previously the parsed path hardcoded
896    /// route-to-DLQ, ignoring the config):
897    ///
898    /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
899    ///   appended to the batch's `dlq_entries` (no silent drop) and counted in
900    ///   errors + dlq. The driver routes those entries before commit.
901    /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
902    ///   in errors ONLY (a deliberate, configured drop -- not a silent vanish).
903    /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
904    ///   failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
905    ///   consistent with the P1 ack barrier, so the block is re-delivered rather
906    ///   than partially committed.
907    ///
908    /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
909    ///
910    /// # Errors
911    ///
912    /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
913    /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
914    #[cfg(feature = "transport")]
915    fn parse_block<T: crate::transport::CommitToken>(
916        &self,
917        batch: WorkBatch<T>,
918    ) -> Result<ParsedBatch<'_, T>, EngineError> {
919        use super::ParseErrorAction;
920        use crate::transport::PayloadFormat;
921
922        let WorkBatch {
923            records,
924            commit_tokens,
925            mut dlq_entries,
926        } = batch;
927
928        // Parse each record on the pool. The pool's map_owned applies the scaler
929        // semaphore per item, so the parse phase obeys the CPU cap exactly as the
930        // legacy parsed path does. map_owned preserves input order, so a record
931        // and its parse result stay aligned without threading an explicit index.
932        let parsed_each: Vec<(Record, Result<ParsedPayload, String>)> =
933            self.pool.map_owned(records, |record| {
934                let format: PayloadFormat = record.metadata.format;
935                let result =
936                    codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
937                (record, result)
938            });
939
940        let action = self.config.parse_error_action;
941        let mut keep_records = Vec::new();
942        let mut keep_parsed = Vec::new();
943        for (record, result) in parsed_each {
944            match result {
945                Ok(payload) => {
946                    keep_records.push(record);
947                    keep_parsed.push(payload);
948                }
949                Err(reason) => match action {
950                    ParseErrorAction::Dlq => {
951                        // No silent drop: the unparseable record's bytes go to DLQ.
952                        self.stats.incr_errors();
953                        self.stats.incr_dlq();
954                        dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
955                            payload: record.payload.to_vec(),
956                            key: record.key.clone(),
957                            reason,
958                        });
959                    }
960                    ParseErrorAction::Skip => {
961                        // Deliberate, configured drop -- counted in errors but NOT
962                        // dead-lettered. This is opt-in loss, not a silent vanish.
963                        self.stats.incr_errors();
964                    }
965                    ParseErrorAction::FailBatch => {
966                        // Terminal: the whole block fails its commit (ack barrier).
967                        self.stats.incr_errors();
968                        return Err(EngineError::ParseBatchFailed(reason));
969                    }
970                },
971            }
972        }
973
974        Ok(ParsedBatch {
975            records: keep_records,
976            parsed: keep_parsed,
977            commit_tokens,
978            dlq_entries,
979            interner: &self.interner,
980        })
981    }
982
983    /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
984    /// returning an RAII lease that releases them on drop.
985    ///
986    /// Drives the in-flight ingress accounting for the WorkBatch driver: the
987    /// lease is taken in [`drive_block`](Self::drive_block) and releases the
988    /// bytes on every block exit path via `Drop`.
989    ///
990    /// [`MemoryGuard`]: crate::memory::MemoryGuard
991    #[cfg(feature = "memory")]
992    pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
993        &self,
994        batch: &WorkBatch<T>,
995    ) -> Option<super::IngressLease<'_>> {
996        let guard = self.memory_guard.as_ref()?;
997        let bytes = batch.total_payload_bytes() as u64;
998        guard.add_bytes(bytes);
999        Some(super::IngressLease::new(guard, bytes))
1000    }
1001}
1002
1003/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
1004/// of [`Record`]s at a time, so the streaming driver never pre-materialises
1005/// every sub-block vector up front.
1006///
1007/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
1008/// order) from the source until the accumulated `payload.len()` would overshoot
1009/// `target_bytes`, then returns that sub-block; the remaining records stay
1010/// un-pulled in the source iterator. Splitting contract:
1011///
1012/// - records are kept in order;
1013/// - FLOOR of one record per sub-block: a record whose payload alone meets or
1014///   exceeds `target_bytes` is its own single-record sub-block (never stalls);
1015/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
1016/// - an exhausted source yields `None`.
1017///
1018/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
1019/// sub-block vector before the loop processed the first one. Here, at most ONE
1020/// sub-block vector is allocated at a time -- the one the loop is about to lease
1021/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
1022#[cfg(feature = "transport")]
1023struct SubBlockDrain {
1024    /// Source records, drained in order. `peeked` holds a record we pulled but
1025    /// could not fit into the sub-block being built (it starts the next one).
1026    iter: std::vec::IntoIter<Record>,
1027    peeked: Option<Record>,
1028    target_bytes: u64,
1029}
1030
1031#[cfg(feature = "transport")]
1032impl SubBlockDrain {
1033    fn new(records: Vec<Record>, target_bytes: u64) -> Self {
1034        Self {
1035            iter: records.into_iter(),
1036            peeked: None,
1037            target_bytes,
1038        }
1039    }
1040
1041    /// Yield the next consecutive sub-block, or `None` when the source is
1042    /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
1043    fn next_sub_block(&mut self) -> Option<Vec<Record>> {
1044        // Start with the record carried over from the previous call (if any),
1045        // else pull the first record of this sub-block from the source.
1046        let first = self.peeked.take().or_else(|| self.iter.next())?;
1047        let mut current_bytes = first.payload.len() as u64;
1048        let mut current = vec![first];
1049
1050        // Pull more records while they fit. Floor 1: we already took one record
1051        // above, so an oversized record is still its own sub-block.
1052        for record in self.iter.by_ref() {
1053            let record_bytes = record.payload.len() as u64;
1054            if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1055                // Does not fit -- carry it to the next sub-block and stop here.
1056                self.peeked = Some(record);
1057                break;
1058            }
1059            current_bytes = current_bytes.saturating_add(record_bytes);
1060            current.push(record);
1061        }
1062        Some(current)
1063    }
1064}
1065
1066#[cfg(all(test, feature = "transport-memory"))]
1067#[path = "driver_tests.rs"]
1068mod tests;