Skip to main content

hyperi_rustlib/worker/engine/
mod.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/engine/mod.rs
3// Purpose:   SIMD-optimised batch processing engine for DFE pipelines
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9pub mod config;
10pub mod intern;
11pub mod metrics;
12pub mod parse;
13pub mod pre_route;
14pub mod types;
15
16pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
17pub use intern::FieldInterner;
18pub use types::{MessageMetadata, ParsedMessage, PreRouteResult, RawMessage};
19
20/// Errors returned by [`BatchEngine::run`] and [`BatchEngine::run_raw`].
21///
22/// Only available when the `transport` feature is enabled.
23#[cfg(feature = "transport")]
24#[derive(Debug, thiserror::Error)]
25pub enum EngineError {
26    /// Transport receive or commit failed.
27    #[error("transport error: {0}")]
28    Transport(#[from] crate::TransportError),
29    /// Sink callback returned an error.
30    #[error("sink error: {0}")]
31    Sink(String),
32    /// Shutdown was requested via cancellation token.
33    #[error("shutdown")]
34    Shutdown,
35}
36
37use std::sync::Arc;
38
39use rayon::prelude::*;
40
41use super::pool::AdaptiveWorkerPool;
42use super::stats::PipelineStats;
43
44use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field, filters_from_config};
45use self::types::PayloadFormat;
46use super::config::WorkerPoolConfig;
47
48/// Core batch processing engine for DFE pipelines.
49///
50/// Provides two processing modes:
51///
52/// - [`process_mid_tier`](Self::process_mid_tier) -- parse JSON via SIMD, extract
53///   known fields, apply pre-route filters, then parallel transform via rayon.
54///   The standard path for most DFE apps (loader, archiver, transforms).
55///
56/// - [`process_raw`](Self::process_raw) -- skip parsing, apply pre-route on raw
57///   bytes, then parallel transform via rayon. For apps that handle raw bytes
58///   (receiver, binary protocols).
59///
60/// Both modes chunk large batches, track stats atomically, and pause between
61/// chunks when memory pressure is detected.
62pub struct BatchEngine {
63    config: BatchProcessingConfig,
64    pool: Arc<AdaptiveWorkerPool>,
65    stats: Arc<PipelineStats>,
66    interner: Arc<FieldInterner>,
67    filters: Vec<pre_route::PreRouteFilter>,
68    #[cfg(feature = "memory")]
69    memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
70}
71
72impl BatchEngine {
73    /// Create a standalone engine with its own worker pool.
74    ///
75    /// Uses `WorkerPoolConfig::default()` for the pool. Prefer
76    /// [`with_pool`](Self::with_pool) when a `ServiceRuntime` pool exists.
77    #[must_use]
78    pub fn new(config: BatchProcessingConfig) -> Self {
79        let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
80        Self::with_pool(pool, config)
81    }
82
83    /// Create an engine that reuses an existing worker pool.
84    ///
85    /// This is the preferred constructor when `ServiceRuntime` is available,
86    /// as it avoids creating a second rayon thread pool.
87    #[must_use]
88    pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
89        let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
90        let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
91        let filters = filters_from_config(&config.pre_route_filters);
92        Self {
93            config,
94            pool,
95            stats: Arc::new(PipelineStats::new()),
96            interner,
97            filters,
98            #[cfg(feature = "memory")]
99            memory_guard: None,
100        }
101    }
102
103    /// Load configuration from the cascade and create a standalone engine.
104    ///
105    /// # Errors
106    ///
107    /// Returns `ConfigError` if the cascade contains invalid data.
108    pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
109        let config = BatchProcessingConfig::from_cascade(key)?;
110        Ok(Self::new(config))
111    }
112
113    /// Pipeline statistics (atomic, lock-free).
114    #[must_use]
115    pub fn stats(&self) -> &Arc<PipelineStats> {
116        &self.stats
117    }
118
119    /// Underlying worker pool.
120    #[must_use]
121    pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
122        &self.pool
123    }
124
125    /// Engine configuration.
126    #[must_use]
127    pub fn config(&self) -> &BatchProcessingConfig {
128        &self.config
129    }
130
131    /// Auto-wire engine with infrastructure components.
132    ///
133    /// Called by `ServiceRuntime::build()`. Apps never call this directly.
134    pub fn auto_wire(
135        &mut self,
136        metrics_manager: &crate::metrics::MetricsManager,
137        #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
138    ) {
139        metrics::register(metrics_manager, &self.config);
140
141        #[cfg(feature = "memory")]
142        if let Some(guard) = memory_guard {
143            self.memory_guard = Some(Arc::clone(guard));
144        }
145    }
146
147    /// Parse, filter, and transform a batch of raw messages.
148    ///
149    /// Pipeline phases per chunk:
150    /// 1. **Pre-route** -- SIMD field extraction + filter evaluation (sequential, ~100 ns/msg)
151    /// 2. **Parse** -- `sonic_rs::from_slice` + known-field extraction (sequential, ~1-5 µs/msg)
152    /// 3. **Transform** -- user closure via rayon `par_iter_mut` (parallel)
153    ///
154    /// Results contain one entry per non-filtered message. Filtered messages are
155    /// silently removed (their commit tokens remain accessible via the original
156    /// slice). DLQ'd and parse-error messages produce `Err` entries.
157    pub fn process_mid_tier<O, E, F>(
158        &self,
159        messages: &[RawMessage],
160        transform: F,
161    ) -> Vec<Result<O, E>>
162    where
163        O: Send,
164        E: Send + From<String>,
165        F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
166    {
167        if messages.is_empty() {
168            return Vec::new();
169        }
170
171        let chunk_size = if self.config.max_chunk_size == 0 {
172            messages.len()
173        } else {
174            self.config.max_chunk_size
175        };
176
177        let has_routing = self.config.routing_field.is_some();
178        let mut all_results = Vec::with_capacity(messages.len());
179
180        for chunk in messages.chunks(chunk_size) {
181            self.stats.add_received(chunk.len() as u64);
182
183            // Accumulate bytes received
184            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
185            self.stats.add_bytes_received(chunk_bytes);
186
187            // Phase 1 + 2: Pre-route and parse, building ParsedMessage vec.
188            // Track which messages are included (not filtered).
189            let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
190                Vec::with_capacity(chunk.len());
191
192            for msg in chunk {
193                // Phase 1: Pre-route
194                if has_routing {
195                    let field_name = self.config.routing_field.as_ref().expect("checked above");
196                    let extraction = extract_routing_field(&msg.payload, field_name);
197                    let outcome = apply_filters(&extraction, &self.filters);
198
199                    match outcome {
200                        PreRouteOutcome::Continue => {}
201                        PreRouteOutcome::Filtered => {
202                            self.stats.incr_filtered();
203                            continue; // skip this message entirely
204                        }
205                        PreRouteOutcome::Dlq(reason) => {
206                            self.stats.incr_dlq();
207                            self.stats.incr_errors();
208                            parsed_msgs.push(Err(reason));
209                            continue;
210                        }
211                    }
212                }
213
214                // Phase 2: Parse
215                let format = match msg.metadata.format {
216                    PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
217                    other => other,
218                };
219
220                match parse::parse_payload(&msg.payload, format) {
221                    Ok(value) => {
222                        let extracted = self.interner.extract_known(&value);
223                        parsed_msgs.push(Ok(ParsedMessage::Parsed {
224                            value,
225                            raw: msg.payload.clone(),
226                            format,
227                            key: msg.key.clone(),
228                            headers: msg.headers.clone(),
229                            metadata: msg.metadata.clone(),
230                            extracted,
231                        }));
232                    }
233                    Err(e) => {
234                        self.stats.incr_errors();
235                        match self.config.parse_error_action {
236                            ParseErrorAction::Dlq => {
237                                self.stats.incr_dlq();
238                                parsed_msgs.push(Err(format!("parse error: {e}")));
239                            }
240                            ParseErrorAction::Skip => {
241                                // Counted in errors above, not added to results
242                            }
243                            ParseErrorAction::FailBatch => {
244                                // Return all accumulated results + this error,
245                                // then stop processing the chunk.
246                                parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
247                                let results: Vec<Result<O, E>> = parsed_msgs
248                                    .into_iter()
249                                    .map(|r| match r {
250                                        Ok(_) => Err(E::from(
251                                            "batch failed due to parse error".to_string(),
252                                        )),
253                                        Err(reason) => Err(E::from(reason)),
254                                    })
255                                    .collect();
256                                all_results.extend(results);
257                                return all_results;
258                            }
259                        }
260                    }
261                }
262            }
263
264            // Phase 3: Parallel transform via rayon.
265            // Split into ok/err: transform only the Ok entries.
266            let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
267                parsed_msgs.into_iter().enumerate().collect();
268
269            // Separate errors from parseable messages
270            let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
271            let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
272
273            for (idx, item) in indexed.drain(..) {
274                match item {
275                    Ok(pm) => to_transform.push((idx, pm)),
276                    Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
277                }
278            }
279
280            // Parallel transform
281            let transformed: Vec<(usize, Result<O, E>)> = self.pool.install(|| {
282                to_transform
283                    .into_par_iter()
284                    .map(|(idx, mut pm)| {
285                        let result = transform(&mut pm);
286                        (idx, result)
287                    })
288                    .collect()
289            });
290
291            chunk_results.extend(transformed);
292
293            // Sort by original index to preserve order
294            chunk_results.sort_by_key(|(idx, _)| *idx);
295
296            // Update stats
297            let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
298            self.stats.add_processed(ok_count as u64);
299
300            all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
301
302            // Memory pressure check between chunks
303            self.check_memory_pressure();
304        }
305
306        all_results
307    }
308
309    /// Pre-route and transform a batch of raw messages without parsing.
310    ///
311    /// The transform closure receives immutable `&RawMessage` references.
312    /// Use this for apps that handle raw bytes directly (e.g. receiver forwarding).
313    pub fn process_raw<O, E, F>(&self, messages: &[RawMessage], transform: F) -> Vec<Result<O, E>>
314    where
315        O: Send,
316        E: Send + From<String>,
317        F: Fn(&RawMessage) -> Result<O, E> + Sync,
318    {
319        if messages.is_empty() {
320            return Vec::new();
321        }
322
323        let chunk_size = if self.config.max_chunk_size == 0 {
324            messages.len()
325        } else {
326            self.config.max_chunk_size
327        };
328
329        let has_routing = self.config.routing_field.is_some();
330        let mut all_results = Vec::with_capacity(messages.len());
331
332        for chunk in messages.chunks(chunk_size) {
333            self.stats.add_received(chunk.len() as u64);
334
335            let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
336            self.stats.add_bytes_received(chunk_bytes);
337
338            // Phase 1: Pre-route filter
339            let to_process: Vec<&RawMessage> = if has_routing {
340                let field_name = self.config.routing_field.as_ref().expect("checked above");
341                let mut passed = Vec::with_capacity(chunk.len());
342                for msg in chunk {
343                    let extraction = extract_routing_field(&msg.payload, field_name);
344                    let outcome = apply_filters(&extraction, &self.filters);
345                    match outcome {
346                        PreRouteOutcome::Continue => passed.push(msg),
347                        PreRouteOutcome::Filtered => {
348                            self.stats.incr_filtered();
349                        }
350                        PreRouteOutcome::Dlq(reason) => {
351                            self.stats.incr_dlq();
352                            self.stats.incr_errors();
353                            all_results.push(Err(E::from(reason)));
354                        }
355                    }
356                }
357                passed
358            } else {
359                chunk.iter().collect()
360            };
361
362            // Phase 2: Parallel transform via process_batch
363            let results = self.pool.process_batch(&to_process, |msg| transform(msg));
364
365            let ok_count = results.iter().filter(|r| r.is_ok()).count();
366            self.stats.add_processed(ok_count as u64);
367
368            all_results.extend(results);
369
370            self.check_memory_pressure();
371        }
372
373        all_results
374    }
375
376    /// Transport-wired async run loop for mid-tier (parsed JSON) processing.
377    ///
378    /// Receives up to `config.max_chunk_size` messages per iteration, converts them
379    /// to `RawMessage`, runs [`process_mid_tier`](Self::process_mid_tier), calls the
380    /// sink, then commits transport tokens only on sink success.
381    ///
382    /// Stops cleanly when `shutdown` is cancelled.
383    ///
384    /// # Errors
385    ///
386    /// Returns `EngineError::Transport` if recv or commit fails fatally.
387    /// Returns `EngineError::Sink` if the sink callback errors (commit skipped).
388    #[cfg(feature = "transport")]
389    pub async fn run<R, O, E, Transform, Sink>(
390        &self,
391        receiver: &R,
392        shutdown: tokio_util::sync::CancellationToken,
393        transform: Transform,
394        mut sink: Sink,
395    ) -> Result<(), EngineError>
396    where
397        R: crate::transport::TransportReceiver,
398        O: Send + 'static,
399        E: Send + From<String> + std::fmt::Display + 'static,
400        Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
401        Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
402    {
403        tracing::info!(
404            chunk_size = self.config.max_chunk_size,
405            routing_field = ?self.config.routing_field,
406            "BatchEngine starting"
407        );
408
409        loop {
410            tokio::select! {
411                biased;
412                () = shutdown.cancelled() => {
413                    tracing::info!("BatchEngine shutting down");
414                    return Ok(());
415                }
416                recv_result = receiver.recv(self.config.max_chunk_size) => {
417                    let messages = recv_result.map_err(EngineError::Transport)?;
418                    if messages.is_empty() {
419                        continue;
420                    }
421
422                    // Collect commit tokens before converting (move happens below).
423                    let tokens: Vec<R::Token> = messages.iter()
424                        .map(|m| m.token.clone())
425                        .collect();
426
427                    // Convert to RawMessage (type-erased).
428                    let raw: Vec<RawMessage> = messages.into_iter()
429                        .map(RawMessage::from)
430                        .collect();
431
432                    // Process: pre-route + parse + parallel transform.
433                    let results = self.process_mid_tier(&raw, &transform);
434
435                    // Sink: commit only on success.
436                    if let Err(e) = sink(results) {
437                        tracing::error!(error = %e, "Sink failed, skipping commit");
438                        continue;
439                    }
440
441                    if let Err(e) = receiver.commit(&tokens).await {
442                        tracing::error!(error = %e, "Commit failed");
443                    }
444                }
445            }
446        }
447    }
448
449    /// Transport-wired async run loop for raw byte processing.
450    ///
451    /// Like [`run`](Self::run) but uses [`process_raw`](Self::process_raw): the
452    /// transform closure receives `&RawMessage` bytes without JSON parsing.
453    ///
454    /// # Errors
455    ///
456    /// Returns `EngineError::Transport` if recv or commit fails fatally.
457    /// Returns `EngineError::Sink` if the sink callback errors (commit skipped).
458    #[cfg(feature = "transport")]
459    pub async fn run_raw<R, O, E, Transform, Sink>(
460        &self,
461        receiver: &R,
462        shutdown: tokio_util::sync::CancellationToken,
463        transform: Transform,
464        mut sink: Sink,
465    ) -> Result<(), EngineError>
466    where
467        R: crate::transport::TransportReceiver,
468        O: Send + 'static,
469        E: Send + From<String> + std::fmt::Display + 'static,
470        Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
471        Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
472    {
473        tracing::info!(
474            chunk_size = self.config.max_chunk_size,
475            "BatchEngine (raw) starting"
476        );
477
478        loop {
479            tokio::select! {
480                biased;
481                () = shutdown.cancelled() => {
482                    tracing::info!("BatchEngine (raw) shutting down");
483                    return Ok(());
484                }
485                recv_result = receiver.recv(self.config.max_chunk_size) => {
486                    let messages = recv_result.map_err(EngineError::Transport)?;
487                    if messages.is_empty() {
488                        continue;
489                    }
490
491                    let tokens: Vec<R::Token> = messages.iter()
492                        .map(|m| m.token.clone())
493                        .collect();
494
495                    let raw: Vec<RawMessage> = messages.into_iter()
496                        .map(RawMessage::from)
497                        .collect();
498
499                    let results = self.process_raw(&raw, &transform);
500
501                    if let Err(e) = sink(results) {
502                        tracing::error!(error = %e, "Sink failed (raw), skipping commit");
503                        continue;
504                    }
505
506                    if let Err(e) = receiver.commit(&tokens).await {
507                        tracing::error!(error = %e, "Commit failed (raw)");
508                    }
509                }
510            }
511        }
512    }
513
514    /// Transport-wired async run loop with full control over commit semantics.
515    ///
516    /// Like [`run`](Self::run) but with two key differences:
517    ///
518    /// 1. **Async sink** -- the sink is an async closure, enabling async I/O
519    ///    (ClickHouse inserts, Kafka produce, storage writes) inside the sink.
520    ///
521    /// 2. **Sink-managed commits** -- the sink receives commit tokens and decides
522    ///    when to commit. The engine does NOT auto-commit. This enables deferred
523    ///    commit patterns (e.g., commit after ClickHouse flush, not after buffer push).
524    ///
525    /// An optional ticker fires on a fixed interval within the select! loop,
526    /// enabling flush timers and periodic maintenance without breaking out of
527    /// the engine loop.
528    ///
529    /// # Errors
530    ///
531    /// Returns `EngineError::Transport` if recv fails fatally.
532    /// Returns `EngineError::Sink` if the sink callback errors.
533    #[cfg(feature = "transport")]
534    pub async fn run_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
535        &self,
536        receiver: &R,
537        shutdown: tokio_util::sync::CancellationToken,
538        transform: Transform,
539        mut sink: Sink,
540        ticker: Option<(std::time::Duration, Ticker)>,
541    ) -> Result<(), EngineError>
542    where
543        R: crate::transport::TransportReceiver,
544        O: Send + 'static,
545        E: Send + From<String> + std::fmt::Display + 'static,
546        Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
547        Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
548        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
549        Ticker: FnMut() -> TickerFut,
550        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
551    {
552        tracing::info!(
553            chunk_size = self.config.max_chunk_size,
554            routing_field = ?self.config.routing_field,
555            ticker = ticker.is_some(),
556            "BatchEngine (async) starting"
557        );
558
559        // Ticker interval -- if None, create a dummy that never fires.
560        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
561        let mut ticker_fn = ticker.map(|(_, f)| f);
562
563        // Consume the first tick (fires immediately).
564        if let Some(ref mut interval) = tick_interval {
565            interval.tick().await;
566        }
567
568        loop {
569            tokio::select! {
570                biased;
571
572                () = shutdown.cancelled() => {
573                    tracing::info!("BatchEngine (async) shutting down");
574                    return Ok(());
575                }
576
577                _ = async {
578                    match tick_interval.as_mut() {
579                        Some(interval) => interval.tick().await,
580                        None => std::future::pending().await,
581                    }
582                } => {
583                    if let Some(ref mut f) = ticker_fn
584                        && let Err(e) = f().await
585                    {
586                        tracing::error!(error = %e, "Ticker failed");
587                    }
588                }
589
590                recv_result = receiver.recv(self.config.max_chunk_size) => {
591                    let messages = recv_result.map_err(EngineError::Transport)?;
592                    if messages.is_empty() {
593                        continue;
594                    }
595
596                    // Collect commit tokens before converting (move happens below).
597                    let tokens: Vec<R::Token> = messages.iter()
598                        .map(|m| m.token.clone())
599                        .collect();
600
601                    // Convert to RawMessage (type-erased).
602                    let raw: Vec<RawMessage> = messages.into_iter()
603                        .map(RawMessage::from)
604                        .collect();
605
606                    // Process: pre-route + parse + parallel transform.
607                    let results = self.process_mid_tier(&raw, &transform);
608
609                    // Sink receives results AND tokens -- sink decides when to commit.
610                    if let Err(e) = sink(results, tokens).await {
611                        tracing::error!(error = %e, "Sink failed (async)");
612                    }
613                }
614            }
615        }
616    }
617
618    /// Transport-wired async run loop for raw byte processing with full control.
619    ///
620    /// Like [`run_async`](Self::run_async) but uses [`process_raw`](Self::process_raw):
621    /// the transform closure receives `&RawMessage` bytes without JSON parsing.
622    ///
623    /// # Errors
624    ///
625    /// Returns `EngineError::Transport` if recv fails fatally.
626    /// Returns `EngineError::Sink` if the sink callback errors.
627    #[cfg(feature = "transport")]
628    pub async fn run_raw_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
629        &self,
630        receiver: &R,
631        shutdown: tokio_util::sync::CancellationToken,
632        transform: Transform,
633        mut sink: Sink,
634        ticker: Option<(std::time::Duration, Ticker)>,
635    ) -> Result<(), EngineError>
636    where
637        R: crate::transport::TransportReceiver,
638        O: Send + 'static,
639        E: Send + From<String> + std::fmt::Display + 'static,
640        Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
641        Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
642        SinkFut: std::future::Future<Output = Result<(), EngineError>>,
643        Ticker: FnMut() -> TickerFut,
644        TickerFut: std::future::Future<Output = Result<(), EngineError>>,
645    {
646        tracing::info!(
647            chunk_size = self.config.max_chunk_size,
648            ticker = ticker.is_some(),
649            "BatchEngine (raw async) starting"
650        );
651
652        let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
653        let mut ticker_fn = ticker.map(|(_, f)| f);
654
655        if let Some(ref mut interval) = tick_interval {
656            interval.tick().await;
657        }
658
659        loop {
660            tokio::select! {
661                biased;
662
663                () = shutdown.cancelled() => {
664                    tracing::info!("BatchEngine (raw async) shutting down");
665                    return Ok(());
666                }
667
668                _ = async {
669                    match tick_interval.as_mut() {
670                        Some(interval) => interval.tick().await,
671                        None => std::future::pending().await,
672                    }
673                } => {
674                    if let Some(ref mut f) = ticker_fn
675                        && let Err(e) = f().await
676                    {
677                        tracing::error!(error = %e, "Ticker (raw) failed");
678                    }
679                }
680
681                recv_result = receiver.recv(self.config.max_chunk_size) => {
682                    let messages = recv_result.map_err(EngineError::Transport)?;
683                    if messages.is_empty() {
684                        continue;
685                    }
686
687                    let tokens: Vec<R::Token> = messages.iter()
688                        .map(|m| m.token.clone())
689                        .collect();
690
691                    let raw: Vec<RawMessage> = messages.into_iter()
692                        .map(RawMessage::from)
693                        .collect();
694
695                    let results = self.process_raw(&raw, &transform);
696
697                    if let Err(e) = sink(results, tokens).await {
698                        tracing::error!(error = %e, "Sink failed (raw async)");
699                    }
700                }
701            }
702        }
703    }
704
705    /// Pause between chunks when memory pressure is detected.
706    ///
707    /// Uses `std::thread::sleep` (not tokio) because `process_mid_tier` and
708    /// `process_raw` are sync methods that run within rayon context. The pause
709    /// happens between chunks (cold path), not per message.
710    #[allow(clippy::unused_self)]
711    fn check_memory_pressure(&self) {
712        #[cfg(feature = "memory")]
713        if let Some(guard) = &self.memory_guard
714            && guard.under_pressure()
715        {
716            tracing::warn!(
717                pause_ms = self.config.memory_pressure_pause_ms,
718                "BatchEngine: memory pressure detected, pausing between chunks"
719            );
720            std::thread::sleep(std::time::Duration::from_millis(
721                self.config.memory_pressure_pause_ms,
722            ));
723        }
724    }
725}
726
727impl std::fmt::Debug for BatchEngine {
728    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729        let mut s = f.debug_struct("BatchEngine");
730        s.field("config", &self.config)
731            .field("pool_max_threads", &self.pool.max_threads())
732            .field("stats", &self.stats.snapshot())
733            .field("interner_len", &self.interner.len())
734            .field("filters", &self.filters);
735        #[cfg(feature = "memory")]
736        s.field("memory_guard", &self.memory_guard.is_some());
737        s.finish()
738    }
739}
740
741#[cfg(test)]
742mod engine_tests {
743    use super::*;
744    use bytes::Bytes;
745
746    fn make_json_messages(n: usize) -> Vec<RawMessage> {
747        (0..n)
748            .map(|i| RawMessage {
749                payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
750                key: None,
751                headers: vec![],
752                metadata: MessageMetadata {
753                    timestamp_ms: None,
754                    format: types::PayloadFormat::Json,
755                    commit_token: None,
756                },
757            })
758            .collect()
759    }
760
761    fn default_engine() -> BatchEngine {
762        BatchEngine::new(BatchProcessingConfig::default())
763    }
764
765    #[test]
766    fn process_mid_tier_basic() {
767        let engine = default_engine();
768        let msgs = make_json_messages(100);
769
770        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
771            Ok(pm
772                .field("_table")
773                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
774                .unwrap_or("unknown")
775                .to_string())
776        });
777
778        assert_eq!(results.len(), 100);
779        assert!(results.iter().all(|r| r.is_ok()));
780        assert_eq!(results[0].as_ref().unwrap(), "events");
781    }
782
783    #[test]
784    fn process_mid_tier_parse_error() {
785        let engine = default_engine();
786        let mut msgs = make_json_messages(2);
787        // Insert an invalid JSON message
788        msgs.insert(
789            1,
790            RawMessage {
791                payload: Bytes::from_static(b"not json {{{"),
792                key: None,
793                headers: vec![],
794                metadata: MessageMetadata {
795                    timestamp_ms: None,
796                    format: types::PayloadFormat::Json,
797                    commit_token: None,
798                },
799            },
800        );
801
802        let results: Vec<Result<String, String>> =
803            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
804
805        // 2 successful + 1 error (DLQ by default)
806        assert_eq!(results.len(), 3);
807        assert!(results[0].is_ok());
808        assert!(results[1].is_err());
809        assert!(results[1].as_ref().unwrap_err().contains("parse error"));
810        assert!(results[2].is_ok());
811    }
812
813    #[test]
814    fn process_mid_tier_empty_batch() {
815        let engine = default_engine();
816        let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
817        assert!(results.is_empty());
818    }
819
820    #[test]
821    fn process_mid_tier_respects_chunk_size() {
822        let config = BatchProcessingConfig {
823            max_chunk_size: 50,
824            ..Default::default()
825        };
826        let engine = BatchEngine::new(config);
827        let msgs = make_json_messages(120);
828
829        let results: Vec<Result<usize, String>> =
830            engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
831
832        assert_eq!(results.len(), 120);
833        assert!(results.iter().all(|r| r.is_ok()));
834        // Stats should show 120 received across 3 chunks (50+50+20)
835        let snap = engine.stats().snapshot();
836        assert_eq!(snap.received, 120);
837    }
838
839    #[test]
840    fn stats_updated_after_processing() {
841        let engine = default_engine();
842        let msgs = make_json_messages(10);
843
844        let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
845
846        let snap = engine.stats().snapshot();
847        assert_eq!(snap.received, 10);
848        assert_eq!(snap.processed, 10);
849        assert_eq!(snap.errors, 0);
850        assert_eq!(snap.filtered, 0);
851    }
852
853    #[test]
854    fn process_raw_passthrough() {
855        let engine = default_engine();
856        let msgs = make_json_messages(50);
857
858        let results: Vec<Result<usize, String>> =
859            engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
860
861        assert_eq!(results.len(), 50);
862        assert!(results.iter().all(|r| r.is_ok()));
863        // All JSON messages have the same format: {"_table":"events","id":N}
864        assert!(results[0].as_ref().unwrap() > &0);
865
866        let snap = engine.stats().snapshot();
867        assert_eq!(snap.received, 50);
868        assert_eq!(snap.processed, 50);
869    }
870
871    #[test]
872    fn process_mid_tier_with_pre_route() {
873        let config = BatchProcessingConfig {
874            routing_field: Some("_table".to_string()),
875            pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
876                field: "_table".to_string(),
877                value: "poison".to_string(),
878            }],
879            ..Default::default()
880        };
881        let engine = BatchEngine::new(config);
882
883        let mut msgs = make_json_messages(3);
884        // Replace middle message with a poison value
885        msgs[1] = RawMessage {
886            payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
887            key: None,
888            headers: vec![],
889            metadata: MessageMetadata {
890                timestamp_ms: None,
891                format: types::PayloadFormat::Json,
892                commit_token: None,
893            },
894        };
895
896        let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
897            Ok(pm
898                .field("_table")
899                .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
900                .unwrap_or("?")
901                .to_string())
902        });
903
904        // 2 ok + 1 DLQ error
905        assert_eq!(results.len(), 3);
906        assert!(results[0].is_ok());
907        assert!(results[1].is_err());
908        assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
909        assert!(results[2].is_ok());
910
911        let snap = engine.stats().snapshot();
912        assert_eq!(snap.dlq, 1);
913        assert_eq!(snap.errors, 1);
914    }
915
916    #[test]
917    fn process_mid_tier_filtered_not_in_results() {
918        let config = BatchProcessingConfig {
919            routing_field: Some("_table".to_string()),
920            pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
921                field: "_table".to_string(),
922            }],
923            ..Default::default()
924        };
925        let engine = BatchEngine::new(config);
926
927        let mut msgs = make_json_messages(3);
928        // Replace middle message with one missing _table
929        msgs[1] = RawMessage {
930            payload: Bytes::from(r#"{"host":"web1"}"#),
931            key: None,
932            headers: vec![],
933            metadata: MessageMetadata {
934                timestamp_ms: None,
935                format: types::PayloadFormat::Json,
936                commit_token: None,
937            },
938        };
939
940        let results: Vec<Result<String, String>> =
941            engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
942
943        // Filtered messages are removed -- only 2 results
944        assert_eq!(results.len(), 2);
945        assert!(results.iter().all(|r| r.is_ok()));
946
947        let snap = engine.stats().snapshot();
948        assert_eq!(snap.filtered, 1);
949        assert_eq!(snap.received, 3);
950    }
951
952    #[test]
953    fn from_cascade_creates_engine() {
954        let engine = BatchEngine::from_cascade("batch_processing").unwrap();
955        assert_eq!(engine.config().max_chunk_size, 10_000);
956    }
957
958    #[test]
959    fn accessors_return_expected_types() {
960        let engine = default_engine();
961        let _stats = engine.stats();
962        let _pool = engine.pool();
963        let _config = engine.config();
964        assert_eq!(engine.stats().snapshot().received, 0);
965    }
966
967    #[test]
968    fn auto_wire_does_not_panic() {
969        let mut engine = default_engine();
970        let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
971        engine.auto_wire(
972            &mgr,
973            #[cfg(feature = "memory")]
974            None,
975        );
976        // Engine should still work after auto_wire
977        let msgs = make_json_messages(5);
978        let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
979        assert_eq!(results.len(), 5);
980    }
981
982    #[test]
983    fn debug_impl_works() {
984        let engine = default_engine();
985        let debug = format!("{engine:?}");
986        assert!(debug.contains("BatchEngine"));
987        assert!(debug.contains("config"));
988    }
989
990    #[cfg(feature = "transport-memory")]
991    mod async_engine_tests {
992        use super::*;
993        use std::sync::atomic::{AtomicU64, Ordering};
994
995        fn json_payload(table: &str, id: usize) -> Vec<u8> {
996            format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
997        }
998
999        #[tokio::test]
1000        async fn run_async_processes_and_passes_tokens_to_sink() {
1001            let config = crate::transport::memory::MemoryConfig {
1002                recv_timeout_ms: 50,
1003                ..Default::default()
1004            };
1005            let transport = crate::transport::memory::MemoryTransport::new(&config)
1006                .expect("memory transport with valid config must construct");
1007            // Inject 5 messages
1008            for i in 0..5 {
1009                transport
1010                    .inject(None, json_payload("events", i))
1011                    .await
1012                    .unwrap();
1013            }
1014
1015            let engine = default_engine();
1016            let shutdown = tokio_util::sync::CancellationToken::new();
1017            let shutdown_clone = shutdown.clone();
1018
1019            let sink_count = Arc::new(AtomicU64::new(0));
1020            let token_count = Arc::new(AtomicU64::new(0));
1021            let sink_count_clone = Arc::clone(&sink_count);
1022            let token_count_clone = Arc::clone(&token_count);
1023
1024            // Shut down after a short delay
1025            tokio::spawn(async move {
1026                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1027                shutdown_clone.cancel();
1028            });
1029
1030            let result = engine
1031                .run_async(
1032                    &transport,
1033                    shutdown,
1034                    |pm: &mut ParsedMessage| -> Result<String, String> {
1035                        Ok(pm
1036                            .field("_table")
1037                            .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1038                            .unwrap_or("?")
1039                            .to_string())
1040                    },
1041                    |results, tokens| {
1042                        let sc = Arc::clone(&sink_count_clone);
1043                        let tc = Arc::clone(&token_count_clone);
1044                        async move {
1045                            sc.fetch_add(results.len() as u64, Ordering::Relaxed);
1046                            tc.fetch_add(tokens.len() as u64, Ordering::Relaxed);
1047                            // Simulate app committing tokens via receiver
1048                            Ok(())
1049                        }
1050                    },
1051                    None::<(
1052                        std::time::Duration,
1053                        fn() -> std::future::Ready<Result<(), EngineError>>,
1054                    )>,
1055                )
1056                .await;
1057
1058            assert!(result.is_ok());
1059            assert_eq!(sink_count.load(Ordering::Relaxed), 5);
1060            assert_eq!(token_count.load(Ordering::Relaxed), 5);
1061        }
1062
1063        #[tokio::test]
1064        async fn run_async_ticker_fires() {
1065            let config = crate::transport::memory::MemoryConfig {
1066                recv_timeout_ms: 50,
1067                ..Default::default()
1068            };
1069            let transport = crate::transport::memory::MemoryTransport::new(&config)
1070                .expect("memory transport with valid config must construct");
1071            let engine = default_engine();
1072            let shutdown = tokio_util::sync::CancellationToken::new();
1073            let shutdown_clone = shutdown.clone();
1074
1075            let tick_count = Arc::new(AtomicU64::new(0));
1076            let tick_count_clone = Arc::clone(&tick_count);
1077
1078            // Shut down after 350ms -- ticker at 100ms should fire ~2-3 times
1079            tokio::spawn(async move {
1080                tokio::time::sleep(std::time::Duration::from_millis(350)).await;
1081                shutdown_clone.cancel();
1082            });
1083
1084            let result = engine
1085                .run_async(
1086                    &transport,
1087                    shutdown,
1088                    |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1089                    |_results, _tokens| async { Ok(()) },
1090                    Some((std::time::Duration::from_millis(100), move || {
1091                        let tc = Arc::clone(&tick_count_clone);
1092                        async move {
1093                            tc.fetch_add(1, Ordering::Relaxed);
1094                            Ok(())
1095                        }
1096                    })),
1097                )
1098                .await;
1099
1100            assert!(result.is_ok());
1101            let ticks = tick_count.load(Ordering::Relaxed);
1102            assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1103        }
1104
1105        #[tokio::test]
1106        async fn run_raw_async_processes_without_parse() {
1107            let config = crate::transport::memory::MemoryConfig {
1108                recv_timeout_ms: 50,
1109                ..Default::default()
1110            };
1111            let transport = crate::transport::memory::MemoryTransport::new(&config)
1112                .expect("memory transport with valid config must construct");
1113            for i in 0..3 {
1114                transport
1115                    .inject(None, json_payload("logs", i))
1116                    .await
1117                    .unwrap();
1118            }
1119
1120            let engine = default_engine();
1121            let shutdown = tokio_util::sync::CancellationToken::new();
1122            let shutdown_clone = shutdown.clone();
1123
1124            let total_bytes = Arc::new(AtomicU64::new(0));
1125            let total_bytes_clone = Arc::clone(&total_bytes);
1126
1127            tokio::spawn(async move {
1128                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1129                shutdown_clone.cancel();
1130            });
1131
1132            let result = engine
1133                .run_raw_async(
1134                    &transport,
1135                    shutdown,
1136                    |msg: &RawMessage| -> Result<usize, String> { Ok(msg.payload.len()) },
1137                    |results, _tokens| {
1138                        let tb = Arc::clone(&total_bytes_clone);
1139                        async move {
1140                            for len in results.iter().flatten() {
1141                                tb.fetch_add(*len as u64, Ordering::Relaxed);
1142                            }
1143                            Ok(())
1144                        }
1145                    },
1146                    None::<(
1147                        std::time::Duration,
1148                        fn() -> std::future::Ready<Result<(), EngineError>>,
1149                    )>,
1150                )
1151                .await;
1152
1153            assert!(result.is_ok());
1154            assert!(total_bytes.load(Ordering::Relaxed) > 0);
1155        }
1156
1157        #[tokio::test]
1158        async fn run_async_sink_error_does_not_crash() {
1159            let config = crate::transport::memory::MemoryConfig {
1160                recv_timeout_ms: 50,
1161                ..Default::default()
1162            };
1163            let transport = crate::transport::memory::MemoryTransport::new(&config)
1164                .expect("memory transport with valid config must construct");
1165
1166            transport
1167                .inject(None, json_payload("events", 0))
1168                .await
1169                .unwrap();
1170
1171            let engine = default_engine();
1172            let shutdown = tokio_util::sync::CancellationToken::new();
1173            let shutdown_clone = shutdown.clone();
1174
1175            tokio::spawn(async move {
1176                tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1177                shutdown_clone.cancel();
1178            });
1179
1180            // Sink always errors -- engine should continue (not crash)
1181            let result = engine
1182                .run_async(
1183                    &transport,
1184                    shutdown,
1185                    |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1186                    |_results, _tokens| async { Err(EngineError::Sink("test sink error".into())) },
1187                    None::<(
1188                        std::time::Duration,
1189                        fn() -> std::future::Ready<Result<(), EngineError>>,
1190                    )>,
1191                )
1192                .await;
1193
1194            // Should shut down cleanly (not propagate sink error)
1195            assert!(result.is_ok());
1196        }
1197    }
1198}