hyperi_rustlib/worker/engine/driver.rs
1// Project: hyperi-rustlib
2// File: src/worker/engine/driver.rs
3// Purpose: Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//! recv(max) -> WorkBatch (recv now yields a WorkBatch natively)
18//! -> apply_workbatch_dlq_policy (route/discard/reject inline-DLQ entries)
19//! -> lease_ingress_batch (memory accounting on the block's bytes)
20//! -> process(WorkBatch) (transforms parse ON DEMAND via codec::parse)
21//! -> sink(&out_batch).await (async send of the whole block)
22//! -> commit per CommitMode (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//! does NOT pre-parse. A transform that needs a field calls
38//! [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//! never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//! hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//! (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//! closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//! shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//! dedup. This keeps the batch-parse + interner throughput win for apps that
47//! opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67 /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68 /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69 /// skips the commit so the block is re-delivered. This is the engine-commits
70 /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71 Auto,
72 /// The sink owns the commit -- the engine does NOT commit. The sink is
73 /// handed the block (which carries `commit_tokens`) and decides when to
74 /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75 /// the former async run loop, lifted onto the block.
76 SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95/// [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96/// (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97/// the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99/// errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101/// terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109 /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110 pub records: Vec<Record>,
111
112 /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113 pub parsed: Vec<ParsedPayload>,
114
115 /// Input source acks for the whole block (decoupled from record count).
116 pub commit_tokens: Vec<T>,
117
118 /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119 /// that failed to parse (no-silent-drop).
120 pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122 /// Shared interner for hot routing-field-name dedup. The first time a field
123 /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124 /// bump. Reused from the engine so dedup persists across blocks.
125 pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129 /// Number of successfully-parsed records.
130 #[must_use]
131 pub fn len(&self) -> usize {
132 self.records.len()
133 }
134
135 /// Whether there are no successfully-parsed records.
136 #[must_use]
137 pub fn is_empty(&self) -> bool {
138 self.records.is_empty()
139 }
140
141 /// Intern a routing-field name through the shared interner.
142 ///
143 /// Use this to dedup the routing-key field name once per block rather than
144 /// re-allocating it per record.
145 #[must_use]
146 pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147 self.interner.intern(name)
148 }
149}
150
151/// Optional periodic ticker shared by the run-loops (flush timers, periodic
152/// maintenance). Folds away the identical interval-setup + select-arm the four
153/// loops would otherwise each carry. The ticker is COLD (fires on the order of
154/// the flush interval, not per block), so this extraction does not touch the
155/// hot recv path.
156#[cfg(feature = "transport")]
157struct LoopTicker<F> {
158 interval: Option<tokio::time::Interval>,
159 callback: Option<F>,
160}
161
162#[cfg(feature = "transport")]
163impl<F, Fut> LoopTicker<F>
164where
165 F: FnMut() -> Fut,
166 Fut: std::future::Future<Output = Result<(), EngineError>>,
167{
168 fn new(ticker: Option<(Duration, F)>) -> Self {
169 // Start the first tick one period out (not immediately) so the loop
170 // does not fire a tick before it has polled the source once.
171 let interval = ticker
172 .as_ref()
173 .map(|(d, _)| tokio::time::interval_at(tokio::time::Instant::now() + *d, *d));
174 Self {
175 interval,
176 callback: ticker.map(|(_, f)| f),
177 }
178 }
179
180 /// Yield when the next tick is due, or never if no ticker is configured.
181 /// Cancel-safe: `Interval::tick` is cancel-safe and the no-ticker arm pends,
182 /// so this sits directly in `tokio::select!`.
183 async fn wait(&mut self) {
184 match self.interval.as_mut() {
185 Some(i) => {
186 i.tick().await;
187 }
188 None => std::future::pending::<()>().await,
189 }
190 }
191
192 /// Run the ticker callback; a callback error is logged, not fatal.
193 async fn fire(&mut self, label: &str) {
194 if let Some(f) = self.callback.as_mut()
195 && let Err(e) = f().await
196 {
197 tracing::error!(error = %e, ticker = label, "Ticker failed");
198 }
199 }
200}
201
202impl BatchEngine {
203 /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
204 ///
205 /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
206 /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
207 /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
208 /// pay zero parse cost.
209 ///
210 /// - `process` runs on the loop task (cancellation-aware between awaits) and
211 /// may fan records out or in; it MUST preserve `commit_tokens` (use
212 /// [`WorkBatch::map_records`], which does so automatically).
213 /// - `sink` is async and receives the WHOLE out-batch by reference.
214 /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
215 /// or [`CommitMode::SinkManaged`] (sink owns commit).
216 /// - `ticker` is an optional `(interval, fn)` that fires on the interval
217 /// inside the select loop (flush timers, periodic maintenance).
218 ///
219 /// Stops cleanly when `shutdown` is cancelled.
220 ///
221 /// # Errors
222 ///
223 /// Returns [`EngineError::Transport`] if `recv` fails fatally,
224 /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
225 /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
226 /// the error returned by `process`.
227 ///
228 /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
229 /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
230 /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
231 /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
232 /// block is fetched and committed past them, which would silently skip the
233 /// never-sent records (data loss). On restart the source re-delivers from
234 /// the last committed watermark, preserving at-least-once. The app owns
235 /// restart/retry policy.
236 #[cfg(feature = "transport")]
237 #[allow(clippy::too_many_arguments)]
238 pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
239 &self,
240 receiver: &R,
241 shutdown: CancellationToken,
242 process: P,
243 mut sink: Sink,
244 commit: CommitMode,
245 ticker: Option<(Duration, Ticker)>,
246 ) -> Result<(), EngineError>
247 where
248 R: TransportReceiver,
249 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
250 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
251 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
252 Ticker: FnMut() -> TickerFut,
253 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
254 {
255 tracing::info!(
256 chunk_size = self.config.max_chunk_size,
257 commit = ?commit,
258 ticker = ticker.is_some(),
259 "BatchEngine (workbatch) starting"
260 );
261
262 let mut ticker = LoopTicker::new(ticker);
263
264 loop {
265 tokio::select! {
266 biased;
267
268 () = shutdown.cancelled() => {
269 tracing::info!("BatchEngine (workbatch) shutting down");
270 return Ok(());
271 }
272
273 () = ticker.wait() => ticker.fire("workbatch").await,
274
275 recv_result = receiver.recv(self.config.max_chunk_size) => {
276 let work_batch = recv_result.map_err(EngineError::Transport)?;
277 let Some(batch) = self.ingest_workbatch(work_batch)? else {
278 continue;
279 };
280 self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
281 }
282 }
283 }
284 }
285
286 /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
287 ///
288 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
289 /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
290 /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
291 /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
292 /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
293 /// is leased and processed.
294 ///
295 /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
296 /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
297 /// at-least-once is preserved: a sink error on any sub-block stops the block
298 /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
299 /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
300 /// the end.
301 ///
302 /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
303 /// one record, so a record larger than the target is still its own sub-block
304 /// and the loop never stalls). Taken as an explicit parameter so the path is
305 /// testable in isolation; `run_governed` (`governor` feature) supplies it
306 /// from the governor's byte budget.
307 ///
308 /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
309 /// acks are still the batch's input tokens, committed once at the end.
310 ///
311 /// # Errors
312 ///
313 /// Same as [`run_workbatch`](Self::run_workbatch).
314 #[cfg(feature = "transport")]
315 #[allow(clippy::too_many_arguments)]
316 pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
317 &self,
318 receiver: &R,
319 shutdown: CancellationToken,
320 process: P,
321 mut sink: Sink,
322 commit: CommitMode,
323 sub_block_bytes: u64,
324 ticker: Option<(Duration, Ticker)>,
325 ) -> Result<(), EngineError>
326 where
327 R: TransportReceiver,
328 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
329 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
330 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
331 Ticker: FnMut() -> TickerFut,
332 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
333 {
334 // SinkManaged is unrepresentable on the streaming path: sub-block views
335 // carry EMPTY tokens, so the sink never sees the block's source acks and
336 // cannot own the commit. Fail fast at startup instead of freezing the
337 // partition at runtime.
338 if matches!(commit, CommitMode::SinkManaged) {
339 return Err(EngineError::SinkManagedUnsupported);
340 }
341
342 tracing::info!(
343 chunk_size = self.config.max_chunk_size,
344 commit = ?commit,
345 sub_block_bytes,
346 ticker = ticker.is_some(),
347 "BatchEngine (workbatch streaming) starting"
348 );
349
350 let mut ticker = LoopTicker::new(ticker);
351
352 loop {
353 tokio::select! {
354 biased;
355
356 () = shutdown.cancelled() => {
357 tracing::info!("BatchEngine (workbatch streaming) shutting down");
358 return Ok(());
359 }
360
361 () = ticker.wait() => ticker.fire("workbatch streaming").await,
362
363 recv_result = receiver.recv(self.config.max_chunk_size) => {
364 let work_batch = recv_result.map_err(EngineError::Transport)?;
365 let Some(batch) = self.ingest_workbatch(work_batch)? else {
366 continue;
367 };
368 self.drive_block_streaming(
369 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
370 )
371 .await?;
372 }
373 }
374 }
375 }
376
377 /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
378 ///
379 /// This is what a self-regulating app calls instead of choosing between
380 /// [`run_workbatch`](Self::run_workbatch) and
381 /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
382 /// dispatches on whether the byte-budget lever is wired
383 /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
384 /// `ServiceRuntime` when `self_regulation.enabled = true`):
385 ///
386 /// - **Governor ON** (budget wired): streams each received block in
387 /// sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
388 /// peak in-flight memory to one sub-block, and folds each block's
389 /// `(bytes, process_time, ingest_interval)` into the AIMD loop via
390 /// [`observe`](crate::governor::ByteBudgetController::observe). The recv
391 /// `max` is capped to the budget's poll-safety
392 /// [`record_cap`](crate::governor::ByteBudgetController::record_cap).
393 /// While pressure is LOW the budget sits at its big start value, so the
394 /// block becomes a SINGLE sub-block -- no per-record overhead, behaviour
395 /// matches the whole-batch loop.
396 /// - **Governor OFF** (no budget): delegates verbatim to
397 /// [`run_workbatch`](Self::run_workbatch) -- byte-identical to
398 /// pre-governor behaviour.
399 ///
400 /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
401 /// SEPARATELY into the receive transport, not here -- this method is the
402 /// driver-side lever (sub-block sizing + AIMD), the gate is the
403 /// transport-side brake. The two share the same `UnifiedPressure`.
404 ///
405 /// # Errors
406 ///
407 /// Same as [`run_workbatch`](Self::run_workbatch).
408 #[cfg(all(feature = "transport", feature = "governor"))]
409 #[allow(clippy::too_many_arguments)]
410 pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
411 &self,
412 receiver: &R,
413 shutdown: CancellationToken,
414 process: P,
415 mut sink: Sink,
416 commit: CommitMode,
417 ticker: Option<(Duration, Ticker)>,
418 ) -> Result<(), EngineError>
419 where
420 R: TransportReceiver,
421 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
422 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
423 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
424 Ticker: FnMut() -> TickerFut,
425 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
426 {
427 // Governor OFF -> the original whole-batch loop, byte-for-byte. The
428 // whole-batch path DOES support SinkManaged (the sink receives the full
429 // block with its tokens), so the guard below must sit AFTER this
430 // delegate, not before it.
431 let Some(budget) = self.byte_budget.clone() else {
432 return self
433 .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
434 .await;
435 };
436
437 // Governor ON streams in sub-blocks whose views carry EMPTY tokens, so
438 // the sink can never own a SinkManaged commit -- reject it at startup
439 // rather than silently freeze the source offset. (Same guard as
440 // run_workbatch_streaming, which this path bypasses.)
441 if matches!(commit, CommitMode::SinkManaged) {
442 return Err(EngineError::SinkManagedUnsupported);
443 }
444
445 tracing::info!(
446 chunk_size = self.config.max_chunk_size,
447 commit = ?commit,
448 ticker = ticker.is_some(),
449 start_byte_budget = budget.byte_budget(),
450 "BatchEngine (governed) starting -- self-regulation ON"
451 );
452
453 let mut ticker = LoopTicker::new(ticker);
454
455 // Track the previous block's arrival instant so we can feed the AIMD
456 // loop a real ingest inter-arrival interval.
457 let mut last_recv: Option<std::time::Instant> = None;
458
459 loop {
460 // The recv limits bound a single poll by BOTH:
461 // - the SMALLER of the config chunk size and the budget's
462 // poll-safety record cap (a tiny-record flood cannot blow the
463 // count even within the byte budget), AND
464 // - the CURRENT byte budget (re-read per block), so a single poll
465 // never RETAINS more than ~one budget's worth of inbound payload
466 // BEFORE the sub-block split. This is the fix for the
467 // "byte budget does not bound RECEIVE memory" gap: without the
468 // byte cap, `recv(max)` could build a WorkBatch (and, for the
469 // Kafka recv-arena, allocate one arena) far larger than the
470 // budget before any sub-block lease ran.
471 let recv_limits = crate::transport::RecvLimits {
472 max_records: self.config.max_chunk_size.min(budget.record_cap()),
473 max_bytes: budget.byte_budget(),
474 };
475
476 tokio::select! {
477 biased;
478
479 () = shutdown.cancelled() => {
480 tracing::info!("BatchEngine (governed) shutting down");
481 return Ok(());
482 }
483
484 () = ticker.wait() => ticker.fire("governed").await,
485
486 recv_result = receiver.recv_limited(recv_limits) => {
487 let now = std::time::Instant::now();
488 let ingest_interval = last_recv
489 .map(|prev| now.saturating_duration_since(prev))
490 .unwrap_or_default();
491 last_recv = Some(now);
492
493 let work_batch = recv_result.map_err(EngineError::Transport)?;
494 let block_bytes = work_batch.total_payload_bytes() as u64;
495 let Some(batch) = self.ingest_workbatch(work_batch)? else {
496 // Empty block: still fold the timing so a quiet pipeline
497 // can grow its budget back. No bytes -> treated as slack.
498 budget.observe(0, Duration::ZERO, ingest_interval);
499 continue;
500 };
501
502 // Re-read the budget for THIS block: low pressure -> big
503 // budget -> one sub-block (no overhead); high pressure ->
504 // shrunk budget -> peak in-flight bounded to one sub-block.
505 let sub_block_bytes = budget.byte_budget();
506
507 let process_start = std::time::Instant::now();
508 self.drive_block_streaming(
509 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
510 )
511 .await?;
512 let process_time = process_start.elapsed();
513
514 // Fold the OBSERVED actual block bytes into the AIMD loop. A
515 // memory HARD override inside observe() shrinks immediately
516 // regardless of rho.
517 budget.observe(block_bytes, process_time, ingest_interval);
518
519 // Observability: surface the current budget + pressure as
520 // gauges so throttling is visible, not mysterious, AND the
521 // ACTUAL received block bytes so the gap between the budget
522 // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
523 // is measurable -- a persistent overshoot means the recv byte
524 // cap is not holding. The gate edges (pause/resume) are
525 // logged by the ObservingActuator.
526 #[cfg(feature = "metrics")]
527 {
528 metrics::gauge!("self_regulation_byte_budget")
529 .set(budget.byte_budget() as f64);
530 // dual-emit: drop OLD in next release (MIGRATIONS) --
531 // `_bytes` is the convention base-unit suffix.
532 metrics::gauge!("self_regulation_byte_budget_bytes")
533 .set(budget.byte_budget() as f64);
534 metrics::gauge!("self_regulation_recv_block_bytes")
535 .set(block_bytes as f64);
536 // `self_regulation_` domain prefix: a bare `pressure_ratio`
537 // collides with MemoryGuard's and ScalingPressure's own
538 // pressure gauges on the same registry.
539 metrics::gauge!("self_regulation_pressure_ratio")
540 .set(budget.pressure().level());
541 }
542 }
543 }
544 }
545 }
546
547 /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
548 ///
549 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
550 /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
551 /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
552 /// (records + aligned parsed payloads + shared
553 /// [`FieldInterner`](super::FieldInterner)). This keeps
554 /// the batch-parse + interner throughput win for apps that opt in.
555 ///
556 /// Records that fail to parse are handled per the configured
557 /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
558 /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
559 /// the parse-failure contract. `process_parsed` returns the final
560 /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
561 ///
562 /// # Errors
563 ///
564 /// Same as [`run_workbatch`](Self::run_workbatch).
565 #[cfg(feature = "transport")]
566 #[allow(clippy::too_many_arguments)]
567 pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
568 &self,
569 receiver: &R,
570 shutdown: CancellationToken,
571 process_parsed: P,
572 mut sink: Sink,
573 commit: CommitMode,
574 ticker: Option<(Duration, Ticker)>,
575 ) -> Result<(), EngineError>
576 where
577 R: TransportReceiver,
578 P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
579 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
580 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
581 Ticker: FnMut() -> TickerFut,
582 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
583 {
584 tracing::info!(
585 chunk_size = self.config.max_chunk_size,
586 commit = ?commit,
587 ticker = ticker.is_some(),
588 "BatchEngine (workbatch parsed) starting"
589 );
590
591 let mut ticker = LoopTicker::new(ticker);
592
593 loop {
594 tokio::select! {
595 biased;
596
597 () = shutdown.cancelled() => {
598 tracing::info!("BatchEngine (workbatch parsed) shutting down");
599 return Ok(());
600 }
601
602 () = ticker.wait() => ticker.fire("workbatch parsed").await,
603
604 recv_result = receiver.recv(self.config.max_chunk_size) => {
605 let recv_batch = recv_result.map_err(EngineError::Transport)?;
606 let Some(batch) = self.ingest_workbatch(recv_batch)? else {
607 continue;
608 };
609 // Wrap the parse-then-process so drive_block stays generic.
610 // parse_block honours ParseErrorAction: FailBatch surfaces a
611 // terminal EngineError here (no commit), Dlq carries entries
612 // forward for the driver to route, Skip drops silently+counted.
613 let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
614 let parsed = self.parse_block(b)?;
615 process_parsed(parsed)
616 };
617 self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
618 }
619 }
620 }
621 }
622
623 /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
624 /// entries per the configured policy, then return the batch (with its
625 /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
626 /// the block has no records (caller should `continue`).
627 ///
628 /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
629 /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
630 /// [`WorkBatch::dlq_entries`] and are routed here via
631 /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
632 /// before processing. Memory accounting is performed in
633 /// [`drive_block`](Self::drive_block).
634 #[cfg(feature = "transport")]
635 fn ingest_workbatch<T: crate::transport::CommitToken>(
636 &self,
637 batch: WorkBatch<T>,
638 ) -> Result<Option<WorkBatch<T>>, EngineError> {
639 // Route/discard/reject inline-DLQ entries per the configured policy --
640 // never silently dropped. The batch comes back with its dlq_entries
641 // consumed so the process stage sees a clean block.
642 let batch = self.apply_workbatch_dlq_policy(batch)?;
643 // Skip ONLY a truly-empty block. A block with no records but with
644 // commit_tokens is the all-filtered case (every record was dropped/
645 // DLQ-routed by an inbound filter): those acks must still be committed
646 // so the source advances past the filtered records -- returning None
647 // here would strand them (stalled Kafka offset / leaked Redis PEL).
648 if batch.records.is_empty() && batch.commit_tokens.is_empty() {
649 return Ok(None);
650 }
651 Ok(Some(batch))
652 }
653
654 /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
655 ///
656 /// Shared by both [`run_workbatch`](Self::run_workbatch) and
657 /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
658 /// between the two is the `process` closure they pass.
659 #[cfg(feature = "transport")]
660 async fn drive_block<R, P, Sink, SinkFut>(
661 &self,
662 receiver: &R,
663 batch: WorkBatch<R::Token>,
664 process: &P,
665 sink: &mut Sink,
666 commit: CommitMode,
667 ) -> Result<(), EngineError>
668 where
669 R: TransportReceiver,
670 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
671 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
672 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
673 {
674 // Account the in-flight ingress bytes against the MemoryGuard; the lease
675 // releases on every exit path of this block (sink-error early return,
676 // commit, ?-return) via Drop.
677 #[cfg(feature = "memory")]
678 let _ingress_lease = self.lease_ingress_batch(&batch);
679
680 // process() may fan out / fan in; it preserves the input commit_tokens.
681 // Capture the input ack count so a contract breach is LOGGED rather than
682 // silently freezing the source offset: a closure that rebuilds its
683 // output with WorkBatch::from_records (instead of map_records) drops the
684 // tokens to zero, the Auto commit below commits `&[]`, and the partition
685 // stalls with no diagnostic. One len() compare per block (not per
686 // record) -- nil hot-path cost.
687 let input_token_count = batch.commit_tokens.len();
688
689 let mut out_batch = process(batch)?;
690
691 if out_batch.commit_tokens.len() != input_token_count {
692 tracing::warn!(
693 input_tokens = input_token_count,
694 output_tokens = out_batch.commit_tokens.len(),
695 "process() changed the commit-token count -- the run contract is \
696 that process preserves source acks (transform records, not \
697 tokens). A drop toward zero will under-commit and stall the \
698 source offset; use map_records, not WorkBatch::from_records."
699 );
700 }
701
702 // Route any parse/process-generated DLQ entries the out-batch carries,
703 // through the SAME policy + route point as the inbound-filter entries
704 // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
705 // sink/commit, so a parse/process dead-letter can never vanish on the
706 // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
707 // Route sink Err) is a terminal ack-barrier error -- the commit is
708 // skipped and the whole block re-delivered, so no later ordered commit
709 // advances past these undelivered dead-letters. Silent discard is opt-in
710 // only (FilterDlqPolicy::DiscardWithMetric).
711 if !out_batch.dlq_entries.is_empty() {
712 let entries = std::mem::take(&mut out_batch.dlq_entries);
713 if let Err(e) = self.route_dlq_entries(entries) {
714 tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
715 return Err(e);
716 }
717 }
718
719 // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
720 //
721 // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
722 // TERMINAL error -- it stops the run loop. The source commit is ordered
723 // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
724 // logged and continued, the NEXT block's commit would advance the
725 // committed watermark PAST this block's never-sent offsets, silently
726 // skipping records (data loss). Stopping the loop leaves THIS block's
727 // tokens uncommitted, so the source re-delivers from the last committed
728 // watermark on restart -- no later block can commit ahead of the
729 // failure. The app owns restart/retry policy; the engine never invents
730 // a silent skip.
731 // Skip the sink when there is nothing to send (e.g. every record in the
732 // block was filtered out): the sink has no work, but the block's
733 // commit_tokens -- which include the filtered records' acks -- must still
734 // be committed below so the source advances past them. (The streaming
735 // path gets this for free: a zero-record block runs zero sub-blocks and
736 // still commits once at the end.)
737 if !out_batch.records.is_empty()
738 && let Err(e) = sink(&out_batch).await
739 {
740 tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
741 return Err(e);
742 }
743
744 // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
745 // output record count. This is the at-least-once block contract.
746 match commit {
747 CommitMode::Auto => {
748 // A commit failure is ALSO a terminal ack-barrier failure: a
749 // failed ordered commit must not be followed by a later block's
750 // commit advancing the watermark past these uncommitted offsets.
751 if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
752 tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
753 return Err(EngineError::Transport(e));
754 }
755 }
756 CommitMode::SinkManaged => {
757 // The sink owns the commit -- the engine does not commit here.
758 }
759 }
760 Ok(())
761 }
762
763 /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
764 /// bounded to ONE sub-block, the source acks commit once after the final
765 /// sub-block.
766 ///
767 /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
768 /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
769 /// sub-block never multiplies the source acks. Each sub-block's ingress lease
770 /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
771 /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
772 /// block.
773 ///
774 /// On ANY sub-block sink error the block stops and the commit is skipped (the
775 /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
776 /// propagates out and stops the run loop, so no LATER block's ordered commit
777 /// can advance the cumulative watermark past these never-committed offsets
778 /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
779 /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
780 /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
781 /// commit failure is likewise terminal.
782 #[cfg(feature = "transport")]
783 async fn drive_block_streaming<R, P, Sink, SinkFut>(
784 &self,
785 receiver: &R,
786 batch: WorkBatch<R::Token>,
787 process: &P,
788 sink: &mut Sink,
789 commit: CommitMode,
790 sub_block_bytes: u64,
791 ) -> Result<(), EngineError>
792 where
793 R: TransportReceiver,
794 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
795 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
796 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
797 {
798 // Carry the WHOLE block's source acks aside; the sub-block views below
799 // commit EMPTY tokens. The batch's tokens commit ONCE after the final
800 // sub-block (at-least-once on the whole block). dlq_entries were already
801 // routed by ingest_workbatch, so the block here carries none.
802 let WorkBatch {
803 records,
804 commit_tokens,
805 ..
806 } = batch;
807
808 // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
809 // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
810 // pulls it -- it never pre-materialises every sub-block vector up front,
811 // so the only sub-block resident is the one currently being leased and
812 // sunk (the streaming peak-memory contract holds for the SPLIT itself,
813 // not just the lease).
814 let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
815
816 while let Some(sub_records) = sub_blocks.next_sub_block() {
817 // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
818 // path of this iteration (sink-error early return, ?-return, or the
819 // end of the loop body) via Drop -- BEFORE the next sub-block leases.
820 // Peak in-flight lease is therefore one sub-block, never the block.
821 let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
822 #[cfg(feature = "memory")]
823 let _sub_lease = self.lease_ingress_batch(&sub_block);
824
825 // process() may fan out / fan in within the sub-block; it preserves
826 // the (empty) commit_tokens of the sub-block view.
827 let mut out_sub = process(sub_block)?;
828
829 // Route any parse/process-generated DLQ entries this sub-block
830 // carries BEFORE its sink -- same single policy + route point as the
831 // whole-batch path and the inbound-filter entries. Fallible: a route
832 // failure is terminal (ack barrier) so the commit for the WHOLE block
833 // is skipped and it is re-delivered -- a dead-letter is never lost on
834 // the path to a source commit.
835 if !out_sub.dlq_entries.is_empty() {
836 let entries = std::mem::take(&mut out_sub.dlq_entries);
837 if let Err(e) = self.route_dlq_entries(entries) {
838 tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
839 return Err(e);
840 }
841 }
842
843 // Sink this sub-block. A sink error stops the block and skips the
844 // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
845 // propagate so the run loop stops -- a later block's ordered commit
846 // must never advance the cumulative watermark past this block's
847 // uncommitted offsets.
848 if let Err(e) = sink(&out_sub).await {
849 tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
850 return Err(e);
851 }
852 // _sub_lease drops here -> bytes released before the next sub-block.
853 }
854
855 // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
856 match commit {
857 CommitMode::Auto => {
858 // Commit failure is terminal (ack barrier) -- same reasoning as
859 // the sink-error path above.
860 if let Err(e) = receiver.commit(&commit_tokens).await {
861 tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
862 return Err(EngineError::Transport(e));
863 }
864 }
865 CommitMode::SinkManaged => {
866 // The sink owns the commit -- the engine does not commit here.
867 }
868 }
869 Ok(())
870 }
871
872 /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
873 ///
874 /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
875 /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
876 /// sub-block shapes) ergonomic. Same splitting contract as
877 /// [`SubBlockDrain::next_sub_block`].
878 #[cfg(all(test, feature = "transport"))]
879 fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
880 let mut drain = SubBlockDrain::new(records, target_bytes);
881 let mut out = Vec::new();
882 while let Some(sub) = drain.next_sub_block() {
883 out.push(sub);
884 }
885 out
886 }
887
888 /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
889 /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
890 ///
891 /// Parses each record's payload via [`codec::parse`] on the worker pool
892 /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
893 /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
894 /// the engine's `parse_error_action` -- the SAME contract the legacy
895 /// `process_mid_tier` honoured (previously the parsed path hardcoded
896 /// route-to-DLQ, ignoring the config):
897 ///
898 /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
899 /// appended to the batch's `dlq_entries` (no silent drop) and counted in
900 /// errors + dlq. The driver routes those entries before commit.
901 /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
902 /// in errors ONLY (a deliberate, configured drop -- not a silent vanish).
903 /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
904 /// failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
905 /// consistent with the P1 ack barrier, so the block is re-delivered rather
906 /// than partially committed.
907 ///
908 /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
909 ///
910 /// # Errors
911 ///
912 /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
913 /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
914 #[cfg(feature = "transport")]
915 fn parse_block<T: crate::transport::CommitToken>(
916 &self,
917 batch: WorkBatch<T>,
918 ) -> Result<ParsedBatch<'_, T>, EngineError> {
919 use super::ParseErrorAction;
920 use crate::transport::PayloadFormat;
921
922 let WorkBatch {
923 records,
924 commit_tokens,
925 mut dlq_entries,
926 } = batch;
927
928 // Parse each record on the pool. The pool's map_owned applies the scaler
929 // semaphore per item, so the parse phase obeys the CPU cap exactly as the
930 // legacy parsed path does. map_owned preserves input order, so a record
931 // and its parse result stay aligned without threading an explicit index.
932 let parsed_each: Vec<(Record, Result<ParsedPayload, String>)> =
933 self.pool.map_owned(records, |record| {
934 let format: PayloadFormat = record.metadata.format;
935 let result =
936 codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
937 (record, result)
938 });
939
940 let action = self.config.parse_error_action;
941 let mut keep_records = Vec::new();
942 let mut keep_parsed = Vec::new();
943 for (record, result) in parsed_each {
944 match result {
945 Ok(payload) => {
946 keep_records.push(record);
947 keep_parsed.push(payload);
948 }
949 Err(reason) => match action {
950 ParseErrorAction::Dlq => {
951 // No silent drop: the unparseable record's bytes go to DLQ.
952 self.stats.incr_errors();
953 self.stats.incr_dlq();
954 dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
955 payload: record.payload.to_vec(),
956 key: record.key.clone(),
957 reason,
958 });
959 }
960 ParseErrorAction::Skip => {
961 // Deliberate, configured drop -- counted in errors but NOT
962 // dead-lettered. This is opt-in loss, not a silent vanish.
963 self.stats.incr_errors();
964 }
965 ParseErrorAction::FailBatch => {
966 // Terminal: the whole block fails its commit (ack barrier).
967 self.stats.incr_errors();
968 return Err(EngineError::ParseBatchFailed(reason));
969 }
970 },
971 }
972 }
973
974 Ok(ParsedBatch {
975 records: keep_records,
976 parsed: keep_parsed,
977 commit_tokens,
978 dlq_entries,
979 interner: &self.interner,
980 })
981 }
982
983 /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
984 /// returning an RAII lease that releases them on drop.
985 ///
986 /// Drives the in-flight ingress accounting for the WorkBatch driver: the
987 /// lease is taken in [`drive_block`](Self::drive_block) and releases the
988 /// bytes on every block exit path via `Drop`.
989 ///
990 /// [`MemoryGuard`]: crate::memory::MemoryGuard
991 #[cfg(feature = "memory")]
992 pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
993 &self,
994 batch: &WorkBatch<T>,
995 ) -> Option<super::IngressLease<'_>> {
996 let guard = self.memory_guard.as_ref()?;
997 let bytes = batch.total_payload_bytes() as u64;
998 guard.add_bytes(bytes);
999 Some(super::IngressLease::new(guard, bytes))
1000 }
1001}
1002
1003/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
1004/// of [`Record`]s at a time, so the streaming driver never pre-materialises
1005/// every sub-block vector up front.
1006///
1007/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
1008/// order) from the source until the accumulated `payload.len()` would overshoot
1009/// `target_bytes`, then returns that sub-block; the remaining records stay
1010/// un-pulled in the source iterator. Splitting contract:
1011///
1012/// - records are kept in order;
1013/// - FLOOR of one record per sub-block: a record whose payload alone meets or
1014/// exceeds `target_bytes` is its own single-record sub-block (never stalls);
1015/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
1016/// - an exhausted source yields `None`.
1017///
1018/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
1019/// sub-block vector before the loop processed the first one. Here, at most ONE
1020/// sub-block vector is allocated at a time -- the one the loop is about to lease
1021/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
1022#[cfg(feature = "transport")]
1023struct SubBlockDrain {
1024 /// Source records, drained in order. `peeked` holds a record we pulled but
1025 /// could not fit into the sub-block being built (it starts the next one).
1026 iter: std::vec::IntoIter<Record>,
1027 peeked: Option<Record>,
1028 target_bytes: u64,
1029}
1030
1031#[cfg(feature = "transport")]
1032impl SubBlockDrain {
1033 fn new(records: Vec<Record>, target_bytes: u64) -> Self {
1034 Self {
1035 iter: records.into_iter(),
1036 peeked: None,
1037 target_bytes,
1038 }
1039 }
1040
1041 /// Yield the next consecutive sub-block, or `None` when the source is
1042 /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
1043 fn next_sub_block(&mut self) -> Option<Vec<Record>> {
1044 // Start with the record carried over from the previous call (if any),
1045 // else pull the first record of this sub-block from the source.
1046 let first = self.peeked.take().or_else(|| self.iter.next())?;
1047 let mut current_bytes = first.payload.len() as u64;
1048 let mut current = vec![first];
1049
1050 // Pull more records while they fit. Floor 1: we already took one record
1051 // above, so an oversized record is still its own sub-block.
1052 for record in self.iter.by_ref() {
1053 let record_bytes = record.payload.len() as u64;
1054 if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1055 // Does not fit -- carry it to the next sub-block and stop here.
1056 self.peeked = Some(record);
1057 break;
1058 }
1059 current_bytes = current_bytes.saturating_add(record_bytes);
1060 current.push(record);
1061 }
1062 Some(current)
1063 }
1064}
1065
1066#[cfg(all(test, feature = "transport-memory"))]
1067#[path = "driver_tests.rs"]
1068mod tests;