Skip to main content

hyperi_rustlib/worker/engine/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/mod.rs
3// Purpose:   SIMD-optimised batch processing engine for DFE pipelines
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9pub mod config;
10#[cfg(feature = "transport")]
11pub mod driver;
12pub mod intern;
13pub mod metrics;
14pub mod parse;
15pub mod pre_route;
16pub mod types;
17
18pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
19#[cfg(feature = "transport")]
20pub use driver::{CommitMode, ParsedBatch};
21pub use intern::FieldInterner;
22pub use types::{MessageMetadata, ParsedMessage, PreRouteResult};
23
24/// Errors returned by the [`BatchEngine`] `WorkBatch` drivers
25/// ([`run_workbatch`](BatchEngine::run_workbatch) /
26/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed)).
27///
28/// Only available when the `transport` feature is enabled.
29#[cfg(feature = "transport")]
30#[derive(Debug, thiserror::Error)]
31pub enum EngineError {
32    /// Transport receive or commit failed.
33    #[error("transport error: {0}")]
34    Transport(#[from] crate::TransportError),
35    /// Sink callback returned an error.
36    #[error("sink error: {0}")]
37    Sink(String),
38    /// Shutdown was requested via cancellation token.
39    #[error("shutdown")]
40    Shutdown,
41    /// Inbound-filter DLQ entries appeared but no routing policy was configured
42    /// (the default [`FilterDlqPolicy::Reject`]). Metrics are not delivery, so
43    /// the engine fails fast rather than silently dropping dead-letters.
44    #[error(
45        "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
46         configured -- set a policy via BatchEngine::with_filter_dlq_policy \
47         (Route to forward, or DiscardWithMetric to deliberately drop)"
48    )]
49    FilterDlqUnrouted(usize),
50    /// A [`FilterDlqPolicy::Route`] sink failed to deliver DLQ entries. This is
51    /// a TERMINAL ack-barrier failure: the source commit is skipped so the whole
52    /// block is re-delivered -- a DLQ-route failure must NOT let a later ordered
53    /// commit advance past these undelivered dead-letters. Silent discard is
54    /// OPT-IN only, via [`FilterDlqPolicy::DiscardWithMetric`].
55    #[error("DLQ route failed: {0}")]
56    DlqRouteFailed(String),
57    /// A parse failure occurred under [`ParseErrorAction::FailBatch`]. TERMINAL:
58    /// the whole block fails its commit (consistent with the ack barrier) so it
59    /// is re-delivered rather than partially committed.
60    #[error("parse failed (fail_batch): {0}")]
61    ParseBatchFailed(String),
62    /// [`CommitMode::SinkManaged`] on the streaming/governed driver: sub-blocks
63    /// carry no tokens, so the sink can't own the commit. Use
64    /// [`CommitMode::Auto`], or `run_workbatch` for sink-managed commits.
65    #[error(
66        "CommitMode::SinkManaged unsupported on the streaming/governed driver; \
67         use CommitMode::Auto, or run_workbatch for sink-managed commits"
68    )]
69    SinkManagedUnsupported,
70}
71
72/// What a [`BatchEngine`] run loop does with inbound-filter DLQ entries
73/// ([`RecvBatch::dlq_entries`](crate::transport::RecvBatch)).
74///
75/// Inbound `action: dlq` filters remove messages from the normal batch; those
76/// entries must go somewhere. The default is [`Reject`](Self::Reject) so a
77/// data-loss-shaped config never passes silently.
78#[cfg(feature = "transport")]
79#[derive(Clone, Default)]
80pub enum FilterDlqPolicy {
81    /// Fail the run loop ([`EngineError::FilterDlqUnrouted`]) if any DLQ entries
82    /// appear. The safe default -- forces a deliberate choice.
83    #[default]
84    Reject,
85    /// Deliberately discard DLQ entries, counting them in the
86    /// `dfe_engine_filter_dlq_discarded_total` metric. Explicit opt-in.
87    DiscardWithMetric,
88    /// Hand each batch's DLQ entries to a sink (e.g. enqueue onto a DLQ
89    /// transport, or `tokio::spawn` an async send). Called on the run loop, so
90    /// keep it cheap -- offload slow work.
91    ///
92    /// The sink is FALLIBLE: returning `Err` raises
93    /// [`EngineError::DlqRouteFailed`], which is a terminal ack-barrier failure
94    /// (the source commit is skipped, the whole block re-delivered). A sink that
95    /// deliberately tolerates loss should return `Ok` after counting the drop;
96    /// to discard ALL entries by policy use
97    /// [`DiscardWithMetric`](Self::DiscardWithMetric) instead.
98    Route(
99        Arc<
100            dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) -> Result<(), EngineError>
101                + Send
102                + Sync,
103        >,
104    ),
105}
106
107#[cfg(feature = "transport")]
108impl std::fmt::Debug for FilterDlqPolicy {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Self::Reject => f.write_str("Reject"),
112            Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
113            Self::Route(_) => f.write_str("Route(..)"),
114        }
115    }
116}
117
118use std::sync::Arc;
119
120use super::pool::AdaptiveWorkerPool;
121use super::stats::PipelineStats;
122
123use self::pre_route::filters_from_config;
124// Pre-route + parse helpers are used only by the in-process process_* methods,
125// which take the canonical transport Record and so are transport-gated.
126#[cfg(feature = "transport")]
127use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field};
128#[cfg(feature = "transport")]
129use self::types::PayloadFormat;
130use super::config::WorkerPoolConfig;
131
132/// Core batch processing engine for DFE pipelines.
133///
134/// Provides two in-process processing modes (the run-loop drivers live in the
135/// `driver` module, gated on the `transport` feature):
136///
137/// - [`process_mid_tier`](Self::process_mid_tier) -- parse JSON via SIMD, extract
138///   known fields, apply pre-route filters, then parallel transform via rayon.
139///   The standard path for most DFE apps (loader, archiver, transforms).
140///
141/// - [`process_raw`](Self::process_raw) -- skip parsing, apply pre-route on raw
142///   bytes, then parallel transform via rayon. For apps that handle raw bytes
143///   (receiver, binary protocols).
144///
145/// Both take the canonical [`Record`](crate::transport::Record) slice (the same
146/// currency the [`WorkBatch`](crate::transport::WorkBatch) carries), chunk large
147/// batches, and track stats atomically.
148///
149/// Inbound braking under memory pressure is no longer a blocking pause between
150/// chunks (the retired `check_memory_pressure` proto-actuator). It is now the
151/// self-regulation governor's job: the inbound GATE pauses the source transport
152/// and the streaming byte-budget lever bounds peak in-flight memory. See
153/// `run_governed` (`transport` + `governor` features). With the governor OFF,
154/// there is no active brake -- that is the deliberate opt-out.
155pub struct BatchEngine {
156    config: BatchProcessingConfig,
157    pool: Arc<AdaptiveWorkerPool>,
158    stats: Arc<PipelineStats>,
159    interner: Arc<FieldInterner>,
160    filters: Vec<pre_route::PreRouteFilter>,
161    #[cfg(feature = "memory")]
162    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
163    /// What the run loops do with inbound-filter DLQ entries. Default
164    /// [`FilterDlqPolicy::Reject`] (no silent data loss).
165    #[cfg(feature = "transport")]
166    filter_dlq_policy: FilterDlqPolicy,
167    /// Self-regulation byte-budget lever (`governor` feature). `None` (the
168    /// default, and whenever self-regulation is OFF) keeps the engine on the
169    /// whole-batch [`run_workbatch`](Self::run_workbatch) loop -- byte-identical
170    /// to pre-governor behaviour. When wired (by `ServiceRuntime` when the
171    /// governor is enabled), `run_governed` streams in
172    /// budget-sized sub-blocks and feeds the AIMD loop per block.
173    #[cfg(feature = "governor")]
174    byte_budget: Option<Arc<crate::governor::ByteBudgetController>>,
175}
176
177impl BatchEngine {
178    /// Create a standalone engine with its own worker pool.
179    ///
180    /// Uses `WorkerPoolConfig::default()` for the pool. Prefer
181    /// [`with_pool`](Self::with_pool) when a `ServiceRuntime` pool exists.
182    #[must_use]
183    pub fn new(config: BatchProcessingConfig) -> Self {
184        let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
185        Self::with_pool(pool, config)
186    }
187
188    /// Create an engine that reuses an existing worker pool.
189    ///
190    /// This is the preferred constructor when `ServiceRuntime` is available,
191    /// as it avoids creating a second rayon thread pool.
192    #[must_use]
193    pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
194        let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
195        let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
196        let filters = filters_from_config(&config.pre_route_filters);
197        Self {
198            config,
199            pool,
200            stats: Arc::new(PipelineStats::new()),
201            interner,
202            filters,
203            #[cfg(feature = "memory")]
204            memory_guard: None,
205            #[cfg(feature = "transport")]
206            filter_dlq_policy: FilterDlqPolicy::default(),
207            #[cfg(feature = "governor")]
208            byte_budget: None,
209        }
210    }
211
212    /// Wire the self-regulation byte-budget lever (`governor` feature).
213    ///
214    /// Called by `ServiceRuntime::build()` when the governor is enabled. Once
215    /// wired, `run_governed` streams the input in
216    /// budget-sized sub-blocks and drives the AIMD loop per block. Without it
217    /// (governor off), `run_governed` falls back to the whole-batch loop.
218    #[cfg(feature = "governor")]
219    pub fn set_byte_budget(&mut self, budget: Arc<crate::governor::ByteBudgetController>) {
220        self.byte_budget = Some(budget);
221    }
222
223    /// Whether the self-regulation byte-budget lever is wired (`governor`
224    /// feature). When `false`, `run_governed` is the
225    /// whole-batch loop.
226    #[cfg(feature = "governor")]
227    #[must_use]
228    pub fn is_self_regulated(&self) -> bool {
229        self.byte_budget.is_some()
230    }
231
232    /// Set the policy for inbound-filter DLQ entries in the run loops.
233    ///
234    /// Default is [`FilterDlqPolicy::Reject`] -- the run loop errors if an
235    /// inbound `action: dlq` filter produces entries and no routing is set, so
236    /// dead-letters are never silently dropped.
237    #[cfg(feature = "transport")]
238    #[must_use]
239    pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
240        self.filter_dlq_policy = policy;
241        self
242    }
243
244    /// Load configuration from the cascade and create a standalone engine.
245    ///
246    /// # Errors
247    ///
248    /// Returns `ConfigError` if the cascade contains invalid data.
249    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
250        let config = BatchProcessingConfig::from_cascade(key)?;
251        Ok(Self::new(config))
252    }
253
254    /// Pipeline statistics (atomic, lock-free).
255    #[must_use]
256    pub fn stats(&self) -> &Arc<PipelineStats> {
257        &self.stats
258    }
259
260    /// Underlying worker pool.
261    #[must_use]
262    pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
263        &self.pool
264    }
265
266    /// Engine configuration.
267    #[must_use]
268    pub fn config(&self) -> &BatchProcessingConfig {
269        &self.config
270    }
271
272    /// Auto-wire engine with infrastructure components.
273    ///
274    /// Called by `ServiceRuntime::build()`. Apps never call this directly.
275    pub fn auto_wire(
276        &mut self,
277        metrics_manager: &crate::metrics::MetricsManager,
278        #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
279    ) {
280        metrics::register(metrics_manager, &self.config);
281
282        #[cfg(feature = "memory")]
283        if let Some(guard) = memory_guard {
284            self.memory_guard = Some(Arc::clone(guard));
285        }
286    }
287
288    /// Test-only: wire a `MemoryGuard` directly so memory-lease behaviour can be
289    /// exercised without a full `auto_wire`.
290    #[cfg(all(test, feature = "memory"))]
291    pub(crate) fn set_memory_guard_for_test(&mut self, guard: Arc<crate::memory::MemoryGuard>) {
292        self.memory_guard = Some(guard);
293    }
294
295    /// Parse, filter, and transform a batch of raw messages.
296    ///
297    /// Pipeline phases per chunk:
298    /// 1. **Pre-route** -- SIMD field extraction + filter evaluation (sequential, ~100 ns/msg)
299    /// 2. **Parse** -- `sonic_rs::from_slice` + known-field extraction (sequential, ~1-5 µs/msg)
300    /// 3. **Transform** -- user closure via rayon `par_iter_mut` (parallel)
301    ///
302    /// Results contain one entry per non-filtered message. Filtered messages are
303    /// silently removed (their commit tokens remain accessible via the original
304    /// slice). DLQ'd and parse-error messages produce `Err` entries.
305    #[cfg(feature = "transport")]
306    pub fn process_mid_tier<O, E, F>(
307        &self,
308        messages: &[crate::transport::Record],
309        transform: F,
310    ) -> Vec<Result<O, E>>
311    where
312        O: Send,
313        E: Send + From<String>,
314        F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
315    {
316        if messages.is_empty() {
317            return Vec::new();
318        }
319
320        let chunk_size = if self.config.max_chunk_size == 0 {
321            messages.len()
322        } else {
323            self.config.max_chunk_size
324        };
325
326        let has_routing = self.config.routing_field.is_some();
327        let mut all_results = Vec::with_capacity(messages.len());
328
329        for chunk in messages.chunks(chunk_size) {
330            self.stats.add_received(chunk.len() as u64);
331
332            // Accumulate bytes received
333            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
334            self.stats.add_bytes_received(chunk_bytes);
335
336            // Phase 1 + 2: Pre-route and parse, building ParsedMessage vec.
337            // Track which messages are included (not filtered).
338            let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
339                Vec::with_capacity(chunk.len());
340
341            for msg in chunk {
342                // Phase 1: Pre-route
343                if has_routing {
344                    let field_name = self.config.routing_field.as_ref().expect("checked above");
345                    let extraction = extract_routing_field(&msg.payload, field_name);
346                    let outcome = apply_filters(&extraction, &self.filters);
347
348                    match outcome {
349                        PreRouteOutcome::Continue => {}
350                        PreRouteOutcome::Filtered => {
351                            self.stats.incr_filtered();
352                            continue; // skip this message entirely
353                        }
354                        PreRouteOutcome::Dlq(reason) => {
355                            self.stats.incr_dlq();
356                            self.stats.incr_errors();
357                            parsed_msgs.push(Err(reason));
358                            continue;
359                        }
360                    }
361                }
362
363                // Phase 2: Parse. The Record carries the transport PayloadFormat;
364                // convert it to the engine's local enum, resolving Auto.
365                let format: PayloadFormat = match PayloadFormat::from(msg.metadata.format) {
366                    PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
367                    other => other,
368                };
369
370                match parse::parse_payload(&msg.payload, format) {
371                    Ok(value) => {
372                        let extracted = self.interner.extract_known(&value);
373                        // Rebuild a MessageMetadata for ParsedMessage. Commit
374                        // tokens live on the WorkBatch, not on individual records.
375                        let metadata = MessageMetadata {
376                            timestamp_ms: msg.metadata.timestamp_ms,
377                            format,
378                        };
379                        parsed_msgs.push(Ok(ParsedMessage {
380                            value,
381                            raw: msg.payload.clone(),
382                            format,
383                            key: msg.key.clone(),
384                            headers: msg.headers.clone(),
385                            metadata,
386                            extracted,
387                        }));
388                    }
389                    Err(e) => {
390                        self.stats.incr_errors();
391                        match self.config.parse_error_action {
392                            ParseErrorAction::Dlq => {
393                                self.stats.incr_dlq();
394                                parsed_msgs.push(Err(format!("parse error: {e}")));
395                            }
396                            ParseErrorAction::Skip => {
397                                // Counted in errors above, not added to results
398                            }
399                            ParseErrorAction::FailBatch => {
400                                // Return all accumulated results + this error,
401                                // then stop processing the chunk.
402                                parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
403                                let results: Vec<Result<O, E>> = parsed_msgs
404                                    .into_iter()
405                                    .map(|r| match r {
406                                        Ok(_) => Err(E::from(
407                                            "batch failed due to parse error".to_string(),
408                                        )),
409                                        Err(reason) => Err(E::from(reason)),
410                                    })
411                                    .collect();
412                                all_results.extend(results);
413                                return all_results;
414                            }
415                        }
416                    }
417                }
418            }
419
420            // Phase 3: Parallel transform via rayon.
421            // Split into ok/err: transform only the Ok entries.
422            let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
423                parsed_msgs.into_iter().enumerate().collect();
424
425            // Separate errors from parseable messages
426            let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
427            let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
428
429            for (idx, item) in indexed.drain(..) {
430                match item {
431                    Ok(pm) => to_transform.push((idx, pm)),
432                    Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
433                }
434            }
435
436            // Parallel transform, throttled by the scaler target (map_owned
437            // applies the semaphore per item -- unlike the old install() path,
438            // which bypassed it and let the parsed path ignore the CPU cap).
439            let transformed: Vec<(usize, Result<O, E>)> =
440                self.pool.map_owned(to_transform, |(idx, mut pm)| {
441                    let result = transform(&mut pm);
442                    (idx, result)
443                });
444
445            chunk_results.extend(transformed);
446
447            // Sort by original index to preserve order
448            chunk_results.sort_by_key(|(idx, _)| *idx);
449
450            // Update stats
451            let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
452            self.stats.add_processed(ok_count as u64);
453
454            all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
455        }
456
457        all_results
458    }
459
460    /// Pre-route and transform a batch of records without parsing.
461    ///
462    /// The transform closure receives immutable [`Record`](crate::transport::Record)
463    /// references. Use this for apps that handle raw bytes directly (e.g. receiver
464    /// forwarding).
465    #[cfg(feature = "transport")]
466    pub fn process_raw<O, E, F>(
467        &self,
468        messages: &[crate::transport::Record],
469        transform: F,
470    ) -> Vec<Result<O, E>>
471    where
472        O: Send,
473        E: Send + From<String>,
474        F: Fn(&crate::transport::Record) -> Result<O, E> + Sync,
475    {
476        if messages.is_empty() {
477            return Vec::new();
478        }
479
480        let chunk_size = if self.config.max_chunk_size == 0 {
481            messages.len()
482        } else {
483            self.config.max_chunk_size
484        };
485
486        let has_routing = self.config.routing_field.is_some();
487        let mut all_results = Vec::with_capacity(messages.len());
488
489        for chunk in messages.chunks(chunk_size) {
490            self.stats.add_received(chunk.len() as u64);
491
492            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
493            self.stats.add_bytes_received(chunk_bytes);
494
495            // Phase 1: Pre-route filter
496            let to_process: Vec<&crate::transport::Record> = if has_routing {
497                let field_name = self.config.routing_field.as_ref().expect("checked above");
498                let mut passed = Vec::with_capacity(chunk.len());
499                for msg in chunk {
500                    let extraction = extract_routing_field(&msg.payload, field_name);
501                    let outcome = apply_filters(&extraction, &self.filters);
502                    match outcome {
503                        PreRouteOutcome::Continue => passed.push(msg),
504                        PreRouteOutcome::Filtered => {
505                            self.stats.incr_filtered();
506                        }
507                        PreRouteOutcome::Dlq(reason) => {
508                            self.stats.incr_dlq();
509                            self.stats.incr_errors();
510                            all_results.push(Err(E::from(reason)));
511                        }
512                    }
513                }
514                passed
515            } else {
516                chunk.iter().collect()
517            };
518
519            // Phase 2: Parallel transform via process_batch
520            let results = self.pool.process_batch(&to_process, |msg| transform(msg));
521
522            let ok_count = results.iter().filter(|r| r.is_ok()).count();
523            self.stats.add_processed(ok_count as u64);
524
525            all_results.extend(results);
526        }
527
528        all_results
529    }
530
531    /// Apply the [`FilterDlqPolicy`] to a received [`WorkBatch`](crate::transport::WorkBatch),
532    /// routing/discarding/rejecting its inline-DLQ entries per the policy and
533    /// returning the batch with `dlq_entries` consumed.
534    ///
535    /// Now that `recv` yields a `WorkBatch` directly (Task 0.7b), the
536    /// inbound-filter DLQ entries arrive on
537    /// [`WorkBatch::dlq_entries`](crate::transport::WorkBatch) rather than on a
538    /// `RecvBatch`. Records are never touched -- only the DLQ entries are routed
539    /// -- so dead-letters are never silently dropped.
540    ///
541    /// # Errors
542    ///
543    /// [`EngineError::FilterDlqUnrouted`] when entries appear under
544    /// [`FilterDlqPolicy::Reject`].
545    #[cfg(feature = "transport")]
546    fn apply_workbatch_dlq_policy<T: crate::transport::CommitToken>(
547        &self,
548        mut batch: crate::transport::WorkBatch<T>,
549    ) -> Result<crate::transport::WorkBatch<T>, EngineError> {
550        if !batch.dlq_entries.is_empty() {
551            let entries = std::mem::take(&mut batch.dlq_entries);
552            self.route_dlq_entries(entries)?;
553        }
554        Ok(batch)
555    }
556
557    /// Route a set of DLQ entries through the configured [`FilterDlqPolicy`].
558    ///
559    /// THE single DLQ route point: both the inbound-filter entries (routed
560    /// before `process` by
561    /// [`apply_workbatch_dlq_policy`](Self::apply_workbatch_dlq_policy)) AND the
562    /// parse/process-generated entries (routed after `process`, before commit,
563    /// inline in the driver's `drive_block` / `drive_block_streaming`) flow
564    /// through here, so the two paths share ONE policy and ONE behaviour. There
565    /// is no
566    /// silent-drop default: `Reject` fails fast, `DiscardWithMetric` is the only
567    /// deliberate opt-in to drop, and `Route` is fallible (a delivery failure is
568    /// a terminal ack-barrier error, never a silent loss).
569    ///
570    /// # Errors
571    ///
572    /// - [`EngineError::FilterDlqUnrouted`] under [`FilterDlqPolicy::Reject`].
573    /// - [`EngineError::DlqRouteFailed`] when a [`FilterDlqPolicy::Route`] sink
574    ///   returns `Err`.
575    #[cfg(feature = "transport")]
576    pub(crate) fn route_dlq_entries(
577        &self,
578        entries: Vec<crate::transport::filter::FilteredDlqEntry>,
579    ) -> Result<(), EngineError> {
580        if entries.is_empty() {
581            return Ok(());
582        }
583        match &self.filter_dlq_policy {
584            FilterDlqPolicy::Reject => Err(EngineError::FilterDlqUnrouted(entries.len())),
585            FilterDlqPolicy::DiscardWithMetric => {
586                #[cfg(feature = "metrics")]
587                ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
588                    .increment(entries.len() as u64);
589                Ok(())
590            }
591            FilterDlqPolicy::Route(sink) => sink(entries),
592        }
593    }
594}
595
596/// RAII lease over in-flight ingress bytes tracked by a [`MemoryGuard`].
597///
598/// Created by the WorkBatch driver's `lease_ingress_batch`; releases the
599/// accounted bytes back to the guard on drop so no block exit path can leak
600/// the reservation.
601///
602/// [`MemoryGuard`]: crate::memory::MemoryGuard
603#[cfg(feature = "memory")]
604#[must_use = "the lease must be held for the lifetime of the in-flight block; \
605              dropping it immediately releases the reservation and corrupts the \
606              in-flight byte accounting"]
607pub(crate) struct IngressLease<'a> {
608    guard: &'a crate::memory::MemoryGuard,
609    bytes: u64,
610}
611
612#[cfg(feature = "memory")]
613impl<'a> IngressLease<'a> {
614    /// Construct a lease over already-accounted ingress bytes. The caller must
615    /// have already called `guard.add_bytes(bytes)`; `Drop` releases them. Used
616    /// by the WorkBatch driver's `lease_ingress_batch`.
617    fn new(guard: &'a crate::memory::MemoryGuard, bytes: u64) -> Self {
618        Self { guard, bytes }
619    }
620}
621
622#[cfg(feature = "memory")]
623impl Drop for IngressLease<'_> {
624    fn drop(&mut self) {
625        self.guard.release(self.bytes);
626    }
627}
628
629impl std::fmt::Debug for BatchEngine {
630    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
631        let mut s = f.debug_struct("BatchEngine");
632        s.field("config", &self.config)
633            .field("pool_max_threads", &self.pool.max_threads())
634            .field("stats", &self.stats.snapshot())
635            .field("interner_len", &self.interner.len())
636            .field("filters", &self.filters);
637        #[cfg(feature = "memory")]
638        s.field("memory_guard", &self.memory_guard.is_some());
639        #[cfg(feature = "transport")]
640        s.field("filter_dlq_policy", &self.filter_dlq_policy);
641        #[cfg(feature = "governor")]
642        s.field("self_regulated", &self.byte_budget.is_some());
643        s.finish()
644    }
645}
646
647#[cfg(all(test, feature = "transport"))]
648mod engine_tests {
649    use super::*;
650    use crate::transport::{PayloadFormat as TPayloadFormat, Record, RecordMeta};
651    use bytes::Bytes;
652
653    fn make_json_messages(n: usize) -> Vec<Record> {
654        (0..n)
655            .map(|i| Record {
656                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
657                key: None,
658                headers: vec![],
659                metadata: RecordMeta {
660                    timestamp_ms: None,
661                    format: TPayloadFormat::Json,
662                },
663            })
664            .collect()
665    }
666
667    fn default_engine() -> BatchEngine {
668        BatchEngine::new(BatchProcessingConfig::default())
669    }
670
671    #[cfg(feature = "transport")]
672    #[test]
673    fn filter_dlq_policy_routes_discards_or_rejects() {
674        use crate::transport::WorkBatch;
675        use crate::transport::filter::FilteredDlqEntry;
676        use std::sync::Arc as StdArc;
677        use std::sync::atomic::{AtomicUsize, Ordering};
678
679        // Minimal CommitToken for the generic helper.
680        #[derive(Clone, Debug)]
681        struct TestTok;
682        impl std::fmt::Display for TestTok {
683            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684                f.write_str("test")
685            }
686        }
687        impl crate::transport::CommitToken for TestTok {}
688
689        let entry = || FilteredDlqEntry {
690            payload: b"x".to_vec(),
691            key: None,
692            reason: "r".to_string(),
693        };
694        let batch_with = |n: usize| {
695            WorkBatch::<TestTok>::from_records(vec![])
696                .with_dlq_entries((0..n).map(|_| entry()).collect())
697        };
698
699        // Reject (default): any DLQ entries -> fail fast, not silent drop.
700        let eng = default_engine();
701        assert!(matches!(
702            eng.apply_workbatch_dlq_policy(batch_with(1)),
703            Err(EngineError::FilterDlqUnrouted(1))
704        ));
705        // Reject + no entries -> ok (batch passes through with no DLQ entries).
706        let passed = eng
707            .apply_workbatch_dlq_policy(WorkBatch::<TestTok>::from_records(vec![]))
708            .expect("no entries -> ok");
709        assert!(passed.dlq_entries.is_empty());
710
711        // DiscardWithMetric -> ok (deliberately dropped); entries consumed.
712        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
713        let passed = eng
714            .apply_workbatch_dlq_policy(batch_with(1))
715            .expect("discard -> ok");
716        assert!(
717            passed.dlq_entries.is_empty(),
718            "entries consumed after routing"
719        );
720
721        // Route -> the sink receives every entry.
722        let seen = StdArc::new(AtomicUsize::new(0));
723        let s = StdArc::clone(&seen);
724        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
725            move |e: Vec<FilteredDlqEntry>| {
726                s.fetch_add(e.len(), Ordering::Relaxed);
727                Ok(())
728            },
729        )));
730        let passed = eng
731            .apply_workbatch_dlq_policy(batch_with(2))
732            .expect("route -> ok");
733        assert!(passed.dlq_entries.is_empty());
734        assert_eq!(
735            seen.load(Ordering::Relaxed),
736            2,
737            "Route sink received all entries"
738        );
739    }
740
741    /// Build a `WorkBatch` of `n` JSON records (no commit tokens) for the
742    /// `WorkBatch`-shaped ingress-lease tests.
743    #[cfg(all(feature = "memory", feature = "transport"))]
744    fn make_record_batch(n: usize) -> crate::transport::WorkBatch<TestTok> {
745        use crate::transport::{PayloadFormat, Record, RecordMeta};
746        let records = (0..n)
747            .map(|i| Record {
748                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
749                key: None,
750                headers: vec![],
751                metadata: RecordMeta {
752                    timestamp_ms: None,
753                    format: PayloadFormat::Json,
754                },
755            })
756            .collect();
757        crate::transport::WorkBatch::from_records(records)
758    }
759
760    /// Minimal commit token for the `WorkBatch` ingress-lease tests.
761    #[cfg(all(feature = "memory", feature = "transport"))]
762    #[derive(Debug, Clone)]
763    struct TestTok;
764    #[cfg(all(feature = "memory", feature = "transport"))]
765    impl std::fmt::Display for TestTok {
766        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
767            f.write_str("test")
768        }
769    }
770    #[cfg(all(feature = "memory", feature = "transport"))]
771    impl crate::transport::CommitToken for TestTok {}
772
773    #[cfg(all(feature = "memory", feature = "transport"))]
774    #[test]
775    fn ingress_lease_accounts_and_releases() {
776        use crate::memory::{MemoryGuard, MemoryGuardConfig};
777
778        let mut engine = default_engine();
779        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
780            limit_bytes: 1024 * 1024,
781            ..Default::default()
782        }));
783        engine.memory_guard = Some(Arc::clone(&guard));
784
785        let batch = make_record_batch(10);
786        let expected = batch.total_payload_bytes() as u64;
787        assert_eq!(guard.current_bytes(), 0, "starts at zero");
788
789        {
790            let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
791            assert_eq!(
792                guard.current_bytes(),
793                expected,
794                "bytes accounted while lease held"
795            );
796        }
797        // Lease dropped -> bytes released.
798        assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
799    }
800
801    #[cfg(all(feature = "memory", feature = "transport"))]
802    #[test]
803    fn ingress_lease_none_without_guard() {
804        let engine = default_engine();
805        let batch = make_record_batch(5);
806        assert!(
807            engine.lease_ingress_batch(&batch).is_none(),
808            "no lease when no guard wired"
809        );
810    }
811
812    #[test]
813    fn process_mid_tier_basic() {
814        let engine = default_engine();
815        let msgs = make_json_messages(100);
816
817        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
818            Ok(pm
819                .field("_table")
820                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
821                .unwrap_or("unknown")
822                .to_string())
823        });
824
825        assert_eq!(results.len(), 100);
826        assert!(results.iter().all(|r| r.is_ok()));
827        assert_eq!(results[0].as_ref().unwrap(), "events");
828    }
829
830    #[test]
831    fn process_mid_tier_parse_error() {
832        let engine = default_engine();
833        let mut msgs = make_json_messages(2);
834        // Insert an invalid JSON message
835        msgs.insert(
836            1,
837            Record {
838                payload: Bytes::from_static(b"not json {{{"),
839                key: None,
840                headers: vec![],
841                metadata: RecordMeta {
842                    timestamp_ms: None,
843                    format: TPayloadFormat::Json,
844                },
845            },
846        );
847
848        let results: Vec<Result<String, String>> =
849            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
850
851        // 2 successful + 1 error (DLQ by default)
852        assert_eq!(results.len(), 3);
853        assert!(results[0].is_ok());
854        assert!(results[1].is_err());
855        assert!(results[1].as_ref().unwrap_err().contains("parse error"));
856        assert!(results[2].is_ok());
857    }
858
859    #[test]
860    fn process_mid_tier_empty_batch() {
861        let engine = default_engine();
862        let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
863        assert!(results.is_empty());
864    }
865
866    #[test]
867    fn process_mid_tier_respects_chunk_size() {
868        let config = BatchProcessingConfig {
869            max_chunk_size: 50,
870            ..Default::default()
871        };
872        let engine = BatchEngine::new(config);
873        let msgs = make_json_messages(120);
874
875        let results: Vec<Result<usize, String>> =
876            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
877
878        assert_eq!(results.len(), 120);
879        assert!(results.iter().all(|r| r.is_ok()));
880        // Stats should show 120 received across 3 chunks (50+50+20)
881        let snap = engine.stats().snapshot();
882        assert_eq!(snap.received, 120);
883    }
884
885    #[test]
886    fn stats_updated_after_processing() {
887        let engine = default_engine();
888        let msgs = make_json_messages(10);
889
890        let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
891
892        let snap = engine.stats().snapshot();
893        assert_eq!(snap.received, 10);
894        assert_eq!(snap.processed, 10);
895        assert_eq!(snap.errors, 0);
896        assert_eq!(snap.filtered, 0);
897    }
898
899    #[test]
900    fn process_raw_passthrough() {
901        let engine = default_engine();
902        let msgs = make_json_messages(50);
903
904        let results: Vec<Result<usize, String>> =
905            engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
906
907        assert_eq!(results.len(), 50);
908        assert!(results.iter().all(|r| r.is_ok()));
909        // All JSON messages have the same format: {"_table":"events","id":N}
910        assert!(results[0].as_ref().unwrap() > &0);
911
912        let snap = engine.stats().snapshot();
913        assert_eq!(snap.received, 50);
914        assert_eq!(snap.processed, 50);
915    }
916
917    #[test]
918    fn process_mid_tier_with_pre_route() {
919        let config = BatchProcessingConfig {
920            routing_field: Some("_table".to_string()),
921            pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
922                field: "_table".to_string(),
923                value: "poison".to_string(),
924            }],
925            ..Default::default()
926        };
927        let engine = BatchEngine::new(config);
928
929        let mut msgs = make_json_messages(3);
930        // Replace middle message with a poison value
931        msgs[1] = Record {
932            payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
933            key: None,
934            headers: vec![],
935            metadata: RecordMeta {
936                timestamp_ms: None,
937                format: TPayloadFormat::Json,
938            },
939        };
940
941        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
942            Ok(pm
943                .field("_table")
944                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
945                .unwrap_or("?")
946                .to_string())
947        });
948
949        // 2 ok + 1 DLQ error
950        assert_eq!(results.len(), 3);
951        assert!(results[0].is_ok());
952        assert!(results[1].is_err());
953        assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
954        assert!(results[2].is_ok());
955
956        let snap = engine.stats().snapshot();
957        assert_eq!(snap.dlq, 1);
958        assert_eq!(snap.errors, 1);
959    }
960
961    #[test]
962    fn process_mid_tier_filtered_not_in_results() {
963        let config = BatchProcessingConfig {
964            routing_field: Some("_table".to_string()),
965            pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
966                field: "_table".to_string(),
967            }],
968            ..Default::default()
969        };
970        let engine = BatchEngine::new(config);
971
972        let mut msgs = make_json_messages(3);
973        // Replace middle message with one missing _table
974        msgs[1] = Record {
975            payload: Bytes::from(r#"{"host":"web1"}"#),
976            key: None,
977            headers: vec![],
978            metadata: RecordMeta {
979                timestamp_ms: None,
980                format: TPayloadFormat::Json,
981            },
982        };
983
984        let results: Vec<Result<String, String>> =
985            engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
986
987        // Filtered messages are removed -- only 2 results
988        assert_eq!(results.len(), 2);
989        assert!(results.iter().all(|r| r.is_ok()));
990
991        let snap = engine.stats().snapshot();
992        assert_eq!(snap.filtered, 1);
993        assert_eq!(snap.received, 3);
994    }
995
996    #[test]
997    fn from_cascade_creates_engine() {
998        let engine = BatchEngine::from_cascade("batch_processing").unwrap();
999        assert_eq!(engine.config().max_chunk_size, 10_000);
1000    }
1001
1002    #[test]
1003    fn accessors_return_expected_types() {
1004        let engine = default_engine();
1005        let _stats = engine.stats();
1006        let _pool = engine.pool();
1007        let _config = engine.config();
1008        assert_eq!(engine.stats().snapshot().received, 0);
1009    }
1010
1011    #[test]
1012    fn auto_wire_does_not_panic() {
1013        let mut engine = default_engine();
1014        let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1015        engine.auto_wire(
1016            &mgr,
1017            #[cfg(feature = "memory")]
1018            None,
1019        );
1020        // Engine should still work after auto_wire
1021        let msgs = make_json_messages(5);
1022        let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1023        assert_eq!(results.len(), 5);
1024    }
1025
1026    #[test]
1027    fn debug_impl_works() {
1028        let engine = default_engine();
1029        let debug = format!("{engine:?}");
1030        assert!(debug.contains("BatchEngine"));
1031        assert!(debug.contains("config"));
1032    }
1033
1034    /// The driver run loops (`run_workbatch` / `run_workbatch_parsed`) replaced
1035    /// the four legacy loops in Task 0.7b. These tests exercise the same
1036    /// behaviours -- process+sink, ticker, on-demand (no-parse) pass-through, and
1037    /// sink-error resilience -- through the surviving WorkBatch driver.
1038    #[cfg(feature = "transport-memory")]
1039    mod driver_engine_tests {
1040        use super::*;
1041        use crate::transport::WorkBatch;
1042        use crate::worker::engine::CommitMode;
1043        use std::sync::atomic::{AtomicU64, Ordering};
1044
1045        fn json_payload(table: &str, id: usize) -> Vec<u8> {
1046            format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1047        }
1048
1049        /// No-ticker placeholder for the `ticker` argument.
1050        #[allow(clippy::type_complexity)]
1051        fn no_ticker() -> Option<(
1052            std::time::Duration,
1053            fn() -> std::future::Ready<Result<(), EngineError>>,
1054        )> {
1055            None
1056        }
1057
1058        fn cancel_after(shutdown: tokio_util::sync::CancellationToken, ms: u64) {
1059            tokio::spawn(async move {
1060                tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
1061                shutdown.cancel();
1062            });
1063        }
1064
1065        #[tokio::test]
1066        async fn run_workbatch_processes_and_passes_tokens_to_sink() {
1067            let config = crate::transport::memory::MemoryConfig {
1068                recv_timeout_ms: 50,
1069                ..Default::default()
1070            };
1071            let transport = crate::transport::memory::MemoryTransport::new(&config)
1072                .expect("memory transport with valid config must construct");
1073            for i in 0..5 {
1074                transport
1075                    .inject(None, json_payload("events", i))
1076                    .await
1077                    .unwrap();
1078            }
1079
1080            let engine = default_engine();
1081            let shutdown = tokio_util::sync::CancellationToken::new();
1082            cancel_after(shutdown.clone(), 200);
1083
1084            let record_count = Arc::new(AtomicU64::new(0));
1085            let token_count = Arc::new(AtomicU64::new(0));
1086            let rc = Arc::clone(&record_count);
1087            let tc = Arc::clone(&token_count);
1088
1089            let result = engine
1090                .run_workbatch(
1091                    &transport,
1092                    shutdown,
1093                    |batch| Ok(batch),
1094                    |out: &WorkBatch<_>| {
1095                        let rc = Arc::clone(&rc);
1096                        let tc = Arc::clone(&tc);
1097                        let records = out.records.len();
1098                        let tokens = out.commit_tokens.len();
1099                        async move {
1100                            rc.fetch_add(records as u64, Ordering::Relaxed);
1101                            tc.fetch_add(tokens as u64, Ordering::Relaxed);
1102                            Ok(())
1103                        }
1104                    },
1105                    // SinkManaged mirrors the legacy run_async sink-owns-commit shape.
1106                    CommitMode::SinkManaged,
1107                    no_ticker(),
1108                )
1109                .await;
1110
1111            assert!(result.is_ok());
1112            assert_eq!(record_count.load(Ordering::Relaxed), 5);
1113            assert_eq!(token_count.load(Ordering::Relaxed), 5);
1114        }
1115
1116        #[tokio::test]
1117        async fn run_workbatch_ticker_fires() {
1118            let config = crate::transport::memory::MemoryConfig {
1119                recv_timeout_ms: 50,
1120                ..Default::default()
1121            };
1122            let transport = crate::transport::memory::MemoryTransport::new(&config)
1123                .expect("memory transport with valid config must construct");
1124            let engine = default_engine();
1125            let shutdown = tokio_util::sync::CancellationToken::new();
1126            cancel_after(shutdown.clone(), 350);
1127
1128            let tick_count = Arc::new(AtomicU64::new(0));
1129            let tick_count_clone = Arc::clone(&tick_count);
1130
1131            let result = engine
1132                .run_workbatch(
1133                    &transport,
1134                    shutdown,
1135                    |batch| Ok(batch),
1136                    |_out: &WorkBatch<_>| async { Ok(()) },
1137                    CommitMode::Auto,
1138                    Some((std::time::Duration::from_millis(100), move || {
1139                        let tc = Arc::clone(&tick_count_clone);
1140                        async move {
1141                            tc.fetch_add(1, Ordering::Relaxed);
1142                            Ok(())
1143                        }
1144                    })),
1145                )
1146                .await;
1147
1148            assert!(result.is_ok());
1149            let ticks = tick_count.load(Ordering::Relaxed);
1150            assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1151        }
1152
1153        #[tokio::test]
1154        async fn run_workbatch_passthrough_without_parse() {
1155            let config = crate::transport::memory::MemoryConfig {
1156                recv_timeout_ms: 50,
1157                ..Default::default()
1158            };
1159            let transport = crate::transport::memory::MemoryTransport::new(&config)
1160                .expect("memory transport with valid config must construct");
1161            for i in 0..3 {
1162                transport
1163                    .inject(None, json_payload("logs", i))
1164                    .await
1165                    .unwrap();
1166            }
1167
1168            let engine = default_engine();
1169            let shutdown = tokio_util::sync::CancellationToken::new();
1170            cancel_after(shutdown.clone(), 200);
1171
1172            let total_bytes = Arc::new(AtomicU64::new(0));
1173            let total_bytes_clone = Arc::clone(&total_bytes);
1174
1175            // On-demand driver: pass-through process pays no parse cost.
1176            let result = engine
1177                .run_workbatch(
1178                    &transport,
1179                    shutdown,
1180                    |batch| Ok(batch),
1181                    |out: &WorkBatch<_>| {
1182                        let tb = Arc::clone(&total_bytes_clone);
1183                        let sum: u64 = out.records.iter().map(|r| r.payload.len() as u64).sum();
1184                        async move {
1185                            tb.fetch_add(sum, Ordering::Relaxed);
1186                            Ok(())
1187                        }
1188                    },
1189                    CommitMode::Auto,
1190                    no_ticker(),
1191                )
1192                .await;
1193
1194            assert!(result.is_ok());
1195            assert!(total_bytes.load(Ordering::Relaxed) > 0);
1196        }
1197
1198        #[tokio::test]
1199        async fn run_workbatch_parsed_reads_field() {
1200            // The parsed path is the analogue of the old mid-tier run_async: the
1201            // driver pre-parses and the process closure reads a routing field.
1202            let config = crate::transport::memory::MemoryConfig {
1203                recv_timeout_ms: 50,
1204                ..Default::default()
1205            };
1206            let transport = crate::transport::memory::MemoryTransport::new(&config)
1207                .expect("memory transport with valid config must construct");
1208            for i in 0..4 {
1209                transport
1210                    .inject(None, json_payload("events", i))
1211                    .await
1212                    .unwrap();
1213            }
1214
1215            let engine = default_engine();
1216            let shutdown = tokio_util::sync::CancellationToken::new();
1217            cancel_after(shutdown.clone(), 200);
1218
1219            let hits = Arc::new(AtomicU64::new(0));
1220            let hc = Arc::clone(&hits);
1221
1222            let result = engine
1223                .run_workbatch_parsed(
1224                    &transport,
1225                    shutdown,
1226                    move |pb| {
1227                        let field = pb.intern("_table");
1228                        let mut local = 0u64;
1229                        for parsed in &pb.parsed {
1230                            if parsed.field_str(&field) == Some("events") {
1231                                local += 1;
1232                            }
1233                        }
1234                        hc.fetch_add(local, Ordering::Relaxed);
1235                        Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1236                            .with_dlq_entries(pb.dlq_entries))
1237                    },
1238                    |_out: &WorkBatch<_>| async { Ok(()) },
1239                    CommitMode::Auto,
1240                    no_ticker(),
1241                )
1242                .await;
1243
1244            assert!(result.is_ok());
1245            assert_eq!(hits.load(Ordering::Relaxed), 4);
1246        }
1247
1248        #[tokio::test]
1249        async fn run_workbatch_sink_error_does_not_crash() {
1250            let config = crate::transport::memory::MemoryConfig {
1251                recv_timeout_ms: 50,
1252                ..Default::default()
1253            };
1254            let transport = crate::transport::memory::MemoryTransport::new(&config)
1255                .expect("memory transport with valid config must construct");
1256            transport
1257                .inject(None, json_payload("events", 0))
1258                .await
1259                .unwrap();
1260
1261            let engine = default_engine();
1262            let shutdown = tokio_util::sync::CancellationToken::new();
1263            cancel_after(shutdown.clone(), 200);
1264
1265            // Sink always errors -- the driver returns the error cleanly (no
1266            // crash/panic). Post Remediation Phase 1 the sink error is a
1267            // TERMINAL ack-barrier error rather than a logged continue.
1268            let result = engine
1269                .run_workbatch(
1270                    &transport,
1271                    shutdown,
1272                    |batch| Ok(batch),
1273                    |_out: &WorkBatch<_>| async {
1274                        Err(EngineError::Sink("test sink error".into()))
1275                    },
1276                    CommitMode::Auto,
1277                    no_ticker(),
1278                )
1279                .await;
1280
1281            assert!(
1282                matches!(result, Err(EngineError::Sink(_))),
1283                "sink error returns terminally without crashing: {result:?}"
1284            );
1285        }
1286    }
1287}