1pub mod config;
10pub mod intern;
11pub mod metrics;
12pub mod parse;
13pub mod pre_route;
14pub mod types;
15
16pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
17pub use intern::FieldInterner;
18pub use types::{MessageMetadata, ParsedMessage, PreRouteResult, RawMessage};
19
20#[cfg(feature = "transport")]
24#[derive(Debug, thiserror::Error)]
25pub enum EngineError {
26 #[error("transport error: {0}")]
28 Transport(#[from] crate::TransportError),
29 #[error("sink error: {0}")]
31 Sink(String),
32 #[error("shutdown")]
34 Shutdown,
35}
36
37use std::sync::Arc;
38
39use rayon::prelude::*;
40
41use super::pool::AdaptiveWorkerPool;
42use super::stats::PipelineStats;
43
44use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field, filters_from_config};
45use self::types::PayloadFormat;
46use super::config::WorkerPoolConfig;
47
48pub struct BatchEngine {
63 config: BatchProcessingConfig,
64 pool: Arc<AdaptiveWorkerPool>,
65 stats: Arc<PipelineStats>,
66 interner: Arc<FieldInterner>,
67 filters: Vec<pre_route::PreRouteFilter>,
68 #[cfg(feature = "memory")]
69 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
70}
71
72impl BatchEngine {
73 #[must_use]
78 pub fn new(config: BatchProcessingConfig) -> Self {
79 let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
80 Self::with_pool(pool, config)
81 }
82
83 #[must_use]
88 pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
89 let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
90 let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
91 let filters = filters_from_config(&config.pre_route_filters);
92 Self {
93 config,
94 pool,
95 stats: Arc::new(PipelineStats::new()),
96 interner,
97 filters,
98 #[cfg(feature = "memory")]
99 memory_guard: None,
100 }
101 }
102
103 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
109 let config = BatchProcessingConfig::from_cascade(key)?;
110 Ok(Self::new(config))
111 }
112
113 #[must_use]
115 pub fn stats(&self) -> &Arc<PipelineStats> {
116 &self.stats
117 }
118
119 #[must_use]
121 pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
122 &self.pool
123 }
124
125 #[must_use]
127 pub fn config(&self) -> &BatchProcessingConfig {
128 &self.config
129 }
130
131 pub fn auto_wire(
135 &mut self,
136 metrics_manager: &crate::metrics::MetricsManager,
137 #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
138 ) {
139 metrics::register(metrics_manager, &self.config);
140
141 #[cfg(feature = "memory")]
142 if let Some(guard) = memory_guard {
143 self.memory_guard = Some(Arc::clone(guard));
144 }
145 }
146
147 pub fn process_mid_tier<O, E, F>(
158 &self,
159 messages: &[RawMessage],
160 transform: F,
161 ) -> Vec<Result<O, E>>
162 where
163 O: Send,
164 E: Send + From<String>,
165 F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
166 {
167 if messages.is_empty() {
168 return Vec::new();
169 }
170
171 let chunk_size = if self.config.max_chunk_size == 0 {
172 messages.len()
173 } else {
174 self.config.max_chunk_size
175 };
176
177 let has_routing = self.config.routing_field.is_some();
178 let mut all_results = Vec::with_capacity(messages.len());
179
180 for chunk in messages.chunks(chunk_size) {
181 self.stats.add_received(chunk.len() as u64);
182
183 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
185 self.stats.add_bytes_received(chunk_bytes);
186
187 let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
190 Vec::with_capacity(chunk.len());
191
192 for msg in chunk {
193 if has_routing {
195 let field_name = self.config.routing_field.as_ref().expect("checked above");
196 let extraction = extract_routing_field(&msg.payload, field_name);
197 let outcome = apply_filters(&extraction, &self.filters);
198
199 match outcome {
200 PreRouteOutcome::Continue => {}
201 PreRouteOutcome::Filtered => {
202 self.stats.incr_filtered();
203 continue; }
205 PreRouteOutcome::Dlq(reason) => {
206 self.stats.incr_dlq();
207 self.stats.incr_errors();
208 parsed_msgs.push(Err(reason));
209 continue;
210 }
211 }
212 }
213
214 let format = match msg.metadata.format {
216 PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
217 other => other,
218 };
219
220 match parse::parse_payload(&msg.payload, format) {
221 Ok(value) => {
222 let extracted = self.interner.extract_known(&value);
223 parsed_msgs.push(Ok(ParsedMessage::Parsed {
224 value,
225 raw: msg.payload.clone(),
226 format,
227 key: msg.key.clone(),
228 headers: msg.headers.clone(),
229 metadata: msg.metadata.clone(),
230 extracted,
231 }));
232 }
233 Err(e) => {
234 self.stats.incr_errors();
235 match self.config.parse_error_action {
236 ParseErrorAction::Dlq => {
237 self.stats.incr_dlq();
238 parsed_msgs.push(Err(format!("parse error: {e}")));
239 }
240 ParseErrorAction::Skip => {
241 }
243 ParseErrorAction::FailBatch => {
244 parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
247 let results: Vec<Result<O, E>> = parsed_msgs
248 .into_iter()
249 .map(|r| match r {
250 Ok(_) => Err(E::from(
251 "batch failed due to parse error".to_string(),
252 )),
253 Err(reason) => Err(E::from(reason)),
254 })
255 .collect();
256 all_results.extend(results);
257 return all_results;
258 }
259 }
260 }
261 }
262 }
263
264 let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
267 parsed_msgs.into_iter().enumerate().collect();
268
269 let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
271 let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
272
273 for (idx, item) in indexed.drain(..) {
274 match item {
275 Ok(pm) => to_transform.push((idx, pm)),
276 Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
277 }
278 }
279
280 let transformed: Vec<(usize, Result<O, E>)> = self.pool.install(|| {
282 to_transform
283 .into_par_iter()
284 .map(|(idx, mut pm)| {
285 let result = transform(&mut pm);
286 (idx, result)
287 })
288 .collect()
289 });
290
291 chunk_results.extend(transformed);
292
293 chunk_results.sort_by_key(|(idx, _)| *idx);
295
296 let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
298 self.stats.add_processed(ok_count as u64);
299
300 all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
301
302 self.check_memory_pressure();
304 }
305
306 all_results
307 }
308
309 pub fn process_raw<O, E, F>(&self, messages: &[RawMessage], transform: F) -> Vec<Result<O, E>>
314 where
315 O: Send,
316 E: Send + From<String>,
317 F: Fn(&RawMessage) -> Result<O, E> + Sync,
318 {
319 if messages.is_empty() {
320 return Vec::new();
321 }
322
323 let chunk_size = if self.config.max_chunk_size == 0 {
324 messages.len()
325 } else {
326 self.config.max_chunk_size
327 };
328
329 let has_routing = self.config.routing_field.is_some();
330 let mut all_results = Vec::with_capacity(messages.len());
331
332 for chunk in messages.chunks(chunk_size) {
333 self.stats.add_received(chunk.len() as u64);
334
335 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
336 self.stats.add_bytes_received(chunk_bytes);
337
338 let to_process: Vec<&RawMessage> = if has_routing {
340 let field_name = self.config.routing_field.as_ref().expect("checked above");
341 let mut passed = Vec::with_capacity(chunk.len());
342 for msg in chunk {
343 let extraction = extract_routing_field(&msg.payload, field_name);
344 let outcome = apply_filters(&extraction, &self.filters);
345 match outcome {
346 PreRouteOutcome::Continue => passed.push(msg),
347 PreRouteOutcome::Filtered => {
348 self.stats.incr_filtered();
349 }
350 PreRouteOutcome::Dlq(reason) => {
351 self.stats.incr_dlq();
352 self.stats.incr_errors();
353 all_results.push(Err(E::from(reason)));
354 }
355 }
356 }
357 passed
358 } else {
359 chunk.iter().collect()
360 };
361
362 let results = self.pool.process_batch(&to_process, |msg| transform(msg));
364
365 let ok_count = results.iter().filter(|r| r.is_ok()).count();
366 self.stats.add_processed(ok_count as u64);
367
368 all_results.extend(results);
369
370 self.check_memory_pressure();
371 }
372
373 all_results
374 }
375
376 #[cfg(feature = "transport")]
389 pub async fn run<R, O, E, Transform, Sink>(
390 &self,
391 receiver: &R,
392 shutdown: tokio_util::sync::CancellationToken,
393 transform: Transform,
394 mut sink: Sink,
395 ) -> Result<(), EngineError>
396 where
397 R: crate::transport::TransportReceiver,
398 O: Send + 'static,
399 E: Send + From<String> + std::fmt::Display + 'static,
400 Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
401 Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
402 {
403 tracing::info!(
404 chunk_size = self.config.max_chunk_size,
405 routing_field = ?self.config.routing_field,
406 "BatchEngine starting"
407 );
408
409 loop {
410 tokio::select! {
411 biased;
412 () = shutdown.cancelled() => {
413 tracing::info!("BatchEngine shutting down");
414 return Ok(());
415 }
416 recv_result = receiver.recv(self.config.max_chunk_size) => {
417 let messages = recv_result.map_err(EngineError::Transport)?;
418 if messages.is_empty() {
419 continue;
420 }
421
422 let tokens: Vec<R::Token> = messages.iter()
424 .map(|m| m.token.clone())
425 .collect();
426
427 let raw: Vec<RawMessage> = messages.into_iter()
429 .map(RawMessage::from)
430 .collect();
431
432 let results = self.process_mid_tier(&raw, &transform);
434
435 if let Err(e) = sink(results) {
437 tracing::error!(error = %e, "Sink failed, skipping commit");
438 continue;
439 }
440
441 if let Err(e) = receiver.commit(&tokens).await {
442 tracing::error!(error = %e, "Commit failed");
443 }
444 }
445 }
446 }
447 }
448
449 #[cfg(feature = "transport")]
459 pub async fn run_raw<R, O, E, Transform, Sink>(
460 &self,
461 receiver: &R,
462 shutdown: tokio_util::sync::CancellationToken,
463 transform: Transform,
464 mut sink: Sink,
465 ) -> Result<(), EngineError>
466 where
467 R: crate::transport::TransportReceiver,
468 O: Send + 'static,
469 E: Send + From<String> + std::fmt::Display + 'static,
470 Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
471 Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
472 {
473 tracing::info!(
474 chunk_size = self.config.max_chunk_size,
475 "BatchEngine (raw) starting"
476 );
477
478 loop {
479 tokio::select! {
480 biased;
481 () = shutdown.cancelled() => {
482 tracing::info!("BatchEngine (raw) shutting down");
483 return Ok(());
484 }
485 recv_result = receiver.recv(self.config.max_chunk_size) => {
486 let messages = recv_result.map_err(EngineError::Transport)?;
487 if messages.is_empty() {
488 continue;
489 }
490
491 let tokens: Vec<R::Token> = messages.iter()
492 .map(|m| m.token.clone())
493 .collect();
494
495 let raw: Vec<RawMessage> = messages.into_iter()
496 .map(RawMessage::from)
497 .collect();
498
499 let results = self.process_raw(&raw, &transform);
500
501 if let Err(e) = sink(results) {
502 tracing::error!(error = %e, "Sink failed (raw), skipping commit");
503 continue;
504 }
505
506 if let Err(e) = receiver.commit(&tokens).await {
507 tracing::error!(error = %e, "Commit failed (raw)");
508 }
509 }
510 }
511 }
512 }
513
514 #[cfg(feature = "transport")]
534 pub async fn run_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
535 &self,
536 receiver: &R,
537 shutdown: tokio_util::sync::CancellationToken,
538 transform: Transform,
539 mut sink: Sink,
540 ticker: Option<(std::time::Duration, Ticker)>,
541 ) -> Result<(), EngineError>
542 where
543 R: crate::transport::TransportReceiver,
544 O: Send + 'static,
545 E: Send + From<String> + std::fmt::Display + 'static,
546 Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
547 Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
548 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
549 Ticker: FnMut() -> TickerFut,
550 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
551 {
552 tracing::info!(
553 chunk_size = self.config.max_chunk_size,
554 routing_field = ?self.config.routing_field,
555 ticker = ticker.is_some(),
556 "BatchEngine (async) starting"
557 );
558
559 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
561 let mut ticker_fn = ticker.map(|(_, f)| f);
562
563 if let Some(ref mut interval) = tick_interval {
565 interval.tick().await;
566 }
567
568 loop {
569 tokio::select! {
570 biased;
571
572 () = shutdown.cancelled() => {
573 tracing::info!("BatchEngine (async) shutting down");
574 return Ok(());
575 }
576
577 _ = async {
578 match tick_interval.as_mut() {
579 Some(interval) => interval.tick().await,
580 None => std::future::pending().await,
581 }
582 } => {
583 if let Some(ref mut f) = ticker_fn
584 && let Err(e) = f().await
585 {
586 tracing::error!(error = %e, "Ticker failed");
587 }
588 }
589
590 recv_result = receiver.recv(self.config.max_chunk_size) => {
591 let messages = recv_result.map_err(EngineError::Transport)?;
592 if messages.is_empty() {
593 continue;
594 }
595
596 let tokens: Vec<R::Token> = messages.iter()
598 .map(|m| m.token.clone())
599 .collect();
600
601 let raw: Vec<RawMessage> = messages.into_iter()
603 .map(RawMessage::from)
604 .collect();
605
606 let results = self.process_mid_tier(&raw, &transform);
608
609 if let Err(e) = sink(results, tokens).await {
611 tracing::error!(error = %e, "Sink failed (async)");
612 }
613 }
614 }
615 }
616 }
617
618 #[cfg(feature = "transport")]
628 pub async fn run_raw_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
629 &self,
630 receiver: &R,
631 shutdown: tokio_util::sync::CancellationToken,
632 transform: Transform,
633 mut sink: Sink,
634 ticker: Option<(std::time::Duration, Ticker)>,
635 ) -> Result<(), EngineError>
636 where
637 R: crate::transport::TransportReceiver,
638 O: Send + 'static,
639 E: Send + From<String> + std::fmt::Display + 'static,
640 Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
641 Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
642 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
643 Ticker: FnMut() -> TickerFut,
644 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
645 {
646 tracing::info!(
647 chunk_size = self.config.max_chunk_size,
648 ticker = ticker.is_some(),
649 "BatchEngine (raw async) starting"
650 );
651
652 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
653 let mut ticker_fn = ticker.map(|(_, f)| f);
654
655 if let Some(ref mut interval) = tick_interval {
656 interval.tick().await;
657 }
658
659 loop {
660 tokio::select! {
661 biased;
662
663 () = shutdown.cancelled() => {
664 tracing::info!("BatchEngine (raw async) shutting down");
665 return Ok(());
666 }
667
668 _ = async {
669 match tick_interval.as_mut() {
670 Some(interval) => interval.tick().await,
671 None => std::future::pending().await,
672 }
673 } => {
674 if let Some(ref mut f) = ticker_fn
675 && let Err(e) = f().await
676 {
677 tracing::error!(error = %e, "Ticker (raw) failed");
678 }
679 }
680
681 recv_result = receiver.recv(self.config.max_chunk_size) => {
682 let messages = recv_result.map_err(EngineError::Transport)?;
683 if messages.is_empty() {
684 continue;
685 }
686
687 let tokens: Vec<R::Token> = messages.iter()
688 .map(|m| m.token.clone())
689 .collect();
690
691 let raw: Vec<RawMessage> = messages.into_iter()
692 .map(RawMessage::from)
693 .collect();
694
695 let results = self.process_raw(&raw, &transform);
696
697 if let Err(e) = sink(results, tokens).await {
698 tracing::error!(error = %e, "Sink failed (raw async)");
699 }
700 }
701 }
702 }
703 }
704
705 #[allow(clippy::unused_self)]
711 fn check_memory_pressure(&self) {
712 #[cfg(feature = "memory")]
713 if let Some(guard) = &self.memory_guard
714 && guard.under_pressure()
715 {
716 tracing::warn!(
717 pause_ms = self.config.memory_pressure_pause_ms,
718 "BatchEngine: memory pressure detected, pausing between chunks"
719 );
720 std::thread::sleep(std::time::Duration::from_millis(
721 self.config.memory_pressure_pause_ms,
722 ));
723 }
724 }
725}
726
727impl std::fmt::Debug for BatchEngine {
728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 let mut s = f.debug_struct("BatchEngine");
730 s.field("config", &self.config)
731 .field("pool_max_threads", &self.pool.max_threads())
732 .field("stats", &self.stats.snapshot())
733 .field("interner_len", &self.interner.len())
734 .field("filters", &self.filters);
735 #[cfg(feature = "memory")]
736 s.field("memory_guard", &self.memory_guard.is_some());
737 s.finish()
738 }
739}
740
741#[cfg(test)]
742mod engine_tests {
743 use super::*;
744 use bytes::Bytes;
745
746 fn make_json_messages(n: usize) -> Vec<RawMessage> {
747 (0..n)
748 .map(|i| RawMessage {
749 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
750 key: None,
751 headers: vec![],
752 metadata: MessageMetadata {
753 timestamp_ms: None,
754 format: types::PayloadFormat::Json,
755 commit_token: None,
756 },
757 })
758 .collect()
759 }
760
761 fn default_engine() -> BatchEngine {
762 BatchEngine::new(BatchProcessingConfig::default())
763 }
764
765 #[test]
766 fn process_mid_tier_basic() {
767 let engine = default_engine();
768 let msgs = make_json_messages(100);
769
770 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
771 Ok(pm
772 .field("_table")
773 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
774 .unwrap_or("unknown")
775 .to_string())
776 });
777
778 assert_eq!(results.len(), 100);
779 assert!(results.iter().all(|r| r.is_ok()));
780 assert_eq!(results[0].as_ref().unwrap(), "events");
781 }
782
783 #[test]
784 fn process_mid_tier_parse_error() {
785 let engine = default_engine();
786 let mut msgs = make_json_messages(2);
787 msgs.insert(
789 1,
790 RawMessage {
791 payload: Bytes::from_static(b"not json {{{"),
792 key: None,
793 headers: vec![],
794 metadata: MessageMetadata {
795 timestamp_ms: None,
796 format: types::PayloadFormat::Json,
797 commit_token: None,
798 },
799 },
800 );
801
802 let results: Vec<Result<String, String>> =
803 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
804
805 assert_eq!(results.len(), 3);
807 assert!(results[0].is_ok());
808 assert!(results[1].is_err());
809 assert!(results[1].as_ref().unwrap_err().contains("parse error"));
810 assert!(results[2].is_ok());
811 }
812
813 #[test]
814 fn process_mid_tier_empty_batch() {
815 let engine = default_engine();
816 let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
817 assert!(results.is_empty());
818 }
819
820 #[test]
821 fn process_mid_tier_respects_chunk_size() {
822 let config = BatchProcessingConfig {
823 max_chunk_size: 50,
824 ..Default::default()
825 };
826 let engine = BatchEngine::new(config);
827 let msgs = make_json_messages(120);
828
829 let results: Vec<Result<usize, String>> =
830 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
831
832 assert_eq!(results.len(), 120);
833 assert!(results.iter().all(|r| r.is_ok()));
834 let snap = engine.stats().snapshot();
836 assert_eq!(snap.received, 120);
837 }
838
839 #[test]
840 fn stats_updated_after_processing() {
841 let engine = default_engine();
842 let msgs = make_json_messages(10);
843
844 let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
845
846 let snap = engine.stats().snapshot();
847 assert_eq!(snap.received, 10);
848 assert_eq!(snap.processed, 10);
849 assert_eq!(snap.errors, 0);
850 assert_eq!(snap.filtered, 0);
851 }
852
853 #[test]
854 fn process_raw_passthrough() {
855 let engine = default_engine();
856 let msgs = make_json_messages(50);
857
858 let results: Vec<Result<usize, String>> =
859 engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
860
861 assert_eq!(results.len(), 50);
862 assert!(results.iter().all(|r| r.is_ok()));
863 assert!(results[0].as_ref().unwrap() > &0);
865
866 let snap = engine.stats().snapshot();
867 assert_eq!(snap.received, 50);
868 assert_eq!(snap.processed, 50);
869 }
870
871 #[test]
872 fn process_mid_tier_with_pre_route() {
873 let config = BatchProcessingConfig {
874 routing_field: Some("_table".to_string()),
875 pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
876 field: "_table".to_string(),
877 value: "poison".to_string(),
878 }],
879 ..Default::default()
880 };
881 let engine = BatchEngine::new(config);
882
883 let mut msgs = make_json_messages(3);
884 msgs[1] = RawMessage {
886 payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
887 key: None,
888 headers: vec![],
889 metadata: MessageMetadata {
890 timestamp_ms: None,
891 format: types::PayloadFormat::Json,
892 commit_token: None,
893 },
894 };
895
896 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
897 Ok(pm
898 .field("_table")
899 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
900 .unwrap_or("?")
901 .to_string())
902 });
903
904 assert_eq!(results.len(), 3);
906 assert!(results[0].is_ok());
907 assert!(results[1].is_err());
908 assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
909 assert!(results[2].is_ok());
910
911 let snap = engine.stats().snapshot();
912 assert_eq!(snap.dlq, 1);
913 assert_eq!(snap.errors, 1);
914 }
915
916 #[test]
917 fn process_mid_tier_filtered_not_in_results() {
918 let config = BatchProcessingConfig {
919 routing_field: Some("_table".to_string()),
920 pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
921 field: "_table".to_string(),
922 }],
923 ..Default::default()
924 };
925 let engine = BatchEngine::new(config);
926
927 let mut msgs = make_json_messages(3);
928 msgs[1] = RawMessage {
930 payload: Bytes::from(r#"{"host":"web1"}"#),
931 key: None,
932 headers: vec![],
933 metadata: MessageMetadata {
934 timestamp_ms: None,
935 format: types::PayloadFormat::Json,
936 commit_token: None,
937 },
938 };
939
940 let results: Vec<Result<String, String>> =
941 engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
942
943 assert_eq!(results.len(), 2);
945 assert!(results.iter().all(|r| r.is_ok()));
946
947 let snap = engine.stats().snapshot();
948 assert_eq!(snap.filtered, 1);
949 assert_eq!(snap.received, 3);
950 }
951
952 #[test]
953 fn from_cascade_creates_engine() {
954 let engine = BatchEngine::from_cascade("batch_processing").unwrap();
955 assert_eq!(engine.config().max_chunk_size, 10_000);
956 }
957
958 #[test]
959 fn accessors_return_expected_types() {
960 let engine = default_engine();
961 let _stats = engine.stats();
962 let _pool = engine.pool();
963 let _config = engine.config();
964 assert_eq!(engine.stats().snapshot().received, 0);
965 }
966
967 #[test]
968 fn auto_wire_does_not_panic() {
969 let mut engine = default_engine();
970 let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
971 engine.auto_wire(
972 &mgr,
973 #[cfg(feature = "memory")]
974 None,
975 );
976 let msgs = make_json_messages(5);
978 let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
979 assert_eq!(results.len(), 5);
980 }
981
982 #[test]
983 fn debug_impl_works() {
984 let engine = default_engine();
985 let debug = format!("{engine:?}");
986 assert!(debug.contains("BatchEngine"));
987 assert!(debug.contains("config"));
988 }
989
990 #[cfg(feature = "transport-memory")]
991 mod async_engine_tests {
992 use super::*;
993 use std::sync::atomic::{AtomicU64, Ordering};
994
995 fn json_payload(table: &str, id: usize) -> Vec<u8> {
996 format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
997 }
998
999 #[tokio::test]
1000 async fn run_async_processes_and_passes_tokens_to_sink() {
1001 let config = crate::transport::memory::MemoryConfig {
1002 recv_timeout_ms: 50,
1003 ..Default::default()
1004 };
1005 let transport = crate::transport::memory::MemoryTransport::new(&config)
1006 .expect("memory transport with valid config must construct");
1007 for i in 0..5 {
1009 transport
1010 .inject(None, json_payload("events", i))
1011 .await
1012 .unwrap();
1013 }
1014
1015 let engine = default_engine();
1016 let shutdown = tokio_util::sync::CancellationToken::new();
1017 let shutdown_clone = shutdown.clone();
1018
1019 let sink_count = Arc::new(AtomicU64::new(0));
1020 let token_count = Arc::new(AtomicU64::new(0));
1021 let sink_count_clone = Arc::clone(&sink_count);
1022 let token_count_clone = Arc::clone(&token_count);
1023
1024 tokio::spawn(async move {
1026 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1027 shutdown_clone.cancel();
1028 });
1029
1030 let result = engine
1031 .run_async(
1032 &transport,
1033 shutdown,
1034 |pm: &mut ParsedMessage| -> Result<String, String> {
1035 Ok(pm
1036 .field("_table")
1037 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1038 .unwrap_or("?")
1039 .to_string())
1040 },
1041 |results, tokens| {
1042 let sc = Arc::clone(&sink_count_clone);
1043 let tc = Arc::clone(&token_count_clone);
1044 async move {
1045 sc.fetch_add(results.len() as u64, Ordering::Relaxed);
1046 tc.fetch_add(tokens.len() as u64, Ordering::Relaxed);
1047 Ok(())
1049 }
1050 },
1051 None::<(
1052 std::time::Duration,
1053 fn() -> std::future::Ready<Result<(), EngineError>>,
1054 )>,
1055 )
1056 .await;
1057
1058 assert!(result.is_ok());
1059 assert_eq!(sink_count.load(Ordering::Relaxed), 5);
1060 assert_eq!(token_count.load(Ordering::Relaxed), 5);
1061 }
1062
1063 #[tokio::test]
1064 async fn run_async_ticker_fires() {
1065 let config = crate::transport::memory::MemoryConfig {
1066 recv_timeout_ms: 50,
1067 ..Default::default()
1068 };
1069 let transport = crate::transport::memory::MemoryTransport::new(&config)
1070 .expect("memory transport with valid config must construct");
1071 let engine = default_engine();
1072 let shutdown = tokio_util::sync::CancellationToken::new();
1073 let shutdown_clone = shutdown.clone();
1074
1075 let tick_count = Arc::new(AtomicU64::new(0));
1076 let tick_count_clone = Arc::clone(&tick_count);
1077
1078 tokio::spawn(async move {
1080 tokio::time::sleep(std::time::Duration::from_millis(350)).await;
1081 shutdown_clone.cancel();
1082 });
1083
1084 let result = engine
1085 .run_async(
1086 &transport,
1087 shutdown,
1088 |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1089 |_results, _tokens| async { Ok(()) },
1090 Some((std::time::Duration::from_millis(100), move || {
1091 let tc = Arc::clone(&tick_count_clone);
1092 async move {
1093 tc.fetch_add(1, Ordering::Relaxed);
1094 Ok(())
1095 }
1096 })),
1097 )
1098 .await;
1099
1100 assert!(result.is_ok());
1101 let ticks = tick_count.load(Ordering::Relaxed);
1102 assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1103 }
1104
1105 #[tokio::test]
1106 async fn run_raw_async_processes_without_parse() {
1107 let config = crate::transport::memory::MemoryConfig {
1108 recv_timeout_ms: 50,
1109 ..Default::default()
1110 };
1111 let transport = crate::transport::memory::MemoryTransport::new(&config)
1112 .expect("memory transport with valid config must construct");
1113 for i in 0..3 {
1114 transport
1115 .inject(None, json_payload("logs", i))
1116 .await
1117 .unwrap();
1118 }
1119
1120 let engine = default_engine();
1121 let shutdown = tokio_util::sync::CancellationToken::new();
1122 let shutdown_clone = shutdown.clone();
1123
1124 let total_bytes = Arc::new(AtomicU64::new(0));
1125 let total_bytes_clone = Arc::clone(&total_bytes);
1126
1127 tokio::spawn(async move {
1128 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1129 shutdown_clone.cancel();
1130 });
1131
1132 let result = engine
1133 .run_raw_async(
1134 &transport,
1135 shutdown,
1136 |msg: &RawMessage| -> Result<usize, String> { Ok(msg.payload.len()) },
1137 |results, _tokens| {
1138 let tb = Arc::clone(&total_bytes_clone);
1139 async move {
1140 for len in results.iter().flatten() {
1141 tb.fetch_add(*len as u64, Ordering::Relaxed);
1142 }
1143 Ok(())
1144 }
1145 },
1146 None::<(
1147 std::time::Duration,
1148 fn() -> std::future::Ready<Result<(), EngineError>>,
1149 )>,
1150 )
1151 .await;
1152
1153 assert!(result.is_ok());
1154 assert!(total_bytes.load(Ordering::Relaxed) > 0);
1155 }
1156
1157 #[tokio::test]
1158 async fn run_async_sink_error_does_not_crash() {
1159 let config = crate::transport::memory::MemoryConfig {
1160 recv_timeout_ms: 50,
1161 ..Default::default()
1162 };
1163 let transport = crate::transport::memory::MemoryTransport::new(&config)
1164 .expect("memory transport with valid config must construct");
1165
1166 transport
1167 .inject(None, json_payload("events", 0))
1168 .await
1169 .unwrap();
1170
1171 let engine = default_engine();
1172 let shutdown = tokio_util::sync::CancellationToken::new();
1173 let shutdown_clone = shutdown.clone();
1174
1175 tokio::spawn(async move {
1176 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1177 shutdown_clone.cancel();
1178 });
1179
1180 let result = engine
1182 .run_async(
1183 &transport,
1184 shutdown,
1185 |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1186 |_results, _tokens| async { Err(EngineError::Sink("test sink error".into())) },
1187 None::<(
1188 std::time::Duration,
1189 fn() -> std::future::Ready<Result<(), EngineError>>,
1190 )>,
1191 )
1192 .await;
1193
1194 assert!(result.is_ok());
1196 }
1197 }
1198}