hyperi_rustlib/worker/engine/driver.rs
1// Project: hyperi-rustlib
2// File: src/worker/engine/driver.rs
3// Purpose: Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//! recv(max) -> WorkBatch (recv now yields a WorkBatch natively)
18//! -> apply_workbatch_dlq_policy (route/discard/reject inline-DLQ entries)
19//! -> lease_ingress_batch (memory accounting on the block's bytes)
20//! -> process(WorkBatch) (transforms parse ON DEMAND via codec::parse)
21//! -> sink(&out_batch).await (async send of the whole block)
22//! -> commit per CommitMode (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//! does NOT pre-parse. A transform that needs a field calls
38//! [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//! never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//! hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//! (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//! closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//! shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//! dedup. This keeps the batch-parse + interner throughput win for apps that
47//! opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67 /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68 /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69 /// skips the commit so the block is re-delivered. This is the engine-commits
70 /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71 Auto,
72 /// The sink owns the commit -- the engine does NOT commit. The sink is
73 /// handed the block (which carries `commit_tokens`) and decides when to
74 /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75 /// the former async run loop, lifted onto the block.
76 SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95/// [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96/// (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97/// the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99/// errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101/// terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109 /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110 pub records: Vec<Record>,
111
112 /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113 pub parsed: Vec<ParsedPayload>,
114
115 /// Input source acks for the whole block (decoupled from record count).
116 pub commit_tokens: Vec<T>,
117
118 /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119 /// that failed to parse (no-silent-drop).
120 pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122 /// Shared interner for hot routing-field-name dedup. The first time a field
123 /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124 /// bump. Reused from the engine so dedup persists across blocks.
125 pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129 /// Number of successfully-parsed records.
130 #[must_use]
131 pub fn len(&self) -> usize {
132 self.records.len()
133 }
134
135 /// Whether there are no successfully-parsed records.
136 #[must_use]
137 pub fn is_empty(&self) -> bool {
138 self.records.is_empty()
139 }
140
141 /// Intern a routing-field name through the shared interner.
142 ///
143 /// Use this to dedup the routing-key field name once per block rather than
144 /// re-allocating it per record.
145 #[must_use]
146 pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147 self.interner.intern(name)
148 }
149}
150
151/// Optional periodic ticker shared by the run-loops (flush timers, periodic
152/// maintenance). Folds away the identical interval-setup + select-arm the four
153/// loops would otherwise each carry. The ticker is COLD (fires on the order of
154/// the flush interval, not per block), so this extraction does not touch the
155/// hot recv path.
156#[cfg(feature = "transport")]
157struct LoopTicker<F> {
158 interval: Option<tokio::time::Interval>,
159 callback: Option<F>,
160}
161
162#[cfg(feature = "transport")]
163impl<F, Fut> LoopTicker<F>
164where
165 F: FnMut() -> Fut,
166 Fut: std::future::Future<Output = Result<(), EngineError>>,
167{
168 fn new(ticker: Option<(Duration, F)>) -> Self {
169 // Start the first tick one period out (not immediately) so the loop
170 // does not fire a tick before it has polled the source once.
171 let interval = ticker
172 .as_ref()
173 .map(|(d, _)| tokio::time::interval_at(tokio::time::Instant::now() + *d, *d));
174 Self {
175 interval,
176 callback: ticker.map(|(_, f)| f),
177 }
178 }
179
180 /// Yield when the next tick is due, or never if no ticker is configured.
181 /// Cancel-safe: `Interval::tick` is cancel-safe and the no-ticker arm pends,
182 /// so this sits directly in `tokio::select!`.
183 async fn wait(&mut self) {
184 match self.interval.as_mut() {
185 Some(i) => {
186 i.tick().await;
187 }
188 None => std::future::pending::<()>().await,
189 }
190 }
191
192 /// Run the ticker callback; a callback error is logged, not fatal.
193 async fn fire(&mut self, label: &str) {
194 if let Some(f) = self.callback.as_mut()
195 && let Err(e) = f().await
196 {
197 tracing::error!(error = %e, ticker = label, "Ticker failed");
198 }
199 }
200}
201
202impl BatchEngine {
203 /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
204 ///
205 /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
206 /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
207 /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
208 /// pay zero parse cost.
209 ///
210 /// - `process` runs on the loop task (cancellation-aware between awaits) and
211 /// may fan records out or in; it MUST preserve `commit_tokens` (use
212 /// [`WorkBatch::map_records`], which does so automatically).
213 /// - `sink` is async and receives the WHOLE out-batch by reference.
214 /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
215 /// or [`CommitMode::SinkManaged`] (sink owns commit).
216 /// - `ticker` is an optional `(interval, fn)` that fires on the interval
217 /// inside the select loop (flush timers, periodic maintenance).
218 ///
219 /// Stops cleanly when `shutdown` is cancelled.
220 ///
221 /// # Errors
222 ///
223 /// Returns [`EngineError::Transport`] if `recv` fails fatally,
224 /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
225 /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
226 /// the error returned by `process`.
227 ///
228 /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
229 /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
230 /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
231 /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
232 /// block is fetched and committed past them, which would silently skip the
233 /// never-sent records (data loss). On restart the source re-delivers from
234 /// the last committed watermark, preserving at-least-once. The app owns
235 /// restart/retry policy.
236 #[cfg(feature = "transport")]
237 #[allow(clippy::too_many_arguments)]
238 pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
239 &self,
240 receiver: &R,
241 shutdown: CancellationToken,
242 process: P,
243 mut sink: Sink,
244 commit: CommitMode,
245 ticker: Option<(Duration, Ticker)>,
246 ) -> Result<(), EngineError>
247 where
248 R: TransportReceiver,
249 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
250 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
251 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
252 Ticker: FnMut() -> TickerFut,
253 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
254 {
255 tracing::info!(
256 chunk_size = self.config.max_chunk_size,
257 commit = ?commit,
258 ticker = ticker.is_some(),
259 "BatchEngine (workbatch) starting"
260 );
261
262 let mut ticker = LoopTicker::new(ticker);
263
264 loop {
265 tokio::select! {
266 biased;
267
268 () = shutdown.cancelled() => {
269 tracing::info!("BatchEngine (workbatch) shutting down");
270 return Ok(());
271 }
272
273 () = ticker.wait() => ticker.fire("workbatch").await,
274
275 recv_result = receiver.recv(self.config.max_chunk_size) => {
276 let work_batch = recv_result.map_err(EngineError::Transport)?;
277 let Some(batch) = self.ingest_workbatch(work_batch)? else {
278 continue;
279 };
280 self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
281 }
282 }
283 }
284 }
285
286 /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
287 ///
288 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
289 /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
290 /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
291 /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
292 /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
293 /// is leased and processed.
294 ///
295 /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
296 /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
297 /// at-least-once is preserved: a sink error on any sub-block stops the block
298 /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
299 /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
300 /// the end.
301 ///
302 /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
303 /// one record, so a record larger than the target is still its own sub-block
304 /// and the loop never stalls). Taken as an explicit parameter so the path is
305 /// testable in isolation; `run_governed` (`governor` feature) supplies it
306 /// from the governor's byte budget.
307 ///
308 /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
309 /// acks are still the batch's input tokens, committed once at the end.
310 ///
311 /// # Errors
312 ///
313 /// Same as [`run_workbatch`](Self::run_workbatch).
314 #[cfg(feature = "transport")]
315 #[allow(clippy::too_many_arguments)]
316 pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
317 &self,
318 receiver: &R,
319 shutdown: CancellationToken,
320 process: P,
321 mut sink: Sink,
322 commit: CommitMode,
323 sub_block_bytes: u64,
324 ticker: Option<(Duration, Ticker)>,
325 ) -> Result<(), EngineError>
326 where
327 R: TransportReceiver,
328 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
329 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
330 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
331 Ticker: FnMut() -> TickerFut,
332 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
333 {
334 // SinkManaged is unrepresentable on the streaming path: sub-block views
335 // carry EMPTY tokens, so the sink never sees the block's source acks and
336 // cannot own the commit. Fail fast at startup instead of freezing the
337 // partition at runtime.
338 if matches!(commit, CommitMode::SinkManaged) {
339 return Err(EngineError::SinkManagedUnsupported);
340 }
341
342 tracing::info!(
343 chunk_size = self.config.max_chunk_size,
344 commit = ?commit,
345 sub_block_bytes,
346 ticker = ticker.is_some(),
347 "BatchEngine (workbatch streaming) starting"
348 );
349
350 let mut ticker = LoopTicker::new(ticker);
351
352 loop {
353 tokio::select! {
354 biased;
355
356 () = shutdown.cancelled() => {
357 tracing::info!("BatchEngine (workbatch streaming) shutting down");
358 return Ok(());
359 }
360
361 () = ticker.wait() => ticker.fire("workbatch streaming").await,
362
363 recv_result = receiver.recv(self.config.max_chunk_size) => {
364 let work_batch = recv_result.map_err(EngineError::Transport)?;
365 let Some(batch) = self.ingest_workbatch(work_batch)? else {
366 continue;
367 };
368 self.drive_block_streaming(
369 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
370 )
371 .await?;
372 }
373 }
374 }
375 }
376
377 /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
378 ///
379 /// This is what a self-regulating app calls instead of choosing between
380 /// [`run_workbatch`](Self::run_workbatch) and
381 /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
382 /// dispatches on whether the byte-budget lever is wired
383 /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
384 /// `ServiceRuntime` when `self_regulation.enabled = true`):
385 ///
386 /// - **Governor ON** (budget wired): streams each received block in
387 /// sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
388 /// peak in-flight memory to one sub-block, and folds each block's
389 /// `(bytes, process_time, ingest_interval)` into the AIMD loop via
390 /// [`observe`](crate::governor::ByteBudgetController::observe). The recv
391 /// `max` is capped to the budget's poll-safety
392 /// [`record_cap`](crate::governor::ByteBudgetController::record_cap).
393 /// While pressure is LOW the budget sits at its big start value, so the
394 /// block becomes a SINGLE sub-block -- no per-record overhead, behaviour
395 /// matches the whole-batch loop.
396 /// - **Governor OFF** (no budget): delegates verbatim to
397 /// [`run_workbatch`](Self::run_workbatch) -- byte-identical to
398 /// pre-governor behaviour.
399 ///
400 /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
401 /// SEPARATELY into the receive transport, not here -- this method is the
402 /// driver-side lever (sub-block sizing + AIMD), the gate is the
403 /// transport-side brake. The two share the same `UnifiedPressure`.
404 ///
405 /// # Errors
406 ///
407 /// Same as [`run_workbatch`](Self::run_workbatch).
408 #[cfg(all(feature = "transport", feature = "governor"))]
409 #[allow(clippy::too_many_arguments)]
410 pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
411 &self,
412 receiver: &R,
413 shutdown: CancellationToken,
414 process: P,
415 mut sink: Sink,
416 commit: CommitMode,
417 ticker: Option<(Duration, Ticker)>,
418 ) -> Result<(), EngineError>
419 where
420 R: TransportReceiver,
421 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
422 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
423 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
424 Ticker: FnMut() -> TickerFut,
425 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
426 {
427 // Governor OFF -> the original whole-batch loop, byte-for-byte. The
428 // whole-batch path DOES support SinkManaged (the sink receives the full
429 // block with its tokens), so the guard below must sit AFTER this
430 // delegate, not before it.
431 let Some(budget) = self.byte_budget.clone() else {
432 return self
433 .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
434 .await;
435 };
436
437 // Governor ON streams in sub-blocks whose views carry EMPTY tokens, so
438 // the sink can never own a SinkManaged commit -- reject it at startup
439 // rather than silently freeze the source offset. (Same guard as
440 // run_workbatch_streaming, which this path bypasses.)
441 if matches!(commit, CommitMode::SinkManaged) {
442 return Err(EngineError::SinkManagedUnsupported);
443 }
444
445 tracing::info!(
446 chunk_size = self.config.max_chunk_size,
447 commit = ?commit,
448 ticker = ticker.is_some(),
449 start_byte_budget = budget.byte_budget(),
450 "BatchEngine (governed) starting -- self-regulation ON"
451 );
452
453 let mut ticker = LoopTicker::new(ticker);
454
455 // Track the previous block's arrival instant so we can feed the AIMD
456 // loop a real ingest inter-arrival interval.
457 let mut last_recv: Option<std::time::Instant> = None;
458
459 loop {
460 // The recv limits bound a single poll by BOTH:
461 // - the SMALLER of the config chunk size and the budget's
462 // poll-safety record cap (a tiny-record flood cannot blow the
463 // count even within the byte budget), AND
464 // - the CURRENT byte budget (re-read per block), so a single poll
465 // never RETAINS more than ~one budget's worth of inbound payload
466 // BEFORE the sub-block split. This is the fix for the
467 // "byte budget does not bound RECEIVE memory" gap: without the
468 // byte cap, `recv(max)` could build a WorkBatch (and, for the
469 // Kafka recv-arena, allocate one arena) far larger than the
470 // budget before any sub-block lease ran.
471 let recv_limits = crate::transport::RecvLimits {
472 max_records: self.config.max_chunk_size.min(budget.record_cap()),
473 max_bytes: budget.byte_budget(),
474 };
475
476 tokio::select! {
477 biased;
478
479 () = shutdown.cancelled() => {
480 tracing::info!("BatchEngine (governed) shutting down");
481 return Ok(());
482 }
483
484 () = ticker.wait() => ticker.fire("governed").await,
485
486 recv_result = receiver.recv_limited(recv_limits) => {
487 let now = std::time::Instant::now();
488 let ingest_interval = last_recv
489 .map(|prev| now.saturating_duration_since(prev))
490 .unwrap_or_default();
491 last_recv = Some(now);
492
493 let work_batch = recv_result.map_err(EngineError::Transport)?;
494 let block_bytes = work_batch.total_payload_bytes() as u64;
495 let Some(batch) = self.ingest_workbatch(work_batch)? else {
496 // Empty block: still fold the timing so a quiet pipeline
497 // can grow its budget back. No bytes -> treated as slack.
498 budget.observe(0, Duration::ZERO, ingest_interval);
499 continue;
500 };
501
502 // Re-read the budget for THIS block: low pressure -> big
503 // budget -> one sub-block (no overhead); high pressure ->
504 // shrunk budget -> peak in-flight bounded to one sub-block.
505 let sub_block_bytes = budget.byte_budget();
506
507 let process_start = std::time::Instant::now();
508 self.drive_block_streaming(
509 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
510 )
511 .await?;
512 let process_time = process_start.elapsed();
513
514 // Fold the OBSERVED actual block bytes into the AIMD loop. A
515 // memory HARD override inside observe() shrinks immediately
516 // regardless of rho.
517 budget.observe(block_bytes, process_time, ingest_interval);
518
519 // Observability: surface the current budget + pressure as
520 // gauges so throttling is visible, not mysterious, AND the
521 // ACTUAL received block bytes so the gap between the budget
522 // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
523 // is measurable -- a persistent overshoot means the recv byte
524 // cap is not holding. The gate edges (pause/resume) are
525 // logged by the ObservingActuator.
526 #[cfg(feature = "metrics")]
527 {
528 metrics::gauge!("self_regulation_byte_budget")
529 .set(budget.byte_budget() as f64);
530 metrics::gauge!("self_regulation_recv_block_bytes")
531 .set(block_bytes as f64);
532 // `self_regulation_` domain prefix: a bare `pressure_ratio`
533 // collides with MemoryGuard's and ScalingPressure's own
534 // pressure gauges on the same registry.
535 metrics::gauge!("self_regulation_pressure_ratio")
536 .set(budget.pressure().level());
537 }
538 }
539 }
540 }
541 }
542
543 /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
544 ///
545 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
546 /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
547 /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
548 /// (records + aligned parsed payloads + shared
549 /// [`FieldInterner`](super::FieldInterner)). This keeps
550 /// the batch-parse + interner throughput win for apps that opt in.
551 ///
552 /// Records that fail to parse are handled per the configured
553 /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
554 /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
555 /// the parse-failure contract. `process_parsed` returns the final
556 /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
557 ///
558 /// # Errors
559 ///
560 /// Same as [`run_workbatch`](Self::run_workbatch).
561 #[cfg(feature = "transport")]
562 #[allow(clippy::too_many_arguments)]
563 pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
564 &self,
565 receiver: &R,
566 shutdown: CancellationToken,
567 process_parsed: P,
568 mut sink: Sink,
569 commit: CommitMode,
570 ticker: Option<(Duration, Ticker)>,
571 ) -> Result<(), EngineError>
572 where
573 R: TransportReceiver,
574 P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
575 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
576 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
577 Ticker: FnMut() -> TickerFut,
578 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
579 {
580 tracing::info!(
581 chunk_size = self.config.max_chunk_size,
582 commit = ?commit,
583 ticker = ticker.is_some(),
584 "BatchEngine (workbatch parsed) starting"
585 );
586
587 let mut ticker = LoopTicker::new(ticker);
588
589 loop {
590 tokio::select! {
591 biased;
592
593 () = shutdown.cancelled() => {
594 tracing::info!("BatchEngine (workbatch parsed) shutting down");
595 return Ok(());
596 }
597
598 () = ticker.wait() => ticker.fire("workbatch parsed").await,
599
600 recv_result = receiver.recv(self.config.max_chunk_size) => {
601 let recv_batch = recv_result.map_err(EngineError::Transport)?;
602 let Some(batch) = self.ingest_workbatch(recv_batch)? else {
603 continue;
604 };
605 // Wrap the parse-then-process so drive_block stays generic.
606 // parse_block honours ParseErrorAction: FailBatch surfaces a
607 // terminal EngineError here (no commit), Dlq carries entries
608 // forward for the driver to route, Skip drops silently+counted.
609 let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
610 let parsed = self.parse_block(b)?;
611 process_parsed(parsed)
612 };
613 self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
614 }
615 }
616 }
617 }
618
619 /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
620 /// entries per the configured policy, then return the batch (with its
621 /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
622 /// the block has no records (caller should `continue`).
623 ///
624 /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
625 /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
626 /// [`WorkBatch::dlq_entries`] and are routed here via
627 /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
628 /// before processing. Memory accounting is performed in
629 /// [`drive_block`](Self::drive_block).
630 #[cfg(feature = "transport")]
631 fn ingest_workbatch<T: crate::transport::CommitToken>(
632 &self,
633 batch: WorkBatch<T>,
634 ) -> Result<Option<WorkBatch<T>>, EngineError> {
635 // Route/discard/reject inline-DLQ entries per the configured policy --
636 // never silently dropped. The batch comes back with its dlq_entries
637 // consumed so the process stage sees a clean block.
638 let batch = self.apply_workbatch_dlq_policy(batch)?;
639 // Skip ONLY a truly-empty block. A block with no records but with
640 // commit_tokens is the all-filtered case (every record was dropped/
641 // DLQ-routed by an inbound filter): those acks must still be committed
642 // so the source advances past the filtered records -- returning None
643 // here would strand them (stalled Kafka offset / leaked Redis PEL).
644 if batch.records.is_empty() && batch.commit_tokens.is_empty() {
645 return Ok(None);
646 }
647 Ok(Some(batch))
648 }
649
650 /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
651 ///
652 /// Shared by both [`run_workbatch`](Self::run_workbatch) and
653 /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
654 /// between the two is the `process` closure they pass.
655 #[cfg(feature = "transport")]
656 async fn drive_block<R, P, Sink, SinkFut>(
657 &self,
658 receiver: &R,
659 batch: WorkBatch<R::Token>,
660 process: &P,
661 sink: &mut Sink,
662 commit: CommitMode,
663 ) -> Result<(), EngineError>
664 where
665 R: TransportReceiver,
666 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
667 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
668 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
669 {
670 // Account the in-flight ingress bytes against the MemoryGuard; the lease
671 // releases on every exit path of this block (sink-error early return,
672 // commit, ?-return) via Drop.
673 #[cfg(feature = "memory")]
674 let _ingress_lease = self.lease_ingress_batch(&batch);
675
676 // process() may fan out / fan in; it preserves the input commit_tokens.
677 // Capture the input ack count so a contract breach is LOGGED rather than
678 // silently freezing the source offset: a closure that rebuilds its
679 // output with WorkBatch::from_records (instead of map_records) drops the
680 // tokens to zero, the Auto commit below commits `&[]`, and the partition
681 // stalls with no diagnostic. One len() compare per block (not per
682 // record) -- nil hot-path cost.
683 let input_token_count = batch.commit_tokens.len();
684
685 let mut out_batch = process(batch)?;
686
687 if out_batch.commit_tokens.len() != input_token_count {
688 tracing::warn!(
689 input_tokens = input_token_count,
690 output_tokens = out_batch.commit_tokens.len(),
691 "process() changed the commit-token count -- the run contract is \
692 that process preserves source acks (transform records, not \
693 tokens). A drop toward zero will under-commit and stall the \
694 source offset; use map_records, not WorkBatch::from_records."
695 );
696 }
697
698 // Route any parse/process-generated DLQ entries the out-batch carries,
699 // through the SAME policy + route point as the inbound-filter entries
700 // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
701 // sink/commit, so a parse/process dead-letter can never vanish on the
702 // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
703 // Route sink Err) is a terminal ack-barrier error -- the commit is
704 // skipped and the whole block re-delivered, so no later ordered commit
705 // advances past these undelivered dead-letters. Silent discard is opt-in
706 // only (FilterDlqPolicy::DiscardWithMetric).
707 if !out_batch.dlq_entries.is_empty() {
708 let entries = std::mem::take(&mut out_batch.dlq_entries);
709 if let Err(e) = self.route_dlq_entries(entries) {
710 tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
711 return Err(e);
712 }
713 }
714
715 // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
716 //
717 // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
718 // TERMINAL error -- it stops the run loop. The source commit is ordered
719 // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
720 // logged and continued, the NEXT block's commit would advance the
721 // committed watermark PAST this block's never-sent offsets, silently
722 // skipping records (data loss). Stopping the loop leaves THIS block's
723 // tokens uncommitted, so the source re-delivers from the last committed
724 // watermark on restart -- no later block can commit ahead of the
725 // failure. The app owns restart/retry policy; the engine never invents
726 // a silent skip.
727 // Skip the sink when there is nothing to send (e.g. every record in the
728 // block was filtered out): the sink has no work, but the block's
729 // commit_tokens -- which include the filtered records' acks -- must still
730 // be committed below so the source advances past them. (The streaming
731 // path gets this for free: a zero-record block runs zero sub-blocks and
732 // still commits once at the end.)
733 if !out_batch.records.is_empty()
734 && let Err(e) = sink(&out_batch).await
735 {
736 tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
737 return Err(e);
738 }
739
740 // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
741 // output record count. This is the at-least-once block contract.
742 match commit {
743 CommitMode::Auto => {
744 // A commit failure is ALSO a terminal ack-barrier failure: a
745 // failed ordered commit must not be followed by a later block's
746 // commit advancing the watermark past these uncommitted offsets.
747 if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
748 tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
749 return Err(EngineError::Transport(e));
750 }
751 }
752 CommitMode::SinkManaged => {
753 // The sink owns the commit -- the engine does not commit here.
754 }
755 }
756 Ok(())
757 }
758
759 /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
760 /// bounded to ONE sub-block, the source acks commit once after the final
761 /// sub-block.
762 ///
763 /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
764 /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
765 /// sub-block never multiplies the source acks. Each sub-block's ingress lease
766 /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
767 /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
768 /// block.
769 ///
770 /// On ANY sub-block sink error the block stops and the commit is skipped (the
771 /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
772 /// propagates out and stops the run loop, so no LATER block's ordered commit
773 /// can advance the cumulative watermark past these never-committed offsets
774 /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
775 /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
776 /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
777 /// commit failure is likewise terminal.
778 #[cfg(feature = "transport")]
779 async fn drive_block_streaming<R, P, Sink, SinkFut>(
780 &self,
781 receiver: &R,
782 batch: WorkBatch<R::Token>,
783 process: &P,
784 sink: &mut Sink,
785 commit: CommitMode,
786 sub_block_bytes: u64,
787 ) -> Result<(), EngineError>
788 where
789 R: TransportReceiver,
790 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
791 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
792 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
793 {
794 // Carry the WHOLE block's source acks aside; the sub-block views below
795 // commit EMPTY tokens. The batch's tokens commit ONCE after the final
796 // sub-block (at-least-once on the whole block). dlq_entries were already
797 // routed by ingest_workbatch, so the block here carries none.
798 let WorkBatch {
799 records,
800 commit_tokens,
801 ..
802 } = batch;
803
804 // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
805 // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
806 // pulls it -- it never pre-materialises every sub-block vector up front,
807 // so the only sub-block resident is the one currently being leased and
808 // sunk (the streaming peak-memory contract holds for the SPLIT itself,
809 // not just the lease).
810 let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
811
812 while let Some(sub_records) = sub_blocks.next_sub_block() {
813 // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
814 // path of this iteration (sink-error early return, ?-return, or the
815 // end of the loop body) via Drop -- BEFORE the next sub-block leases.
816 // Peak in-flight lease is therefore one sub-block, never the block.
817 let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
818 #[cfg(feature = "memory")]
819 let _sub_lease = self.lease_ingress_batch(&sub_block);
820
821 // process() may fan out / fan in within the sub-block; it preserves
822 // the (empty) commit_tokens of the sub-block view.
823 let mut out_sub = process(sub_block)?;
824
825 // Route any parse/process-generated DLQ entries this sub-block
826 // carries BEFORE its sink -- same single policy + route point as the
827 // whole-batch path and the inbound-filter entries. Fallible: a route
828 // failure is terminal (ack barrier) so the commit for the WHOLE block
829 // is skipped and it is re-delivered -- a dead-letter is never lost on
830 // the path to a source commit.
831 if !out_sub.dlq_entries.is_empty() {
832 let entries = std::mem::take(&mut out_sub.dlq_entries);
833 if let Err(e) = self.route_dlq_entries(entries) {
834 tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
835 return Err(e);
836 }
837 }
838
839 // Sink this sub-block. A sink error stops the block and skips the
840 // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
841 // propagate so the run loop stops -- a later block's ordered commit
842 // must never advance the cumulative watermark past this block's
843 // uncommitted offsets.
844 if let Err(e) = sink(&out_sub).await {
845 tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
846 return Err(e);
847 }
848 // _sub_lease drops here -> bytes released before the next sub-block.
849 }
850
851 // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
852 match commit {
853 CommitMode::Auto => {
854 // Commit failure is terminal (ack barrier) -- same reasoning as
855 // the sink-error path above.
856 if let Err(e) = receiver.commit(&commit_tokens).await {
857 tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
858 return Err(EngineError::Transport(e));
859 }
860 }
861 CommitMode::SinkManaged => {
862 // The sink owns the commit -- the engine does not commit here.
863 }
864 }
865 Ok(())
866 }
867
868 /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
869 ///
870 /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
871 /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
872 /// sub-block shapes) ergonomic. Same splitting contract as
873 /// [`SubBlockDrain::next_sub_block`].
874 #[cfg(all(test, feature = "transport"))]
875 fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
876 let mut drain = SubBlockDrain::new(records, target_bytes);
877 let mut out = Vec::new();
878 while let Some(sub) = drain.next_sub_block() {
879 out.push(sub);
880 }
881 out
882 }
883
884 /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
885 /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
886 ///
887 /// Parses each record's payload via [`codec::parse`] on the worker pool
888 /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
889 /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
890 /// the engine's `parse_error_action` -- the SAME contract the legacy
891 /// `process_mid_tier` honoured (previously the parsed path hardcoded
892 /// route-to-DLQ, ignoring the config):
893 ///
894 /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
895 /// appended to the batch's `dlq_entries` (no silent drop) and counted in
896 /// errors + dlq. The driver routes those entries before commit.
897 /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
898 /// in errors ONLY (a deliberate, configured drop -- not a silent vanish).
899 /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
900 /// failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
901 /// consistent with the P1 ack barrier, so the block is re-delivered rather
902 /// than partially committed.
903 ///
904 /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
905 ///
906 /// # Errors
907 ///
908 /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
909 /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
910 #[cfg(feature = "transport")]
911 fn parse_block<T: crate::transport::CommitToken>(
912 &self,
913 batch: WorkBatch<T>,
914 ) -> Result<ParsedBatch<'_, T>, EngineError> {
915 use super::ParseErrorAction;
916 use crate::transport::PayloadFormat;
917
918 let WorkBatch {
919 records,
920 commit_tokens,
921 mut dlq_entries,
922 } = batch;
923
924 // Parse each record on the pool. The pool's map_owned applies the scaler
925 // semaphore per item, so the parse phase obeys the CPU cap exactly as the
926 // legacy parsed path does. map_owned preserves input order, so a record
927 // and its parse result stay aligned without threading an explicit index.
928 let parsed_each: Vec<(Record, Result<ParsedPayload, String>)> =
929 self.pool.map_owned(records, |record| {
930 let format: PayloadFormat = record.metadata.format;
931 let result =
932 codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
933 (record, result)
934 });
935
936 let action = self.config.parse_error_action;
937 let mut keep_records = Vec::new();
938 let mut keep_parsed = Vec::new();
939 for (record, result) in parsed_each {
940 match result {
941 Ok(payload) => {
942 keep_records.push(record);
943 keep_parsed.push(payload);
944 }
945 Err(reason) => match action {
946 ParseErrorAction::Dlq => {
947 // No silent drop: the unparseable record's bytes go to DLQ.
948 self.stats.incr_errors();
949 self.stats.incr_dlq();
950 dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
951 payload: record.payload.to_vec(),
952 key: record.key.clone(),
953 reason,
954 });
955 }
956 ParseErrorAction::Skip => {
957 // Deliberate, configured drop -- counted in errors but NOT
958 // dead-lettered. This is opt-in loss, not a silent vanish.
959 self.stats.incr_errors();
960 }
961 ParseErrorAction::FailBatch => {
962 // Terminal: the whole block fails its commit (ack barrier).
963 self.stats.incr_errors();
964 return Err(EngineError::ParseBatchFailed(reason));
965 }
966 },
967 }
968 }
969
970 Ok(ParsedBatch {
971 records: keep_records,
972 parsed: keep_parsed,
973 commit_tokens,
974 dlq_entries,
975 interner: &self.interner,
976 })
977 }
978
979 /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
980 /// returning an RAII lease that releases them on drop.
981 ///
982 /// Drives the in-flight ingress accounting for the WorkBatch driver: the
983 /// lease is taken in [`drive_block`](Self::drive_block) and releases the
984 /// bytes on every block exit path via `Drop`.
985 ///
986 /// [`MemoryGuard`]: crate::memory::MemoryGuard
987 #[cfg(feature = "memory")]
988 pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
989 &self,
990 batch: &WorkBatch<T>,
991 ) -> Option<super::IngressLease<'_>> {
992 let guard = self.memory_guard.as_ref()?;
993 let bytes = batch.total_payload_bytes() as u64;
994 guard.add_bytes(bytes);
995 Some(super::IngressLease::new(guard, bytes))
996 }
997}
998
999/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
1000/// of [`Record`]s at a time, so the streaming driver never pre-materialises
1001/// every sub-block vector up front.
1002///
1003/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
1004/// order) from the source until the accumulated `payload.len()` would overshoot
1005/// `target_bytes`, then returns that sub-block; the remaining records stay
1006/// un-pulled in the source iterator. Splitting contract:
1007///
1008/// - records are kept in order;
1009/// - FLOOR of one record per sub-block: a record whose payload alone meets or
1010/// exceeds `target_bytes` is its own single-record sub-block (never stalls);
1011/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
1012/// - an exhausted source yields `None`.
1013///
1014/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
1015/// sub-block vector before the loop processed the first one. Here, at most ONE
1016/// sub-block vector is allocated at a time -- the one the loop is about to lease
1017/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
1018#[cfg(feature = "transport")]
1019struct SubBlockDrain {
1020 /// Source records, drained in order. `peeked` holds a record we pulled but
1021 /// could not fit into the sub-block being built (it starts the next one).
1022 iter: std::vec::IntoIter<Record>,
1023 peeked: Option<Record>,
1024 target_bytes: u64,
1025}
1026
1027#[cfg(feature = "transport")]
1028impl SubBlockDrain {
1029 fn new(records: Vec<Record>, target_bytes: u64) -> Self {
1030 Self {
1031 iter: records.into_iter(),
1032 peeked: None,
1033 target_bytes,
1034 }
1035 }
1036
1037 /// Yield the next consecutive sub-block, or `None` when the source is
1038 /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
1039 fn next_sub_block(&mut self) -> Option<Vec<Record>> {
1040 // Start with the record carried over from the previous call (if any),
1041 // else pull the first record of this sub-block from the source.
1042 let first = self.peeked.take().or_else(|| self.iter.next())?;
1043 let mut current_bytes = first.payload.len() as u64;
1044 let mut current = vec![first];
1045
1046 // Pull more records while they fit. Floor 1: we already took one record
1047 // above, so an oversized record is still its own sub-block.
1048 for record in self.iter.by_ref() {
1049 let record_bytes = record.payload.len() as u64;
1050 if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1051 // Does not fit -- carry it to the next sub-block and stop here.
1052 self.peeked = Some(record);
1053 break;
1054 }
1055 current_bytes = current_bytes.saturating_add(record_bytes);
1056 current.push(record);
1057 }
1058 Some(current)
1059 }
1060}
1061
1062#[cfg(all(test, feature = "transport-memory"))]
1063#[path = "driver_tests.rs"]
1064mod tests;