hyperi_rustlib/worker/engine/driver.rs
1// Project: hyperi-rustlib
2// File: src/worker/engine/driver.rs
3// Purpose: Unified WorkBatch engine driver (get -> process -> send -> commit)
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! # Unified `WorkBatch` engine driver
10//!
11//! The single run loop that the four legacy loops (`run` / `run_raw` /
12//! `run_async` / `run_raw_async`) collapsed into when the spine flipped to
13//! `WorkBatch` (Task 0.7b). It drives the canonical currency -- [`WorkBatch`] --
14//! through one block at a time:
15//!
16//! ```text
17//! recv(max) -> WorkBatch (recv now yields a WorkBatch natively)
18//! -> apply_workbatch_dlq_policy (route/discard/reject inline-DLQ entries)
19//! -> lease_ingress_batch (memory accounting on the block's bytes)
20//! -> process(WorkBatch) (transforms parse ON DEMAND via codec::parse)
21//! -> sink(&out_batch).await (async send of the whole block)
22//! -> commit per CommitMode (at-least-once, AFTER the block is sent)
23//! ```
24//!
25//! ## Why tokens live on the batch, not the record
26//!
27//! [`WorkBatch::commit_tokens`] are the INPUT source acks. They are decoupled
28//! from `records.len()`, so a `process` that fans `N` records out to `2N` (or
29//! collapses them) does NOT disturb the source acks. The driver commits EXACTLY
30//! the input tokens after the whole out-batch is sent -- never `2N`, never per
31//! output record. That invariant is the data-plane core; the fan-out
32//! commit-correctness test proves it.
33//!
34//! ## Two parse modes (the hybrid)
35//!
36//! - [`run_workbatch`](BatchEngine::run_workbatch) -- the DEFAULT. The driver
37//! does NOT pre-parse. A transform that needs a field calls
38//! [`codec::parse`] on demand. Pass-through apps (receiver, raw forwarders)
39//! never pay a parse.
40//!
41//! - [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) -- opt-in for
42//! hot pipelines. The driver pre-parses the whole block via `codec::parse`
43//! (SIMD JSON / native MsgPack) on the worker pool and hands the process
44//! closure a [`ParsedBatch`] -- records + their aligned `ParsedPayload`s + a
45//! shared [`FieldInterner`](super::FieldInterner) for hot routing-field
46//! dedup. This keeps the batch-parse + interner throughput win for apps that
47//! opt in.
48//!
49//! `process_mid_tier`, `process_raw` and `ParsedMessage` remain for the
50//! in-process (non-run-loop) callers; only the four legacy run loops were
51//! removed by the 0.7b flip.
52
53use std::time::Duration;
54
55use tokio_util::sync::CancellationToken;
56
57use super::{BatchEngine, EngineError};
58use crate::transport::codec::{self, ParsedPayload};
59use crate::transport::{Record, TransportReceiver, WorkBatch};
60
61/// When the driver commits the input source acks.
62///
63/// The `commit_tokens` carried on the [`WorkBatch`] ARE the input source acks
64/// (Kafka offsets, fetch cursors, ...). This enum decides who fires them.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum CommitMode {
67 /// At-least-once: after the sink returns `Ok` for the WHOLE out-batch, the
68 /// engine calls `receiver.commit(&out_batch.commit_tokens)`. A sink error
69 /// skips the commit so the block is re-delivered. This is the engine-commits
70 /// behaviour of the former mid-tier / raw run loops, lifted onto the block.
71 Auto,
72 /// The sink owns the commit -- the engine does NOT commit. The sink is
73 /// handed the block (which carries `commit_tokens`) and decides when to
74 /// acknowledge (e.g. after a downstream flush). The deferred-commit shape of
75 /// the former async run loop, lifted onto the block.
76 SinkManaged,
77}
78
79/// A pre-parsed block for the opt-in
80/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed) hot path.
81///
82/// Bundles the surviving [`Record`]s with their aligned [`ParsedPayload`]s
83/// (`records[i]` parsed to `parsed[i]`), the input `commit_tokens`, any inline
84/// DLQ entries carried forward, and a shared [`FieldInterner`](super::FieldInterner)
85/// for hot routing-field dedup.
86///
87/// ## Parse-failure contract
88///
89/// `records` and `parsed` are aligned 1:1 and contain ONLY records that parsed
90/// successfully. A record whose payload fails [`codec::parse`] is handled per
91/// the engine's configured [`ParseErrorAction`](super::ParseErrorAction) -- the
92/// same contract the legacy `process_mid_tier` honoured:
93///
94/// - [`Dlq`](super::ParseErrorAction::Dlq) (default): its bytes are appended to
95/// [`dlq_entries`](Self::dlq_entries) with a `parse error: ...` reason
96/// (no silent drop); the resulting [`WorkBatch`] inherits those entries and
97/// the driver routes them through the DLQ policy before commit.
98/// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped (counted in
99/// errors) -- a deliberate, configured drop, not a silent vanish.
100/// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block fails
101/// terminally (no commit), consistent with the ack barrier.
102///
103/// The process closure therefore always sees a clean, fully-parsed view.
104///
105/// `commit_tokens` are the INPUT source acks and are carried through unchanged
106/// regardless of how many records survived parsing -- the same fan-out-safe
107/// token decoupling as [`WorkBatch`].
108pub struct ParsedBatch<'a, T: crate::transport::CommitToken> {
109 /// Records that parsed successfully (aligned 1:1 with [`parsed`](Self::parsed)).
110 pub records: Vec<Record>,
111
112 /// The parsed payloads, `parsed[i]` being `records[i]` decoded.
113 pub parsed: Vec<ParsedPayload>,
114
115 /// Input source acks for the whole block (decoupled from record count).
116 pub commit_tokens: Vec<T>,
117
118 /// Inline-DLQ entries: those carried in on the source batch PLUS any record
119 /// that failed to parse (no-silent-drop).
120 pub dlq_entries: Vec<crate::transport::filter::FilteredDlqEntry>,
121
122 /// Shared interner for hot routing-field-name dedup. The first time a field
123 /// name is seen it allocates an `Arc<str>`; later lookups are a refcount
124 /// bump. Reused from the engine so dedup persists across blocks.
125 pub interner: &'a super::FieldInterner,
126}
127
128impl<T: crate::transport::CommitToken> ParsedBatch<'_, T> {
129 /// Number of successfully-parsed records.
130 #[must_use]
131 pub fn len(&self) -> usize {
132 self.records.len()
133 }
134
135 /// Whether there are no successfully-parsed records.
136 #[must_use]
137 pub fn is_empty(&self) -> bool {
138 self.records.is_empty()
139 }
140
141 /// Intern a routing-field name through the shared interner.
142 ///
143 /// Use this to dedup the routing-key field name once per block rather than
144 /// re-allocating it per record.
145 #[must_use]
146 pub fn intern(&self, name: &str) -> std::sync::Arc<str> {
147 self.interner.intern(name)
148 }
149}
150
151impl BatchEngine {
152 /// Unified on-demand `WorkBatch` driver -- the default data-plane loop.
153 ///
154 /// Drives one [`WorkBatch`] at a time through `recv -> filter-DLQ policy ->
155 /// ingress lease -> process -> sink -> commit`. The driver does NOT pre-parse:
156 /// `process` reads fields on demand via [`codec::parse`]. Pass-through apps
157 /// pay zero parse cost.
158 ///
159 /// - `process` runs on the loop task (cancellation-aware between awaits) and
160 /// may fan records out or in; it MUST preserve `commit_tokens` (use
161 /// [`WorkBatch::map_records`], which does so automatically).
162 /// - `sink` is async and receives the WHOLE out-batch by reference.
163 /// - `commit` selects [`CommitMode::Auto`] (engine commits after sink `Ok`)
164 /// or [`CommitMode::SinkManaged`] (sink owns commit).
165 /// - `ticker` is an optional `(interval, fn)` that fires on the interval
166 /// inside the select loop (flush timers, periodic maintenance).
167 ///
168 /// Stops cleanly when `shutdown` is cancelled.
169 ///
170 /// # Errors
171 ///
172 /// Returns [`EngineError::Transport`] if `recv` fails fatally,
173 /// [`EngineError::FilterDlqUnrouted`] if inline-DLQ entries appear under the
174 /// default [`FilterDlqPolicy::Reject`](super::FilterDlqPolicy::Reject), or
175 /// the error returned by `process`.
176 ///
177 /// A sink error (and, under [`CommitMode::Auto`], a commit error) is
178 /// TERMINAL: it stops the run loop and propagates. This is the ack barrier
179 /// for the ORDERED/cumulative source commit (Kafka "commit up to offset N"):
180 /// the failed block's tokens are NOT committed, and -- crucially -- no LATER
181 /// block is fetched and committed past them, which would silently skip the
182 /// never-sent records (data loss). On restart the source re-delivers from
183 /// the last committed watermark, preserving at-least-once. The app owns
184 /// restart/retry policy.
185 #[cfg(feature = "transport")]
186 #[allow(clippy::too_many_arguments)]
187 pub async fn run_workbatch<R, P, Sink, SinkFut, Ticker, TickerFut>(
188 &self,
189 receiver: &R,
190 shutdown: CancellationToken,
191 process: P,
192 mut sink: Sink,
193 commit: CommitMode,
194 ticker: Option<(Duration, Ticker)>,
195 ) -> Result<(), EngineError>
196 where
197 R: TransportReceiver,
198 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
199 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
200 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
201 Ticker: FnMut() -> TickerFut,
202 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
203 {
204 tracing::info!(
205 chunk_size = self.config.max_chunk_size,
206 commit = ?commit,
207 ticker = ticker.is_some(),
208 "BatchEngine (workbatch) starting"
209 );
210
211 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
212 let mut ticker_fn = ticker.map(|(_, f)| f);
213 if let Some(ref mut interval) = tick_interval {
214 interval.tick().await; // first tick fires immediately -- consume it
215 }
216
217 loop {
218 tokio::select! {
219 biased;
220
221 () = shutdown.cancelled() => {
222 tracing::info!("BatchEngine (workbatch) shutting down");
223 return Ok(());
224 }
225
226 _ = async {
227 match tick_interval.as_mut() {
228 Some(interval) => interval.tick().await,
229 None => std::future::pending().await,
230 }
231 } => {
232 if let Some(ref mut f) = ticker_fn
233 && let Err(e) = f().await
234 {
235 tracing::error!(error = %e, "Ticker (workbatch) failed");
236 }
237 }
238
239 recv_result = receiver.recv(self.config.max_chunk_size) => {
240 let work_batch = recv_result.map_err(EngineError::Transport)?;
241 let Some(batch) = self.ingest_workbatch(work_batch)? else {
242 continue;
243 };
244 self.drive_block(receiver, batch, &process, &mut sink, commit).await?;
245 }
246 }
247 }
248 }
249
250 /// Streaming `WorkBatch` driver -- the opt-in peak-memory-bounded path.
251 ///
252 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), but each
253 /// received block is processed in consecutive byte-budget-sized SUB-BLOCKS
254 /// rather than all at once. Peak in-flight ingress memory is bounded to ONE
255 /// sub-block (`~sub_block_bytes`) instead of the whole block: the per-sub-block
256 /// ingress lease is dropped (releasing those bytes) BEFORE the next sub-block
257 /// is leased and processed.
258 ///
259 /// The source acks for the WHOLE block are committed EXACTLY ONCE, after the
260 /// FINAL sub-block's sink returns `Ok` (under [`CommitMode::Auto`]) -- so
261 /// at-least-once is preserved: a sink error on any sub-block stops the block
262 /// and skips the commit, so the WHOLE block is re-delivered. The sub-block
263 /// views carry EMPTY `commit_tokens`; the batch's tokens are committed once at
264 /// the end.
265 ///
266 /// `sub_block_bytes` is the target sum of `payload.len()` per sub-block (floor
267 /// one record, so a record larger than the target is still its own sub-block
268 /// and the loop never stalls). It is taken as a parameter so the path is
269 /// testable; Phase 3 wires the byte budget from the governor.
270 ///
271 /// Fan-out WITHIN a sub-block's `process` is fine (records grow); the source
272 /// acks are still the batch's input tokens, committed once at the end.
273 ///
274 /// # Errors
275 ///
276 /// Same as [`run_workbatch`](Self::run_workbatch).
277 #[cfg(feature = "transport")]
278 #[allow(clippy::too_many_arguments)]
279 pub async fn run_workbatch_streaming<R, P, Sink, SinkFut, Ticker, TickerFut>(
280 &self,
281 receiver: &R,
282 shutdown: CancellationToken,
283 process: P,
284 mut sink: Sink,
285 commit: CommitMode,
286 sub_block_bytes: u64,
287 ticker: Option<(Duration, Ticker)>,
288 ) -> Result<(), EngineError>
289 where
290 R: TransportReceiver,
291 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
292 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
293 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
294 Ticker: FnMut() -> TickerFut,
295 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
296 {
297 tracing::info!(
298 chunk_size = self.config.max_chunk_size,
299 commit = ?commit,
300 sub_block_bytes,
301 ticker = ticker.is_some(),
302 "BatchEngine (workbatch streaming) starting"
303 );
304
305 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
306 let mut ticker_fn = ticker.map(|(_, f)| f);
307 if let Some(ref mut interval) = tick_interval {
308 interval.tick().await; // first tick fires immediately -- consume it
309 }
310
311 loop {
312 tokio::select! {
313 biased;
314
315 () = shutdown.cancelled() => {
316 tracing::info!("BatchEngine (workbatch streaming) shutting down");
317 return Ok(());
318 }
319
320 _ = async {
321 match tick_interval.as_mut() {
322 Some(interval) => interval.tick().await,
323 None => std::future::pending().await,
324 }
325 } => {
326 if let Some(ref mut f) = ticker_fn
327 && let Err(e) = f().await
328 {
329 tracing::error!(error = %e, "Ticker (workbatch streaming) failed");
330 }
331 }
332
333 recv_result = receiver.recv(self.config.max_chunk_size) => {
334 let work_batch = recv_result.map_err(EngineError::Transport)?;
335 let Some(batch) = self.ingest_workbatch(work_batch)? else {
336 continue;
337 };
338 self.drive_block_streaming(
339 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
340 )
341 .await?;
342 }
343 }
344 }
345 }
346
347 /// Governed `WorkBatch` driver -- the default-ON self-regulation run path.
348 ///
349 /// This is what a self-regulating app calls instead of choosing between
350 /// [`run_workbatch`](Self::run_workbatch) and
351 /// [`run_workbatch_streaming`](Self::run_workbatch_streaming) by hand. It
352 /// dispatches on whether the byte-budget lever is wired
353 /// ([`set_byte_budget`](BatchEngine::set_byte_budget), done by
354 /// `ServiceRuntime` when `self_regulation.enabled = true`):
355 ///
356 /// - **Governor ON** (budget wired): streams each received block in
357 /// sub-blocks sized to the CURRENT byte budget (re-read per block), bounds
358 /// peak in-flight memory to one sub-block, and folds each block's
359 /// `(bytes, process_time, ingest_interval)` into the AIMD loop via
360 /// [`observe`](crate::governor::ByteBudgetController::observe). The recv
361 /// `max` is capped to the budget's poll-safety
362 /// [`record_cap`](crate::governor::ByteBudgetController::record_cap).
363 /// While pressure is LOW the budget sits at its big start value, so the
364 /// block becomes a SINGLE sub-block -- no per-record overhead, behaviour
365 /// matches the whole-batch loop.
366 /// - **Governor OFF** (no budget): delegates verbatim to
367 /// [`run_workbatch`](Self::run_workbatch) -- byte-identical to
368 /// pre-governor behaviour.
369 ///
370 /// The inbound GATE (Kafka pause-partitions / HTTP-gRPC 503) is wired
371 /// SEPARATELY into the receive transport, not here -- this method is the
372 /// driver-side lever (sub-block sizing + AIMD), the gate is the
373 /// transport-side brake. The two share the same `UnifiedPressure`.
374 ///
375 /// # Errors
376 ///
377 /// Same as [`run_workbatch`](Self::run_workbatch).
378 #[cfg(all(feature = "transport", feature = "governor"))]
379 #[allow(clippy::too_many_arguments)]
380 pub async fn run_governed<R, P, Sink, SinkFut, Ticker, TickerFut>(
381 &self,
382 receiver: &R,
383 shutdown: CancellationToken,
384 process: P,
385 mut sink: Sink,
386 commit: CommitMode,
387 ticker: Option<(Duration, Ticker)>,
388 ) -> Result<(), EngineError>
389 where
390 R: TransportReceiver,
391 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
392 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
393 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
394 Ticker: FnMut() -> TickerFut,
395 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
396 {
397 // Governor OFF -> the original whole-batch loop, byte-for-byte.
398 let Some(budget) = self.byte_budget.clone() else {
399 return self
400 .run_workbatch(receiver, shutdown, process, sink, commit, ticker)
401 .await;
402 };
403
404 tracing::info!(
405 chunk_size = self.config.max_chunk_size,
406 commit = ?commit,
407 ticker = ticker.is_some(),
408 start_byte_budget = budget.byte_budget(),
409 "BatchEngine (governed) starting -- self-regulation ON"
410 );
411
412 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
413 let mut ticker_fn = ticker.map(|(_, f)| f);
414 if let Some(ref mut interval) = tick_interval {
415 interval.tick().await; // first tick fires immediately -- consume it
416 }
417
418 // Track the previous block's arrival instant so we can feed the AIMD
419 // loop a real ingest inter-arrival interval.
420 let mut last_recv: Option<std::time::Instant> = None;
421
422 loop {
423 // The recv limits bound a single poll by BOTH:
424 // - the SMALLER of the config chunk size and the budget's
425 // poll-safety record cap (a tiny-record flood cannot blow the
426 // count even within the byte budget), AND
427 // - the CURRENT byte budget (re-read per block), so a single poll
428 // never RETAINS more than ~one budget's worth of inbound payload
429 // BEFORE the sub-block split. This is the fix for the
430 // "byte budget does not bound RECEIVE memory" gap: without the
431 // byte cap, `recv(max)` could build a WorkBatch (and, for the
432 // Kafka recv-arena, allocate one arena) far larger than the
433 // budget before any sub-block lease ran.
434 let recv_limits = crate::transport::RecvLimits {
435 max_records: self.config.max_chunk_size.min(budget.record_cap()),
436 max_bytes: budget.byte_budget(),
437 };
438
439 tokio::select! {
440 biased;
441
442 () = shutdown.cancelled() => {
443 tracing::info!("BatchEngine (governed) shutting down");
444 return Ok(());
445 }
446
447 _ = async {
448 match tick_interval.as_mut() {
449 Some(interval) => interval.tick().await,
450 None => std::future::pending().await,
451 }
452 } => {
453 if let Some(ref mut f) = ticker_fn
454 && let Err(e) = f().await
455 {
456 tracing::error!(error = %e, "Ticker (governed) failed");
457 }
458 }
459
460 recv_result = receiver.recv_limited(recv_limits) => {
461 let now = std::time::Instant::now();
462 let ingest_interval = last_recv
463 .map(|prev| now.saturating_duration_since(prev))
464 .unwrap_or_default();
465 last_recv = Some(now);
466
467 let work_batch = recv_result.map_err(EngineError::Transport)?;
468 let block_bytes = work_batch.total_payload_bytes() as u64;
469 let Some(batch) = self.ingest_workbatch(work_batch)? else {
470 // Empty block: still fold the timing so a quiet pipeline
471 // can grow its budget back. No bytes -> treated as slack.
472 budget.observe(0, Duration::ZERO, ingest_interval);
473 continue;
474 };
475
476 // Re-read the budget for THIS block: low pressure -> big
477 // budget -> one sub-block (no overhead); high pressure ->
478 // shrunk budget -> peak in-flight bounded to one sub-block.
479 let sub_block_bytes = budget.byte_budget();
480
481 let process_start = std::time::Instant::now();
482 self.drive_block_streaming(
483 receiver, batch, &process, &mut sink, commit, sub_block_bytes,
484 )
485 .await?;
486 let process_time = process_start.elapsed();
487
488 // Fold the OBSERVED actual block bytes into the AIMD loop. A
489 // memory HARD override inside observe() shrinks immediately
490 // regardless of rho.
491 budget.observe(block_bytes, process_time, ingest_interval);
492
493 // Observability: surface the current budget + pressure as
494 // gauges so throttling is visible, not mysterious, AND the
495 // ACTUAL received block bytes so the gap between the budget
496 // (`self_regulation_byte_budget`) and reality (`recv_block_bytes`)
497 // is measurable -- a persistent overshoot means the recv byte
498 // cap is not holding. The gate edges (pause/resume) are
499 // logged by the ObservingActuator.
500 #[cfg(feature = "metrics")]
501 {
502 metrics::gauge!("self_regulation_byte_budget")
503 .set(budget.byte_budget() as f64);
504 metrics::gauge!("recv_block_bytes").set(block_bytes as f64);
505 metrics::gauge!("pressure_ratio").set(budget.pressure().level());
506 }
507 }
508 }
509 }
510 }
511
512 /// Unified pre-parsed `WorkBatch` driver -- the opt-in hot path.
513 ///
514 /// Identical loop shape to [`run_workbatch`](Self::run_workbatch), except the
515 /// driver PRE-PARSES the whole block via [`codec::parse`] (SIMD JSON / native
516 /// MsgPack) on the worker pool and hands `process_parsed` a [`ParsedBatch`]
517 /// (records + aligned parsed payloads + shared
518 /// [`FieldInterner`](super::FieldInterner)). This keeps
519 /// the batch-parse + interner throughput win for apps that opt in.
520 ///
521 /// Records that fail to parse are handled per the configured
522 /// [`ParseErrorAction`](super::ParseErrorAction) (Dlq -> dlq_entries, Skip ->
523 /// drop+counted, FailBatch -> terminal no-commit) -- see [`ParsedBatch`] for
524 /// the parse-failure contract. `process_parsed` returns the final
525 /// [`WorkBatch`] and MUST preserve the input `commit_tokens`.
526 ///
527 /// # Errors
528 ///
529 /// Same as [`run_workbatch`](Self::run_workbatch).
530 #[cfg(feature = "transport")]
531 #[allow(clippy::too_many_arguments)]
532 pub async fn run_workbatch_parsed<R, P, Sink, SinkFut, Ticker, TickerFut>(
533 &self,
534 receiver: &R,
535 shutdown: CancellationToken,
536 process_parsed: P,
537 mut sink: Sink,
538 commit: CommitMode,
539 ticker: Option<(Duration, Ticker)>,
540 ) -> Result<(), EngineError>
541 where
542 R: TransportReceiver,
543 P: Fn(ParsedBatch<'_, R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
544 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
545 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
546 Ticker: FnMut() -> TickerFut,
547 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
548 {
549 tracing::info!(
550 chunk_size = self.config.max_chunk_size,
551 commit = ?commit,
552 ticker = ticker.is_some(),
553 "BatchEngine (workbatch parsed) starting"
554 );
555
556 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
557 let mut ticker_fn = ticker.map(|(_, f)| f);
558 if let Some(ref mut interval) = tick_interval {
559 interval.tick().await;
560 }
561
562 loop {
563 tokio::select! {
564 biased;
565
566 () = shutdown.cancelled() => {
567 tracing::info!("BatchEngine (workbatch parsed) shutting down");
568 return Ok(());
569 }
570
571 _ = async {
572 match tick_interval.as_mut() {
573 Some(interval) => interval.tick().await,
574 None => std::future::pending().await,
575 }
576 } => {
577 if let Some(ref mut f) = ticker_fn
578 && let Err(e) = f().await
579 {
580 tracing::error!(error = %e, "Ticker (workbatch parsed) failed");
581 }
582 }
583
584 recv_result = receiver.recv(self.config.max_chunk_size) => {
585 let recv_batch = recv_result.map_err(EngineError::Transport)?;
586 let Some(batch) = self.ingest_workbatch(recv_batch)? else {
587 continue;
588 };
589 // Wrap the parse-then-process so drive_block stays generic.
590 // parse_block honours ParseErrorAction: FailBatch surfaces a
591 // terminal EngineError here (no commit), Dlq carries entries
592 // forward for the driver to route, Skip drops silently+counted.
593 let parse = |b: WorkBatch<R::Token>| -> Result<WorkBatch<R::Token>, EngineError> {
594 let parsed = self.parse_block(b)?;
595 process_parsed(parsed)
596 };
597 self.drive_block(receiver, batch, &parse, &mut sink, commit).await?;
598 }
599 }
600 }
601 }
602
603 /// Prepare a received [`WorkBatch`] for processing: route its inline-DLQ
604 /// entries per the configured policy, then return the batch (with its
605 /// `dlq_entries` stripped) ready for the process stage. Returns `None` when
606 /// the block has no records (caller should `continue`).
607 ///
608 /// `recv` now yields a [`WorkBatch`] directly (Task 0.7b), so there is no
609 /// `RecvBatch` round-trip: the inbound-filter DLQ entries arrive on
610 /// [`WorkBatch::dlq_entries`] and are routed here via
611 /// [`apply_workbatch_dlq_policy`](BatchEngine::apply_workbatch_dlq_policy)
612 /// before processing. Memory accounting is performed in
613 /// [`drive_block`](Self::drive_block).
614 #[cfg(feature = "transport")]
615 fn ingest_workbatch<T: crate::transport::CommitToken>(
616 &self,
617 batch: WorkBatch<T>,
618 ) -> Result<Option<WorkBatch<T>>, EngineError> {
619 // Route/discard/reject inline-DLQ entries per the configured policy --
620 // never silently dropped. The batch comes back with its dlq_entries
621 // consumed so the process stage sees a clean block.
622 let batch = self.apply_workbatch_dlq_policy(batch)?;
623 if batch.is_empty() {
624 return Ok(None);
625 }
626 Ok(Some(batch))
627 }
628
629 /// Drive ONE block through `ingress lease -> process -> sink -> commit`.
630 ///
631 /// Shared by both [`run_workbatch`](Self::run_workbatch) and
632 /// [`run_workbatch_parsed`](Self::run_workbatch_parsed); the only difference
633 /// between the two is the `process` closure they pass.
634 #[cfg(feature = "transport")]
635 async fn drive_block<R, P, Sink, SinkFut>(
636 &self,
637 receiver: &R,
638 batch: WorkBatch<R::Token>,
639 process: &P,
640 sink: &mut Sink,
641 commit: CommitMode,
642 ) -> Result<(), EngineError>
643 where
644 R: TransportReceiver,
645 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
646 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
647 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
648 {
649 // Account the in-flight ingress bytes against the MemoryGuard; the lease
650 // releases on every exit path of this block (sink-error early return,
651 // commit, ?-return) via Drop.
652 #[cfg(feature = "memory")]
653 let _ingress_lease = self.lease_ingress_batch(&batch);
654
655 // process() may fan out / fan in; it preserves the input commit_tokens.
656 let mut out_batch = process(batch)?;
657
658 // Route any parse/process-generated DLQ entries the out-batch carries,
659 // through the SAME policy + route point as the inbound-filter entries
660 // (apply_workbatch_dlq_policy). This happens AFTER process and BEFORE the
661 // sink/commit, so a parse/process dead-letter can never vanish on the
662 // path to a source commit. It is FALLIBLE: a route failure (Reject, or a
663 // Route sink Err) is a terminal ack-barrier error -- the commit is
664 // skipped and the whole block re-delivered, so no later ordered commit
665 // advances past these undelivered dead-letters. Silent discard is opt-in
666 // only (FilterDlqPolicy::DiscardWithMetric).
667 if !out_batch.dlq_entries.is_empty() {
668 let entries = std::mem::take(&mut out_batch.dlq_entries);
669 if let Err(e) = self.route_dlq_entries(entries) {
670 tracing::error!(error = %e, "DLQ route failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
671 return Err(e);
672 }
673 }
674
675 // Sink the WHOLE out-batch. Commit only fires after this returns Ok.
676 //
677 // ACK BARRIER (at-least-once on an ORDERED commit): a sink failure is a
678 // TERMINAL error -- it stops the run loop. The source commit is ordered
679 // and CUMULATIVE (Kafka "commit up to offset N"); if the loop merely
680 // logged and continued, the NEXT block's commit would advance the
681 // committed watermark PAST this block's never-sent offsets, silently
682 // skipping records (data loss). Stopping the loop leaves THIS block's
683 // tokens uncommitted, so the source re-delivers from the last committed
684 // watermark on restart -- no later block can commit ahead of the
685 // failure. The app owns restart/retry policy; the engine never invents
686 // a silent skip.
687 if let Err(e) = sink(&out_batch).await {
688 tracing::error!(error = %e, "Sink failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
689 return Err(e);
690 }
691
692 // Commit EXACTLY the input source acks -- never the (possibly fanned-out)
693 // output record count. This is the at-least-once block contract.
694 match commit {
695 CommitMode::Auto => {
696 // A commit failure is ALSO a terminal ack-barrier failure: a
697 // failed ordered commit must not be followed by a later block's
698 // commit advancing the watermark past these uncommitted offsets.
699 if let Err(e) = receiver.commit(&out_batch.commit_tokens).await {
700 tracing::error!(error = %e, "Commit failed (workbatch) -- terminal, stopping the run loop (ack barrier)");
701 return Err(EngineError::Transport(e));
702 }
703 }
704 CommitMode::SinkManaged => {
705 // The sink owns the commit -- the engine does not commit here.
706 }
707 }
708 Ok(())
709 }
710
711 /// Drive ONE block through streaming sub-blocks: peak in-flight memory is
712 /// bounded to ONE sub-block, the source acks commit once after the final
713 /// sub-block.
714 ///
715 /// The whole batch's `commit_tokens` are carried ASIDE; each sub-block view is
716 /// processed and sunk with EMPTY `commit_tokens` so a fan-out within a
717 /// sub-block never multiplies the source acks. Each sub-block's ingress lease
718 /// is dropped (releasing those bytes) BEFORE the next sub-block is leased, so
719 /// the high-water lease never exceeds one sub-block's bytes -- NOT the whole
720 /// block.
721 ///
722 /// On ANY sub-block sink error the block stops and the commit is skipped (the
723 /// WHOLE block is re-delivered -- at-least-once). The error is TERMINAL: it
724 /// propagates out and stops the run loop, so no LATER block's ordered commit
725 /// can advance the cumulative watermark past these never-committed offsets
726 /// (the ack barrier -- see [`drive_block`](Self::drive_block)). The commit
727 /// (under [`CommitMode::Auto`]) fires EXACTLY ONCE after the final
728 /// sub-block's sink returns `Ok`, with ALL the batch's input source acks; a
729 /// commit failure is likewise terminal.
730 #[cfg(feature = "transport")]
731 async fn drive_block_streaming<R, P, Sink, SinkFut>(
732 &self,
733 receiver: &R,
734 batch: WorkBatch<R::Token>,
735 process: &P,
736 sink: &mut Sink,
737 commit: CommitMode,
738 sub_block_bytes: u64,
739 ) -> Result<(), EngineError>
740 where
741 R: TransportReceiver,
742 P: Fn(WorkBatch<R::Token>) -> Result<WorkBatch<R::Token>, EngineError>,
743 Sink: FnMut(&WorkBatch<R::Token>) -> SinkFut,
744 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
745 {
746 // Carry the WHOLE block's source acks aside; the sub-block views below
747 // commit EMPTY tokens. The batch's tokens commit ONCE after the final
748 // sub-block (at-least-once on the whole block). dlq_entries were already
749 // routed by ingest_workbatch, so the block here carries none.
750 let WorkBatch {
751 records,
752 commit_tokens,
753 ..
754 } = batch;
755
756 // Drain into consecutive byte-budget-sized sub-blocks LAZILY (floor 1
757 // record). `SubBlockDrain` yields ONE sub-block at a time as the loop
758 // pulls it -- it never pre-materialises every sub-block vector up front,
759 // so the only sub-block resident is the one currently being leased and
760 // sunk (the streaming peak-memory contract holds for the SPLIT itself,
761 // not just the lease).
762 let mut sub_blocks = SubBlockDrain::new(records, sub_block_bytes);
763
764 while let Some(sub_records) = sub_blocks.next_sub_block() {
765 // Lease ONLY this sub-block's bytes. The lease releases on EVERY exit
766 // path of this iteration (sink-error early return, ?-return, or the
767 // end of the loop body) via Drop -- BEFORE the next sub-block leases.
768 // Peak in-flight lease is therefore one sub-block, never the block.
769 let sub_block: WorkBatch<R::Token> = WorkBatch::from_records(sub_records);
770 #[cfg(feature = "memory")]
771 let _sub_lease = self.lease_ingress_batch(&sub_block);
772
773 // process() may fan out / fan in within the sub-block; it preserves
774 // the (empty) commit_tokens of the sub-block view.
775 let mut out_sub = process(sub_block)?;
776
777 // Route any parse/process-generated DLQ entries this sub-block
778 // carries BEFORE its sink -- same single policy + route point as the
779 // whole-batch path and the inbound-filter entries. Fallible: a route
780 // failure is terminal (ack barrier) so the commit for the WHOLE block
781 // is skipped and it is re-delivered -- a dead-letter is never lost on
782 // the path to a source commit.
783 if !out_sub.dlq_entries.is_empty() {
784 let entries = std::mem::take(&mut out_sub.dlq_entries);
785 if let Err(e) = self.route_dlq_entries(entries) {
786 tracing::error!(error = %e, "DLQ route failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
787 return Err(e);
788 }
789 }
790
791 // Sink this sub-block. A sink error stops the block and skips the
792 // commit so the WHOLE block is re-delivered. TERMINAL (ack barrier):
793 // propagate so the run loop stops -- a later block's ordered commit
794 // must never advance the cumulative watermark past this block's
795 // uncommitted offsets.
796 if let Err(e) = sink(&out_sub).await {
797 tracing::error!(error = %e, "Sink failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
798 return Err(e);
799 }
800 // _sub_lease drops here -> bytes released before the next sub-block.
801 }
802
803 // All sub-blocks sunk Ok. Commit EXACTLY the input source acks ONCE.
804 match commit {
805 CommitMode::Auto => {
806 // Commit failure is terminal (ack barrier) -- same reasoning as
807 // the sink-error path above.
808 if let Err(e) = receiver.commit(&commit_tokens).await {
809 tracing::error!(error = %e, "Commit failed (workbatch streaming) -- terminal, stopping the run loop (ack barrier)");
810 return Err(EngineError::Transport(e));
811 }
812 }
813 CommitMode::SinkManaged => {
814 // The sink owns the commit -- the engine does not commit here.
815 }
816 }
817 Ok(())
818 }
819
820 /// Collect a [`SubBlockDrain`] into a `Vec<Vec<Record>>` (test convenience).
821 ///
822 /// The driver itself uses [`SubBlockDrain`] LAZILY and never collects all
823 /// sub-blocks; this wrapper keeps the byte-split unit tests (which assert the
824 /// sub-block shapes) ergonomic. Same splitting contract as
825 /// [`SubBlockDrain::next_sub_block`].
826 #[cfg(all(test, feature = "transport"))]
827 fn split_into_sub_blocks(records: Vec<Record>, target_bytes: u64) -> Vec<Vec<Record>> {
828 let mut drain = SubBlockDrain::new(records, target_bytes);
829 let mut out = Vec::new();
830 while let Some(sub) = drain.next_sub_block() {
831 out.push(sub);
832 }
833 out
834 }
835
836 /// Pre-parse a whole [`WorkBatch`] into a [`ParsedBatch`] (the hot-path step),
837 /// honouring the configured [`ParseErrorAction`](super::ParseErrorAction).
838 ///
839 /// Parses each record's payload via [`codec::parse`] on the worker pool
840 /// (SIMD JSON / native MsgPack), keeping the surviving records aligned 1:1
841 /// with their [`ParsedPayload`]s. A record that FAILS to parse is handled per
842 /// the engine's `parse_error_action` -- the SAME contract the legacy
843 /// `process_mid_tier` honoured (previously the parsed path hardcoded
844 /// route-to-DLQ, ignoring the config):
845 ///
846 /// - [`Dlq`](super::ParseErrorAction::Dlq) (default): the record's bytes are
847 /// appended to the batch's `dlq_entries` (no silent drop) and counted in
848 /// errors + dlq. The driver routes those entries before commit.
849 /// - [`Skip`](super::ParseErrorAction::Skip): the record is dropped, counted
850 /// in errors ONLY (a deliberate, configured drop -- not a silent vanish).
851 /// - [`FailBatch`](super::ParseErrorAction::FailBatch): the whole block is
852 /// failed via [`EngineError::ParseBatchFailed`] -- terminal/no-commit,
853 /// consistent with the P1 ack barrier, so the block is re-delivered rather
854 /// than partially committed.
855 ///
856 /// Input `commit_tokens` and any carried-in `dlq_entries` are preserved.
857 ///
858 /// # Errors
859 ///
860 /// [`EngineError::ParseBatchFailed`] when a parse failure occurs under
861 /// [`ParseErrorAction::FailBatch`](super::ParseErrorAction::FailBatch).
862 #[cfg(feature = "transport")]
863 fn parse_block<T: crate::transport::CommitToken>(
864 &self,
865 batch: WorkBatch<T>,
866 ) -> Result<ParsedBatch<'_, T>, EngineError> {
867 use super::ParseErrorAction;
868 use crate::transport::PayloadFormat;
869
870 let WorkBatch {
871 records,
872 commit_tokens,
873 mut dlq_entries,
874 } = batch;
875
876 // Parse each record on the pool. The pool's map_owned applies the scaler
877 // semaphore per item, so the parse phase obeys the CPU cap exactly as the
878 // legacy parsed path does. Carry the index so failures keep their bytes.
879 let indexed: Vec<(usize, Record)> = records.into_iter().enumerate().collect();
880 let parsed_each: Vec<(usize, Record, Result<ParsedPayload, String>)> =
881 self.pool.map_owned(indexed, |(idx, record)| {
882 let format: PayloadFormat = record.metadata.format;
883 let result =
884 codec::parse(&record.payload, format).map_err(|e| format!("parse error: {e}"));
885 (idx, record, result)
886 });
887
888 let action = self.config.parse_error_action;
889 let mut keep_records = Vec::new();
890 let mut keep_parsed = Vec::new();
891 for (_idx, record, result) in parsed_each {
892 match result {
893 Ok(payload) => {
894 keep_records.push(record);
895 keep_parsed.push(payload);
896 }
897 Err(reason) => match action {
898 ParseErrorAction::Dlq => {
899 // No silent drop: the unparseable record's bytes go to DLQ.
900 self.stats.incr_errors();
901 self.stats.incr_dlq();
902 dlq_entries.push(crate::transport::filter::FilteredDlqEntry {
903 payload: record.payload.to_vec(),
904 key: record.key.clone(),
905 reason,
906 });
907 }
908 ParseErrorAction::Skip => {
909 // Deliberate, configured drop -- counted in errors but NOT
910 // dead-lettered. This is opt-in loss, not a silent vanish.
911 self.stats.incr_errors();
912 }
913 ParseErrorAction::FailBatch => {
914 // Terminal: the whole block fails its commit (ack barrier).
915 self.stats.incr_errors();
916 return Err(EngineError::ParseBatchFailed(reason));
917 }
918 },
919 }
920 }
921
922 Ok(ParsedBatch {
923 records: keep_records,
924 parsed: keep_parsed,
925 commit_tokens,
926 dlq_entries,
927 interner: &self.interner,
928 })
929 }
930
931 /// Account a [`WorkBatch`]'s payload bytes against the [`MemoryGuard`],
932 /// returning an RAII lease that releases them on drop.
933 ///
934 /// Drives the in-flight ingress accounting for the WorkBatch driver: the
935 /// lease is taken in [`drive_block`](Self::drive_block) and releases the
936 /// bytes on every block exit path via `Drop`.
937 ///
938 /// [`MemoryGuard`]: crate::memory::MemoryGuard
939 #[cfg(feature = "memory")]
940 pub(crate) fn lease_ingress_batch<T: crate::transport::CommitToken>(
941 &self,
942 batch: &WorkBatch<T>,
943 ) -> Option<super::IngressLease<'_>> {
944 let guard = self.memory_guard.as_ref()?;
945 let bytes = batch.total_payload_bytes() as u64;
946 guard.add_bytes(bytes);
947 Some(super::IngressLease::new(guard, bytes))
948 }
949}
950
951/// A LAZY sub-block drain: yields one consecutive byte-budget-sized sub-block
952/// of [`Record`]s at a time, so the streaming driver never pre-materialises
953/// every sub-block vector up front.
954///
955/// Each call to [`next_sub_block`](Self::next_sub_block) pulls records (in
956/// order) from the source until the accumulated `payload.len()` would overshoot
957/// `target_bytes`, then returns that sub-block; the remaining records stay
958/// un-pulled in the source iterator. Splitting contract:
959///
960/// - records are kept in order;
961/// - FLOOR of one record per sub-block: a record whose payload alone meets or
962/// exceeds `target_bytes` is its own single-record sub-block (never stalls);
963/// - `target_bytes` of `0` is treated as a floor of one record per sub-block;
964/// - an exhausted source yields `None`.
965///
966/// The lazy shape matters: the previous `Vec<Vec<Record>>` allocated every
967/// sub-block vector before the loop processed the first one. Here, at most ONE
968/// sub-block vector is allocated at a time -- the one the loop is about to lease
969/// and sink -- so the SPLIT no longer defeats the streaming peak-memory bound.
970#[cfg(feature = "transport")]
971struct SubBlockDrain {
972 /// Source records, drained in order. `peeked` holds a record we pulled but
973 /// could not fit into the sub-block being built (it starts the next one).
974 iter: std::vec::IntoIter<Record>,
975 peeked: Option<Record>,
976 target_bytes: u64,
977}
978
979#[cfg(feature = "transport")]
980impl SubBlockDrain {
981 fn new(records: Vec<Record>, target_bytes: u64) -> Self {
982 Self {
983 iter: records.into_iter(),
984 peeked: None,
985 target_bytes,
986 }
987 }
988
989 /// Yield the next consecutive sub-block, or `None` when the source is
990 /// exhausted. Allocates exactly ONE sub-block `Vec` per call.
991 fn next_sub_block(&mut self) -> Option<Vec<Record>> {
992 // Start with the record carried over from the previous call (if any),
993 // else pull the first record of this sub-block from the source.
994 let first = self.peeked.take().or_else(|| self.iter.next())?;
995 let mut current_bytes = first.payload.len() as u64;
996 let mut current = vec![first];
997
998 // Pull more records while they fit. Floor 1: we already took one record
999 // above, so an oversized record is still its own sub-block.
1000 for record in self.iter.by_ref() {
1001 let record_bytes = record.payload.len() as u64;
1002 if current_bytes.saturating_add(record_bytes) > self.target_bytes {
1003 // Does not fit -- carry it to the next sub-block and stop here.
1004 self.peeked = Some(record);
1005 break;
1006 }
1007 current_bytes = current_bytes.saturating_add(record_bytes);
1008 current.push(record);
1009 }
1010 Some(current)
1011 }
1012}
1013
1014#[cfg(all(test, feature = "transport-memory"))]
1015mod tests {
1016 use super::*;
1017 use crate::transport::memory::{MemoryConfig, MemoryTransport};
1018 use crate::transport::{CommitToken, PayloadFormat, RecordMeta};
1019 use crate::worker::engine::BatchProcessingConfig;
1020 use bytes::Bytes;
1021 use std::sync::Arc;
1022 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
1023
1024 fn default_engine() -> BatchEngine {
1025 BatchEngine::new(BatchProcessingConfig::default())
1026 }
1027
1028 fn mem_transport(timeout_ms: u64) -> MemoryTransport {
1029 MemoryTransport::new(&MemoryConfig {
1030 recv_timeout_ms: timeout_ms,
1031 ..Default::default()
1032 })
1033 .expect("memory transport with valid config must construct")
1034 }
1035
1036 /// Cancel `shutdown` after `ms` to stop the run loop cleanly.
1037 fn cancel_after(shutdown: CancellationToken, ms: u64) {
1038 tokio::spawn(async move {
1039 tokio::time::sleep(Duration::from_millis(ms)).await;
1040 shutdown.cancel();
1041 });
1042 }
1043
1044 /// Clone one record into `factor` copies (1->N fan-out).
1045 fn fan_out(records: Vec<Record>, factor: usize) -> Vec<Record> {
1046 let mut out = Vec::with_capacity(records.len() * factor);
1047 for r in records {
1048 for _ in 0..factor {
1049 out.push(r.clone());
1050 }
1051 }
1052 out
1053 }
1054
1055 /// THE proving test: N source records, each with a distinct ack; a process
1056 /// that fans 1->2; assert all 2N records hit the sink AND commit acked
1057 /// EXACTLY N source tokens (committed_sequence advanced by the source acks,
1058 /// not the doubled output count).
1059 #[tokio::test]
1060 async fn fan_out_commits_source_tokens_not_output_count() {
1061 let n = 5usize;
1062 let transport = mem_transport(50);
1063 for i in 0..n {
1064 transport
1065 .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
1066 .await
1067 .unwrap();
1068 }
1069
1070 let engine = default_engine();
1071 let shutdown = CancellationToken::new();
1072 cancel_after(shutdown.clone(), 200);
1073
1074 let sink_records = Arc::new(AtomicUsize::new(0));
1075 let sink_tokens = Arc::new(AtomicUsize::new(0));
1076 let sr = Arc::clone(&sink_records);
1077 let st = Arc::clone(&sink_tokens);
1078
1079 engine
1080 .run_workbatch(
1081 &transport,
1082 shutdown,
1083 |batch| Ok(batch.map_records(|recs| fan_out(recs, 2))),
1084 |out: &WorkBatch<_>| {
1085 let sr = Arc::clone(&sr);
1086 let st = Arc::clone(&st);
1087 let records = out.records.len();
1088 let tokens = out.commit_tokens.len();
1089 async move {
1090 sr.fetch_add(records, Ordering::Relaxed);
1091 st.fetch_add(tokens, Ordering::Relaxed);
1092 Ok(())
1093 }
1094 },
1095 CommitMode::Auto,
1096 None::<(
1097 Duration,
1098 fn() -> std::future::Ready<Result<(), EngineError>>,
1099 )>,
1100 )
1101 .await
1102 .unwrap();
1103
1104 // (a) all 2N records reached the sink.
1105 assert_eq!(
1106 sink_records.load(Ordering::Relaxed),
1107 2 * n,
1108 "all 2N records sunk"
1109 );
1110 // (b) the out-batch carried exactly N source tokens (fan-out did not
1111 // multiply the acks).
1112 assert_eq!(
1113 sink_tokens.load(Ordering::Relaxed),
1114 n,
1115 "N source tokens carried"
1116 );
1117 // (b cont.) commit acked exactly the N source tokens: MemoryToken seq is
1118 // 0..N, so committed_sequence (a fetch_max) lands on N-1.
1119 assert_eq!(
1120 transport.committed_sequence(),
1121 (n - 1) as u64,
1122 "commit advanced to the highest of the N source acks, not the 2N output count"
1123 );
1124 }
1125
1126 /// On a sink error the commit must NOT fire (the block is re-delivered) AND
1127 /// the run loop stops -- the sink error is a TERMINAL ack-barrier error.
1128 #[tokio::test]
1129 async fn sink_error_does_not_commit() {
1130 let transport = mem_transport(50);
1131 transport
1132 .inject(None, br#"{"id":1}"#.to_vec())
1133 .await
1134 .unwrap();
1135
1136 let engine = default_engine();
1137 let shutdown = CancellationToken::new();
1138 cancel_after(shutdown.clone(), 200);
1139
1140 let result = engine
1141 .run_workbatch(
1142 &transport,
1143 shutdown,
1144 |batch| Ok(batch),
1145 |_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
1146 CommitMode::Auto,
1147 None::<(
1148 Duration,
1149 fn() -> std::future::Ready<Result<(), EngineError>>,
1150 )>,
1151 )
1152 .await;
1153 assert!(
1154 matches!(result, Err(EngineError::Sink(_))),
1155 "sink error is terminal: the run returns the sink error, got {result:?}"
1156 );
1157
1158 // committed_sequence is a fetch_max seeded at 0 and the only injected
1159 // message had seq 0; a commit would still leave it at 0, so to PROVE the
1160 // commit did not fire we inject a higher-seq message that, if committed,
1161 // would advance the sequence past 0. Re-run with seq 1..=2.
1162 let transport = mem_transport(50);
1163 transport
1164 .inject(None, br#"{"a":1}"#.to_vec())
1165 .await
1166 .unwrap(); // seq 0
1167 transport
1168 .inject(None, br#"{"b":2}"#.to_vec())
1169 .await
1170 .unwrap(); // seq 1
1171 // drain seq 0 first so the failing block carries seq 1.
1172 let _ = transport.recv(1).await.unwrap();
1173 let shutdown = CancellationToken::new();
1174 cancel_after(shutdown.clone(), 200);
1175 let result = engine
1176 .run_workbatch(
1177 &transport,
1178 shutdown,
1179 |batch| Ok(batch),
1180 |_out: &WorkBatch<_>| async { Err(EngineError::Sink("boom".into())) },
1181 CommitMode::Auto,
1182 None::<(
1183 Duration,
1184 fn() -> std::future::Ready<Result<(), EngineError>>,
1185 )>,
1186 )
1187 .await;
1188 assert!(result.is_err(), "sink error is terminal");
1189 assert_eq!(
1190 transport.committed_sequence(),
1191 0,
1192 "sink error must skip commit -- sequence stays at its initial 0"
1193 );
1194 }
1195
1196 /// `CommitMode::Auto` commits after a successful sink.
1197 #[tokio::test]
1198 async fn auto_commits_after_sink_ok() {
1199 let transport = mem_transport(50);
1200 for i in 0..3u64 {
1201 transport
1202 .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
1203 .await
1204 .unwrap();
1205 }
1206
1207 let engine = default_engine();
1208 let shutdown = CancellationToken::new();
1209 cancel_after(shutdown.clone(), 200);
1210
1211 engine
1212 .run_workbatch(
1213 &transport,
1214 shutdown,
1215 |batch| Ok(batch),
1216 |_out: &WorkBatch<_>| async { Ok(()) },
1217 CommitMode::Auto,
1218 None::<(
1219 Duration,
1220 fn() -> std::future::Ready<Result<(), EngineError>>,
1221 )>,
1222 )
1223 .await
1224 .unwrap();
1225
1226 // Three messages seq 0..=2 -> committed sequence is 2.
1227 assert_eq!(transport.committed_sequence(), 2);
1228 }
1229
1230 /// `CommitMode::SinkManaged` leaves the commit to the sink -- the engine
1231 /// does not commit.
1232 #[tokio::test]
1233 async fn sink_managed_does_not_commit_in_engine() {
1234 let transport = mem_transport(50);
1235 transport
1236 .inject(None, br#"{"a":1}"#.to_vec())
1237 .await
1238 .unwrap(); // seq 0
1239 transport
1240 .inject(None, br#"{"b":2}"#.to_vec())
1241 .await
1242 .unwrap(); // seq 1
1243 // Drain seq 0 so the block carries seq 1 -- a commit would push the
1244 // sequence past its initial 0.
1245 let _ = transport.recv(1).await.unwrap();
1246
1247 let engine = default_engine();
1248 let shutdown = CancellationToken::new();
1249 cancel_after(shutdown.clone(), 200);
1250
1251 engine
1252 .run_workbatch(
1253 &transport,
1254 shutdown,
1255 |batch| Ok(batch),
1256 // Sink does NOT commit here -- it could, but we prove the engine
1257 // does not commit on its behalf.
1258 |_out: &WorkBatch<_>| async { Ok(()) },
1259 CommitMode::SinkManaged,
1260 None::<(
1261 Duration,
1262 fn() -> std::future::Ready<Result<(), EngineError>>,
1263 )>,
1264 )
1265 .await
1266 .unwrap();
1267
1268 assert_eq!(
1269 transport.committed_sequence(),
1270 0,
1271 "SinkManaged: engine must not commit -- sequence stays at initial 0"
1272 );
1273 }
1274
1275 /// The ticker fires on its interval; shutdown stops the loop cleanly.
1276 #[tokio::test]
1277 async fn ticker_fires_and_shutdown_stops_loop() {
1278 let transport = mem_transport(50);
1279 let engine = default_engine();
1280 let shutdown = CancellationToken::new();
1281 cancel_after(shutdown.clone(), 350);
1282
1283 let ticks = Arc::new(AtomicU64::new(0));
1284 let tc = Arc::clone(&ticks);
1285
1286 let result = engine
1287 .run_workbatch(
1288 &transport,
1289 shutdown,
1290 |batch| Ok(batch),
1291 |_out: &WorkBatch<_>| async { Ok(()) },
1292 CommitMode::Auto,
1293 Some((Duration::from_millis(100), move || {
1294 let tc = Arc::clone(&tc);
1295 async move {
1296 tc.fetch_add(1, Ordering::Relaxed);
1297 Ok(())
1298 }
1299 })),
1300 )
1301 .await;
1302
1303 assert!(result.is_ok(), "shutdown stops the loop cleanly");
1304 assert!(
1305 ticks.load(Ordering::Relaxed) >= 2,
1306 "ticker fired at least twice over 350ms at 100ms interval"
1307 );
1308 }
1309
1310 /// On-demand path: a transform that calls codec::parse reads the right field
1311 /// and can rewrite the payload, all without the driver pre-parsing.
1312 #[tokio::test]
1313 async fn on_demand_transform_reads_field_via_codec_parse() {
1314 let transport = mem_transport(50);
1315 transport
1316 .inject(None, br#"{"_table":"events","id":1}"#.to_vec())
1317 .await
1318 .unwrap();
1319
1320 let engine = default_engine();
1321 let shutdown = CancellationToken::new();
1322 cancel_after(shutdown.clone(), 200);
1323
1324 let seen_table = Arc::new(std::sync::Mutex::new(String::new()));
1325 let st = Arc::clone(&seen_table);
1326
1327 engine
1328 .run_workbatch(
1329 &transport,
1330 shutdown,
1331 move |batch| {
1332 let st = Arc::clone(&st);
1333 Ok(batch.map_records(move |recs| {
1334 recs.into_iter()
1335 .inspect(|r| {
1336 // Parse ON DEMAND inside the transform.
1337 let parsed = codec::parse(&r.payload, r.metadata.format)
1338 .expect("valid json");
1339 if let Some(t) = parsed.field_str("_table") {
1340 *st.lock().unwrap() = t.to_string();
1341 }
1342 })
1343 .collect()
1344 }))
1345 },
1346 |_out: &WorkBatch<_>| async { Ok(()) },
1347 CommitMode::Auto,
1348 None::<(
1349 Duration,
1350 fn() -> std::future::Ready<Result<(), EngineError>>,
1351 )>,
1352 )
1353 .await
1354 .unwrap();
1355
1356 assert_eq!(*seen_table.lock().unwrap(), "events");
1357 }
1358
1359 /// Batch-parse path: the driver pre-parses; the process closure sees aligned
1360 /// parsed payloads, the interner dedups field names, and the logical result
1361 /// matches the on-demand path.
1362 #[tokio::test]
1363 async fn parsed_path_pre_parses_and_interner_dedups() {
1364 let transport = mem_transport(50);
1365 for i in 0..4 {
1366 transport
1367 .inject(
1368 None,
1369 format!(r#"{{"_table":"events","id":{i}}}"#).into_bytes(),
1370 )
1371 .await
1372 .unwrap();
1373 }
1374
1375 let engine = default_engine();
1376 let shutdown = CancellationToken::new();
1377 cancel_after(shutdown.clone(), 200);
1378
1379 let tables = Arc::new(AtomicUsize::new(0));
1380 let tc = Arc::clone(&tables);
1381
1382 engine
1383 .run_workbatch_parsed(
1384 &transport,
1385 shutdown,
1386 move |pb: ParsedBatch<'_, _>| {
1387 // Records are aligned 1:1 with parsed payloads.
1388 assert_eq!(pb.records.len(), pb.parsed.len());
1389 // Intern the routing-field name once for the whole block.
1390 let field = pb.intern("_table");
1391 let mut hits = 0;
1392 for parsed in &pb.parsed {
1393 if parsed.field_str(&field) == Some("events") {
1394 hits += 1;
1395 }
1396 }
1397 tc.fetch_add(hits, Ordering::Relaxed);
1398 // Re-assemble a WorkBatch preserving the source acks.
1399 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1400 .with_dlq_entries(pb.dlq_entries))
1401 },
1402 |_out: &WorkBatch<_>| async { Ok(()) },
1403 CommitMode::Auto,
1404 None::<(
1405 Duration,
1406 fn() -> std::future::Ready<Result<(), EngineError>>,
1407 )>,
1408 )
1409 .await
1410 .unwrap();
1411
1412 assert_eq!(
1413 tables.load(Ordering::Relaxed),
1414 4,
1415 "all 4 records routed on _table"
1416 );
1417 assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
1418 }
1419
1420 /// Parsed path no-silent-drop (default `ParseErrorAction::Dlq`): an
1421 /// unparseable record is routed to the out-batch DLQ entries, the process
1422 /// closure sees them, AND they reach the DLQ route point (a `Route` policy
1423 /// sink) before commit -- not dropped -- while source acks stay intact.
1424 #[tokio::test]
1425 async fn parsed_path_routes_parse_failures_to_dlq() {
1426 use crate::worker::engine::FilterDlqPolicy;
1427 let transport = mem_transport(50);
1428 transport
1429 .inject(None, br#"{"id":1}"#.to_vec())
1430 .await
1431 .unwrap(); // seq 0 ok
1432 transport
1433 .inject(None, b"not json {{{".to_vec())
1434 .await
1435 .unwrap(); // seq 1 bad
1436 transport
1437 .inject(None, br#"{"id":3}"#.to_vec())
1438 .await
1439 .unwrap(); // seq 2 ok
1440
1441 // A Route policy captures the entries that reach the DLQ route point.
1442 let routed = Arc::new(AtomicUsize::new(0));
1443 let rc = Arc::clone(&routed);
1444 let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
1445 move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
1446 rc.fetch_add(entries.len(), Ordering::Relaxed);
1447 Ok(())
1448 },
1449 )));
1450 let shutdown = CancellationToken::new();
1451 cancel_after(shutdown.clone(), 200);
1452
1453 let dlq_seen = Arc::new(AtomicUsize::new(0));
1454 let kept = Arc::new(AtomicUsize::new(0));
1455 let ds = Arc::clone(&dlq_seen);
1456 let kp = Arc::clone(&kept);
1457
1458 engine
1459 .run_workbatch_parsed(
1460 &transport,
1461 shutdown,
1462 move |pb: ParsedBatch<'_, _>| {
1463 ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
1464 kp.fetch_add(pb.records.len(), Ordering::Relaxed);
1465 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1466 .with_dlq_entries(pb.dlq_entries))
1467 },
1468 |_out: &WorkBatch<_>| async { Ok(()) },
1469 CommitMode::Auto,
1470 None::<(
1471 Duration,
1472 fn() -> std::future::Ready<Result<(), EngineError>>,
1473 )>,
1474 )
1475 .await
1476 .unwrap();
1477
1478 assert_eq!(kept.load(Ordering::Relaxed), 2, "2 records parsed cleanly");
1479 assert_eq!(
1480 dlq_seen.load(Ordering::Relaxed),
1481 1,
1482 "1 parse failure carried to the process closure as a DLQ entry"
1483 );
1484 assert_eq!(
1485 routed.load(Ordering::Relaxed),
1486 1,
1487 "the parse-failure DLQ entry reached the DLQ route point before commit"
1488 );
1489 // All three source acks are still committed -- a parse failure does not
1490 // lose the source ack (at-least-once on the WHOLE block).
1491 assert_eq!(transport.committed_sequence(), 2);
1492 }
1493
1494 /// Memory pressure / lease accounting on a WorkBatch.
1495 #[cfg(feature = "memory")]
1496 #[tokio::test]
1497 async fn lease_ingress_batch_accounts_and_releases() {
1498 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1499
1500 let mut engine = default_engine();
1501 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1502 limit_bytes: 1024 * 1024,
1503 ..Default::default()
1504 }));
1505 engine.set_memory_guard_for_test(Arc::clone(&guard));
1506
1507 let payloads: Vec<Record> = (0..4)
1508 .map(|i| Record {
1509 payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
1510 key: None,
1511 headers: vec![],
1512 metadata: RecordMeta {
1513 timestamp_ms: None,
1514 format: PayloadFormat::Json,
1515 },
1516 })
1517 .collect();
1518 let batch = WorkBatch::<MemTok>::from_records(payloads);
1519 let expected = batch.total_payload_bytes() as u64;
1520
1521 assert_eq!(guard.current_bytes(), 0);
1522 {
1523 let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
1524 assert_eq!(guard.current_bytes(), expected, "accounted while held");
1525 }
1526 assert_eq!(guard.current_bytes(), 0, "released on drop");
1527 }
1528
1529 /// A minimal CommitToken for the memory-lease unit test (no transport recv).
1530 #[cfg(feature = "memory")]
1531 #[derive(Debug, Clone)]
1532 struct MemTok;
1533 #[cfg(feature = "memory")]
1534 impl std::fmt::Display for MemTok {
1535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536 f.write_str("memtok")
1537 }
1538 }
1539 #[cfg(feature = "memory")]
1540 impl CommitToken for MemTok {}
1541
1542 // ---- Remediation Phase 1: ordered-commit ack barrier -----------------
1543 //
1544 // Kafka (and MemoryTransport) commit is CUMULATIVE: `commit up to offset N`
1545 // advances a watermark via fetch_max. So if a block carrying token 0 fails
1546 // its sink/commit, a LATER block carrying token 1 must NEVER be committed --
1547 // doing so advances the watermark past token 0's never-sent records, which
1548 // silently skips them (data loss, at-least-once violated). These tests pin
1549 // the ack barrier: the committed watermark never advances past the last
1550 // successfully-sunk-and-committed block.
1551
1552 /// A real ORDERED receiver test double (real `Record`/`WorkBatch`/`MemoryToken`
1553 /// types, no internal-code mock). It hands out ONE record per `recv` with
1554 /// MONOTONIC tokens (seq 0, 1, 2, ...) and a CUMULATIVE commit -- the
1555 /// committed watermark is `fetch_max` of the committed tokens, exactly like
1556 /// a Kafka offset commit. This isolates the ordered-commit semantics from
1557 /// MemoryTransport's channel batching (which would coalesce all pending
1558 /// messages into a single block).
1559 struct OrderedReceiver {
1560 /// Next seq to deliver; one record per recv until exhausted.
1561 next_seq: Arc<AtomicU64>,
1562 /// How many records to deliver before recv blocks (pending) forever.
1563 total: u64,
1564 /// Cumulative committed watermark (highest committed seq + 1, or 0 if
1565 /// nothing committed). `u64::MAX` sentinel means "no commit yet".
1566 committed_hwm: Arc<AtomicU64>,
1567 /// Count of commit calls (to prove a later block's commit did not fire).
1568 commit_calls: Arc<AtomicUsize>,
1569 /// If set, `commit` returns an error (broker commit failure) for any
1570 /// block whose highest token seq equals this value.
1571 fail_commit_on_seq: Option<u64>,
1572 }
1573
1574 impl OrderedReceiver {
1575 fn new(total: u64) -> Self {
1576 Self {
1577 next_seq: Arc::new(AtomicU64::new(0)),
1578 total,
1579 committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
1580 commit_calls: Arc::new(AtomicUsize::new(0)),
1581 fail_commit_on_seq: None,
1582 }
1583 }
1584 }
1585
1586 impl crate::transport::TransportBase for OrderedReceiver {
1587 fn close(
1588 &self,
1589 ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
1590 {
1591 std::future::ready(Ok(()))
1592 }
1593 fn is_healthy(&self) -> bool {
1594 true
1595 }
1596 fn name(&self) -> &'static str {
1597 "ordered-test"
1598 }
1599 }
1600
1601 impl TransportReceiver for OrderedReceiver {
1602 type Token = crate::transport::memory::MemoryToken;
1603
1604 fn recv(
1605 &self,
1606 _max: usize,
1607 ) -> impl std::future::Future<
1608 Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
1609 > + Send {
1610 let next_seq = Arc::clone(&self.next_seq);
1611 let total = self.total;
1612 async move {
1613 let seq = next_seq.fetch_add(1, Ordering::Relaxed);
1614 if seq >= total {
1615 // Exhausted: block forever so the loop only exits on shutdown
1616 // (mirrors a quiet broker -- never an error/EOF).
1617 next_seq.fetch_sub(1, Ordering::Relaxed);
1618 std::future::pending::<()>().await;
1619 }
1620 let record = Record {
1621 payload: Bytes::from(format!(r#"{{"seq":{seq}}}"#)),
1622 key: None,
1623 headers: vec![],
1624 metadata: RecordMeta {
1625 timestamp_ms: None,
1626 format: PayloadFormat::Json,
1627 },
1628 };
1629 Ok(WorkBatch::new(
1630 vec![record],
1631 vec![crate::transport::memory::MemoryToken { seq }],
1632 ))
1633 }
1634 }
1635
1636 async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
1637 self.commit_calls.fetch_add(1, Ordering::Relaxed);
1638 let Some(max_seq) = tokens.iter().map(|t| t.seq).max() else {
1639 return Ok(());
1640 };
1641 if self.fail_commit_on_seq == Some(max_seq) {
1642 return Err(crate::transport::TransportError::Commit(format!(
1643 "broker commit failed for seq {max_seq}"
1644 )));
1645 }
1646 // Cumulative: watermark = max(current, this block's highest seq).
1647 self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
1648 Ok(())
1649 }
1650 }
1651
1652 /// THE ack-barrier bug test (sink failure). Token 0's block fails at the
1653 /// sink; token 1's block would succeed. With an ORDERED/cumulative commit,
1654 /// the engine must NEVER commit token 1 (which would advance the watermark
1655 /// past the never-sent token 0). Assert: the committed watermark never
1656 /// advances past the last successfully-sunk block -- i.e. NOTHING is
1657 /// committed, and the run STOPS (terminal) rather than draining token 1.
1658 #[tokio::test]
1659 async fn sink_error_blocks_later_ordered_commits() {
1660 let receiver = OrderedReceiver::new(3);
1661 let committed = Arc::clone(&receiver.committed_hwm);
1662 let commit_calls = Arc::clone(&receiver.commit_calls);
1663
1664 let engine = default_engine();
1665 let shutdown = CancellationToken::new();
1666 // Safety net: if the loop wrongly continued, shutdown stops it so the
1667 // test cannot hang. The assertions still catch the data-loss advance.
1668 cancel_after(shutdown.clone(), 500);
1669
1670 let sink_calls = Arc::new(AtomicUsize::new(0));
1671 let sc = Arc::clone(&sink_calls);
1672
1673 let result = engine
1674 .run_workbatch(
1675 &receiver,
1676 shutdown,
1677 |batch| Ok(batch),
1678 move |out: &WorkBatch<_>| {
1679 let sc = Arc::clone(&sc);
1680 // Fail the sink for the block carrying token 0.
1681 let carries_zero = out.commit_tokens.iter().any(|t| t.seq == 0);
1682 async move {
1683 sc.fetch_add(1, Ordering::Relaxed);
1684 if carries_zero {
1685 Err(EngineError::Sink("boom on token 0".into()))
1686 } else {
1687 Ok(())
1688 }
1689 }
1690 },
1691 CommitMode::Auto,
1692 None::<(
1693 Duration,
1694 fn() -> std::future::Ready<Result<(), EngineError>>,
1695 )>,
1696 )
1697 .await;
1698
1699 // The ack barrier: token 0 failed, so the watermark must NOT advance
1700 // past it. NOTHING may be committed (token 1 must never commit ahead).
1701 assert_eq!(
1702 committed.load(Ordering::Relaxed),
1703 u64::MAX,
1704 "sink error on token 0 must leave the committed watermark unmoved -- \
1705 a later token must NOT be committed past the failed offset"
1706 );
1707 assert_eq!(
1708 commit_calls.load(Ordering::Relaxed),
1709 0,
1710 "no commit may fire while token 0's block is unsent"
1711 );
1712 // The fix makes the sink error TERMINAL: the run returns Err and the
1713 // loop never advances to deliver token 1.
1714 assert!(
1715 result.is_err(),
1716 "sink failure under Auto must be a terminal engine error (ack barrier), \
1717 not a logged continue that drains later blocks"
1718 );
1719 assert_eq!(
1720 sink_calls.load(Ordering::Relaxed),
1721 1,
1722 "loop must stop at the failed block -- token 1 must not be fetched+sunk"
1723 );
1724 }
1725
1726 /// Ack-barrier on COMMIT failure. The sink succeeds but the COMMIT for
1727 /// token 0's block fails (broker commit error). The engine must treat this
1728 /// as a terminal ack-barrier failure and NOT advance to fetch+commit
1729 /// token 1 past the failed offset.
1730 #[tokio::test]
1731 async fn commit_error_blocks_later_ordered_commits() {
1732 let mut receiver = OrderedReceiver::new(3);
1733 receiver.fail_commit_on_seq = Some(0);
1734 let committed = Arc::clone(&receiver.committed_hwm);
1735
1736 let engine = default_engine();
1737 let shutdown = CancellationToken::new();
1738 cancel_after(shutdown.clone(), 500);
1739
1740 let sink_calls = Arc::new(AtomicUsize::new(0));
1741 let sc = Arc::clone(&sink_calls);
1742
1743 let result = engine
1744 .run_workbatch(
1745 &receiver,
1746 shutdown,
1747 |batch| Ok(batch),
1748 move |_out: &WorkBatch<_>| {
1749 let sc = Arc::clone(&sc);
1750 async move {
1751 sc.fetch_add(1, Ordering::Relaxed);
1752 Ok(())
1753 }
1754 },
1755 CommitMode::Auto,
1756 None::<(
1757 Duration,
1758 fn() -> std::future::Ready<Result<(), EngineError>>,
1759 )>,
1760 )
1761 .await;
1762
1763 // Commit of token 0 failed -> watermark unmoved, run terminates, token 1
1764 // is never fetched/committed past the failed offset.
1765 assert_eq!(
1766 committed.load(Ordering::Relaxed),
1767 u64::MAX,
1768 "failed commit must not leave a later commit to advance past it"
1769 );
1770 assert!(
1771 result.is_err(),
1772 "commit failure must be a terminal ack-barrier error"
1773 );
1774 assert_eq!(
1775 sink_calls.load(Ordering::Relaxed),
1776 1,
1777 "loop must stop at the failed commit -- token 1 must not be processed"
1778 );
1779 }
1780
1781 /// Streaming variant of the ack barrier: a sink error on token 0's block
1782 /// (streamed in sub-blocks) must block any later ordered commit. Mid-block
1783 /// sink failure stops the block AND must not let a later block's commit
1784 /// advance the watermark past it.
1785 #[tokio::test]
1786 async fn streaming_sink_error_blocks_later_ordered_commits() {
1787 let receiver = OrderedReceiver::new(3);
1788 let committed = Arc::clone(&receiver.committed_hwm);
1789 let commit_calls = Arc::clone(&receiver.commit_calls);
1790
1791 let engine = default_engine();
1792 let shutdown = CancellationToken::new();
1793 cancel_after(shutdown.clone(), 500);
1794
1795 let sink_calls = Arc::new(AtomicUsize::new(0));
1796 let sc = Arc::clone(&sink_calls);
1797
1798 let result = engine
1799 .run_workbatch_streaming(
1800 &receiver,
1801 shutdown,
1802 |batch| Ok(batch),
1803 move |out: &WorkBatch<_>| {
1804 let sc = Arc::clone(&sc);
1805 // Streaming sub-block views carry EMPTY commit_tokens, so we
1806 // identify token 0's block by its payload bytes ({"seq":0}).
1807 let carries_zero = out
1808 .records
1809 .iter()
1810 .any(|r| r.payload.as_ref() == br#"{"seq":0}"#);
1811 async move {
1812 sc.fetch_add(1, Ordering::Relaxed);
1813 if carries_zero {
1814 Err(EngineError::Sink("boom on token 0 (streaming)".into()))
1815 } else {
1816 Ok(())
1817 }
1818 }
1819 },
1820 CommitMode::Auto,
1821 64, // one record per sub-block (records are tiny)
1822 None::<(
1823 Duration,
1824 fn() -> std::future::Ready<Result<(), EngineError>>,
1825 )>,
1826 )
1827 .await;
1828
1829 assert_eq!(
1830 committed.load(Ordering::Relaxed),
1831 u64::MAX,
1832 "streaming sink error on token 0 must not let a later token commit ahead"
1833 );
1834 assert_eq!(
1835 commit_calls.load(Ordering::Relaxed),
1836 0,
1837 "no commit may fire while token 0's block is unsent (streaming)"
1838 );
1839 assert!(
1840 result.is_err(),
1841 "streaming sink failure under Auto must be a terminal ack-barrier error"
1842 );
1843 assert_eq!(
1844 sink_calls.load(Ordering::Relaxed),
1845 1,
1846 "streaming loop must stop at the failed block"
1847 );
1848 }
1849
1850 // ---- Task G4: per-unit streaming -------------------------------------
1851
1852 /// split_into_sub_blocks unit coverage: byte-budget splitting + floor-1.
1853 #[test]
1854 fn split_groups_by_byte_target() {
1855 // Five 10-byte records, target 25 -> sub-blocks of {2,2,1} records
1856 // (20 <= 25; adding the 3rd would be 30 > 25 -> close at 2).
1857 let records: Vec<Record> = (0..5)
1858 .map(|_| Record {
1859 payload: Bytes::from_static(b"0123456789"), // 10 bytes
1860 key: None,
1861 headers: vec![],
1862 metadata: RecordMeta {
1863 timestamp_ms: None,
1864 format: PayloadFormat::Json,
1865 },
1866 })
1867 .collect();
1868 let sub = BatchEngine::split_into_sub_blocks(records, 25);
1869 let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
1870 assert_eq!(lens, vec![2, 2, 1], "20<=25 per block, never overshoot 25");
1871 }
1872
1873 #[test]
1874 fn split_floor_one_oversized_record() {
1875 // A record larger than the target is still its own sub-block (no stall).
1876 let records = vec![
1877 Record {
1878 payload: Bytes::from_static(b"this-payload-is-way-over-the-target"),
1879 key: None,
1880 headers: vec![],
1881 metadata: RecordMeta {
1882 timestamp_ms: None,
1883 format: PayloadFormat::Json,
1884 },
1885 },
1886 Record {
1887 payload: Bytes::from_static(b"small"),
1888 key: None,
1889 headers: vec![],
1890 metadata: RecordMeta {
1891 timestamp_ms: None,
1892 format: PayloadFormat::Json,
1893 },
1894 },
1895 ];
1896 let sub = BatchEngine::split_into_sub_blocks(records, 4);
1897 let lens: Vec<usize> = sub.iter().map(Vec::len).collect();
1898 assert_eq!(lens, vec![1, 1], "oversized record floors to one-per-block");
1899 }
1900
1901 #[test]
1902 fn split_empty_yields_no_sub_blocks() {
1903 let sub = BatchEngine::split_into_sub_blocks(Vec::new(), 100);
1904 assert!(sub.is_empty());
1905 }
1906
1907 #[test]
1908 fn split_smaller_than_target_is_one_sub_block() {
1909 let records: Vec<Record> = (0..3)
1910 .map(|_| Record {
1911 payload: Bytes::from_static(b"abc"),
1912 key: None,
1913 headers: vec![],
1914 metadata: RecordMeta {
1915 timestamp_ms: None,
1916 format: PayloadFormat::Json,
1917 },
1918 })
1919 .collect();
1920 let sub = BatchEngine::split_into_sub_blocks(records, 10_000);
1921 assert_eq!(sub.len(), 1, "whole batch under target -> single sub-block");
1922 assert_eq!(sub[0].len(), 3);
1923 }
1924
1925 /// THE peak-memory proving test: a batch of N records totalling B bytes,
1926 /// streamed with sub_block_bytes ~= B/4. A guard with a registered guard
1927 /// (no heap source) reports current_bytes() = the outstanding lease. The sink
1928 /// samples guard.current_bytes() on EACH call (the sub-block lease is held
1929 /// during the sink); the high-water must stay at ~one sub-block, NOT the whole
1930 /// batch B. The contrast: drive_block would peak at B.
1931 #[cfg(feature = "memory")]
1932 #[tokio::test]
1933 async fn streaming_peak_lease_bounded_to_one_sub_block() {
1934 use crate::memory::{MemoryGuard, MemoryGuardConfig};
1935
1936 // 16 records of 64 bytes each = 1024 bytes total.
1937 const RECORD_BYTES: usize = 64;
1938 const N: usize = 16;
1939 let total: u64 = (RECORD_BYTES * N) as u64; // 1024
1940 let payload = vec![b'x'; RECORD_BYTES];
1941
1942 let transport = mem_transport(50);
1943 for _ in 0..N {
1944 transport.inject(None, payload.clone()).await.unwrap();
1945 }
1946
1947 let mut engine = default_engine();
1948 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1949 limit_bytes: 1024 * 1024,
1950 ..Default::default()
1951 }));
1952 engine.set_memory_guard_for_test(Arc::clone(&guard));
1953
1954 let shutdown = CancellationToken::new();
1955 cancel_after(shutdown.clone(), 200);
1956
1957 // Sub-block target ~= B/4 -> ~256 bytes -> 4 records per sub-block.
1958 let sub_block_bytes = total / 4; // 256
1959 let one_sub_block_bytes = sub_block_bytes; // 4 records * 64 = 256
1960
1961 // High-water of the guard's accounted bytes, sampled while the sub-block
1962 // lease is held (the sink runs inside the leased window).
1963 let high_water = Arc::new(AtomicU64::new(0));
1964 let guard_for_sink = Arc::clone(&guard);
1965 let hw = Arc::clone(&high_water);
1966
1967 engine
1968 .run_workbatch_streaming(
1969 &transport,
1970 shutdown,
1971 |batch| Ok(batch),
1972 move |_out: &WorkBatch<_>| {
1973 let guard = Arc::clone(&guard_for_sink);
1974 let hw = Arc::clone(&hw);
1975 async move {
1976 let now = guard.current_bytes();
1977 hw.fetch_max(now, Ordering::Relaxed);
1978 Ok(())
1979 }
1980 },
1981 CommitMode::Auto,
1982 sub_block_bytes,
1983 None::<(
1984 Duration,
1985 fn() -> std::future::Ready<Result<(), EngineError>>,
1986 )>,
1987 )
1988 .await
1989 .unwrap();
1990
1991 let peak = high_water.load(Ordering::Relaxed);
1992 // Peak in-flight lease is ONE sub-block, never the whole batch.
1993 assert!(
1994 peak <= one_sub_block_bytes,
1995 "peak lease {peak} exceeded one sub-block {one_sub_block_bytes} \
1996 (a whole-batch lease would be {total})"
1997 );
1998 assert!(
1999 peak > 0 && peak < total,
2000 "peak {peak} must be a partial sub-block, strictly less than the \
2001 whole batch {total}"
2002 );
2003 // Lease fully released after the run.
2004 assert_eq!(guard.current_bytes(), 0, "all leases released after run");
2005 }
2006
2007 /// A counting receiver: delegates recv/lifecycle to an inner MemoryTransport,
2008 /// but records EACH commit call (count + the tokens + how many sink calls had
2009 /// happened by then) so the test can prove "commit fires exactly once, after
2010 /// the final sub-block, with all N source tokens".
2011 struct CountingReceiver {
2012 inner: MemoryTransport,
2013 commit_calls: Arc<AtomicUsize>,
2014 commit_token_count: Arc<AtomicUsize>,
2015 sink_calls: Arc<AtomicUsize>,
2016 sink_calls_at_commit: Arc<AtomicUsize>,
2017 }
2018
2019 impl crate::transport::TransportBase for CountingReceiver {
2020 fn close(
2021 &self,
2022 ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
2023 {
2024 self.inner.close()
2025 }
2026 fn is_healthy(&self) -> bool {
2027 self.inner.is_healthy()
2028 }
2029 fn name(&self) -> &'static str {
2030 self.inner.name()
2031 }
2032 }
2033
2034 impl TransportReceiver for CountingReceiver {
2035 type Token = <MemoryTransport as TransportReceiver>::Token;
2036
2037 fn recv(
2038 &self,
2039 max: usize,
2040 ) -> impl std::future::Future<
2041 Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2042 > + Send {
2043 self.inner.recv(max)
2044 }
2045
2046 async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
2047 self.commit_calls.fetch_add(1, Ordering::Relaxed);
2048 self.commit_token_count
2049 .fetch_add(tokens.len(), Ordering::Relaxed);
2050 self.sink_calls_at_commit
2051 .store(self.sink_calls.load(Ordering::Relaxed), Ordering::Relaxed);
2052 self.inner.commit(tokens).await
2053 }
2054 }
2055
2056 /// Commit-once-after-final: N source tokens streamed across multiple
2057 /// sub-blocks. Commit must fire EXACTLY once, AFTER the last sub-block's sink,
2058 /// carrying ALL N source tokens (at-least-once on the whole block).
2059 #[tokio::test]
2060 async fn streaming_commits_once_after_final_sub_block() {
2061 const N: usize = 12;
2062 const RECORD_BYTES: usize = 32;
2063 let payload = vec![b'y'; RECORD_BYTES];
2064
2065 let inner = mem_transport(50);
2066 for _ in 0..N {
2067 inner.inject(None, payload.clone()).await.unwrap();
2068 }
2069
2070 let commit_calls = Arc::new(AtomicUsize::new(0));
2071 let commit_token_count = Arc::new(AtomicUsize::new(0));
2072 let sink_calls = Arc::new(AtomicUsize::new(0));
2073 let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
2074 let receiver = CountingReceiver {
2075 inner,
2076 commit_calls: Arc::clone(&commit_calls),
2077 commit_token_count: Arc::clone(&commit_token_count),
2078 sink_calls: Arc::clone(&sink_calls),
2079 sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
2080 };
2081
2082 let engine = default_engine();
2083 let shutdown = CancellationToken::new();
2084 cancel_after(shutdown.clone(), 200);
2085
2086 let sc = Arc::clone(&sink_calls);
2087 // ~3 records per sub-block (96 bytes) -> 4 sub-blocks for 12 records.
2088 let sub_block_bytes = (RECORD_BYTES * 3) as u64;
2089
2090 engine
2091 .run_workbatch_streaming(
2092 &receiver,
2093 shutdown,
2094 |batch| Ok(batch),
2095 move |_out: &WorkBatch<_>| {
2096 let sc = Arc::clone(&sc);
2097 async move {
2098 sc.fetch_add(1, Ordering::Relaxed);
2099 Ok(())
2100 }
2101 },
2102 CommitMode::Auto,
2103 sub_block_bytes,
2104 None::<(
2105 Duration,
2106 fn() -> std::future::Ready<Result<(), EngineError>>,
2107 )>,
2108 )
2109 .await
2110 .unwrap();
2111
2112 let total_sinks = sink_calls.load(Ordering::Relaxed);
2113 assert!(
2114 total_sinks >= 4,
2115 "expected multiple sub-block sinks, got {total_sinks}"
2116 );
2117 // Commit fired exactly ONCE.
2118 assert_eq!(commit_calls.load(Ordering::Relaxed), 1, "commit fires once");
2119 // It carried ALL N source tokens.
2120 assert_eq!(
2121 commit_token_count.load(Ordering::Relaxed),
2122 N,
2123 "commit carried all N source tokens"
2124 );
2125 // It fired AFTER the final sub-block sink (all sinks done by commit time).
2126 assert_eq!(
2127 sink_calls_at_commit.load(Ordering::Relaxed),
2128 total_sinks,
2129 "commit fired after the last sub-block sink"
2130 );
2131 }
2132
2133 /// A sink error on a MIDDLE sub-block stops the block and skips the commit
2134 /// (the whole block is re-delivered -- at-least-once).
2135 #[tokio::test]
2136 async fn streaming_mid_sub_block_sink_error_skips_commit() {
2137 const N: usize = 9;
2138 const RECORD_BYTES: usize = 32;
2139 let payload = vec![b'z'; RECORD_BYTES];
2140
2141 let inner = mem_transport(50);
2142 for _ in 0..N {
2143 inner.inject(None, payload.clone()).await.unwrap();
2144 }
2145
2146 let commit_calls = Arc::new(AtomicUsize::new(0));
2147 let commit_token_count = Arc::new(AtomicUsize::new(0));
2148 let sink_calls = Arc::new(AtomicUsize::new(0));
2149 let sink_calls_at_commit = Arc::new(AtomicUsize::new(0));
2150 let receiver = CountingReceiver {
2151 inner,
2152 commit_calls: Arc::clone(&commit_calls),
2153 commit_token_count: Arc::clone(&commit_token_count),
2154 sink_calls: Arc::clone(&sink_calls),
2155 sink_calls_at_commit: Arc::clone(&sink_calls_at_commit),
2156 };
2157
2158 let engine = default_engine();
2159 let shutdown = CancellationToken::new();
2160 cancel_after(shutdown.clone(), 200);
2161
2162 let sc = Arc::clone(&sink_calls);
2163 // ~3 records per sub-block -> 3 sub-blocks; fail on the 2nd (middle).
2164 let sub_block_bytes = (RECORD_BYTES * 3) as u64;
2165
2166 let result = engine
2167 .run_workbatch_streaming(
2168 &receiver,
2169 shutdown,
2170 |batch| Ok(batch),
2171 move |_out: &WorkBatch<_>| {
2172 let sc = Arc::clone(&sc);
2173 async move {
2174 let nth = sc.fetch_add(1, Ordering::Relaxed) + 1;
2175 if nth == 2 {
2176 Err(EngineError::Sink("boom on middle sub-block".into()))
2177 } else {
2178 Ok(())
2179 }
2180 }
2181 },
2182 CommitMode::Auto,
2183 sub_block_bytes,
2184 None::<(
2185 Duration,
2186 fn() -> std::future::Ready<Result<(), EngineError>>,
2187 )>,
2188 )
2189 .await;
2190
2191 // The sink error is TERMINAL (ack barrier): the run returns the error.
2192 assert!(
2193 matches!(result, Err(EngineError::Sink(_))),
2194 "mid sub-block sink error is terminal, got {result:?}"
2195 );
2196 // The block stopped at the failing sub-block: no commit, and the 3rd
2197 // sub-block was never sunk.
2198 assert_eq!(
2199 commit_calls.load(Ordering::Relaxed),
2200 0,
2201 "mid sub-block sink error must skip commit"
2202 );
2203 assert_eq!(
2204 sink_calls.load(Ordering::Relaxed),
2205 2,
2206 "stopped after the failing 2nd sub-block (3rd never sunk)"
2207 );
2208 }
2209
2210 /// Floor case: a batch smaller than sub_block_bytes streams as ONE sub-block
2211 /// and behaves like drive_block (all records sunk once, commit once).
2212 #[tokio::test]
2213 async fn streaming_small_batch_is_single_sub_block() {
2214 let transport = mem_transport(50);
2215 for i in 0..3u64 {
2216 transport
2217 .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2218 .await
2219 .unwrap();
2220 }
2221
2222 let engine = default_engine();
2223 let shutdown = CancellationToken::new();
2224 cancel_after(shutdown.clone(), 200);
2225
2226 let sink_calls = Arc::new(AtomicUsize::new(0));
2227 let sink_records = Arc::new(AtomicUsize::new(0));
2228 let scz = Arc::clone(&sink_calls);
2229 let srz = Arc::clone(&sink_records);
2230
2231 engine
2232 .run_workbatch_streaming(
2233 &transport,
2234 shutdown,
2235 |batch| Ok(batch),
2236 move |out: &WorkBatch<_>| {
2237 let scz = Arc::clone(&scz);
2238 let srz = Arc::clone(&srz);
2239 let n = out.records.len();
2240 async move {
2241 scz.fetch_add(1, Ordering::Relaxed);
2242 srz.fetch_add(n, Ordering::Relaxed);
2243 Ok(())
2244 }
2245 },
2246 CommitMode::Auto,
2247 10_000, // target far larger than the whole batch
2248 None::<(
2249 Duration,
2250 fn() -> std::future::Ready<Result<(), EngineError>>,
2251 )>,
2252 )
2253 .await
2254 .unwrap();
2255
2256 assert_eq!(
2257 sink_calls.load(Ordering::Relaxed),
2258 1,
2259 "under-target batch sinks once (single sub-block)"
2260 );
2261 assert_eq!(
2262 sink_records.load(Ordering::Relaxed),
2263 3,
2264 "all 3 records sunk"
2265 );
2266 assert_eq!(
2267 transport.committed_sequence(),
2268 2,
2269 "all 3 acks committed once"
2270 );
2271 }
2272
2273 // ---- Phase 3: governed run path (default-on self-regulation) ----------
2274
2275 /// Build a real governor over a MemoryGuard and wire its byte budget into
2276 /// the engine, returning (engine, governor) so the test can inspect both.
2277 #[cfg(feature = "governor")]
2278 fn governed_engine() -> (BatchEngine, crate::governor::SelfRegulationGovernor) {
2279 use crate::memory::{MemoryGuard, MemoryGuardConfig};
2280 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2281 limit_bytes: 1024 * 1024,
2282 ..Default::default()
2283 }));
2284 let gov = crate::governor::SelfRegulationConfig::default()
2285 .build(guard)
2286 .expect("enabled by default");
2287 let mut engine = default_engine();
2288 engine.set_byte_budget(gov.budget());
2289 (engine, gov)
2290 }
2291
2292 /// Governor ON: the governed driver streams the input end-to-end through a
2293 /// MemoryTransport, all records reach the sink, the source acks commit, and
2294 /// the AIMD budget moves (observe is folded in per block).
2295 #[cfg(feature = "governor")]
2296 #[tokio::test]
2297 async fn governed_on_streams_and_commits_via_memory_transport() {
2298 let transport = mem_transport(50);
2299 for i in 0..6u64 {
2300 transport
2301 .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2302 .await
2303 .unwrap();
2304 }
2305
2306 let (engine, _gov) = governed_engine();
2307 assert!(engine.is_self_regulated(), "budget wired -> governed path");
2308
2309 let shutdown = CancellationToken::new();
2310 cancel_after(shutdown.clone(), 200);
2311
2312 let sink_records = Arc::new(AtomicUsize::new(0));
2313 let sr = Arc::clone(&sink_records);
2314
2315 engine
2316 .run_governed(
2317 &transport,
2318 shutdown,
2319 |batch| Ok(batch),
2320 move |out: &WorkBatch<_>| {
2321 let sr = Arc::clone(&sr);
2322 let n = out.records.len();
2323 async move {
2324 sr.fetch_add(n, Ordering::Relaxed);
2325 Ok(())
2326 }
2327 },
2328 CommitMode::Auto,
2329 None::<(
2330 Duration,
2331 fn() -> std::future::Ready<Result<(), EngineError>>,
2332 )>,
2333 )
2334 .await
2335 .unwrap();
2336
2337 assert_eq!(
2338 sink_records.load(Ordering::Relaxed),
2339 6,
2340 "all records streamed to the sink under the governor"
2341 );
2342 assert_eq!(transport.committed_sequence(), 5, "all 6 acks committed");
2343 }
2344
2345 /// Governor OFF: with no byte budget wired, run_governed delegates to the
2346 /// whole-batch run_workbatch -- behaviour is unchanged (one sink call for
2347 /// the whole block, commit once).
2348 #[cfg(feature = "governor")]
2349 #[tokio::test]
2350 async fn governed_off_is_whole_batch_passthrough() {
2351 let transport = mem_transport(50);
2352 for i in 0..4u64 {
2353 transport
2354 .inject(None, format!(r#"{{"id":{i}}}"#).into_bytes())
2355 .await
2356 .unwrap();
2357 }
2358
2359 // No set_byte_budget -> byte_budget is None -> OFF path.
2360 let engine = default_engine();
2361 assert!(
2362 !engine.is_self_regulated(),
2363 "no budget wired -> whole-batch path"
2364 );
2365
2366 let shutdown = CancellationToken::new();
2367 cancel_after(shutdown.clone(), 200);
2368
2369 let sink_calls = Arc::new(AtomicUsize::new(0));
2370 let sink_records = Arc::new(AtomicUsize::new(0));
2371 let sc = Arc::clone(&sink_calls);
2372 let sr = Arc::clone(&sink_records);
2373
2374 engine
2375 .run_governed(
2376 &transport,
2377 shutdown,
2378 |batch| Ok(batch),
2379 move |out: &WorkBatch<_>| {
2380 let sc = Arc::clone(&sc);
2381 let sr = Arc::clone(&sr);
2382 let n = out.records.len();
2383 async move {
2384 sc.fetch_add(1, Ordering::Relaxed);
2385 sr.fetch_add(n, Ordering::Relaxed);
2386 Ok(())
2387 }
2388 },
2389 CommitMode::Auto,
2390 None::<(
2391 Duration,
2392 fn() -> std::future::Ready<Result<(), EngineError>>,
2393 )>,
2394 )
2395 .await
2396 .unwrap();
2397
2398 assert_eq!(
2399 sink_calls.load(Ordering::Relaxed),
2400 1,
2401 "OFF path = whole-batch: the block sinks ONCE (not per sub-block)"
2402 );
2403 assert_eq!(sink_records.load(Ordering::Relaxed), 4, "all records sunk");
2404 assert_eq!(transport.committed_sequence(), 3, "all 4 acks committed");
2405 }
2406
2407 /// The shared pressure feeds an InboundGate: under high memory the gate
2408 /// holds (Admit::Hold) and the budget shrinks; low memory admits and the
2409 /// budget sits at start-big. Proves the gate + budget share one pressure.
2410 #[cfg(feature = "governor")]
2411 #[test]
2412 fn governed_gate_and_budget_share_pressure() {
2413 use crate::governor::{Admit, InboundGate, NoopActuator};
2414 use crate::memory::{MemoryGuard, MemoryGuardConfig};
2415
2416 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2417 limit_bytes: 1000,
2418 pressure_threshold: 0.80,
2419 ..Default::default()
2420 }));
2421 let gov = crate::governor::SelfRegulationConfig::default()
2422 .build(Arc::clone(&guard))
2423 .expect("enabled");
2424
2425 let gate = InboundGate::new(gov.pressure(), Box::new(NoopActuator));
2426 let budget = gov.budget();
2427 let start = budget.byte_budget();
2428
2429 // Low memory -> gate admits, budget unchanged on a slack observe.
2430 assert_eq!(gate.evaluate(), Admit::Yes, "low pressure admits");
2431
2432 // Slam memory high -> the SAME pressure both holds the gate AND, via the
2433 // HARD override in observe(), shrinks the budget regardless of rho.
2434 guard.add_bytes(950); // 95% of limit
2435 assert_eq!(gate.evaluate(), Admit::Hold, "high pressure holds the gate");
2436 budget.observe(0, Duration::from_millis(1), Duration::from_millis(100));
2437 assert!(
2438 budget.byte_budget() < start,
2439 "high memory shrinks the shared budget (HARD override)"
2440 );
2441 }
2442
2443 // ---- Phase 4: validation ---------------------------------------------
2444
2445 /// THE send-unaffected invariant: the OUTBOUND drain (sink) is NEVER gated
2446 /// by pressure -- only the INBOUND recv side is. With a `UnifiedPressure`
2447 /// pinned HARD-HIGH so `should_hold()` is true, the SAME transport's
2448 /// `send` / `send_batch` still succeed. Gating the drain would deadlock the
2449 /// pipeline (in-flight work could never leave), so the governor must never
2450 /// touch it. MemoryTransport's send path consults no pressure governor by
2451 /// construction; this test proves that holds even when a governor that the
2452 /// inbound side WOULD obey is wired and saturated.
2453 #[cfg(feature = "governor")]
2454 #[tokio::test]
2455 async fn send_unaffected_by_pressure_pinned_high() {
2456 use crate::governor::{Hysteresis, MemoryPressureSource, PressureSource, UnifiedPressure};
2457 use crate::memory::{MemoryGuard, MemoryGuardConfig};
2458 use crate::transport::TransportSender;
2459
2460 // Pin a REAL HARD memory source high so the latch holds (>= pause_above).
2461 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2462 limit_bytes: 1000,
2463 pressure_threshold: 0.80,
2464 ..Default::default()
2465 }));
2466 guard.add_bytes(950); // 95% -> HARD high
2467 let pressure = Arc::new(UnifiedPressure::new(
2468 vec![Arc::new(MemoryPressureSource::new(Arc::clone(&guard))) as Arc<dyn PressureSource>],
2469 Hysteresis::new(0.80, 0.65).expect("valid band"),
2470 ));
2471 assert!(
2472 pressure.should_hold(),
2473 "pinned-high governor must hold the INBOUND gate"
2474 );
2475
2476 // The OUTBOUND sink: send / send_batch must still succeed under hold.
2477 let transport = mem_transport(50);
2478
2479 let single = transport
2480 .send("k", Bytes::from_static(br#"{"id":1}"#))
2481 .await;
2482 assert!(
2483 single.is_ok(),
2484 "single send must succeed under pressure (sink never gated), got {single:?}"
2485 );
2486
2487 let records: Vec<Record> = (0..5)
2488 .map(|i| Record {
2489 payload: Bytes::from(format!(r#"{{"id":{i}}}"#)),
2490 key: Some(Arc::from(format!("k{i}").as_str())),
2491 headers: vec![],
2492 metadata: RecordMeta {
2493 timestamp_ms: None,
2494 format: PayloadFormat::Json,
2495 },
2496 })
2497 .collect();
2498 let batch_res = transport.send_batch(&records).await;
2499 assert!(
2500 batch_res.is_ok(),
2501 "send_batch must succeed under pressure (sink never gated), got {batch_res:?}"
2502 );
2503
2504 // Pressure is STILL high after the sends -- nothing about the send path
2505 // cleared or consulted it.
2506 assert!(
2507 pressure.should_hold(),
2508 "send does not touch the pressure latch"
2509 );
2510
2511 // And the sent data is intact on the wire (the drain really ran).
2512 let got = transport.recv(10).await.unwrap().records;
2513 assert_eq!(got.len(), 6, "1 single + 5 batched records all drained");
2514 }
2515
2516 /// Build a governed engine over a guard with a LOW limit, sharing ONE guard
2517 /// between the governor (pressure + budget) and the engine's ingress-lease
2518 /// accounting. Returns `(engine, governor, guard)`.
2519 #[cfg(all(feature = "governor", feature = "memory"))]
2520 fn governed_engine_low_limit(
2521 limit_bytes: u64,
2522 ) -> (
2523 BatchEngine,
2524 crate::governor::SelfRegulationGovernor,
2525 Arc<crate::memory::MemoryGuard>,
2526 ) {
2527 use crate::memory::{MemoryGuard, MemoryGuardConfig};
2528 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2529 limit_bytes,
2530 pressure_threshold: 0.80,
2531 ..Default::default()
2532 }));
2533 // The governor's pressure + AIMD budget run off THIS guard.
2534 let gov = crate::governor::SelfRegulationConfig::default()
2535 .build(Arc::clone(&guard))
2536 .expect("enabled by default");
2537 // A SMALL recv chunk so the load arrives over many blocks: the AIMD loop
2538 // (and the memory HARD override) shrink the budget block-to-block as
2539 // pressure builds, rather than pulling the whole load in one cold-budget
2540 // block. This is the realistic streaming shape -- a real broker/source
2541 // delivers in poll-sized chunks, not one giant block.
2542 let mut engine = BatchEngine::new(BatchProcessingConfig {
2543 max_chunk_size: 16,
2544 ..Default::default()
2545 });
2546 engine.set_byte_budget(gov.budget());
2547 // The engine's ingress leases must account against the SAME guard so the
2548 // streaming peak-lease feeds back into the pressure the budget reads.
2549 engine.set_memory_guard_for_test(Arc::clone(&guard));
2550 (engine, gov, guard)
2551 }
2552
2553 /// THE operational never-OOM test (in-process logical form).
2554 ///
2555 /// Drives sustained, large load through `run_governed` over a real
2556 /// `MemoryTransport`, governor ON, with a `MemoryGuard` on a LOW limit. It
2557 /// proves the four never-OOM invariants without a cgroup harness:
2558 ///
2559 /// 1. the inbound GATE engages -- with the governor's pressure pinned by
2560 /// sustained ingress, an `InboundGate` over the SAME pressure returns
2561 /// `Admit::Hold` (the brake the transport would apply);
2562 /// 2. the sink/drain KEEPS RUNNING -- every record reaches the sink and
2563 /// the source acks commit (the drain is never gated);
2564 /// 3. `MemoryGuard::current_bytes()` stays BOUNDED -- the streaming
2565 /// peak-lease holds at most ~one shrunk sub-block in flight, well under
2566 /// the whole-batch footprint, sampled at its high-water inside the sink;
2567 /// 4. the pipeline does NOT panic and the budget never collapses below its
2568 /// floor (>= 1, never 0).
2569 ///
2570 /// A full OS-level cgroup OOM-kill test (a memory-limited container + a real
2571 /// broker or transport under load) is FLAGGED for a CI harness (Phase 5.5);
2572 /// see the report.
2573 #[cfg(all(feature = "governor", feature = "memory"))]
2574 #[tokio::test]
2575 async fn operational_never_oom_governed_pipeline_bounds_memory() {
2576 use crate::governor::{Admit, InboundGate, NoopActuator};
2577
2578 // LOW limit, sized so a SINGLE in-flight poll-chunk (16 x 1 KiB =
2579 // 16 KiB) sits above the 80% pressure threshold (16/18 ~= 0.89), so the
2580 // gate engages while a sub-block is leased -- yet the streaming
2581 // peak-lease keeps the in-flight footprint at one chunk, never the whole
2582 // load. This is the never-OOM shape: high pressure brakes inbound, but
2583 // memory stays bounded because only one sub-block is ever resident.
2584 const LIMIT: u64 = 18 * 1024; // 18 KiB
2585 // Records far larger than the floor; many of them -> sustained load.
2586 const RECORD_BYTES: usize = 1024; // 1 KiB each
2587 const N: usize = 256; // 256 KiB of payload total -- 14x the limit
2588 let payload = vec![b'q'; RECORD_BYTES];
2589 let total_payload: u64 = (RECORD_BYTES * N) as u64;
2590
2591 let transport = mem_transport(50);
2592 for _ in 0..N {
2593 transport.inject(None, payload.clone()).await.unwrap();
2594 }
2595
2596 let (engine, gov, guard) = governed_engine_low_limit(LIMIT);
2597 assert!(engine.is_self_regulated(), "budget wired -> governed path");
2598
2599 // The gate the transport WOULD wire in, over the governor's shared
2600 // pressure. We evaluate it from inside the sink to observe the brake.
2601 let gate = Arc::new(InboundGate::new(gov.pressure(), Box::new(NoopActuator)));
2602
2603 let shutdown = CancellationToken::new();
2604 cancel_after(shutdown.clone(), 600);
2605
2606 let sink_records = Arc::new(AtomicUsize::new(0));
2607 let high_water = Arc::new(AtomicU64::new(0));
2608 let gate_held_ever = Arc::new(std::sync::atomic::AtomicBool::new(false));
2609
2610 let sr = Arc::clone(&sink_records);
2611 let hw = Arc::clone(&high_water);
2612 let geh = Arc::clone(&gate_held_ever);
2613 let guard_for_sink = Arc::clone(&guard);
2614 let gate_for_sink = Arc::clone(&gate);
2615
2616 engine
2617 .run_governed(
2618 &transport,
2619 shutdown,
2620 |batch| Ok(batch),
2621 move |out: &WorkBatch<_>| {
2622 let sr = Arc::clone(&sr);
2623 let hw = Arc::clone(&hw);
2624 let geh = Arc::clone(&geh);
2625 let guard = Arc::clone(&guard_for_sink);
2626 let gate = Arc::clone(&gate_for_sink);
2627 let n = out.records.len();
2628 async move {
2629 // (3) sample current_bytes() while the sub-block lease is
2630 // held -- this is the in-flight high-water.
2631 hw.fetch_max(guard.current_bytes(), Ordering::Relaxed);
2632 // (1) evaluate the gate over the SAME pressure: under
2633 // sustained ingress it engages (Hold).
2634 if gate.evaluate() == Admit::Hold {
2635 geh.store(true, Ordering::Relaxed);
2636 }
2637 // (2) the drain keeps running -- count every record sunk.
2638 sr.fetch_add(n, Ordering::Relaxed);
2639 Ok(())
2640 }
2641 },
2642 CommitMode::Auto,
2643 None::<(
2644 Duration,
2645 fn() -> std::future::Ready<Result<(), EngineError>>,
2646 )>,
2647 )
2648 .await
2649 .unwrap();
2650
2651 // (2) The drain KEPT RUNNING: every record reached the sink and the
2652 // source acks committed -- the sink is never gated.
2653 assert_eq!(
2654 sink_records.load(Ordering::Relaxed),
2655 N,
2656 "all {N} records drained through the governed sink"
2657 );
2658 assert_eq!(
2659 transport.committed_sequence(),
2660 (N - 1) as u64,
2661 "all source acks committed (drain never stalled)"
2662 );
2663
2664 // (1) The inbound gate ENGAGED at least once under the sustained load --
2665 // the brake the transport would apply did fire.
2666 assert!(
2667 gate_held_ever.load(Ordering::Relaxed),
2668 "inbound gate must engage (Admit::Hold) under sustained pressure"
2669 );
2670
2671 // (3) Peak in-flight bytes stayed BOUNDED, NOT the whole payload. The
2672 // streaming peak-lease bounds it to ~one shrunk sub-block; allow generous
2673 // headroom but it must be a small fraction of the whole-batch footprint.
2674 let peak = high_water.load(Ordering::Relaxed);
2675 assert!(
2676 peak > 0,
2677 "some bytes must be accounted while a sub-block is in flight"
2678 );
2679 assert!(
2680 peak < total_payload / 2,
2681 "peak in-flight {peak} must stay well under half the whole payload \
2682 {total_payload} (streaming peak-lease bounds it, never OOM)"
2683 );
2684
2685 // (4) Budget respected its floor (>= 1, never 0) and the run did not
2686 // panic (reaching here proves it). All leases released after the run.
2687 assert!(
2688 gov.budget().byte_budget() >= 1,
2689 "byte budget never collapses below its floor"
2690 );
2691 assert_eq!(
2692 guard.current_bytes(),
2693 0,
2694 "all ingress leases released after the run -- no leak"
2695 );
2696 }
2697
2698 // ---- Remediation Phase 2: byte-aware recv bounds RECEIVE memory -------
2699 //
2700 // The gap (Codex finding): the governed driver bounds memory by the
2701 // post-recv SUB-BLOCK lease, but `recv(max)` is RECORD-bounded only -- a
2702 // single poll can build a WorkBatch whose total bytes >> byte_budget BEFORE
2703 // any sub-block split, so the byte budget did NOT bound RECEIVE memory. The
2704 // fix routes the governed recv through `recv_limited(RecvLimits)` so the poll
2705 // is bounded by BOTH the record cap AND the byte budget.
2706
2707 /// A REAL test transport (not a mock of internal code -- a concrete
2708 /// `TransportReceiver` over owned `Record`/`WorkBatch`/`MemoryToken`) that
2709 /// makes the gap observable:
2710 ///
2711 /// - `recv(max)` is RECORD-bounded: it hands out up to `max` records in ONE
2712 /// block regardless of their bytes -- exactly the pre-fix behaviour that
2713 /// let a single poll retain bytes >> budget.
2714 /// - `recv_limited(limits)` is BYTE-bounded: it accumulates records until the
2715 /// payload bytes reach `limits.max_bytes`, FLOOR one record.
2716 ///
2717 /// Every handed-out block's total payload bytes are folded into a shared
2718 /// high-water so the test can assert the bytes RETAINED at recv time.
2719 struct ByteAwareSource {
2720 /// Remaining records to hand out (front = next).
2721 remaining: std::sync::Mutex<std::collections::VecDeque<Record>>,
2722 /// High-water of the bytes handed out in any single recv/recv_limited.
2723 recv_high_water: Arc<AtomicU64>,
2724 committed: Arc<AtomicU64>,
2725 }
2726
2727 impl ByteAwareSource {
2728 fn new(records: Vec<Record>, recv_high_water: Arc<AtomicU64>) -> Self {
2729 Self {
2730 remaining: std::sync::Mutex::new(records.into_iter().collect()),
2731 recv_high_water,
2732 committed: Arc::new(AtomicU64::new(0)),
2733 }
2734 }
2735
2736 /// Pull a block (front records) bounded by an optional byte cap and a
2737 /// record cap, folding its total bytes into the high-water. Returns
2738 /// `None` when the source is exhausted (the caller then PENDS forever so
2739 /// the run loop parks until shutdown -- never a busy spin).
2740 fn pull(&self, max_records: usize, max_bytes: Option<u64>) -> Option<WorkBatch<MemTok2>> {
2741 let mut q = self.remaining.lock().unwrap();
2742 if q.is_empty() {
2743 return None;
2744 }
2745 let mut records = Vec::new();
2746 let mut bytes: u64 = 0;
2747 while records.len() < max_records {
2748 let Some(front) = q.front() else { break };
2749 let rb = front.payload.len() as u64;
2750 // Byte cap with floor-1: stop only once we already hold >= 1.
2751 if let Some(cap) = max_bytes
2752 && !records.is_empty()
2753 && bytes.saturating_add(rb) > cap
2754 {
2755 break;
2756 }
2757 bytes = bytes.saturating_add(rb);
2758 records.push(q.pop_front().expect("front exists"));
2759 }
2760 self.recv_high_water.fetch_max(bytes, Ordering::Relaxed);
2761 let n = records.len() as u64;
2762 let base = self.committed.load(Ordering::Relaxed);
2763 let tokens: Vec<MemTok2> = (0..n).map(|i| MemTok2 { seq: base + i }).collect();
2764 Some(WorkBatch::new(records, tokens))
2765 }
2766 }
2767
2768 #[derive(Debug, Clone, Copy)]
2769 struct MemTok2 {
2770 seq: u64,
2771 }
2772 impl std::fmt::Display for MemTok2 {
2773 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2774 write!(f, "memtok2:{}", self.seq)
2775 }
2776 }
2777 impl crate::transport::CommitToken for MemTok2 {}
2778
2779 impl crate::transport::TransportBase for ByteAwareSource {
2780 fn close(
2781 &self,
2782 ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
2783 {
2784 std::future::ready(Ok(()))
2785 }
2786 fn is_healthy(&self) -> bool {
2787 true
2788 }
2789 fn name(&self) -> &'static str {
2790 "byte-aware-source"
2791 }
2792 }
2793
2794 impl TransportReceiver for ByteAwareSource {
2795 type Token = MemTok2;
2796
2797 fn recv(
2798 &self,
2799 max: usize,
2800 ) -> impl std::future::Future<
2801 Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2802 > + Send {
2803 // RECORD-bounded only -- ignores bytes. This is the pre-fix shape: a
2804 // single poll can retain bytes >> any budget.
2805 let pulled = self.pull(max, None);
2806 async move {
2807 match pulled {
2808 Some(batch) => Ok(batch),
2809 // Exhausted: park forever so the loop only exits on shutdown
2810 // (mirrors a quiet source -- never a busy spin).
2811 None => std::future::pending().await,
2812 }
2813 }
2814 }
2815
2816 fn recv_limited(
2817 &self,
2818 limits: crate::transport::RecvLimits,
2819 ) -> impl std::future::Future<
2820 Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
2821 > + Send {
2822 // BYTE-bounded (floor one record): the fix path.
2823 let pulled = self.pull(limits.max_records, Some(limits.max_bytes));
2824 async move {
2825 match pulled {
2826 Some(batch) => Ok(batch),
2827 None => std::future::pending().await,
2828 }
2829 }
2830 }
2831
2832 async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
2833 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
2834 self.committed.fetch_max(max_seq, Ordering::Relaxed);
2835 }
2836 Ok(())
2837 }
2838 }
2839
2840 /// THE reproduce/fix test: drive the GOVERNED loop over a source that could
2841 /// deliver a block whose total bytes are FAR larger than the byte budget.
2842 ///
2843 /// PRE-FIX (governed recv == `recv(record_cap)`): the source's record-bounded
2844 /// `recv` hands out the whole big block in one poll, so the bytes RETAINED at
2845 /// recv time = the whole block >> budget. The high-water assertion below
2846 /// FAILS (this is the reproduction).
2847 ///
2848 /// POST-FIX (governed recv == `recv_limited(record_cap, byte_budget)`): the
2849 /// source's byte-bounded `recv_limited` caps each poll at the budget (+ one
2850 /// record), so the retained bytes stay ~<= budget + one record.
2851 #[cfg(feature = "governor")]
2852 #[tokio::test]
2853 async fn governed_recv_is_byte_bounded_not_record_bounded() {
2854 use crate::memory::{MemoryGuard, MemoryGuardConfig};
2855
2856 // 64 records of 4 KiB each = 256 KiB total available in the source.
2857 const RECORD_BYTES: usize = 4 * 1024;
2858 const N: usize = 64;
2859 // A SMALL byte budget: 16 KiB (4 records). The record cap is large (2000
2860 // default) so the count NEVER bounds the poll -- only the byte cap can.
2861 const BUDGET: u64 = 16 * 1024;
2862
2863 let total: u64 = (RECORD_BYTES * N) as u64; // 256 KiB
2864 let payload = vec![b'b'; RECORD_BYTES];
2865 let records: Vec<Record> = (0..N)
2866 .map(|_| Record {
2867 payload: Bytes::from(payload.clone()),
2868 key: None,
2869 headers: vec![],
2870 metadata: RecordMeta {
2871 timestamp_ms: None,
2872 format: PayloadFormat::Json,
2873 },
2874 })
2875 .collect();
2876
2877 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
2878 limit_bytes: 1024 * 1024,
2879 ..Default::default()
2880 }));
2881 let cfg = crate::governor::ByteBudgetConfig {
2882 start_bytes: BUDGET,
2883 max_bytes: BUDGET, // pin it so the budget cannot grow past BUDGET
2884 floor_records: 1,
2885 nominal_record_bytes: RECORD_BYTES as u64,
2886 record_cap: 4096, // far above N -- count never bounds the poll
2887 ..Default::default()
2888 };
2889 let pressure = crate::governor::SelfRegulationConfig::default()
2890 .build(Arc::clone(&guard))
2891 .expect("enabled")
2892 .pressure();
2893 let budget = Arc::new(crate::governor::ByteBudgetController::new(
2894 cfg,
2895 Arc::clone(&pressure),
2896 ));
2897
2898 let recv_high_water = Arc::new(AtomicU64::new(0));
2899 let source = ByteAwareSource::new(records, Arc::clone(&recv_high_water));
2900
2901 let mut engine = BatchEngine::new(BatchProcessingConfig {
2902 // Big chunk so config never bounds the poll either -- the byte budget
2903 // is the ONLY thing that can.
2904 max_chunk_size: 4096,
2905 ..Default::default()
2906 });
2907 engine.set_byte_budget(budget);
2908
2909 let shutdown = CancellationToken::new();
2910 cancel_after(shutdown.clone(), 250);
2911
2912 engine
2913 .run_governed(
2914 &source,
2915 shutdown,
2916 |batch| Ok(batch),
2917 |_out: &WorkBatch<_>| async { Ok(()) },
2918 CommitMode::Auto,
2919 None::<(
2920 Duration,
2921 fn() -> std::future::Ready<Result<(), EngineError>>,
2922 )>,
2923 )
2924 .await
2925 .unwrap();
2926
2927 let peak = recv_high_water.load(Ordering::Relaxed);
2928 // The fix: a single governed recv retains at most the byte budget plus
2929 // one oversized-record floor -- NOT the whole 256 KiB block.
2930 assert!(
2931 peak <= BUDGET + RECORD_BYTES as u64,
2932 "governed recv retained {peak} bytes at recv time -- must be bounded \
2933 by the byte budget {BUDGET} (+ one record {RECORD_BYTES}), not the \
2934 whole {total}-byte block (record-bounded recv would retain all of it)"
2935 );
2936 assert!(
2937 peak > 0,
2938 "the source did hand out records (sanity: the loop ran)"
2939 );
2940 }
2941
2942 /// The sub-block drain is LAZY: it yields one sub-block at a time and does
2943 /// NOT allocate every sub-block up front. We assert incremental yield -- the
2944 /// first `next_sub_block()` returns one budget-sized sub-block while records
2945 /// for later sub-blocks remain un-pulled in the drain.
2946 #[test]
2947 fn sub_block_drain_yields_incrementally() {
2948 // 6 records of 10 bytes; target 25 -> sub-blocks {2, 2, 2}.
2949 let records: Vec<Record> = (0..6)
2950 .map(|_| Record {
2951 payload: Bytes::from_static(b"0123456789"),
2952 key: None,
2953 headers: vec![],
2954 metadata: RecordMeta {
2955 timestamp_ms: None,
2956 format: PayloadFormat::Json,
2957 },
2958 })
2959 .collect();
2960 let mut drain = SubBlockDrain::new(records, 25);
2961
2962 // First pull yields ONE sub-block (2 records); the remaining 4 are still
2963 // inside the drain, NOT pre-materialised into sub-block vectors.
2964 let first = drain.next_sub_block().expect("first sub-block");
2965 assert_eq!(first.len(), 2, "first sub-block is one budget's worth");
2966 // The drain still has records to give (proves it did not eagerly split).
2967 let second = drain.next_sub_block().expect("second sub-block");
2968 assert_eq!(second.len(), 2);
2969 let third = drain.next_sub_block().expect("third sub-block");
2970 assert_eq!(third.len(), 2);
2971 // Now exhausted.
2972 assert!(drain.next_sub_block().is_none(), "drain exhausted");
2973 }
2974
2975 // ---- Remediation Phase 3: DLQ + parse-error-action semantics ----------
2976 //
2977 // Two findings the parsed/process paths had:
2978 // 1. parse_block hardcoded route-to-DLQ, ignoring ParseErrorAction.
2979 // 2. out_batch.dlq_entries from process were never routed before commit
2980 // (silent-drop path) -- only inbound-filter entries were routed.
2981 // These tests pin the fixed contract: one route point, one policy, fallible
2982 // route, parse_error_action honoured on the parsed path.
2983
2984 use crate::worker::engine::FilterDlqPolicy;
2985 use crate::worker::engine::config::ParseErrorAction;
2986
2987 /// An engine with a specific `ParseErrorAction` (default config otherwise).
2988 fn engine_with_parse_action(action: ParseErrorAction) -> BatchEngine {
2989 BatchEngine::new(BatchProcessingConfig {
2990 parse_error_action: action,
2991 ..Default::default()
2992 })
2993 }
2994
2995 /// Finding 1 -- `ParseErrorAction::Skip`: a parse failure on the parsed path
2996 /// is DROPPED silently (NO DLQ entry routed) yet the survivors are kept and
2997 /// ALL source acks commit (the block's tokens are decoupled from records).
2998 #[tokio::test]
2999 async fn parsed_parse_error_skip_drops_without_dlq_and_commits_survivors() {
3000 let transport = mem_transport(50);
3001 transport
3002 .inject(None, br#"{"id":1}"#.to_vec())
3003 .await
3004 .unwrap(); // seq 0 ok
3005 transport
3006 .inject(None, b"not json {{{".to_vec())
3007 .await
3008 .unwrap(); // seq 1 bad
3009 transport
3010 .inject(None, br#"{"id":3}"#.to_vec())
3011 .await
3012 .unwrap(); // seq 2 ok
3013
3014 // Route policy so we can PROVE no entry is routed under Skip.
3015 let routed = Arc::new(AtomicUsize::new(0));
3016 let rc = Arc::clone(&routed);
3017 let engine = engine_with_parse_action(ParseErrorAction::Skip).with_filter_dlq_policy(
3018 FilterDlqPolicy::Route(Arc::new(
3019 move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3020 rc.fetch_add(entries.len(), Ordering::Relaxed);
3021 Ok(())
3022 },
3023 )),
3024 );
3025 let shutdown = CancellationToken::new();
3026 cancel_after(shutdown.clone(), 200);
3027
3028 let dlq_seen = Arc::new(AtomicUsize::new(0));
3029 let kept = Arc::new(AtomicUsize::new(0));
3030 let ds = Arc::clone(&dlq_seen);
3031 let kp = Arc::clone(&kept);
3032
3033 engine
3034 .run_workbatch_parsed(
3035 &transport,
3036 shutdown,
3037 move |pb: ParsedBatch<'_, _>| {
3038 ds.fetch_add(pb.dlq_entries.len(), Ordering::Relaxed);
3039 kp.fetch_add(pb.records.len(), Ordering::Relaxed);
3040 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3041 .with_dlq_entries(pb.dlq_entries))
3042 },
3043 |_out: &WorkBatch<_>| async { Ok(()) },
3044 CommitMode::Auto,
3045 None::<(
3046 Duration,
3047 fn() -> std::future::Ready<Result<(), EngineError>>,
3048 )>,
3049 )
3050 .await
3051 .unwrap();
3052
3053 assert_eq!(kept.load(Ordering::Relaxed), 2, "2 survivors kept");
3054 assert_eq!(
3055 dlq_seen.load(Ordering::Relaxed),
3056 0,
3057 "Skip: parse failure produces NO DLQ entry (dropped, not dead-lettered)"
3058 );
3059 assert_eq!(
3060 routed.load(Ordering::Relaxed),
3061 0,
3062 "Skip: nothing reaches the DLQ route point"
3063 );
3064 // All three source acks committed -- survivors and the dropped record's
3065 // ack alike (at-least-once on the whole block; Skip is opt-in loss).
3066 assert_eq!(transport.committed_sequence(), 2);
3067 }
3068
3069 /// Finding 1 -- `ParseErrorAction::FailBatch`: a parse failure fails the
3070 /// WHOLE block terminally (no commit), consistent with the ack barrier. The
3071 /// run returns the terminal error and the source watermark does not advance.
3072 #[tokio::test]
3073 async fn parsed_parse_error_fail_batch_skips_commit() {
3074 // OrderedReceiver hands one record per recv with monotonic tokens and a
3075 // cumulative watermark, so we can prove the commit never fired.
3076 let receiver = OrderedReceiverBad::new();
3077 let committed = Arc::clone(&receiver.committed_hwm);
3078
3079 let engine = engine_with_parse_action(ParseErrorAction::FailBatch);
3080 let shutdown = CancellationToken::new();
3081 cancel_after(shutdown.clone(), 500);
3082
3083 let sink_calls = Arc::new(AtomicUsize::new(0));
3084 let sc = Arc::clone(&sink_calls);
3085
3086 let result = engine
3087 .run_workbatch_parsed(
3088 &receiver,
3089 shutdown,
3090 |pb: ParsedBatch<'_, _>| {
3091 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3092 .with_dlq_entries(pb.dlq_entries))
3093 },
3094 move |_out: &WorkBatch<_>| {
3095 let sc = Arc::clone(&sc);
3096 async move {
3097 sc.fetch_add(1, Ordering::Relaxed);
3098 Ok(())
3099 }
3100 },
3101 CommitMode::Auto,
3102 None::<(
3103 Duration,
3104 fn() -> std::future::Ready<Result<(), EngineError>>,
3105 )>,
3106 )
3107 .await;
3108
3109 assert!(
3110 matches!(result, Err(EngineError::ParseBatchFailed(_))),
3111 "FailBatch: a parse failure is a terminal engine error, got {result:?}"
3112 );
3113 assert_eq!(
3114 committed.load(Ordering::Relaxed),
3115 u64::MAX,
3116 "FailBatch: the whole block fails its commit -- watermark unmoved"
3117 );
3118 assert_eq!(
3119 sink_calls.load(Ordering::Relaxed),
3120 0,
3121 "FailBatch: the block never reaches the sink (parse fails first)"
3122 );
3123 }
3124
3125 /// Finding 1 -- `ParseErrorAction::Dlq`: a parse failure routes to the DLQ
3126 /// route point BEFORE commit, survivors are sunk, all source acks commit.
3127 #[tokio::test]
3128 async fn parsed_parse_error_dlq_routes_before_commit() {
3129 let transport = Arc::new(mem_transport(50));
3130 transport
3131 .inject(None, br#"{"id":1}"#.to_vec())
3132 .await
3133 .unwrap(); // seq 0 ok
3134 transport
3135 .inject(None, b"not json {{{".to_vec())
3136 .await
3137 .unwrap(); // seq 1 bad
3138 transport
3139 .inject(None, br#"{"id":3}"#.to_vec())
3140 .await
3141 .unwrap(); // seq 2 ok
3142
3143 // Sample committed_sequence at DLQ-route time to prove route precedes
3144 // commit: when the route sink fires, the commit must NOT yet have run.
3145 let routed = Arc::new(AtomicUsize::new(0));
3146 let committed_at_route = Arc::new(AtomicU64::new(u64::MAX));
3147 let rc = Arc::clone(&routed);
3148 let car = Arc::clone(&committed_at_route);
3149 let transport_for_route = Arc::clone(&transport);
3150 let engine = engine_with_parse_action(ParseErrorAction::Dlq).with_filter_dlq_policy(
3151 FilterDlqPolicy::Route(Arc::new(
3152 move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3153 car.store(transport_for_route.committed_sequence(), Ordering::Relaxed);
3154 rc.fetch_add(entries.len(), Ordering::Relaxed);
3155 Ok(())
3156 },
3157 )),
3158 );
3159 let shutdown = CancellationToken::new();
3160 cancel_after(shutdown.clone(), 200);
3161
3162 engine
3163 .run_workbatch_parsed(
3164 &*transport,
3165 shutdown,
3166 |pb: ParsedBatch<'_, _>| {
3167 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
3168 .with_dlq_entries(pb.dlq_entries))
3169 },
3170 |_out: &WorkBatch<_>| async { Ok(()) },
3171 CommitMode::Auto,
3172 None::<(
3173 Duration,
3174 fn() -> std::future::Ready<Result<(), EngineError>>,
3175 )>,
3176 )
3177 .await
3178 .unwrap();
3179
3180 assert_eq!(
3181 routed.load(Ordering::Relaxed),
3182 1,
3183 "Dlq: the parse failure reached the DLQ route point"
3184 );
3185 // The route fired BEFORE the commit: MemoryTransport's committed_sequence
3186 // starts at 0; the block's highest seq is 2, so a commit would set it to
3187 // 2. At route time it must still be its pre-commit value (0).
3188 assert_eq!(
3189 committed_at_route.load(Ordering::Relaxed),
3190 0,
3191 "DLQ route ran BEFORE the source commit advanced the watermark"
3192 );
3193 assert_eq!(
3194 transport.committed_sequence(),
3195 2,
3196 "all 3 acks committed after"
3197 );
3198 }
3199
3200 /// Finding 2 -- the STANDARD (on-demand) `run_workbatch` path must NOT
3201 /// silently drop DLQ entries that `process` emits on the out-batch. A
3202 /// process closure that attaches a dlq_entry has it ROUTED (reaches the DLQ
3203 /// route point) before the sink-success leads to a source commit -- it does
3204 /// not depend on the sink closure remembering to carry it.
3205 #[tokio::test]
3206 async fn standard_send_batch_sink_does_not_silently_drop_dlq_entries() {
3207 let transport = mem_transport(50);
3208 transport
3209 .inject(None, br#"{"id":1}"#.to_vec())
3210 .await
3211 .unwrap();
3212 transport
3213 .inject(None, br#"{"id":2}"#.to_vec())
3214 .await
3215 .unwrap();
3216
3217 let routed = Arc::new(AtomicUsize::new(0));
3218 let rc = Arc::clone(&routed);
3219 let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
3220 move |entries: Vec<crate::transport::filter::FilteredDlqEntry>| {
3221 rc.fetch_add(entries.len(), Ordering::Relaxed);
3222 Ok(())
3223 },
3224 )));
3225 let shutdown = CancellationToken::new();
3226 cancel_after(shutdown.clone(), 200);
3227
3228 // The SINK ignores dlq_entries entirely (the realistic app shape). The
3229 // PROCESS closure emits a dlq_entry on the out-batch. Pre-fix this entry
3230 // would vanish; post-fix the driver routes it before commit.
3231 engine
3232 .run_workbatch(
3233 &transport,
3234 shutdown,
3235 |batch| {
3236 let dlq = vec![crate::transport::filter::FilteredDlqEntry {
3237 payload: b"process-emitted dead-letter".to_vec(),
3238 key: None,
3239 reason: "process decided this record is bad".to_string(),
3240 }];
3241 let tokens = batch.commit_tokens;
3242 let records = batch.records;
3243 Ok(WorkBatch::new(records, tokens).with_dlq_entries(dlq))
3244 },
3245 |_out: &WorkBatch<_>| async { Ok(()) },
3246 CommitMode::Auto,
3247 None::<(
3248 Duration,
3249 fn() -> std::future::Ready<Result<(), EngineError>>,
3250 )>,
3251 )
3252 .await
3253 .unwrap();
3254
3255 assert!(
3256 routed.load(Ordering::Relaxed) >= 1,
3257 "process-emitted DLQ entry must reach the DLQ route point, not be \
3258 silently dropped on the path to commit"
3259 );
3260 // Source acks still commit -- the dead-letter routing is independent of
3261 // the source ack (at-least-once on the whole block).
3262 assert_eq!(transport.committed_sequence(), 1);
3263 }
3264
3265 /// Finding 3 -- a DLQ-route FAILURE under `Route` is a terminal ack-barrier
3266 /// error: the source commit is skipped (no later ordered commit advances
3267 /// past the undelivered dead-letters). Silent discard is opt-in only.
3268 #[tokio::test]
3269 async fn dlq_route_failure_is_terminal_and_blocks_commit() {
3270 let receiver = OrderedReceiverBad::without_parse_fail();
3271 let committed = Arc::clone(&receiver.committed_hwm);
3272
3273 // A Route sink that FAILS, simulating a DLQ transport outage.
3274 let engine = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(Arc::new(
3275 |_e: Vec<crate::transport::filter::FilteredDlqEntry>| {
3276 Err(EngineError::Sink("dlq transport down".into()))
3277 },
3278 )));
3279 let shutdown = CancellationToken::new();
3280 cancel_after(shutdown.clone(), 500);
3281
3282 let result = engine
3283 .run_workbatch(
3284 &receiver,
3285 shutdown,
3286 |batch| {
3287 // process emits a dlq entry; routing it will fail.
3288 let dlq = vec![crate::transport::filter::FilteredDlqEntry {
3289 payload: b"bad".to_vec(),
3290 key: None,
3291 reason: "process dlq".to_string(),
3292 }];
3293 Ok(WorkBatch::new(batch.records, batch.commit_tokens).with_dlq_entries(dlq))
3294 },
3295 |_out: &WorkBatch<_>| async { Ok(()) },
3296 CommitMode::Auto,
3297 None::<(
3298 Duration,
3299 fn() -> std::future::Ready<Result<(), EngineError>>,
3300 )>,
3301 )
3302 .await;
3303
3304 assert!(
3305 result.is_err(),
3306 "DLQ route failure must be a terminal ack-barrier error, got {result:?}"
3307 );
3308 assert_eq!(
3309 committed.load(Ordering::Relaxed),
3310 u64::MAX,
3311 "DLQ route failure must skip the commit -- watermark unmoved"
3312 );
3313 }
3314
3315 /// An ordered receiver that delivers ONE bad (unparseable) record then parks.
3316 /// Cumulative watermark via fetch_max, so a commit is observable. Used to
3317 /// prove FailBatch / DLQ-route-failure leave the watermark unmoved.
3318 struct OrderedReceiverBad {
3319 next: Arc<AtomicU64>,
3320 committed_hwm: Arc<AtomicU64>,
3321 good_payload: bool,
3322 }
3323
3324 impl OrderedReceiverBad {
3325 fn new() -> Self {
3326 Self {
3327 next: Arc::new(AtomicU64::new(0)),
3328 committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
3329 good_payload: false,
3330 }
3331 }
3332 /// Delivers a PARSEABLE record (for the DLQ-route-failure test, where the
3333 /// dead-letter comes from the process closure, not a parse failure).
3334 fn without_parse_fail() -> Self {
3335 Self {
3336 next: Arc::new(AtomicU64::new(0)),
3337 committed_hwm: Arc::new(AtomicU64::new(u64::MAX)),
3338 good_payload: true,
3339 }
3340 }
3341 }
3342
3343 impl crate::transport::TransportBase for OrderedReceiverBad {
3344 fn close(
3345 &self,
3346 ) -> impl std::future::Future<Output = crate::transport::TransportResult<()>> + Send
3347 {
3348 std::future::ready(Ok(()))
3349 }
3350 fn is_healthy(&self) -> bool {
3351 true
3352 }
3353 fn name(&self) -> &'static str {
3354 "ordered-bad-test"
3355 }
3356 }
3357
3358 impl TransportReceiver for OrderedReceiverBad {
3359 type Token = crate::transport::memory::MemoryToken;
3360
3361 fn recv(
3362 &self,
3363 _max: usize,
3364 ) -> impl std::future::Future<
3365 Output = crate::transport::TransportResult<WorkBatch<Self::Token>>,
3366 > + Send {
3367 let next = Arc::clone(&self.next);
3368 let good = self.good_payload;
3369 async move {
3370 let seq = next.fetch_add(1, Ordering::Relaxed);
3371 if seq >= 1 {
3372 next.fetch_sub(1, Ordering::Relaxed);
3373 std::future::pending::<()>().await;
3374 }
3375 let payload = if good {
3376 Bytes::from_static(br#"{"ok":1}"#)
3377 } else {
3378 Bytes::from_static(b"not json {{{")
3379 };
3380 let record = Record {
3381 payload,
3382 key: None,
3383 headers: vec![],
3384 metadata: RecordMeta {
3385 timestamp_ms: None,
3386 format: PayloadFormat::Json,
3387 },
3388 };
3389 Ok(WorkBatch::new(
3390 vec![record],
3391 vec![crate::transport::memory::MemoryToken { seq }],
3392 ))
3393 }
3394 }
3395
3396 async fn commit(&self, tokens: &[Self::Token]) -> crate::transport::TransportResult<()> {
3397 if let Some(max_seq) = tokens.iter().map(|t| t.seq).max() {
3398 self.committed_hwm.fetch_max(max_seq, Ordering::Relaxed);
3399 }
3400 Ok(())
3401 }
3402 }
3403}