Skip to main content

hyperi_rustlib/worker/engine/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/mod.rs
3// Purpose:   SIMD-optimised batch processing engine for DFE pipelines
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9pub mod config;
10pub mod intern;
11pub mod metrics;
12pub mod parse;
13pub mod pre_route;
14pub mod types;
15
16pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
17pub use intern::FieldInterner;
18pub use types::{MessageMetadata, ParsedMessage, PreRouteResult, RawMessage};
19
20/// Errors returned by [`BatchEngine::run`] and [`BatchEngine::run_raw`].
21///
22/// Only available when the `transport` feature is enabled.
23#[cfg(feature = "transport")]
24#[derive(Debug, thiserror::Error)]
25pub enum EngineError {
26    /// Transport receive or commit failed.
27    #[error("transport error: {0}")]
28    Transport(#[from] crate::TransportError),
29    /// Sink callback returned an error.
30    #[error("sink error: {0}")]
31    Sink(String),
32    /// Shutdown was requested via cancellation token.
33    #[error("shutdown")]
34    Shutdown,
35    /// Inbound-filter DLQ entries appeared but no routing policy was configured
36    /// (the default [`FilterDlqPolicy::Reject`]). Metrics are not delivery, so
37    /// the engine fails fast rather than silently dropping dead-letters.
38    #[error(
39        "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
40         configured -- set a policy via BatchEngine::with_filter_dlq_policy \
41         (Route to forward, or DiscardWithMetric to deliberately drop)"
42    )]
43    FilterDlqUnrouted(usize),
44}
45
46/// What a [`BatchEngine`] run loop does with inbound-filter DLQ entries
47/// ([`RecvBatch::dlq_entries`](crate::transport::RecvBatch)).
48///
49/// Inbound `action: dlq` filters remove messages from the normal batch; those
50/// entries must go somewhere. The default is [`Reject`](Self::Reject) so a
51/// data-loss-shaped config never passes silently.
52#[cfg(feature = "transport")]
53#[derive(Clone, Default)]
54pub enum FilterDlqPolicy {
55    /// Fail the run loop ([`EngineError::FilterDlqUnrouted`]) if any DLQ entries
56    /// appear. The safe default -- forces a deliberate choice.
57    #[default]
58    Reject,
59    /// Deliberately discard DLQ entries, counting them in the
60    /// `dfe_engine_filter_dlq_discarded_total` metric. Explicit opt-in.
61    DiscardWithMetric,
62    /// Hand each batch's DLQ entries to a sink (e.g. enqueue onto a DLQ
63    /// transport, or `tokio::spawn` an async send). Called on the run loop, so
64    /// keep it cheap -- offload slow work.
65    Route(Arc<dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) + Send + Sync>),
66}
67
68#[cfg(feature = "transport")]
69impl std::fmt::Debug for FilterDlqPolicy {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        match self {
72            Self::Reject => f.write_str("Reject"),
73            Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
74            Self::Route(_) => f.write_str("Route(..)"),
75        }
76    }
77}
78
79use std::sync::Arc;
80
81use super::pool::AdaptiveWorkerPool;
82use super::stats::PipelineStats;
83
84use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field, filters_from_config};
85use self::types::PayloadFormat;
86use super::config::WorkerPoolConfig;
87
88/// Core batch processing engine for DFE pipelines.
89///
90/// Provides two processing modes:
91///
92/// - [`process_mid_tier`](Self::process_mid_tier) -- parse JSON via SIMD, extract
93///   known fields, apply pre-route filters, then parallel transform via rayon.
94///   The standard path for most DFE apps (loader, archiver, transforms).
95///
96/// - [`process_raw`](Self::process_raw) -- skip parsing, apply pre-route on raw
97///   bytes, then parallel transform via rayon. For apps that handle raw bytes
98///   (receiver, binary protocols).
99///
100/// Both modes chunk large batches, track stats atomically, and pause between
101/// chunks when memory pressure is detected.
102pub struct BatchEngine {
103    config: BatchProcessingConfig,
104    pool: Arc<AdaptiveWorkerPool>,
105    stats: Arc<PipelineStats>,
106    interner: Arc<FieldInterner>,
107    filters: Vec<pre_route::PreRouteFilter>,
108    #[cfg(feature = "memory")]
109    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
110    /// What the run loops do with inbound-filter DLQ entries. Default
111    /// [`FilterDlqPolicy::Reject`] (no silent data loss).
112    #[cfg(feature = "transport")]
113    filter_dlq_policy: FilterDlqPolicy,
114}
115
116impl BatchEngine {
117    /// Create a standalone engine with its own worker pool.
118    ///
119    /// Uses `WorkerPoolConfig::default()` for the pool. Prefer
120    /// [`with_pool`](Self::with_pool) when a `ServiceRuntime` pool exists.
121    #[must_use]
122    pub fn new(config: BatchProcessingConfig) -> Self {
123        let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
124        Self::with_pool(pool, config)
125    }
126
127    /// Create an engine that reuses an existing worker pool.
128    ///
129    /// This is the preferred constructor when `ServiceRuntime` is available,
130    /// as it avoids creating a second rayon thread pool.
131    #[must_use]
132    pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
133        let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
134        let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
135        let filters = filters_from_config(&config.pre_route_filters);
136        Self {
137            config,
138            pool,
139            stats: Arc::new(PipelineStats::new()),
140            interner,
141            filters,
142            #[cfg(feature = "memory")]
143            memory_guard: None,
144            #[cfg(feature = "transport")]
145            filter_dlq_policy: FilterDlqPolicy::default(),
146        }
147    }
148
149    /// Set the policy for inbound-filter DLQ entries in the run loops.
150    ///
151    /// Default is [`FilterDlqPolicy::Reject`] -- the run loop errors if an
152    /// inbound `action: dlq` filter produces entries and no routing is set, so
153    /// dead-letters are never silently dropped.
154    #[cfg(feature = "transport")]
155    #[must_use]
156    pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
157        self.filter_dlq_policy = policy;
158        self
159    }
160
161    /// Load configuration from the cascade and create a standalone engine.
162    ///
163    /// # Errors
164    ///
165    /// Returns `ConfigError` if the cascade contains invalid data.
166    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
167        let config = BatchProcessingConfig::from_cascade(key)?;
168        Ok(Self::new(config))
169    }
170
171    /// Pipeline statistics (atomic, lock-free).
172    #[must_use]
173    pub fn stats(&self) -> &Arc<PipelineStats> {
174        &self.stats
175    }
176
177    /// Underlying worker pool.
178    #[must_use]
179    pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
180        &self.pool
181    }
182
183    /// Engine configuration.
184    #[must_use]
185    pub fn config(&self) -> &BatchProcessingConfig {
186        &self.config
187    }
188
189    /// Auto-wire engine with infrastructure components.
190    ///
191    /// Called by `ServiceRuntime::build()`. Apps never call this directly.
192    pub fn auto_wire(
193        &mut self,
194        metrics_manager: &crate::metrics::MetricsManager,
195        #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
196    ) {
197        metrics::register(metrics_manager, &self.config);
198
199        #[cfg(feature = "memory")]
200        if let Some(guard) = memory_guard {
201            self.memory_guard = Some(Arc::clone(guard));
202        }
203    }
204
205    /// Parse, filter, and transform a batch of raw messages.
206    ///
207    /// Pipeline phases per chunk:
208    /// 1. **Pre-route** -- SIMD field extraction + filter evaluation (sequential, ~100 ns/msg)
209    /// 2. **Parse** -- `sonic_rs::from_slice` + known-field extraction (sequential, ~1-5 µs/msg)
210    /// 3. **Transform** -- user closure via rayon `par_iter_mut` (parallel)
211    ///
212    /// Results contain one entry per non-filtered message. Filtered messages are
213    /// silently removed (their commit tokens remain accessible via the original
214    /// slice). DLQ'd and parse-error messages produce `Err` entries.
215    pub fn process_mid_tier<O, E, F>(
216        &self,
217        messages: &[RawMessage],
218        transform: F,
219    ) -> Vec<Result<O, E>>
220    where
221        O: Send,
222        E: Send + From<String>,
223        F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
224    {
225        if messages.is_empty() {
226            return Vec::new();
227        }
228
229        let chunk_size = if self.config.max_chunk_size == 0 {
230            messages.len()
231        } else {
232            self.config.max_chunk_size
233        };
234
235        let has_routing = self.config.routing_field.is_some();
236        let mut all_results = Vec::with_capacity(messages.len());
237
238        for chunk in messages.chunks(chunk_size) {
239            self.stats.add_received(chunk.len() as u64);
240
241            // Accumulate bytes received
242            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
243            self.stats.add_bytes_received(chunk_bytes);
244
245            // Phase 1 + 2: Pre-route and parse, building ParsedMessage vec.
246            // Track which messages are included (not filtered).
247            let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
248                Vec::with_capacity(chunk.len());
249
250            for msg in chunk {
251                // Phase 1: Pre-route
252                if has_routing {
253                    let field_name = self.config.routing_field.as_ref().expect("checked above");
254                    let extraction = extract_routing_field(&msg.payload, field_name);
255                    let outcome = apply_filters(&extraction, &self.filters);
256
257                    match outcome {
258                        PreRouteOutcome::Continue => {}
259                        PreRouteOutcome::Filtered => {
260                            self.stats.incr_filtered();
261                            continue; // skip this message entirely
262                        }
263                        PreRouteOutcome::Dlq(reason) => {
264                            self.stats.incr_dlq();
265                            self.stats.incr_errors();
266                            parsed_msgs.push(Err(reason));
267                            continue;
268                        }
269                    }
270                }
271
272                // Phase 2: Parse
273                let format = match msg.metadata.format {
274                    PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
275                    other => other,
276                };
277
278                match parse::parse_payload(&msg.payload, format) {
279                    Ok(value) => {
280                        let extracted = self.interner.extract_known(&value);
281                        parsed_msgs.push(Ok(ParsedMessage::Parsed {
282                            value,
283                            raw: msg.payload.clone(),
284                            format,
285                            key: msg.key.clone(),
286                            headers: msg.headers.clone(),
287                            metadata: msg.metadata.clone(),
288                            extracted,
289                        }));
290                    }
291                    Err(e) => {
292                        self.stats.incr_errors();
293                        match self.config.parse_error_action {
294                            ParseErrorAction::Dlq => {
295                                self.stats.incr_dlq();
296                                parsed_msgs.push(Err(format!("parse error: {e}")));
297                            }
298                            ParseErrorAction::Skip => {
299                                // Counted in errors above, not added to results
300                            }
301                            ParseErrorAction::FailBatch => {
302                                // Return all accumulated results + this error,
303                                // then stop processing the chunk.
304                                parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
305                                let results: Vec<Result<O, E>> = parsed_msgs
306                                    .into_iter()
307                                    .map(|r| match r {
308                                        Ok(_) => Err(E::from(
309                                            "batch failed due to parse error".to_string(),
310                                        )),
311                                        Err(reason) => Err(E::from(reason)),
312                                    })
313                                    .collect();
314                                all_results.extend(results);
315                                return all_results;
316                            }
317                        }
318                    }
319                }
320            }
321
322            // Phase 3: Parallel transform via rayon.
323            // Split into ok/err: transform only the Ok entries.
324            let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
325                parsed_msgs.into_iter().enumerate().collect();
326
327            // Separate errors from parseable messages
328            let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
329            let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
330
331            for (idx, item) in indexed.drain(..) {
332                match item {
333                    Ok(pm) => to_transform.push((idx, pm)),
334                    Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
335                }
336            }
337
338            // Parallel transform, throttled by the scaler target (map_owned
339            // applies the semaphore per item -- unlike the old install() path,
340            // which bypassed it and let the parsed path ignore the CPU cap).
341            let transformed: Vec<(usize, Result<O, E>)> =
342                self.pool.map_owned(to_transform, |(idx, mut pm)| {
343                    let result = transform(&mut pm);
344                    (idx, result)
345                });
346
347            chunk_results.extend(transformed);
348
349            // Sort by original index to preserve order
350            chunk_results.sort_by_key(|(idx, _)| *idx);
351
352            // Update stats
353            let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
354            self.stats.add_processed(ok_count as u64);
355
356            all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
357
358            // Memory pressure check between chunks
359            self.check_memory_pressure();
360        }
361
362        all_results
363    }
364
365    /// Pre-route and transform a batch of raw messages without parsing.
366    ///
367    /// The transform closure receives immutable `&RawMessage` references.
368    /// Use this for apps that handle raw bytes directly (e.g. receiver forwarding).
369    pub fn process_raw<O, E, F>(&self, messages: &[RawMessage], transform: F) -> Vec<Result<O, E>>
370    where
371        O: Send,
372        E: Send + From<String>,
373        F: Fn(&RawMessage) -> Result<O, E> + Sync,
374    {
375        if messages.is_empty() {
376            return Vec::new();
377        }
378
379        let chunk_size = if self.config.max_chunk_size == 0 {
380            messages.len()
381        } else {
382            self.config.max_chunk_size
383        };
384
385        let has_routing = self.config.routing_field.is_some();
386        let mut all_results = Vec::with_capacity(messages.len());
387
388        for chunk in messages.chunks(chunk_size) {
389            self.stats.add_received(chunk.len() as u64);
390
391            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
392            self.stats.add_bytes_received(chunk_bytes);
393
394            // Phase 1: Pre-route filter
395            let to_process: Vec<&RawMessage> = if has_routing {
396                let field_name = self.config.routing_field.as_ref().expect("checked above");
397                let mut passed = Vec::with_capacity(chunk.len());
398                for msg in chunk {
399                    let extraction = extract_routing_field(&msg.payload, field_name);
400                    let outcome = apply_filters(&extraction, &self.filters);
401                    match outcome {
402                        PreRouteOutcome::Continue => passed.push(msg),
403                        PreRouteOutcome::Filtered => {
404                            self.stats.incr_filtered();
405                        }
406                        PreRouteOutcome::Dlq(reason) => {
407                            self.stats.incr_dlq();
408                            self.stats.incr_errors();
409                            all_results.push(Err(E::from(reason)));
410                        }
411                    }
412                }
413                passed
414            } else {
415                chunk.iter().collect()
416            };
417
418            // Phase 2: Parallel transform via process_batch
419            let results = self.pool.process_batch(&to_process, |msg| transform(msg));
420
421            let ok_count = results.iter().filter(|r| r.is_ok()).count();
422            self.stats.add_processed(ok_count as u64);
423
424            all_results.extend(results);
425
426            self.check_memory_pressure();
427        }
428
429        all_results
430    }
431
432    /// Transport-wired async run loop for mid-tier (parsed JSON) processing.
433    ///
434    /// Receives up to `config.max_chunk_size` messages per iteration, converts them
435    /// to `RawMessage`, runs [`process_mid_tier`](Self::process_mid_tier), calls the
436    /// sink, then commits transport tokens only on sink success.
437    ///
438    /// Stops cleanly when `shutdown` is cancelled.
439    ///
440    /// # Errors
441    ///
442    /// Returns `EngineError::Transport` if recv or commit fails fatally.
443    /// Returns `EngineError::Sink` if the sink callback errors (commit skipped).
444    #[cfg(feature = "transport")]
445    pub async fn run<R, O, E, Transform, Sink>(
446        &self,
447        receiver: &R,
448        shutdown: tokio_util::sync::CancellationToken,
449        transform: Transform,
450        mut sink: Sink,
451    ) -> Result<(), EngineError>
452    where
453        R: crate::transport::TransportReceiver,
454        O: Send + 'static,
455        E: Send + From<String> + std::fmt::Display + 'static,
456        Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
457        Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
458    {
459        tracing::info!(
460            chunk_size = self.config.max_chunk_size,
461            routing_field = ?self.config.routing_field,
462            "BatchEngine starting"
463        );
464
465        loop {
466            tokio::select! {
467                biased;
468                () = shutdown.cancelled() => {
469                    tracing::info!("BatchEngine shutting down");
470                    return Ok(());
471                }
472                recv_result = receiver.recv(self.config.max_chunk_size) => {
473                    let batch = recv_result.map_err(EngineError::Transport)?;
474                    // Route/discard/reject inbound-filter DLQ entries per the
475                    // configured policy -- never silently dropped.
476                    let messages = self.apply_filter_dlq_policy(batch)?;
477                    if messages.is_empty() {
478                        continue;
479                    }
480
481                    // Collect commit tokens before converting (move happens below).
482                    let tokens: Vec<R::Token> = messages.iter()
483                        .map(|m| m.token.clone())
484                        .collect();
485
486                    // Convert to RawMessage (type-erased).
487                    let raw: Vec<RawMessage> = messages.into_iter()
488                        .map(RawMessage::from)
489                        .collect();
490
491                    // Account in-flight ingress bytes against the MemoryGuard;
492                    // released on every exit path of this arm via Drop.
493                    #[cfg(feature = "memory")]
494                    let _ingress_lease = self.lease_ingress(&raw);
495
496                    // Process: pre-route + parse + parallel transform.
497                    let results = self.process_mid_tier(&raw, &transform);
498
499                    // Sink: commit only on success.
500                    if let Err(e) = sink(results) {
501                        tracing::error!(error = %e, "Sink failed, skipping commit");
502                        continue;
503                    }
504
505                    if let Err(e) = receiver.commit(&tokens).await {
506                        tracing::error!(error = %e, "Commit failed");
507                    }
508                }
509            }
510        }
511    }
512
513    /// Transport-wired async run loop for raw byte processing.
514    ///
515    /// Like [`run`](Self::run) but uses [`process_raw`](Self::process_raw): the
516    /// transform closure receives `&RawMessage` bytes without JSON parsing.
517    ///
518    /// # Errors
519    ///
520    /// Returns `EngineError::Transport` if recv or commit fails fatally.
521    /// Returns `EngineError::Sink` if the sink callback errors (commit skipped).
522    #[cfg(feature = "transport")]
523    pub async fn run_raw<R, O, E, Transform, Sink>(
524        &self,
525        receiver: &R,
526        shutdown: tokio_util::sync::CancellationToken,
527        transform: Transform,
528        mut sink: Sink,
529    ) -> Result<(), EngineError>
530    where
531        R: crate::transport::TransportReceiver,
532        O: Send + 'static,
533        E: Send + From<String> + std::fmt::Display + 'static,
534        Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
535        Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
536    {
537        tracing::info!(
538            chunk_size = self.config.max_chunk_size,
539            "BatchEngine (raw) starting"
540        );
541
542        loop {
543            tokio::select! {
544                biased;
545                () = shutdown.cancelled() => {
546                    tracing::info!("BatchEngine (raw) shutting down");
547                    return Ok(());
548                }
549                recv_result = receiver.recv(self.config.max_chunk_size) => {
550                    let batch = recv_result.map_err(EngineError::Transport)?;
551                    // Route/discard/reject inbound-filter DLQ entries per the
552                    // configured policy -- never silently dropped.
553                    let messages = self.apply_filter_dlq_policy(batch)?;
554                    if messages.is_empty() {
555                        continue;
556                    }
557
558                    let tokens: Vec<R::Token> = messages.iter()
559                        .map(|m| m.token.clone())
560                        .collect();
561
562                    let raw: Vec<RawMessage> = messages.into_iter()
563                        .map(RawMessage::from)
564                        .collect();
565
566                    // Account in-flight ingress bytes against the MemoryGuard;
567                    // released on every exit path of this arm via Drop.
568                    #[cfg(feature = "memory")]
569                    let _ingress_lease = self.lease_ingress(&raw);
570
571                    let results = self.process_raw(&raw, &transform);
572
573                    if let Err(e) = sink(results) {
574                        tracing::error!(error = %e, "Sink failed (raw), skipping commit");
575                        continue;
576                    }
577
578                    if let Err(e) = receiver.commit(&tokens).await {
579                        tracing::error!(error = %e, "Commit failed (raw)");
580                    }
581                }
582            }
583        }
584    }
585
586    /// Transport-wired async run loop with full control over commit semantics.
587    ///
588    /// Like [`run`](Self::run) but with two key differences:
589    ///
590    /// 1. **Async sink** -- the sink is an async closure, enabling async I/O
591    ///    (ClickHouse inserts, Kafka produce, storage writes) inside the sink.
592    ///
593    /// 2. **Sink-managed commits** -- the sink receives commit tokens and decides
594    ///    when to commit. The engine does NOT auto-commit. This enables deferred
595    ///    commit patterns (e.g., commit after ClickHouse flush, not after buffer push).
596    ///
597    /// An optional ticker fires on a fixed interval within the select! loop,
598    /// enabling flush timers and periodic maintenance without breaking out of
599    /// the engine loop.
600    ///
601    /// # Errors
602    ///
603    /// Returns `EngineError::Transport` if recv fails fatally.
604    /// Returns `EngineError::Sink` if the sink callback errors.
605    #[cfg(feature = "transport")]
606    pub async fn run_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
607        &self,
608        receiver: &R,
609        shutdown: tokio_util::sync::CancellationToken,
610        transform: Transform,
611        mut sink: Sink,
612        ticker: Option<(std::time::Duration, Ticker)>,
613    ) -> Result<(), EngineError>
614    where
615        R: crate::transport::TransportReceiver,
616        O: Send + 'static,
617        E: Send + From<String> + std::fmt::Display + 'static,
618        Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
619        Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
620        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
621        Ticker: FnMut() -> TickerFut,
622        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
623    {
624        tracing::info!(
625            chunk_size = self.config.max_chunk_size,
626            routing_field = ?self.config.routing_field,
627            ticker = ticker.is_some(),
628            "BatchEngine (async) starting"
629        );
630
631        // Ticker interval -- if None, create a dummy that never fires.
632        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
633        let mut ticker_fn = ticker.map(|(_, f)| f);
634
635        // Consume the first tick (fires immediately).
636        if let Some(ref mut interval) = tick_interval {
637            interval.tick().await;
638        }
639
640        loop {
641            tokio::select! {
642                biased;
643
644                () = shutdown.cancelled() => {
645                    tracing::info!("BatchEngine (async) shutting down");
646                    return Ok(());
647                }
648
649                _ = async {
650                    match tick_interval.as_mut() {
651                        Some(interval) => interval.tick().await,
652                        None => std::future::pending().await,
653                    }
654                } => {
655                    if let Some(ref mut f) = ticker_fn
656                        && let Err(e) = f().await
657                    {
658                        tracing::error!(error = %e, "Ticker failed");
659                    }
660                }
661
662                recv_result = receiver.recv(self.config.max_chunk_size) => {
663                    let batch = recv_result.map_err(EngineError::Transport)?;
664                    // Route/discard/reject inbound-filter DLQ entries per the
665                    // configured policy -- never silently dropped.
666                    let messages = self.apply_filter_dlq_policy(batch)?;
667                    if messages.is_empty() {
668                        continue;
669                    }
670
671                    // Collect commit tokens before converting (move happens below).
672                    let tokens: Vec<R::Token> = messages.iter()
673                        .map(|m| m.token.clone())
674                        .collect();
675
676                    // Convert to RawMessage (type-erased).
677                    let raw: Vec<RawMessage> = messages.into_iter()
678                        .map(RawMessage::from)
679                        .collect();
680
681                    // Account in-flight ingress bytes against the MemoryGuard;
682                    // released on every exit path of this arm via Drop.
683                    #[cfg(feature = "memory")]
684                    let _ingress_lease = self.lease_ingress(&raw);
685
686                    // Process: pre-route + parse + parallel transform.
687                    let results = self.process_mid_tier(&raw, &transform);
688
689                    // Sink receives results AND tokens -- sink decides when to commit.
690                    if let Err(e) = sink(results, tokens).await {
691                        tracing::error!(error = %e, "Sink failed (async)");
692                    }
693                }
694            }
695        }
696    }
697
698    /// Transport-wired async run loop for raw byte processing with full control.
699    ///
700    /// Like [`run_async`](Self::run_async) but uses [`process_raw`](Self::process_raw):
701    /// the transform closure receives `&RawMessage` bytes without JSON parsing.
702    ///
703    /// # Errors
704    ///
705    /// Returns `EngineError::Transport` if recv fails fatally.
706    /// Returns `EngineError::Sink` if the sink callback errors.
707    #[cfg(feature = "transport")]
708    pub async fn run_raw_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
709        &self,
710        receiver: &R,
711        shutdown: tokio_util::sync::CancellationToken,
712        transform: Transform,
713        mut sink: Sink,
714        ticker: Option<(std::time::Duration, Ticker)>,
715    ) -> Result<(), EngineError>
716    where
717        R: crate::transport::TransportReceiver,
718        O: Send + 'static,
719        E: Send + From<String> + std::fmt::Display + 'static,
720        Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
721        Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
722        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
723        Ticker: FnMut() -> TickerFut,
724        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
725    {
726        tracing::info!(
727            chunk_size = self.config.max_chunk_size,
728            ticker = ticker.is_some(),
729            "BatchEngine (raw async) starting"
730        );
731
732        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
733        let mut ticker_fn = ticker.map(|(_, f)| f);
734
735        if let Some(ref mut interval) = tick_interval {
736            interval.tick().await;
737        }
738
739        loop {
740            tokio::select! {
741                biased;
742
743                () = shutdown.cancelled() => {
744                    tracing::info!("BatchEngine (raw async) shutting down");
745                    return Ok(());
746                }
747
748                _ = async {
749                    match tick_interval.as_mut() {
750                        Some(interval) => interval.tick().await,
751                        None => std::future::pending().await,
752                    }
753                } => {
754                    if let Some(ref mut f) = ticker_fn
755                        && let Err(e) = f().await
756                    {
757                        tracing::error!(error = %e, "Ticker (raw) failed");
758                    }
759                }
760
761                recv_result = receiver.recv(self.config.max_chunk_size) => {
762                    let batch = recv_result.map_err(EngineError::Transport)?;
763                    // Route/discard/reject inbound-filter DLQ entries per the
764                    // configured policy -- never silently dropped.
765                    let messages = self.apply_filter_dlq_policy(batch)?;
766                    if messages.is_empty() {
767                        continue;
768                    }
769
770                    let tokens: Vec<R::Token> = messages.iter()
771                        .map(|m| m.token.clone())
772                        .collect();
773
774                    let raw: Vec<RawMessage> = messages.into_iter()
775                        .map(RawMessage::from)
776                        .collect();
777
778                    // Account in-flight ingress bytes against the MemoryGuard;
779                    // released on every exit path of this arm via Drop.
780                    #[cfg(feature = "memory")]
781                    let _ingress_lease = self.lease_ingress(&raw);
782
783                    let results = self.process_raw(&raw, &transform);
784
785                    if let Err(e) = sink(results, tokens).await {
786                        tracing::error!(error = %e, "Sink failed (raw async)");
787                    }
788                }
789            }
790        }
791    }
792
793    /// Account a received batch's payload bytes against the [`MemoryGuard`]
794    /// and return an RAII lease that releases them on drop.
795    ///
796    /// Ingress is consume-side: by the time `recv()` returns we already hold
797    /// the bytes and cannot reject them, so this TRACKS (`add_bytes`) rather
798    /// than gates (`try_reserve`). The accounting is what drives
799    /// `under_pressure()`, which both the inter-chunk
800    /// [`check_memory_pressure`](Self::check_memory_pressure) pause and the
801    /// worker-pool scaler's memory signal consult -- so a deep in-flight
802    /// ingress backlog now actually throttles processing and down-scales the
803    /// pool, instead of the guard only ever seeing externally-sampled RSS.
804    /// The returned lease releases the bytes on every loop exit path (commit,
805    /// sink-error `continue`, `?`-return) via `Drop`.
806    ///
807    /// [`MemoryGuard`]: crate::memory::MemoryGuard
808    #[cfg(feature = "memory")]
809    fn lease_ingress(&self, raw: &[RawMessage]) -> Option<IngressLease<'_>> {
810        let guard = self.memory_guard.as_ref()?;
811        let bytes: u64 = raw.iter().map(|m| m.payload.len() as u64).sum();
812        guard.add_bytes(bytes);
813        Some(IngressLease { guard, bytes })
814    }
815
816    /// Apply the [`FilterDlqPolicy`] to a received batch, returning the passing
817    /// messages. DLQ entries are routed/discarded/rejected per the policy --
818    /// never silently dropped (Codex review 2026-06-03).
819    ///
820    /// # Errors
821    ///
822    /// [`EngineError::FilterDlqUnrouted`] when entries appear under
823    /// [`FilterDlqPolicy::Reject`].
824    #[cfg(feature = "transport")]
825    fn apply_filter_dlq_policy<T: crate::transport::CommitToken>(
826        &self,
827        batch: crate::transport::RecvBatch<T>,
828    ) -> Result<Vec<crate::transport::Message<T>>, EngineError> {
829        if !batch.dlq_entries.is_empty() {
830            match &self.filter_dlq_policy {
831                FilterDlqPolicy::Reject => {
832                    return Err(EngineError::FilterDlqUnrouted(batch.dlq_entries.len()));
833                }
834                FilterDlqPolicy::DiscardWithMetric => {
835                    #[cfg(feature = "metrics")]
836                    ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
837                        .increment(batch.dlq_entries.len() as u64);
838                }
839                FilterDlqPolicy::Route(sink) => sink(batch.dlq_entries),
840            }
841        }
842        Ok(batch.messages)
843    }
844
845    /// Pause between chunks when memory pressure is detected.
846    ///
847    /// Uses `std::thread::sleep` (not tokio) because `process_mid_tier` and
848    /// `process_raw` are sync methods that run within rayon context. The pause
849    /// happens between chunks (cold path), not per message.
850    #[allow(clippy::unused_self)]
851    fn check_memory_pressure(&self) {
852        #[cfg(feature = "memory")]
853        if let Some(guard) = &self.memory_guard
854            && guard.under_pressure()
855        {
856            tracing::warn!(
857                pause_ms = self.config.memory_pressure_pause_ms,
858                "BatchEngine: memory pressure detected, pausing between chunks"
859            );
860            std::thread::sleep(std::time::Duration::from_millis(
861                self.config.memory_pressure_pause_ms,
862            ));
863        }
864    }
865}
866
867/// RAII lease over in-flight ingress bytes tracked by a [`MemoryGuard`].
868///
869/// Created by [`BatchEngine::lease_ingress`]; releases the accounted bytes
870/// back to the guard on drop so no loop exit path can leak the reservation.
871///
872/// [`MemoryGuard`]: crate::memory::MemoryGuard
873#[cfg(feature = "memory")]
874struct IngressLease<'a> {
875    guard: &'a crate::memory::MemoryGuard,
876    bytes: u64,
877}
878
879#[cfg(feature = "memory")]
880impl Drop for IngressLease<'_> {
881    fn drop(&mut self) {
882        self.guard.release(self.bytes);
883    }
884}
885
886impl std::fmt::Debug for BatchEngine {
887    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888        let mut s = f.debug_struct("BatchEngine");
889        s.field("config", &self.config)
890            .field("pool_max_threads", &self.pool.max_threads())
891            .field("stats", &self.stats.snapshot())
892            .field("interner_len", &self.interner.len())
893            .field("filters", &self.filters);
894        #[cfg(feature = "memory")]
895        s.field("memory_guard", &self.memory_guard.is_some());
896        #[cfg(feature = "transport")]
897        s.field("filter_dlq_policy", &self.filter_dlq_policy);
898        s.finish()
899    }
900}
901
902#[cfg(test)]
903mod engine_tests {
904    use super::*;
905    use bytes::Bytes;
906
907    fn make_json_messages(n: usize) -> Vec<RawMessage> {
908        (0..n)
909            .map(|i| RawMessage {
910                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
911                key: None,
912                headers: vec![],
913                metadata: MessageMetadata {
914                    timestamp_ms: None,
915                    format: types::PayloadFormat::Json,
916                    commit_token: None,
917                },
918            })
919            .collect()
920    }
921
922    fn default_engine() -> BatchEngine {
923        BatchEngine::new(BatchProcessingConfig::default())
924    }
925
926    #[cfg(feature = "transport")]
927    #[test]
928    fn filter_dlq_policy_routes_discards_or_rejects() {
929        use crate::transport::RecvBatch;
930        use crate::transport::filter::FilteredDlqEntry;
931        use std::sync::Arc as StdArc;
932        use std::sync::atomic::{AtomicUsize, Ordering};
933
934        // Minimal CommitToken for the generic helper.
935        #[derive(Clone, Debug)]
936        struct TestTok;
937        impl std::fmt::Display for TestTok {
938            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
939                f.write_str("test")
940            }
941        }
942        impl crate::transport::CommitToken for TestTok {}
943
944        let entry = || FilteredDlqEntry {
945            payload: b"x".to_vec(),
946            key: None,
947            reason: "r".to_string(),
948        };
949
950        // Reject (default): any DLQ entries -> fail fast, not silent drop.
951        let eng = default_engine();
952        let batch = RecvBatch::<TestTok> {
953            messages: vec![],
954            dlq_entries: vec![entry()],
955        };
956        assert!(matches!(
957            eng.apply_filter_dlq_policy(batch),
958            Err(EngineError::FilterDlqUnrouted(1))
959        ));
960        // Reject + no entries -> ok.
961        assert!(
962            eng.apply_filter_dlq_policy(RecvBatch::<TestTok>::from_messages(vec![]))
963                .is_ok()
964        );
965
966        // DiscardWithMetric -> ok (deliberately dropped).
967        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
968        let batch = RecvBatch::<TestTok> {
969            messages: vec![],
970            dlq_entries: vec![entry()],
971        };
972        assert!(eng.apply_filter_dlq_policy(batch).is_ok());
973
974        // Route -> the sink receives every entry.
975        let seen = StdArc::new(AtomicUsize::new(0));
976        let s = StdArc::clone(&seen);
977        let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
978            move |e: Vec<FilteredDlqEntry>| {
979                s.fetch_add(e.len(), Ordering::Relaxed);
980            },
981        )));
982        let batch = RecvBatch::<TestTok> {
983            messages: vec![],
984            dlq_entries: vec![entry(), entry()],
985        };
986        assert!(eng.apply_filter_dlq_policy(batch).is_ok());
987        assert_eq!(
988            seen.load(Ordering::Relaxed),
989            2,
990            "Route sink received all entries"
991        );
992    }
993
994    #[cfg(feature = "memory")]
995    #[test]
996    fn ingress_lease_accounts_and_releases() {
997        use crate::memory::{MemoryGuard, MemoryGuardConfig};
998
999        let mut engine = default_engine();
1000        let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1001            limit_bytes: 1024 * 1024,
1002            ..Default::default()
1003        }));
1004        engine.memory_guard = Some(Arc::clone(&guard));
1005
1006        let msgs = make_json_messages(10);
1007        let expected: u64 = msgs.iter().map(|m| m.payload.len() as u64).sum();
1008        assert_eq!(guard.current_bytes(), 0, "starts at zero");
1009
1010        {
1011            let _lease = engine.lease_ingress(&msgs).expect("guard present");
1012            assert_eq!(
1013                guard.current_bytes(),
1014                expected,
1015                "bytes accounted while lease held"
1016            );
1017        }
1018        // Lease dropped -> bytes released.
1019        assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
1020    }
1021
1022    #[cfg(feature = "memory")]
1023    #[test]
1024    fn ingress_lease_none_without_guard() {
1025        let engine = default_engine();
1026        let msgs = make_json_messages(5);
1027        assert!(
1028            engine.lease_ingress(&msgs).is_none(),
1029            "no lease when no guard wired"
1030        );
1031    }
1032
1033    #[test]
1034    fn process_mid_tier_basic() {
1035        let engine = default_engine();
1036        let msgs = make_json_messages(100);
1037
1038        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
1039            Ok(pm
1040                .field("_table")
1041                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1042                .unwrap_or("unknown")
1043                .to_string())
1044        });
1045
1046        assert_eq!(results.len(), 100);
1047        assert!(results.iter().all(|r| r.is_ok()));
1048        assert_eq!(results[0].as_ref().unwrap(), "events");
1049    }
1050
1051    #[test]
1052    fn process_mid_tier_parse_error() {
1053        let engine = default_engine();
1054        let mut msgs = make_json_messages(2);
1055        // Insert an invalid JSON message
1056        msgs.insert(
1057            1,
1058            RawMessage {
1059                payload: Bytes::from_static(b"not json {{{"),
1060                key: None,
1061                headers: vec![],
1062                metadata: MessageMetadata {
1063                    timestamp_ms: None,
1064                    format: types::PayloadFormat::Json,
1065                    commit_token: None,
1066                },
1067            },
1068        );
1069
1070        let results: Vec<Result<String, String>> =
1071            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
1072
1073        // 2 successful + 1 error (DLQ by default)
1074        assert_eq!(results.len(), 3);
1075        assert!(results[0].is_ok());
1076        assert!(results[1].is_err());
1077        assert!(results[1].as_ref().unwrap_err().contains("parse error"));
1078        assert!(results[2].is_ok());
1079    }
1080
1081    #[test]
1082    fn process_mid_tier_empty_batch() {
1083        let engine = default_engine();
1084        let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
1085        assert!(results.is_empty());
1086    }
1087
1088    #[test]
1089    fn process_mid_tier_respects_chunk_size() {
1090        let config = BatchProcessingConfig {
1091            max_chunk_size: 50,
1092            ..Default::default()
1093        };
1094        let engine = BatchEngine::new(config);
1095        let msgs = make_json_messages(120);
1096
1097        let results: Vec<Result<usize, String>> =
1098            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
1099
1100        assert_eq!(results.len(), 120);
1101        assert!(results.iter().all(|r| r.is_ok()));
1102        // Stats should show 120 received across 3 chunks (50+50+20)
1103        let snap = engine.stats().snapshot();
1104        assert_eq!(snap.received, 120);
1105    }
1106
1107    #[test]
1108    fn stats_updated_after_processing() {
1109        let engine = default_engine();
1110        let msgs = make_json_messages(10);
1111
1112        let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1113
1114        let snap = engine.stats().snapshot();
1115        assert_eq!(snap.received, 10);
1116        assert_eq!(snap.processed, 10);
1117        assert_eq!(snap.errors, 0);
1118        assert_eq!(snap.filtered, 0);
1119    }
1120
1121    #[test]
1122    fn process_raw_passthrough() {
1123        let engine = default_engine();
1124        let msgs = make_json_messages(50);
1125
1126        let results: Vec<Result<usize, String>> =
1127            engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
1128
1129        assert_eq!(results.len(), 50);
1130        assert!(results.iter().all(|r| r.is_ok()));
1131        // All JSON messages have the same format: {"_table":"events","id":N}
1132        assert!(results[0].as_ref().unwrap() > &0);
1133
1134        let snap = engine.stats().snapshot();
1135        assert_eq!(snap.received, 50);
1136        assert_eq!(snap.processed, 50);
1137    }
1138
1139    #[test]
1140    fn process_mid_tier_with_pre_route() {
1141        let config = BatchProcessingConfig {
1142            routing_field: Some("_table".to_string()),
1143            pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
1144                field: "_table".to_string(),
1145                value: "poison".to_string(),
1146            }],
1147            ..Default::default()
1148        };
1149        let engine = BatchEngine::new(config);
1150
1151        let mut msgs = make_json_messages(3);
1152        // Replace middle message with a poison value
1153        msgs[1] = RawMessage {
1154            payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
1155            key: None,
1156            headers: vec![],
1157            metadata: MessageMetadata {
1158                timestamp_ms: None,
1159                format: types::PayloadFormat::Json,
1160                commit_token: None,
1161            },
1162        };
1163
1164        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
1165            Ok(pm
1166                .field("_table")
1167                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1168                .unwrap_or("?")
1169                .to_string())
1170        });
1171
1172        // 2 ok + 1 DLQ error
1173        assert_eq!(results.len(), 3);
1174        assert!(results[0].is_ok());
1175        assert!(results[1].is_err());
1176        assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
1177        assert!(results[2].is_ok());
1178
1179        let snap = engine.stats().snapshot();
1180        assert_eq!(snap.dlq, 1);
1181        assert_eq!(snap.errors, 1);
1182    }
1183
1184    #[test]
1185    fn process_mid_tier_filtered_not_in_results() {
1186        let config = BatchProcessingConfig {
1187            routing_field: Some("_table".to_string()),
1188            pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
1189                field: "_table".to_string(),
1190            }],
1191            ..Default::default()
1192        };
1193        let engine = BatchEngine::new(config);
1194
1195        let mut msgs = make_json_messages(3);
1196        // Replace middle message with one missing _table
1197        msgs[1] = RawMessage {
1198            payload: Bytes::from(r#"{"host":"web1"}"#),
1199            key: None,
1200            headers: vec![],
1201            metadata: MessageMetadata {
1202                timestamp_ms: None,
1203                format: types::PayloadFormat::Json,
1204                commit_token: None,
1205            },
1206        };
1207
1208        let results: Vec<Result<String, String>> =
1209            engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
1210
1211        // Filtered messages are removed -- only 2 results
1212        assert_eq!(results.len(), 2);
1213        assert!(results.iter().all(|r| r.is_ok()));
1214
1215        let snap = engine.stats().snapshot();
1216        assert_eq!(snap.filtered, 1);
1217        assert_eq!(snap.received, 3);
1218    }
1219
1220    #[test]
1221    fn from_cascade_creates_engine() {
1222        let engine = BatchEngine::from_cascade("batch_processing").unwrap();
1223        assert_eq!(engine.config().max_chunk_size, 10_000);
1224    }
1225
1226    #[test]
1227    fn accessors_return_expected_types() {
1228        let engine = default_engine();
1229        let _stats = engine.stats();
1230        let _pool = engine.pool();
1231        let _config = engine.config();
1232        assert_eq!(engine.stats().snapshot().received, 0);
1233    }
1234
1235    #[test]
1236    fn auto_wire_does_not_panic() {
1237        let mut engine = default_engine();
1238        let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1239        engine.auto_wire(
1240            &mgr,
1241            #[cfg(feature = "memory")]
1242            None,
1243        );
1244        // Engine should still work after auto_wire
1245        let msgs = make_json_messages(5);
1246        let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1247        assert_eq!(results.len(), 5);
1248    }
1249
1250    #[test]
1251    fn debug_impl_works() {
1252        let engine = default_engine();
1253        let debug = format!("{engine:?}");
1254        assert!(debug.contains("BatchEngine"));
1255        assert!(debug.contains("config"));
1256    }
1257
1258    #[cfg(feature = "transport-memory")]
1259    mod async_engine_tests {
1260        use super::*;
1261        use std::sync::atomic::{AtomicU64, Ordering};
1262
1263        fn json_payload(table: &str, id: usize) -> Vec<u8> {
1264            format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1265        }
1266
1267        #[tokio::test]
1268        async fn run_async_processes_and_passes_tokens_to_sink() {
1269            let config = crate::transport::memory::MemoryConfig {
1270                recv_timeout_ms: 50,
1271                ..Default::default()
1272            };
1273            let transport = crate::transport::memory::MemoryTransport::new(&config)
1274                .expect("memory transport with valid config must construct");
1275            // Inject 5 messages
1276            for i in 0..5 {
1277                transport
1278                    .inject(None, json_payload("events", i))
1279                    .await
1280                    .unwrap();
1281            }
1282
1283            let engine = default_engine();
1284            let shutdown = tokio_util::sync::CancellationToken::new();
1285            let shutdown_clone = shutdown.clone();
1286
1287            let sink_count = Arc::new(AtomicU64::new(0));
1288            let token_count = Arc::new(AtomicU64::new(0));
1289            let sink_count_clone = Arc::clone(&sink_count);
1290            let token_count_clone = Arc::clone(&token_count);
1291
1292            // Shut down after a short delay
1293            tokio::spawn(async move {
1294                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1295                shutdown_clone.cancel();
1296            });
1297
1298            let result = engine
1299                .run_async(
1300                    &transport,
1301                    shutdown,
1302                    |pm: &mut ParsedMessage| -> Result<String, String> {
1303                        Ok(pm
1304                            .field("_table")
1305                            .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1306                            .unwrap_or("?")
1307                            .to_string())
1308                    },
1309                    |results, tokens| {
1310                        let sc = Arc::clone(&sink_count_clone);
1311                        let tc = Arc::clone(&token_count_clone);
1312                        async move {
1313                            sc.fetch_add(results.len() as u64, Ordering::Relaxed);
1314                            tc.fetch_add(tokens.len() as u64, Ordering::Relaxed);
1315                            // Simulate app committing tokens via receiver
1316                            Ok(())
1317                        }
1318                    },
1319                    None::<(
1320                        std::time::Duration,
1321                        fn() -> std::future::Ready<Result<(), EngineError>>,
1322                    )>,
1323                )
1324                .await;
1325
1326            assert!(result.is_ok());
1327            assert_eq!(sink_count.load(Ordering::Relaxed), 5);
1328            assert_eq!(token_count.load(Ordering::Relaxed), 5);
1329        }
1330
1331        #[tokio::test]
1332        async fn run_async_ticker_fires() {
1333            let config = crate::transport::memory::MemoryConfig {
1334                recv_timeout_ms: 50,
1335                ..Default::default()
1336            };
1337            let transport = crate::transport::memory::MemoryTransport::new(&config)
1338                .expect("memory transport with valid config must construct");
1339            let engine = default_engine();
1340            let shutdown = tokio_util::sync::CancellationToken::new();
1341            let shutdown_clone = shutdown.clone();
1342
1343            let tick_count = Arc::new(AtomicU64::new(0));
1344            let tick_count_clone = Arc::clone(&tick_count);
1345
1346            // Shut down after 350ms -- ticker at 100ms should fire ~2-3 times
1347            tokio::spawn(async move {
1348                tokio::time::sleep(std::time::Duration::from_millis(350)).await;
1349                shutdown_clone.cancel();
1350            });
1351
1352            let result = engine
1353                .run_async(
1354                    &transport,
1355                    shutdown,
1356                    |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1357                    |_results, _tokens| async { Ok(()) },
1358                    Some((std::time::Duration::from_millis(100), move || {
1359                        let tc = Arc::clone(&tick_count_clone);
1360                        async move {
1361                            tc.fetch_add(1, Ordering::Relaxed);
1362                            Ok(())
1363                        }
1364                    })),
1365                )
1366                .await;
1367
1368            assert!(result.is_ok());
1369            let ticks = tick_count.load(Ordering::Relaxed);
1370            assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1371        }
1372
1373        #[tokio::test]
1374        async fn run_raw_async_processes_without_parse() {
1375            let config = crate::transport::memory::MemoryConfig {
1376                recv_timeout_ms: 50,
1377                ..Default::default()
1378            };
1379            let transport = crate::transport::memory::MemoryTransport::new(&config)
1380                .expect("memory transport with valid config must construct");
1381            for i in 0..3 {
1382                transport
1383                    .inject(None, json_payload("logs", i))
1384                    .await
1385                    .unwrap();
1386            }
1387
1388            let engine = default_engine();
1389            let shutdown = tokio_util::sync::CancellationToken::new();
1390            let shutdown_clone = shutdown.clone();
1391
1392            let total_bytes = Arc::new(AtomicU64::new(0));
1393            let total_bytes_clone = Arc::clone(&total_bytes);
1394
1395            tokio::spawn(async move {
1396                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1397                shutdown_clone.cancel();
1398            });
1399
1400            let result = engine
1401                .run_raw_async(
1402                    &transport,
1403                    shutdown,
1404                    |msg: &RawMessage| -> Result<usize, String> { Ok(msg.payload.len()) },
1405                    |results, _tokens| {
1406                        let tb = Arc::clone(&total_bytes_clone);
1407                        async move {
1408                            for len in results.iter().flatten() {
1409                                tb.fetch_add(*len as u64, Ordering::Relaxed);
1410                            }
1411                            Ok(())
1412                        }
1413                    },
1414                    None::<(
1415                        std::time::Duration,
1416                        fn() -> std::future::Ready<Result<(), EngineError>>,
1417                    )>,
1418                )
1419                .await;
1420
1421            assert!(result.is_ok());
1422            assert!(total_bytes.load(Ordering::Relaxed) > 0);
1423        }
1424
1425        #[tokio::test]
1426        async fn run_async_sink_error_does_not_crash() {
1427            let config = crate::transport::memory::MemoryConfig {
1428                recv_timeout_ms: 50,
1429                ..Default::default()
1430            };
1431            let transport = crate::transport::memory::MemoryTransport::new(&config)
1432                .expect("memory transport with valid config must construct");
1433
1434            transport
1435                .inject(None, json_payload("events", 0))
1436                .await
1437                .unwrap();
1438
1439            let engine = default_engine();
1440            let shutdown = tokio_util::sync::CancellationToken::new();
1441            let shutdown_clone = shutdown.clone();
1442
1443            tokio::spawn(async move {
1444                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1445                shutdown_clone.cancel();
1446            });
1447
1448            // Sink always errors -- engine should continue (not crash)
1449            let result = engine
1450                .run_async(
1451                    &transport,
1452                    shutdown,
1453                    |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1454                    |_results, _tokens| async { Err(EngineError::Sink("test sink error".into())) },
1455                    None::<(
1456                        std::time::Duration,
1457                        fn() -> std::future::Ready<Result<(), EngineError>>,
1458                    )>,
1459                )
1460                .await;
1461
1462            // Should shut down cleanly (not propagate sink error)
1463            assert!(result.is_ok());
1464        }
1465    }
1466}