Skip to main content

hyperi_rustlib/worker/engine/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/mod.rs
3// Purpose:   SIMD-optimised batch processing engine for DFE pipelines
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9pub mod config;
10#[cfg(feature = "transport")]
11pub mod driver;
12pub mod intern;
13pub mod metrics;
14pub mod parse;
15pub mod pre_route;
16pub mod types;
17
18pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
19#[cfg(feature = "transport")]
20pub use driver::{CommitMode, ParsedBatch};
21pub use intern::FieldInterner;
22pub use types::{MessageMetadata, ParsedMessage, PreRouteResult};
23
24/// Errors returned by the [`BatchEngine`] `WorkBatch` drivers
25/// ([`run_workbatch`](BatchEngine::run_workbatch) /
26/// [`run_workbatch_parsed`](BatchEngine::run_workbatch_parsed)).
27///
28/// Only available when the `transport` feature is enabled.
29#[cfg(feature = "transport")]
30#[derive(Debug, thiserror::Error)]
31pub enum EngineError {
32    /// Transport receive or commit failed.
33    #[error("transport error: {0}")]
34    Transport(#[from] crate::TransportError),
35    /// Sink callback returned an error.
36    #[error("sink error: {0}")]
37    Sink(String),
38    /// Shutdown was requested via cancellation token.
39    #[error("shutdown")]
40    Shutdown,
41    /// Inbound-filter DLQ entries appeared but no routing policy was configured
42    /// (the default [`FilterDlqPolicy::Reject`]). Metrics are not delivery, so
43    /// the engine fails fast rather than silently dropping dead-letters.
44    #[error(
45        "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
46         configured -- set a policy via BatchEngine::with_filter_dlq_policy \
47         (Route to forward, or DiscardWithMetric to deliberately drop)"
48    )]
49    FilterDlqUnrouted(usize),
50    /// A [`FilterDlqPolicy::Route`] sink failed to deliver DLQ entries. This is
51    /// a TERMINAL ack-barrier failure: the source commit is skipped so the whole
52    /// block is re-delivered -- a DLQ-route failure must NOT let a later ordered
53    /// commit advance past these undelivered dead-letters. Silent discard is
54    /// OPT-IN only, via [`FilterDlqPolicy::DiscardWithMetric`].
55    #[error("DLQ route failed: {0}")]
56    DlqRouteFailed(String),
57    /// A parse failure occurred under [`ParseErrorAction::FailBatch`]. TERMINAL:
58    /// the whole block fails its commit (consistent with the ack barrier) so it
59    /// is re-delivered rather than partially committed.
60    #[error("parse failed (fail_batch): {0}")]
61    ParseBatchFailed(String),
62}
63
64/// What a [`BatchEngine`] run loop does with inbound-filter DLQ entries
65/// ([`RecvBatch::dlq_entries`](crate::transport::RecvBatch)).
66///
67/// Inbound `action: dlq` filters remove messages from the normal batch; those
68/// entries must go somewhere. The default is [`Reject`](Self::Reject) so a
69/// data-loss-shaped config never passes silently.
70#[cfg(feature = "transport")]
71#[derive(Clone, Default)]
72pub enum FilterDlqPolicy {
73    /// Fail the run loop ([`EngineError::FilterDlqUnrouted`]) if any DLQ entries
74    /// appear. The safe default -- forces a deliberate choice.
75    #[default]
76    Reject,
77    /// Deliberately discard DLQ entries, counting them in the
78    /// `dfe_engine_filter_dlq_discarded_total` metric. Explicit opt-in.
79    DiscardWithMetric,
80    /// Hand each batch's DLQ entries to a sink (e.g. enqueue onto a DLQ
81    /// transport, or `tokio::spawn` an async send). Called on the run loop, so
82    /// keep it cheap -- offload slow work.
83    ///
84    /// The sink is FALLIBLE: returning `Err` raises
85    /// [`EngineError::DlqRouteFailed`], which is a terminal ack-barrier failure
86    /// (the source commit is skipped, the whole block re-delivered). A sink that
87    /// deliberately tolerates loss should return `Ok` after counting the drop;
88    /// to discard ALL entries by policy use
89    /// [`DiscardWithMetric`](Self::DiscardWithMetric) instead.
90    Route(
91        Arc<
92            dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) -> Result<(), EngineError>
93                + Send
94                + Sync,
95        >,
96    ),
97}
98
99#[cfg(feature = "transport")]
100impl std::fmt::Debug for FilterDlqPolicy {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        match self {
103            Self::Reject => f.write_str("Reject"),
104            Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
105            Self::Route(_) => f.write_str("Route(..)"),
106        }
107    }
108}
109
110use std::sync::Arc;
111
112use super::pool::AdaptiveWorkerPool;
113use super::stats::PipelineStats;
114
115use self::pre_route::filters_from_config;
116// Pre-route + parse helpers are used only by the in-process process_* methods,
117// which take the canonical transport Record and so are transport-gated.
118#[cfg(feature = "transport")]
119use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field};
120#[cfg(feature = "transport")]
121use self::types::PayloadFormat;
122use super::config::WorkerPoolConfig;
123
124/// Core batch processing engine for DFE pipelines.
125///
126/// Provides two in-process processing modes (the run-loop drivers live in the
127/// `driver` module, gated on the `transport` feature):
128///
129/// - [`process_mid_tier`](Self::process_mid_tier) -- parse JSON via SIMD, extract
130///   known fields, apply pre-route filters, then parallel transform via rayon.
131///   The standard path for most DFE apps (loader, archiver, transforms).
132///
133/// - [`process_raw`](Self::process_raw) -- skip parsing, apply pre-route on raw
134///   bytes, then parallel transform via rayon. For apps that handle raw bytes
135///   (receiver, binary protocols).
136///
137/// Both take the canonical [`Record`](crate::transport::Record) slice (the same
138/// currency the [`WorkBatch`](crate::transport::WorkBatch) carries), chunk large
139/// batches, and track stats atomically.
140///
141/// Inbound braking under memory pressure is no longer a blocking pause between
142/// chunks (the retired `check_memory_pressure` proto-actuator). It is now the
143/// self-regulation governor's job: the inbound GATE pauses the source transport
144/// and the streaming byte-budget lever bounds peak in-flight memory. See
145/// [`run_governed`](Self::run_governed). With the governor OFF, there is no
146/// active brake -- that is the deliberate opt-out.
147pub struct BatchEngine {
148    config: BatchProcessingConfig,
149    pool: Arc<AdaptiveWorkerPool>,
150    stats: Arc<PipelineStats>,
151    interner: Arc<FieldInterner>,
152    filters: Vec<pre_route::PreRouteFilter>,
153    #[cfg(feature = "memory")]
154    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
155    /// What the run loops do with inbound-filter DLQ entries. Default
156    /// [`FilterDlqPolicy::Reject`] (no silent data loss).
157    #[cfg(feature = "transport")]
158    filter_dlq_policy: FilterDlqPolicy,
159    /// Self-regulation byte-budget lever (`governor` feature). `None` (the
160    /// default, and whenever self-regulation is OFF) keeps the engine on the
161    /// whole-batch [`run_workbatch`](Self::run_workbatch) loop -- byte-identical
162    /// to pre-governor behaviour. When wired (by `ServiceRuntime` when the
163    /// governor is enabled), [`run_governed`](Self::run_governed) streams in
164    /// budget-sized sub-blocks and feeds the AIMD loop per block.
165    #[cfg(feature = "governor")]
166    byte_budget: Option<Arc<crate::governor::ByteBudgetController>>,
167}
168
169impl BatchEngine {
170    /// Create a standalone engine with its own worker pool.
171    ///
172    /// Uses `WorkerPoolConfig::default()` for the pool. Prefer
173    /// [`with_pool`](Self::with_pool) when a `ServiceRuntime` pool exists.
174    #[must_use]
175    pub fn new(config: BatchProcessingConfig) -> Self {
176        let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
177        Self::with_pool(pool, config)
178    }
179
180    /// Create an engine that reuses an existing worker pool.
181    ///
182    /// This is the preferred constructor when `ServiceRuntime` is available,
183    /// as it avoids creating a second rayon thread pool.
184    #[must_use]
185    pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
186        let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
187        let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
188        let filters = filters_from_config(&config.pre_route_filters);
189        Self {
190            config,
191            pool,
192            stats: Arc::new(PipelineStats::new()),
193            interner,
194            filters,
195            #[cfg(feature = "memory")]
196            memory_guard: None,
197            #[cfg(feature = "transport")]
198            filter_dlq_policy: FilterDlqPolicy::default(),
199            #[cfg(feature = "governor")]
200            byte_budget: None,
201        }
202    }
203
204    /// Wire the self-regulation byte-budget lever (`governor` feature).
205    ///
206    /// Called by `ServiceRuntime::build()` when the governor is enabled. Once
207    /// wired, [`run_governed`](Self::run_governed) streams the input in
208    /// budget-sized sub-blocks and drives the AIMD loop per block. Without it
209    /// (governor off), `run_governed` falls back to the whole-batch loop.
210    #[cfg(feature = "governor")]
211    pub fn set_byte_budget(&mut self, budget: Arc<crate::governor::ByteBudgetController>) {
212        self.byte_budget = Some(budget);
213    }
214
215    /// Whether the self-regulation byte-budget lever is wired (`governor`
216    /// feature). When `false`, [`run_governed`](Self::run_governed) is the
217    /// whole-batch loop.
218    #[cfg(feature = "governor")]
219    #[must_use]
220    pub fn is_self_regulated(&self) -> bool {
221        self.byte_budget.is_some()
222    }
223
224    /// Set the policy for inbound-filter DLQ entries in the run loops.
225    ///
226    /// Default is [`FilterDlqPolicy::Reject`] -- the run loop errors if an
227    /// inbound `action: dlq` filter produces entries and no routing is set, so
228    /// dead-letters are never silently dropped.
229    #[cfg(feature = "transport")]
230    #[must_use]
231    pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
232        self.filter_dlq_policy = policy;
233        self
234    }
235
236    /// Load configuration from the cascade and create a standalone engine.
237    ///
238    /// # Errors
239    ///
240    /// Returns `ConfigError` if the cascade contains invalid data.
241    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
242        let config = BatchProcessingConfig::from_cascade(key)?;
243        Ok(Self::new(config))
244    }
245
246    /// Pipeline statistics (atomic, lock-free).
247    #[must_use]
248    pub fn stats(&self) -> &Arc<PipelineStats> {
249        &self.stats
250    }
251
252    /// Underlying worker pool.
253    #[must_use]
254    pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
255        &self.pool
256    }
257
258    /// Engine configuration.
259    #[must_use]
260    pub fn config(&self) -> &BatchProcessingConfig {
261        &self.config
262    }
263
264    /// Auto-wire engine with infrastructure components.
265    ///
266    /// Called by `ServiceRuntime::build()`. Apps never call this directly.
267    pub fn auto_wire(
268        &mut self,
269        metrics_manager: &crate::metrics::MetricsManager,
270        #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
271    ) {
272        metrics::register(metrics_manager, &self.config);
273
274        #[cfg(feature = "memory")]
275        if let Some(guard) = memory_guard {
276            self.memory_guard = Some(Arc::clone(guard));
277        }
278    }
279
280    /// Test-only: wire a `MemoryGuard` directly so memory-lease behaviour can be
281    /// exercised without a full `auto_wire`.
282    #[cfg(all(test, feature = "memory"))]
283    pub(crate) fn set_memory_guard_for_test(&mut self, guard: Arc<crate::memory::MemoryGuard>) {
284        self.memory_guard = Some(guard);
285    }
286
287    /// Parse, filter, and transform a batch of raw messages.
288    ///
289    /// Pipeline phases per chunk:
290    /// 1. **Pre-route** -- SIMD field extraction + filter evaluation (sequential, ~100 ns/msg)
291    /// 2. **Parse** -- `sonic_rs::from_slice` + known-field extraction (sequential, ~1-5 µs/msg)
292    /// 3. **Transform** -- user closure via rayon `par_iter_mut` (parallel)
293    ///
294    /// Results contain one entry per non-filtered message. Filtered messages are
295    /// silently removed (their commit tokens remain accessible via the original
296    /// slice). DLQ'd and parse-error messages produce `Err` entries.
297    #[cfg(feature = "transport")]
298    pub fn process_mid_tier<O, E, F>(
299        &self,
300        messages: &[crate::transport::Record],
301        transform: F,
302    ) -> Vec<Result<O, E>>
303    where
304        O: Send,
305        E: Send + From<String>,
306        F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
307    {
308        if messages.is_empty() {
309            return Vec::new();
310        }
311
312        let chunk_size = if self.config.max_chunk_size == 0 {
313            messages.len()
314        } else {
315            self.config.max_chunk_size
316        };
317
318        let has_routing = self.config.routing_field.is_some();
319        let mut all_results = Vec::with_capacity(messages.len());
320
321        for chunk in messages.chunks(chunk_size) {
322            self.stats.add_received(chunk.len() as u64);
323
324            // Accumulate bytes received
325            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
326            self.stats.add_bytes_received(chunk_bytes);
327
328            // Phase 1 + 2: Pre-route and parse, building ParsedMessage vec.
329            // Track which messages are included (not filtered).
330            let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
331                Vec::with_capacity(chunk.len());
332
333            for msg in chunk {
334                // Phase 1: Pre-route
335                if has_routing {
336                    let field_name = self.config.routing_field.as_ref().expect("checked above");
337                    let extraction = extract_routing_field(&msg.payload, field_name);
338                    let outcome = apply_filters(&extraction, &self.filters);
339
340                    match outcome {
341                        PreRouteOutcome::Continue => {}
342                        PreRouteOutcome::Filtered => {
343                            self.stats.incr_filtered();
344                            continue; // skip this message entirely
345                        }
346                        PreRouteOutcome::Dlq(reason) => {
347                            self.stats.incr_dlq();
348                            self.stats.incr_errors();
349                            parsed_msgs.push(Err(reason));
350                            continue;
351                        }
352                    }
353                }
354
355                // Phase 2: Parse. The Record carries the transport PayloadFormat;
356                // convert it to the engine's local enum, resolving Auto.
357                let format: PayloadFormat = match PayloadFormat::from(msg.metadata.format) {
358                    PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
359                    other => other,
360                };
361
362                match parse::parse_payload(&msg.payload, format) {
363                    Ok(value) => {
364                        let extracted = self.interner.extract_known(&value);
365                        // Rebuild a MessageMetadata for ParsedMessage. Commit
366                        // tokens live on the WorkBatch, not on individual records.
367                        let metadata = MessageMetadata {
368                            timestamp_ms: msg.metadata.timestamp_ms,
369                            format,
370                        };
371                        parsed_msgs.push(Ok(ParsedMessage::Parsed {
372                            value,
373                            raw: msg.payload.clone(),
374                            format,
375                            key: msg.key.clone(),
376                            headers: msg.headers.clone(),
377                            metadata,
378                            extracted,
379                        }));
380                    }
381                    Err(e) => {
382                        self.stats.incr_errors();
383                        match self.config.parse_error_action {
384                            ParseErrorAction::Dlq => {
385                                self.stats.incr_dlq();
386                                parsed_msgs.push(Err(format!("parse error: {e}")));
387                            }
388                            ParseErrorAction::Skip => {
389                                // Counted in errors above, not added to results
390                            }
391                            ParseErrorAction::FailBatch => {
392                                // Return all accumulated results + this error,
393                                // then stop processing the chunk.
394                                parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
395                                let results: Vec<Result<O, E>> = parsed_msgs
396                                    .into_iter()
397                                    .map(|r| match r {
398                                        Ok(_) => Err(E::from(
399                                            "batch failed due to parse error".to_string(),
400                                        )),
401                                        Err(reason) => Err(E::from(reason)),
402                                    })
403                                    .collect();
404                                all_results.extend(results);
405                                return all_results;
406                            }
407                        }
408                    }
409                }
410            }
411
412            // Phase 3: Parallel transform via rayon.
413            // Split into ok/err: transform only the Ok entries.
414            let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
415                parsed_msgs.into_iter().enumerate().collect();
416
417            // Separate errors from parseable messages
418            let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
419            let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
420
421            for (idx, item) in indexed.drain(..) {
422                match item {
423                    Ok(pm) => to_transform.push((idx, pm)),
424                    Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
425                }
426            }
427
428            // Parallel transform, throttled by the scaler target (map_owned
429            // applies the semaphore per item -- unlike the old install() path,
430            // which bypassed it and let the parsed path ignore the CPU cap).
431            let transformed: Vec<(usize, Result<O, E>)> =
432                self.pool.map_owned(to_transform, |(idx, mut pm)| {
433                    let result = transform(&mut pm);
434                    (idx, result)
435                });
436
437            chunk_results.extend(transformed);
438
439            // Sort by original index to preserve order
440            chunk_results.sort_by_key(|(idx, _)| *idx);
441
442            // Update stats
443            let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
444            self.stats.add_processed(ok_count as u64);
445
446            all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
447        }
448
449        all_results
450    }
451
452    /// Pre-route and transform a batch of records without parsing.
453    ///
454    /// The transform closure receives immutable [`Record`](crate::transport::Record)
455    /// references. Use this for apps that handle raw bytes directly (e.g. receiver
456    /// forwarding).
457    #[cfg(feature = "transport")]
458    pub fn process_raw<O, E, F>(
459        &self,
460        messages: &[crate::transport::Record],
461        transform: F,
462    ) -> Vec<Result<O, E>>
463    where
464        O: Send,
465        E: Send + From<String>,
466        F: Fn(&crate::transport::Record) -> Result<O, E> + Sync,
467    {
468        if messages.is_empty() {
469            return Vec::new();
470        }
471
472        let chunk_size = if self.config.max_chunk_size == 0 {
473            messages.len()
474        } else {
475            self.config.max_chunk_size
476        };
477
478        let has_routing = self.config.routing_field.is_some();
479        let mut all_results = Vec::with_capacity(messages.len());
480
481        for chunk in messages.chunks(chunk_size) {
482            self.stats.add_received(chunk.len() as u64);
483
484            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
485            self.stats.add_bytes_received(chunk_bytes);
486
487            // Phase 1: Pre-route filter
488            let to_process: Vec<&crate::transport::Record> = if has_routing {
489                let field_name = self.config.routing_field.as_ref().expect("checked above");
490                let mut passed = Vec::with_capacity(chunk.len());
491                for msg in chunk {
492                    let extraction = extract_routing_field(&msg.payload, field_name);
493                    let outcome = apply_filters(&extraction, &self.filters);
494                    match outcome {
495                        PreRouteOutcome::Continue => passed.push(msg),
496                        PreRouteOutcome::Filtered => {
497                            self.stats.incr_filtered();
498                        }
499                        PreRouteOutcome::Dlq(reason) => {
500                            self.stats.incr_dlq();
501                            self.stats.incr_errors();
502                            all_results.push(Err(E::from(reason)));
503                        }
504                    }
505                }
506                passed
507            } else {
508                chunk.iter().collect()
509            };
510
511            // Phase 2: Parallel transform via process_batch
512            let results = self.pool.process_batch(&to_process, |msg| transform(msg));
513
514            let ok_count = results.iter().filter(|r| r.is_ok()).count();
515            self.stats.add_processed(ok_count as u64);
516
517            all_results.extend(results);
518        }
519
520        all_results
521    }
522
523    /// Apply the [`FilterDlqPolicy`] to a received [`WorkBatch`](crate::transport::WorkBatch),
524    /// routing/discarding/rejecting its inline-DLQ entries per the policy and
525    /// returning the batch with `dlq_entries` consumed.
526    ///
527    /// Now that `recv` yields a `WorkBatch` directly (Task 0.7b), the
528    /// inbound-filter DLQ entries arrive on
529    /// [`WorkBatch::dlq_entries`](crate::transport::WorkBatch) rather than on a
530    /// `RecvBatch`. Records are never touched -- only the DLQ entries are routed
531    /// -- so dead-letters are never silently dropped.
532    ///
533    /// # Errors
534    ///
535    /// [`EngineError::FilterDlqUnrouted`] when entries appear under
536    /// [`FilterDlqPolicy::Reject`].
537    #[cfg(feature = "transport")]
538    fn apply_workbatch_dlq_policy<T: crate::transport::CommitToken>(
539        &self,
540        mut batch: crate::transport::WorkBatch<T>,
541    ) -> Result<crate::transport::WorkBatch<T>, EngineError> {
542        if !batch.dlq_entries.is_empty() {
543            let entries = std::mem::take(&mut batch.dlq_entries);
544            self.route_dlq_entries(entries)?;
545        }
546        Ok(batch)
547    }
548
549    /// Route a set of DLQ entries through the configured [`FilterDlqPolicy`].
550    ///
551    /// THE single DLQ route point: both the inbound-filter entries (routed
552    /// before `process` by
553    /// [`apply_workbatch_dlq_policy`](Self::apply_workbatch_dlq_policy)) AND the
554    /// parse/process-generated entries (routed after `process`, before commit,
555    /// inline in the driver's `drive_block` / `drive_block_streaming`) flow
556    /// through here, so the two paths share ONE policy and ONE behaviour. There
557    /// is no
558    /// silent-drop default: `Reject` fails fast, `DiscardWithMetric` is the only
559    /// deliberate opt-in to drop, and `Route` is fallible (a delivery failure is
560    /// a terminal ack-barrier error, never a silent loss).
561    ///
562    /// # Errors
563    ///
564    /// - [`EngineError::FilterDlqUnrouted`] under [`FilterDlqPolicy::Reject`].
565    /// - [`EngineError::DlqRouteFailed`] when a [`FilterDlqPolicy::Route`] sink
566    ///   returns `Err`.
567    #[cfg(feature = "transport")]
568    pub(crate) fn route_dlq_entries(
569        &self,
570        entries: Vec<crate::transport::filter::FilteredDlqEntry>,
571    ) -> Result<(), EngineError> {
572        if entries.is_empty() {
573            return Ok(());
574        }
575        match &self.filter_dlq_policy {
576            FilterDlqPolicy::Reject => Err(EngineError::FilterDlqUnrouted(entries.len())),
577            FilterDlqPolicy::DiscardWithMetric => {
578                #[cfg(feature = "metrics")]
579                ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
580                    .increment(entries.len() as u64);
581                Ok(())
582            }
583            FilterDlqPolicy::Route(sink) => sink(entries),
584        }
585    }
586}
587
588/// RAII lease over in-flight ingress bytes tracked by a [`MemoryGuard`].
589///
590/// Created by the WorkBatch driver's `lease_ingress_batch`; releases the
591/// accounted bytes back to the guard on drop so no block exit path can leak
592/// the reservation.
593///
594/// [`MemoryGuard`]: crate::memory::MemoryGuard
595#[cfg(feature = "memory")]
596pub(crate) struct IngressLease<'a> {
597    guard: &'a crate::memory::MemoryGuard,
598    bytes: u64,
599}
600
601#[cfg(feature = "memory")]
602impl<'a> IngressLease<'a> {
603    /// Construct a lease over already-accounted ingress bytes. The caller must
604    /// have already called `guard.add_bytes(bytes)`; `Drop` releases them. Used
605    /// by the WorkBatch driver's `lease_ingress_batch`.
606    fn new(guard: &'a crate::memory::MemoryGuard, bytes: u64) -> Self {
607        Self { guard, bytes }
608    }
609}
610
611#[cfg(feature = "memory")]
612impl Drop for IngressLease<'_> {
613    fn drop(&mut self) {
614        self.guard.release(self.bytes);
615    }
616}
617
618impl std::fmt::Debug for BatchEngine {
619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620        let mut s = f.debug_struct("BatchEngine");
621        s.field("config", &self.config)
622            .field("pool_max_threads", &self.pool.max_threads())
623            .field("stats", &self.stats.snapshot())
624            .field("interner_len", &self.interner.len())
625            .field("filters", &self.filters);
626        #[cfg(feature = "memory")]
627        s.field("memory_guard", &self.memory_guard.is_some());
628        #[cfg(feature = "transport")]
629        s.field("filter_dlq_policy", &self.filter_dlq_policy);
630        #[cfg(feature = "governor")]
631        s.field("self_regulated", &self.byte_budget.is_some());
632        s.finish()
633    }
634}
635
636#[cfg(all(test, feature = "transport"))]
637mod engine_tests {
638    use super::*;
639    use crate::transport::{PayloadFormat as TPayloadFormat, Record, RecordMeta};
640    use bytes::Bytes;
641
642    fn make_json_messages(n: usize) -> Vec<Record> {
643        (0..n)
644            .map(|i| Record {
645                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
646                key: None,
647                headers: vec![],
648                metadata: RecordMeta {
649                    timestamp_ms: None,
650                    format: TPayloadFormat::Json,
651                },
652            })
653            .collect()
654    }
655
656    fn default_engine() -> BatchEngine {
657        BatchEngine::new(BatchProcessingConfig::default())
658    }
659
660    #[cfg(feature = "transport")]
661    #[test]
662    fn filter_dlq_policy_routes_discards_or_rejects() {
663        use crate::transport::WorkBatch;
664        use crate::transport::filter::FilteredDlqEntry;
665        use std::sync::Arc as StdArc;
666        use std::sync::atomic::{AtomicUsize, Ordering};
667
668        // Minimal CommitToken for the generic helper.
669        #[derive(Clone, Debug)]
670        struct TestTok;
671        impl std::fmt::Display for TestTok {
672            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673                f.write_str("test")
674            }
675        }
676        impl crate::transport::CommitToken for TestTok {}
677
678        let entry = || FilteredDlqEntry {
679            payload: b"x".to_vec(),
680            key: None,
681            reason: "r".to_string(),
682        };
683        let batch_with = |n: usize| {
684            WorkBatch::<TestTok>::from_records(vec![])
685                .with_dlq_entries((0..n).map(|_| entry()).collect())
686        };
687
688        // Reject (default): any DLQ entries -> fail fast, not silent drop.
689        let eng = default_engine();
690        assert!(matches!(
691            eng.apply_workbatch_dlq_policy(batch_with(1)),
692            Err(EngineError::FilterDlqUnrouted(1))
693        ));
694        // Reject + no entries -> ok (batch passes through with no DLQ entries).
695        let passed = eng
696            .apply_workbatch_dlq_policy(WorkBatch::<TestTok>::from_records(vec![]))
697            .expect("no entries -> ok");
698        assert!(passed.dlq_entries.is_empty());
699
700        // DiscardWithMetric -> ok (deliberately dropped); entries consumed.
701        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
702        let passed = eng
703            .apply_workbatch_dlq_policy(batch_with(1))
704            .expect("discard -> ok");
705        assert!(
706            passed.dlq_entries.is_empty(),
707            "entries consumed after routing"
708        );
709
710        // Route -> the sink receives every entry.
711        let seen = StdArc::new(AtomicUsize::new(0));
712        let s = StdArc::clone(&seen);
713        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
714            move |e: Vec<FilteredDlqEntry>| {
715                s.fetch_add(e.len(), Ordering::Relaxed);
716                Ok(())
717            },
718        )));
719        let passed = eng
720            .apply_workbatch_dlq_policy(batch_with(2))
721            .expect("route -> ok");
722        assert!(passed.dlq_entries.is_empty());
723        assert_eq!(
724            seen.load(Ordering::Relaxed),
725            2,
726            "Route sink received all entries"
727        );
728    }
729
730    /// Build a `WorkBatch` of `n` JSON records (no commit tokens) for the
731    /// `WorkBatch`-shaped ingress-lease tests.
732    #[cfg(all(feature = "memory", feature = "transport"))]
733    fn make_record_batch(n: usize) -> crate::transport::WorkBatch<TestTok> {
734        use crate::transport::{PayloadFormat, Record, RecordMeta};
735        let records = (0..n)
736            .map(|i| Record {
737                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
738                key: None,
739                headers: vec![],
740                metadata: RecordMeta {
741                    timestamp_ms: None,
742                    format: PayloadFormat::Json,
743                },
744            })
745            .collect();
746        crate::transport::WorkBatch::from_records(records)
747    }
748
749    /// Minimal commit token for the `WorkBatch` ingress-lease tests.
750    #[cfg(all(feature = "memory", feature = "transport"))]
751    #[derive(Debug, Clone)]
752    struct TestTok;
753    #[cfg(all(feature = "memory", feature = "transport"))]
754    impl std::fmt::Display for TestTok {
755        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756            f.write_str("test")
757        }
758    }
759    #[cfg(all(feature = "memory", feature = "transport"))]
760    impl crate::transport::CommitToken for TestTok {}
761
762    #[cfg(all(feature = "memory", feature = "transport"))]
763    #[test]
764    fn ingress_lease_accounts_and_releases() {
765        use crate::memory::{MemoryGuard, MemoryGuardConfig};
766
767        let mut engine = default_engine();
768        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
769            limit_bytes: 1024 * 1024,
770            ..Default::default()
771        }));
772        engine.memory_guard = Some(Arc::clone(&guard));
773
774        let batch = make_record_batch(10);
775        let expected = batch.total_payload_bytes() as u64;
776        assert_eq!(guard.current_bytes(), 0, "starts at zero");
777
778        {
779            let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
780            assert_eq!(
781                guard.current_bytes(),
782                expected,
783                "bytes accounted while lease held"
784            );
785        }
786        // Lease dropped -> bytes released.
787        assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
788    }
789
790    #[cfg(all(feature = "memory", feature = "transport"))]
791    #[test]
792    fn ingress_lease_none_without_guard() {
793        let engine = default_engine();
794        let batch = make_record_batch(5);
795        assert!(
796            engine.lease_ingress_batch(&batch).is_none(),
797            "no lease when no guard wired"
798        );
799    }
800
801    #[test]
802    fn process_mid_tier_basic() {
803        let engine = default_engine();
804        let msgs = make_json_messages(100);
805
806        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
807            Ok(pm
808                .field("_table")
809                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
810                .unwrap_or("unknown")
811                .to_string())
812        });
813
814        assert_eq!(results.len(), 100);
815        assert!(results.iter().all(|r| r.is_ok()));
816        assert_eq!(results[0].as_ref().unwrap(), "events");
817    }
818
819    #[test]
820    fn process_mid_tier_parse_error() {
821        let engine = default_engine();
822        let mut msgs = make_json_messages(2);
823        // Insert an invalid JSON message
824        msgs.insert(
825            1,
826            Record {
827                payload: Bytes::from_static(b"not json {{{"),
828                key: None,
829                headers: vec![],
830                metadata: RecordMeta {
831                    timestamp_ms: None,
832                    format: TPayloadFormat::Json,
833                },
834            },
835        );
836
837        let results: Vec<Result<String, String>> =
838            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
839
840        // 2 successful + 1 error (DLQ by default)
841        assert_eq!(results.len(), 3);
842        assert!(results[0].is_ok());
843        assert!(results[1].is_err());
844        assert!(results[1].as_ref().unwrap_err().contains("parse error"));
845        assert!(results[2].is_ok());
846    }
847
848    #[test]
849    fn process_mid_tier_empty_batch() {
850        let engine = default_engine();
851        let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
852        assert!(results.is_empty());
853    }
854
855    #[test]
856    fn process_mid_tier_respects_chunk_size() {
857        let config = BatchProcessingConfig {
858            max_chunk_size: 50,
859            ..Default::default()
860        };
861        let engine = BatchEngine::new(config);
862        let msgs = make_json_messages(120);
863
864        let results: Vec<Result<usize, String>> =
865            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
866
867        assert_eq!(results.len(), 120);
868        assert!(results.iter().all(|r| r.is_ok()));
869        // Stats should show 120 received across 3 chunks (50+50+20)
870        let snap = engine.stats().snapshot();
871        assert_eq!(snap.received, 120);
872    }
873
874    #[test]
875    fn stats_updated_after_processing() {
876        let engine = default_engine();
877        let msgs = make_json_messages(10);
878
879        let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
880
881        let snap = engine.stats().snapshot();
882        assert_eq!(snap.received, 10);
883        assert_eq!(snap.processed, 10);
884        assert_eq!(snap.errors, 0);
885        assert_eq!(snap.filtered, 0);
886    }
887
888    #[test]
889    fn process_raw_passthrough() {
890        let engine = default_engine();
891        let msgs = make_json_messages(50);
892
893        let results: Vec<Result<usize, String>> =
894            engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
895
896        assert_eq!(results.len(), 50);
897        assert!(results.iter().all(|r| r.is_ok()));
898        // All JSON messages have the same format: {"_table":"events","id":N}
899        assert!(results[0].as_ref().unwrap() > &0);
900
901        let snap = engine.stats().snapshot();
902        assert_eq!(snap.received, 50);
903        assert_eq!(snap.processed, 50);
904    }
905
906    #[test]
907    fn process_mid_tier_with_pre_route() {
908        let config = BatchProcessingConfig {
909            routing_field: Some("_table".to_string()),
910            pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
911                field: "_table".to_string(),
912                value: "poison".to_string(),
913            }],
914            ..Default::default()
915        };
916        let engine = BatchEngine::new(config);
917
918        let mut msgs = make_json_messages(3);
919        // Replace middle message with a poison value
920        msgs[1] = Record {
921            payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
922            key: None,
923            headers: vec![],
924            metadata: RecordMeta {
925                timestamp_ms: None,
926                format: TPayloadFormat::Json,
927            },
928        };
929
930        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
931            Ok(pm
932                .field("_table")
933                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
934                .unwrap_or("?")
935                .to_string())
936        });
937
938        // 2 ok + 1 DLQ error
939        assert_eq!(results.len(), 3);
940        assert!(results[0].is_ok());
941        assert!(results[1].is_err());
942        assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
943        assert!(results[2].is_ok());
944
945        let snap = engine.stats().snapshot();
946        assert_eq!(snap.dlq, 1);
947        assert_eq!(snap.errors, 1);
948    }
949
950    #[test]
951    fn process_mid_tier_filtered_not_in_results() {
952        let config = BatchProcessingConfig {
953            routing_field: Some("_table".to_string()),
954            pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
955                field: "_table".to_string(),
956            }],
957            ..Default::default()
958        };
959        let engine = BatchEngine::new(config);
960
961        let mut msgs = make_json_messages(3);
962        // Replace middle message with one missing _table
963        msgs[1] = Record {
964            payload: Bytes::from(r#"{"host":"web1"}"#),
965            key: None,
966            headers: vec![],
967            metadata: RecordMeta {
968                timestamp_ms: None,
969                format: TPayloadFormat::Json,
970            },
971        };
972
973        let results: Vec<Result<String, String>> =
974            engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
975
976        // Filtered messages are removed -- only 2 results
977        assert_eq!(results.len(), 2);
978        assert!(results.iter().all(|r| r.is_ok()));
979
980        let snap = engine.stats().snapshot();
981        assert_eq!(snap.filtered, 1);
982        assert_eq!(snap.received, 3);
983    }
984
985    #[test]
986    fn from_cascade_creates_engine() {
987        let engine = BatchEngine::from_cascade("batch_processing").unwrap();
988        assert_eq!(engine.config().max_chunk_size, 10_000);
989    }
990
991    #[test]
992    fn accessors_return_expected_types() {
993        let engine = default_engine();
994        let _stats = engine.stats();
995        let _pool = engine.pool();
996        let _config = engine.config();
997        assert_eq!(engine.stats().snapshot().received, 0);
998    }
999
1000    #[test]
1001    fn auto_wire_does_not_panic() {
1002        let mut engine = default_engine();
1003        let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1004        engine.auto_wire(
1005            &mgr,
1006            #[cfg(feature = "memory")]
1007            None,
1008        );
1009        // Engine should still work after auto_wire
1010        let msgs = make_json_messages(5);
1011        let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1012        assert_eq!(results.len(), 5);
1013    }
1014
1015    #[test]
1016    fn debug_impl_works() {
1017        let engine = default_engine();
1018        let debug = format!("{engine:?}");
1019        assert!(debug.contains("BatchEngine"));
1020        assert!(debug.contains("config"));
1021    }
1022
1023    /// The driver run loops (`run_workbatch` / `run_workbatch_parsed`) replaced
1024    /// the four legacy loops in Task 0.7b. These tests exercise the same
1025    /// behaviours -- process+sink, ticker, on-demand (no-parse) pass-through, and
1026    /// sink-error resilience -- through the surviving WorkBatch driver.
1027    #[cfg(feature = "transport-memory")]
1028    mod driver_engine_tests {
1029        use super::*;
1030        use crate::transport::WorkBatch;
1031        use crate::worker::engine::CommitMode;
1032        use std::sync::atomic::{AtomicU64, Ordering};
1033
1034        fn json_payload(table: &str, id: usize) -> Vec<u8> {
1035            format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1036        }
1037
1038        /// No-ticker placeholder for the `ticker` argument.
1039        #[allow(clippy::type_complexity)]
1040        fn no_ticker() -> Option<(
1041            std::time::Duration,
1042            fn() -> std::future::Ready<Result<(), EngineError>>,
1043        )> {
1044            None
1045        }
1046
1047        fn cancel_after(shutdown: tokio_util::sync::CancellationToken, ms: u64) {
1048            tokio::spawn(async move {
1049                tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
1050                shutdown.cancel();
1051            });
1052        }
1053
1054        #[tokio::test]
1055        async fn run_workbatch_processes_and_passes_tokens_to_sink() {
1056            let config = crate::transport::memory::MemoryConfig {
1057                recv_timeout_ms: 50,
1058                ..Default::default()
1059            };
1060            let transport = crate::transport::memory::MemoryTransport::new(&config)
1061                .expect("memory transport with valid config must construct");
1062            for i in 0..5 {
1063                transport
1064                    .inject(None, json_payload("events", i))
1065                    .await
1066                    .unwrap();
1067            }
1068
1069            let engine = default_engine();
1070            let shutdown = tokio_util::sync::CancellationToken::new();
1071            cancel_after(shutdown.clone(), 200);
1072
1073            let record_count = Arc::new(AtomicU64::new(0));
1074            let token_count = Arc::new(AtomicU64::new(0));
1075            let rc = Arc::clone(&record_count);
1076            let tc = Arc::clone(&token_count);
1077
1078            let result = engine
1079                .run_workbatch(
1080                    &transport,
1081                    shutdown,
1082                    |batch| Ok(batch),
1083                    |out: &WorkBatch<_>| {
1084                        let rc = Arc::clone(&rc);
1085                        let tc = Arc::clone(&tc);
1086                        let records = out.records.len();
1087                        let tokens = out.commit_tokens.len();
1088                        async move {
1089                            rc.fetch_add(records as u64, Ordering::Relaxed);
1090                            tc.fetch_add(tokens as u64, Ordering::Relaxed);
1091                            Ok(())
1092                        }
1093                    },
1094                    // SinkManaged mirrors the legacy run_async sink-owns-commit shape.
1095                    CommitMode::SinkManaged,
1096                    no_ticker(),
1097                )
1098                .await;
1099
1100            assert!(result.is_ok());
1101            assert_eq!(record_count.load(Ordering::Relaxed), 5);
1102            assert_eq!(token_count.load(Ordering::Relaxed), 5);
1103        }
1104
1105        #[tokio::test]
1106        async fn run_workbatch_ticker_fires() {
1107            let config = crate::transport::memory::MemoryConfig {
1108                recv_timeout_ms: 50,
1109                ..Default::default()
1110            };
1111            let transport = crate::transport::memory::MemoryTransport::new(&config)
1112                .expect("memory transport with valid config must construct");
1113            let engine = default_engine();
1114            let shutdown = tokio_util::sync::CancellationToken::new();
1115            cancel_after(shutdown.clone(), 350);
1116
1117            let tick_count = Arc::new(AtomicU64::new(0));
1118            let tick_count_clone = Arc::clone(&tick_count);
1119
1120            let result = engine
1121                .run_workbatch(
1122                    &transport,
1123                    shutdown,
1124                    |batch| Ok(batch),
1125                    |_out: &WorkBatch<_>| async { Ok(()) },
1126                    CommitMode::Auto,
1127                    Some((std::time::Duration::from_millis(100), move || {
1128                        let tc = Arc::clone(&tick_count_clone);
1129                        async move {
1130                            tc.fetch_add(1, Ordering::Relaxed);
1131                            Ok(())
1132                        }
1133                    })),
1134                )
1135                .await;
1136
1137            assert!(result.is_ok());
1138            let ticks = tick_count.load(Ordering::Relaxed);
1139            assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1140        }
1141
1142        #[tokio::test]
1143        async fn run_workbatch_passthrough_without_parse() {
1144            let config = crate::transport::memory::MemoryConfig {
1145                recv_timeout_ms: 50,
1146                ..Default::default()
1147            };
1148            let transport = crate::transport::memory::MemoryTransport::new(&config)
1149                .expect("memory transport with valid config must construct");
1150            for i in 0..3 {
1151                transport
1152                    .inject(None, json_payload("logs", i))
1153                    .await
1154                    .unwrap();
1155            }
1156
1157            let engine = default_engine();
1158            let shutdown = tokio_util::sync::CancellationToken::new();
1159            cancel_after(shutdown.clone(), 200);
1160
1161            let total_bytes = Arc::new(AtomicU64::new(0));
1162            let total_bytes_clone = Arc::clone(&total_bytes);
1163
1164            // On-demand driver: pass-through process pays no parse cost.
1165            let result = engine
1166                .run_workbatch(
1167                    &transport,
1168                    shutdown,
1169                    |batch| Ok(batch),
1170                    |out: &WorkBatch<_>| {
1171                        let tb = Arc::clone(&total_bytes_clone);
1172                        let sum: u64 = out.records.iter().map(|r| r.payload.len() as u64).sum();
1173                        async move {
1174                            tb.fetch_add(sum, Ordering::Relaxed);
1175                            Ok(())
1176                        }
1177                    },
1178                    CommitMode::Auto,
1179                    no_ticker(),
1180                )
1181                .await;
1182
1183            assert!(result.is_ok());
1184            assert!(total_bytes.load(Ordering::Relaxed) > 0);
1185        }
1186
1187        #[tokio::test]
1188        async fn run_workbatch_parsed_reads_field() {
1189            // The parsed path is the analogue of the old mid-tier run_async: the
1190            // driver pre-parses and the process closure reads a routing field.
1191            let config = crate::transport::memory::MemoryConfig {
1192                recv_timeout_ms: 50,
1193                ..Default::default()
1194            };
1195            let transport = crate::transport::memory::MemoryTransport::new(&config)
1196                .expect("memory transport with valid config must construct");
1197            for i in 0..4 {
1198                transport
1199                    .inject(None, json_payload("events", i))
1200                    .await
1201                    .unwrap();
1202            }
1203
1204            let engine = default_engine();
1205            let shutdown = tokio_util::sync::CancellationToken::new();
1206            cancel_after(shutdown.clone(), 200);
1207
1208            let hits = Arc::new(AtomicU64::new(0));
1209            let hc = Arc::clone(&hits);
1210
1211            let result = engine
1212                .run_workbatch_parsed(
1213                    &transport,
1214                    shutdown,
1215                    move |pb| {
1216                        let field = pb.intern("_table");
1217                        let mut local = 0u64;
1218                        for parsed in &pb.parsed {
1219                            if parsed.field_str(&field) == Some("events") {
1220                                local += 1;
1221                            }
1222                        }
1223                        hc.fetch_add(local, Ordering::Relaxed);
1224                        Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1225                            .with_dlq_entries(pb.dlq_entries))
1226                    },
1227                    |_out: &WorkBatch<_>| async { Ok(()) },
1228                    CommitMode::Auto,
1229                    no_ticker(),
1230                )
1231                .await;
1232
1233            assert!(result.is_ok());
1234            assert_eq!(hits.load(Ordering::Relaxed), 4);
1235        }
1236
1237        #[tokio::test]
1238        async fn run_workbatch_sink_error_does_not_crash() {
1239            let config = crate::transport::memory::MemoryConfig {
1240                recv_timeout_ms: 50,
1241                ..Default::default()
1242            };
1243            let transport = crate::transport::memory::MemoryTransport::new(&config)
1244                .expect("memory transport with valid config must construct");
1245            transport
1246                .inject(None, json_payload("events", 0))
1247                .await
1248                .unwrap();
1249
1250            let engine = default_engine();
1251            let shutdown = tokio_util::sync::CancellationToken::new();
1252            cancel_after(shutdown.clone(), 200);
1253
1254            // Sink always errors -- the driver returns the error cleanly (no
1255            // crash/panic). Post Remediation Phase 1 the sink error is a
1256            // TERMINAL ack-barrier error rather than a logged continue.
1257            let result = engine
1258                .run_workbatch(
1259                    &transport,
1260                    shutdown,
1261                    |batch| Ok(batch),
1262                    |_out: &WorkBatch<_>| async {
1263                        Err(EngineError::Sink("test sink error".into()))
1264                    },
1265                    CommitMode::Auto,
1266                    no_ticker(),
1267                )
1268                .await;
1269
1270            assert!(
1271                matches!(result, Err(EngineError::Sink(_))),
1272                "sink error returns terminally without crashing: {result:?}"
1273            );
1274        }
1275    }
1276}