1pub mod config;
10pub mod intern;
11pub mod metrics;
12pub mod parse;
13pub mod pre_route;
14pub mod types;
15
16pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
17pub use intern::FieldInterner;
18pub use types::{MessageMetadata, ParsedMessage, PreRouteResult, RawMessage};
19
20#[cfg(feature = "transport")]
24#[derive(Debug, thiserror::Error)]
25pub enum EngineError {
26 #[error("transport error: {0}")]
28 Transport(#[from] crate::TransportError),
29 #[error("sink error: {0}")]
31 Sink(String),
32 #[error("shutdown")]
34 Shutdown,
35 #[error(
39 "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
40 configured -- set a policy via BatchEngine::with_filter_dlq_policy \
41 (Route to forward, or DiscardWithMetric to deliberately drop)"
42 )]
43 FilterDlqUnrouted(usize),
44}
45
46#[cfg(feature = "transport")]
53#[derive(Clone, Default)]
54pub enum FilterDlqPolicy {
55 #[default]
58 Reject,
59 DiscardWithMetric,
62 Route(Arc<dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) + Send + Sync>),
66}
67
68#[cfg(feature = "transport")]
69impl std::fmt::Debug for FilterDlqPolicy {
70 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71 match self {
72 Self::Reject => f.write_str("Reject"),
73 Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
74 Self::Route(_) => f.write_str("Route(..)"),
75 }
76 }
77}
78
79use std::sync::Arc;
80
81use super::pool::AdaptiveWorkerPool;
82use super::stats::PipelineStats;
83
84use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field, filters_from_config};
85use self::types::PayloadFormat;
86use super::config::WorkerPoolConfig;
87
88pub struct BatchEngine {
103 config: BatchProcessingConfig,
104 pool: Arc<AdaptiveWorkerPool>,
105 stats: Arc<PipelineStats>,
106 interner: Arc<FieldInterner>,
107 filters: Vec<pre_route::PreRouteFilter>,
108 #[cfg(feature = "memory")]
109 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
110 #[cfg(feature = "transport")]
113 filter_dlq_policy: FilterDlqPolicy,
114}
115
116impl BatchEngine {
117 #[must_use]
122 pub fn new(config: BatchProcessingConfig) -> Self {
123 let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
124 Self::with_pool(pool, config)
125 }
126
127 #[must_use]
132 pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
133 let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
134 let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
135 let filters = filters_from_config(&config.pre_route_filters);
136 Self {
137 config,
138 pool,
139 stats: Arc::new(PipelineStats::new()),
140 interner,
141 filters,
142 #[cfg(feature = "memory")]
143 memory_guard: None,
144 #[cfg(feature = "transport")]
145 filter_dlq_policy: FilterDlqPolicy::default(),
146 }
147 }
148
149 #[cfg(feature = "transport")]
155 #[must_use]
156 pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
157 self.filter_dlq_policy = policy;
158 self
159 }
160
161 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
167 let config = BatchProcessingConfig::from_cascade(key)?;
168 Ok(Self::new(config))
169 }
170
171 #[must_use]
173 pub fn stats(&self) -> &Arc<PipelineStats> {
174 &self.stats
175 }
176
177 #[must_use]
179 pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
180 &self.pool
181 }
182
183 #[must_use]
185 pub fn config(&self) -> &BatchProcessingConfig {
186 &self.config
187 }
188
189 pub fn auto_wire(
193 &mut self,
194 metrics_manager: &crate::metrics::MetricsManager,
195 #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
196 ) {
197 metrics::register(metrics_manager, &self.config);
198
199 #[cfg(feature = "memory")]
200 if let Some(guard) = memory_guard {
201 self.memory_guard = Some(Arc::clone(guard));
202 }
203 }
204
205 pub fn process_mid_tier<O, E, F>(
216 &self,
217 messages: &[RawMessage],
218 transform: F,
219 ) -> Vec<Result<O, E>>
220 where
221 O: Send,
222 E: Send + From<String>,
223 F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
224 {
225 if messages.is_empty() {
226 return Vec::new();
227 }
228
229 let chunk_size = if self.config.max_chunk_size == 0 {
230 messages.len()
231 } else {
232 self.config.max_chunk_size
233 };
234
235 let has_routing = self.config.routing_field.is_some();
236 let mut all_results = Vec::with_capacity(messages.len());
237
238 for chunk in messages.chunks(chunk_size) {
239 self.stats.add_received(chunk.len() as u64);
240
241 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
243 self.stats.add_bytes_received(chunk_bytes);
244
245 let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
248 Vec::with_capacity(chunk.len());
249
250 for msg in chunk {
251 if has_routing {
253 let field_name = self.config.routing_field.as_ref().expect("checked above");
254 let extraction = extract_routing_field(&msg.payload, field_name);
255 let outcome = apply_filters(&extraction, &self.filters);
256
257 match outcome {
258 PreRouteOutcome::Continue => {}
259 PreRouteOutcome::Filtered => {
260 self.stats.incr_filtered();
261 continue; }
263 PreRouteOutcome::Dlq(reason) => {
264 self.stats.incr_dlq();
265 self.stats.incr_errors();
266 parsed_msgs.push(Err(reason));
267 continue;
268 }
269 }
270 }
271
272 let format = match msg.metadata.format {
274 PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
275 other => other,
276 };
277
278 match parse::parse_payload(&msg.payload, format) {
279 Ok(value) => {
280 let extracted = self.interner.extract_known(&value);
281 parsed_msgs.push(Ok(ParsedMessage::Parsed {
282 value,
283 raw: msg.payload.clone(),
284 format,
285 key: msg.key.clone(),
286 headers: msg.headers.clone(),
287 metadata: msg.metadata.clone(),
288 extracted,
289 }));
290 }
291 Err(e) => {
292 self.stats.incr_errors();
293 match self.config.parse_error_action {
294 ParseErrorAction::Dlq => {
295 self.stats.incr_dlq();
296 parsed_msgs.push(Err(format!("parse error: {e}")));
297 }
298 ParseErrorAction::Skip => {
299 }
301 ParseErrorAction::FailBatch => {
302 parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
305 let results: Vec<Result<O, E>> = parsed_msgs
306 .into_iter()
307 .map(|r| match r {
308 Ok(_) => Err(E::from(
309 "batch failed due to parse error".to_string(),
310 )),
311 Err(reason) => Err(E::from(reason)),
312 })
313 .collect();
314 all_results.extend(results);
315 return all_results;
316 }
317 }
318 }
319 }
320 }
321
322 let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
325 parsed_msgs.into_iter().enumerate().collect();
326
327 let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
329 let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
330
331 for (idx, item) in indexed.drain(..) {
332 match item {
333 Ok(pm) => to_transform.push((idx, pm)),
334 Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
335 }
336 }
337
338 let transformed: Vec<(usize, Result<O, E>)> =
342 self.pool.map_owned(to_transform, |(idx, mut pm)| {
343 let result = transform(&mut pm);
344 (idx, result)
345 });
346
347 chunk_results.extend(transformed);
348
349 chunk_results.sort_by_key(|(idx, _)| *idx);
351
352 let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
354 self.stats.add_processed(ok_count as u64);
355
356 all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
357
358 self.check_memory_pressure();
360 }
361
362 all_results
363 }
364
365 pub fn process_raw<O, E, F>(&self, messages: &[RawMessage], transform: F) -> Vec<Result<O, E>>
370 where
371 O: Send,
372 E: Send + From<String>,
373 F: Fn(&RawMessage) -> Result<O, E> + Sync,
374 {
375 if messages.is_empty() {
376 return Vec::new();
377 }
378
379 let chunk_size = if self.config.max_chunk_size == 0 {
380 messages.len()
381 } else {
382 self.config.max_chunk_size
383 };
384
385 let has_routing = self.config.routing_field.is_some();
386 let mut all_results = Vec::with_capacity(messages.len());
387
388 for chunk in messages.chunks(chunk_size) {
389 self.stats.add_received(chunk.len() as u64);
390
391 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
392 self.stats.add_bytes_received(chunk_bytes);
393
394 let to_process: Vec<&RawMessage> = if has_routing {
396 let field_name = self.config.routing_field.as_ref().expect("checked above");
397 let mut passed = Vec::with_capacity(chunk.len());
398 for msg in chunk {
399 let extraction = extract_routing_field(&msg.payload, field_name);
400 let outcome = apply_filters(&extraction, &self.filters);
401 match outcome {
402 PreRouteOutcome::Continue => passed.push(msg),
403 PreRouteOutcome::Filtered => {
404 self.stats.incr_filtered();
405 }
406 PreRouteOutcome::Dlq(reason) => {
407 self.stats.incr_dlq();
408 self.stats.incr_errors();
409 all_results.push(Err(E::from(reason)));
410 }
411 }
412 }
413 passed
414 } else {
415 chunk.iter().collect()
416 };
417
418 let results = self.pool.process_batch(&to_process, |msg| transform(msg));
420
421 let ok_count = results.iter().filter(|r| r.is_ok()).count();
422 self.stats.add_processed(ok_count as u64);
423
424 all_results.extend(results);
425
426 self.check_memory_pressure();
427 }
428
429 all_results
430 }
431
432 #[cfg(feature = "transport")]
445 pub async fn run<R, O, E, Transform, Sink>(
446 &self,
447 receiver: &R,
448 shutdown: tokio_util::sync::CancellationToken,
449 transform: Transform,
450 mut sink: Sink,
451 ) -> Result<(), EngineError>
452 where
453 R: crate::transport::TransportReceiver,
454 O: Send + 'static,
455 E: Send + From<String> + std::fmt::Display + 'static,
456 Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
457 Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
458 {
459 tracing::info!(
460 chunk_size = self.config.max_chunk_size,
461 routing_field = ?self.config.routing_field,
462 "BatchEngine starting"
463 );
464
465 loop {
466 tokio::select! {
467 biased;
468 () = shutdown.cancelled() => {
469 tracing::info!("BatchEngine shutting down");
470 return Ok(());
471 }
472 recv_result = receiver.recv(self.config.max_chunk_size) => {
473 let batch = recv_result.map_err(EngineError::Transport)?;
474 let messages = self.apply_filter_dlq_policy(batch)?;
477 if messages.is_empty() {
478 continue;
479 }
480
481 let tokens: Vec<R::Token> = messages.iter()
483 .map(|m| m.token.clone())
484 .collect();
485
486 let raw: Vec<RawMessage> = messages.into_iter()
488 .map(RawMessage::from)
489 .collect();
490
491 #[cfg(feature = "memory")]
494 let _ingress_lease = self.lease_ingress(&raw);
495
496 let results = self.process_mid_tier(&raw, &transform);
498
499 if let Err(e) = sink(results) {
501 tracing::error!(error = %e, "Sink failed, skipping commit");
502 continue;
503 }
504
505 if let Err(e) = receiver.commit(&tokens).await {
506 tracing::error!(error = %e, "Commit failed");
507 }
508 }
509 }
510 }
511 }
512
513 #[cfg(feature = "transport")]
523 pub async fn run_raw<R, O, E, Transform, Sink>(
524 &self,
525 receiver: &R,
526 shutdown: tokio_util::sync::CancellationToken,
527 transform: Transform,
528 mut sink: Sink,
529 ) -> Result<(), EngineError>
530 where
531 R: crate::transport::TransportReceiver,
532 O: Send + 'static,
533 E: Send + From<String> + std::fmt::Display + 'static,
534 Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
535 Sink: FnMut(Vec<Result<O, E>>) -> Result<(), EngineError>,
536 {
537 tracing::info!(
538 chunk_size = self.config.max_chunk_size,
539 "BatchEngine (raw) starting"
540 );
541
542 loop {
543 tokio::select! {
544 biased;
545 () = shutdown.cancelled() => {
546 tracing::info!("BatchEngine (raw) shutting down");
547 return Ok(());
548 }
549 recv_result = receiver.recv(self.config.max_chunk_size) => {
550 let batch = recv_result.map_err(EngineError::Transport)?;
551 let messages = self.apply_filter_dlq_policy(batch)?;
554 if messages.is_empty() {
555 continue;
556 }
557
558 let tokens: Vec<R::Token> = messages.iter()
559 .map(|m| m.token.clone())
560 .collect();
561
562 let raw: Vec<RawMessage> = messages.into_iter()
563 .map(RawMessage::from)
564 .collect();
565
566 #[cfg(feature = "memory")]
569 let _ingress_lease = self.lease_ingress(&raw);
570
571 let results = self.process_raw(&raw, &transform);
572
573 if let Err(e) = sink(results) {
574 tracing::error!(error = %e, "Sink failed (raw), skipping commit");
575 continue;
576 }
577
578 if let Err(e) = receiver.commit(&tokens).await {
579 tracing::error!(error = %e, "Commit failed (raw)");
580 }
581 }
582 }
583 }
584 }
585
586 #[cfg(feature = "transport")]
606 pub async fn run_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
607 &self,
608 receiver: &R,
609 shutdown: tokio_util::sync::CancellationToken,
610 transform: Transform,
611 mut sink: Sink,
612 ticker: Option<(std::time::Duration, Ticker)>,
613 ) -> Result<(), EngineError>
614 where
615 R: crate::transport::TransportReceiver,
616 O: Send + 'static,
617 E: Send + From<String> + std::fmt::Display + 'static,
618 Transform: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
619 Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
620 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
621 Ticker: FnMut() -> TickerFut,
622 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
623 {
624 tracing::info!(
625 chunk_size = self.config.max_chunk_size,
626 routing_field = ?self.config.routing_field,
627 ticker = ticker.is_some(),
628 "BatchEngine (async) starting"
629 );
630
631 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
633 let mut ticker_fn = ticker.map(|(_, f)| f);
634
635 if let Some(ref mut interval) = tick_interval {
637 interval.tick().await;
638 }
639
640 loop {
641 tokio::select! {
642 biased;
643
644 () = shutdown.cancelled() => {
645 tracing::info!("BatchEngine (async) shutting down");
646 return Ok(());
647 }
648
649 _ = async {
650 match tick_interval.as_mut() {
651 Some(interval) => interval.tick().await,
652 None => std::future::pending().await,
653 }
654 } => {
655 if let Some(ref mut f) = ticker_fn
656 && let Err(e) = f().await
657 {
658 tracing::error!(error = %e, "Ticker failed");
659 }
660 }
661
662 recv_result = receiver.recv(self.config.max_chunk_size) => {
663 let batch = recv_result.map_err(EngineError::Transport)?;
664 let messages = self.apply_filter_dlq_policy(batch)?;
667 if messages.is_empty() {
668 continue;
669 }
670
671 let tokens: Vec<R::Token> = messages.iter()
673 .map(|m| m.token.clone())
674 .collect();
675
676 let raw: Vec<RawMessage> = messages.into_iter()
678 .map(RawMessage::from)
679 .collect();
680
681 #[cfg(feature = "memory")]
684 let _ingress_lease = self.lease_ingress(&raw);
685
686 let results = self.process_mid_tier(&raw, &transform);
688
689 if let Err(e) = sink(results, tokens).await {
691 tracing::error!(error = %e, "Sink failed (async)");
692 }
693 }
694 }
695 }
696 }
697
698 #[cfg(feature = "transport")]
708 pub async fn run_raw_async<R, O, E, Transform, Sink, SinkFut, Ticker, TickerFut>(
709 &self,
710 receiver: &R,
711 shutdown: tokio_util::sync::CancellationToken,
712 transform: Transform,
713 mut sink: Sink,
714 ticker: Option<(std::time::Duration, Ticker)>,
715 ) -> Result<(), EngineError>
716 where
717 R: crate::transport::TransportReceiver,
718 O: Send + 'static,
719 E: Send + From<String> + std::fmt::Display + 'static,
720 Transform: Fn(&RawMessage) -> Result<O, E> + Sync,
721 Sink: FnMut(Vec<Result<O, E>>, Vec<R::Token>) -> SinkFut,
722 SinkFut: std::future::Future<Output = Result<(), EngineError>>,
723 Ticker: FnMut() -> TickerFut,
724 TickerFut: std::future::Future<Output = Result<(), EngineError>>,
725 {
726 tracing::info!(
727 chunk_size = self.config.max_chunk_size,
728 ticker = ticker.is_some(),
729 "BatchEngine (raw async) starting"
730 );
731
732 let mut tick_interval = ticker.as_ref().map(|(d, _)| tokio::time::interval(*d));
733 let mut ticker_fn = ticker.map(|(_, f)| f);
734
735 if let Some(ref mut interval) = tick_interval {
736 interval.tick().await;
737 }
738
739 loop {
740 tokio::select! {
741 biased;
742
743 () = shutdown.cancelled() => {
744 tracing::info!("BatchEngine (raw async) shutting down");
745 return Ok(());
746 }
747
748 _ = async {
749 match tick_interval.as_mut() {
750 Some(interval) => interval.tick().await,
751 None => std::future::pending().await,
752 }
753 } => {
754 if let Some(ref mut f) = ticker_fn
755 && let Err(e) = f().await
756 {
757 tracing::error!(error = %e, "Ticker (raw) failed");
758 }
759 }
760
761 recv_result = receiver.recv(self.config.max_chunk_size) => {
762 let batch = recv_result.map_err(EngineError::Transport)?;
763 let messages = self.apply_filter_dlq_policy(batch)?;
766 if messages.is_empty() {
767 continue;
768 }
769
770 let tokens: Vec<R::Token> = messages.iter()
771 .map(|m| m.token.clone())
772 .collect();
773
774 let raw: Vec<RawMessage> = messages.into_iter()
775 .map(RawMessage::from)
776 .collect();
777
778 #[cfg(feature = "memory")]
781 let _ingress_lease = self.lease_ingress(&raw);
782
783 let results = self.process_raw(&raw, &transform);
784
785 if let Err(e) = sink(results, tokens).await {
786 tracing::error!(error = %e, "Sink failed (raw async)");
787 }
788 }
789 }
790 }
791 }
792
793 #[cfg(feature = "memory")]
809 fn lease_ingress(&self, raw: &[RawMessage]) -> Option<IngressLease<'_>> {
810 let guard = self.memory_guard.as_ref()?;
811 let bytes: u64 = raw.iter().map(|m| m.payload.len() as u64).sum();
812 guard.add_bytes(bytes);
813 Some(IngressLease { guard, bytes })
814 }
815
816 #[cfg(feature = "transport")]
825 fn apply_filter_dlq_policy<T: crate::transport::CommitToken>(
826 &self,
827 batch: crate::transport::RecvBatch<T>,
828 ) -> Result<Vec<crate::transport::Message<T>>, EngineError> {
829 if !batch.dlq_entries.is_empty() {
830 match &self.filter_dlq_policy {
831 FilterDlqPolicy::Reject => {
832 return Err(EngineError::FilterDlqUnrouted(batch.dlq_entries.len()));
833 }
834 FilterDlqPolicy::DiscardWithMetric => {
835 #[cfg(feature = "metrics")]
836 ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
837 .increment(batch.dlq_entries.len() as u64);
838 }
839 FilterDlqPolicy::Route(sink) => sink(batch.dlq_entries),
840 }
841 }
842 Ok(batch.messages)
843 }
844
845 #[allow(clippy::unused_self)]
851 fn check_memory_pressure(&self) {
852 #[cfg(feature = "memory")]
853 if let Some(guard) = &self.memory_guard
854 && guard.under_pressure()
855 {
856 tracing::warn!(
857 pause_ms = self.config.memory_pressure_pause_ms,
858 "BatchEngine: memory pressure detected, pausing between chunks"
859 );
860 std::thread::sleep(std::time::Duration::from_millis(
861 self.config.memory_pressure_pause_ms,
862 ));
863 }
864 }
865}
866
867#[cfg(feature = "memory")]
874struct IngressLease<'a> {
875 guard: &'a crate::memory::MemoryGuard,
876 bytes: u64,
877}
878
879#[cfg(feature = "memory")]
880impl Drop for IngressLease<'_> {
881 fn drop(&mut self) {
882 self.guard.release(self.bytes);
883 }
884}
885
886impl std::fmt::Debug for BatchEngine {
887 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
888 let mut s = f.debug_struct("BatchEngine");
889 s.field("config", &self.config)
890 .field("pool_max_threads", &self.pool.max_threads())
891 .field("stats", &self.stats.snapshot())
892 .field("interner_len", &self.interner.len())
893 .field("filters", &self.filters);
894 #[cfg(feature = "memory")]
895 s.field("memory_guard", &self.memory_guard.is_some());
896 #[cfg(feature = "transport")]
897 s.field("filter_dlq_policy", &self.filter_dlq_policy);
898 s.finish()
899 }
900}
901
902#[cfg(test)]
903mod engine_tests {
904 use super::*;
905 use bytes::Bytes;
906
907 fn make_json_messages(n: usize) -> Vec<RawMessage> {
908 (0..n)
909 .map(|i| RawMessage {
910 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
911 key: None,
912 headers: vec![],
913 metadata: MessageMetadata {
914 timestamp_ms: None,
915 format: types::PayloadFormat::Json,
916 commit_token: None,
917 },
918 })
919 .collect()
920 }
921
922 fn default_engine() -> BatchEngine {
923 BatchEngine::new(BatchProcessingConfig::default())
924 }
925
926 #[cfg(feature = "transport")]
927 #[test]
928 fn filter_dlq_policy_routes_discards_or_rejects() {
929 use crate::transport::RecvBatch;
930 use crate::transport::filter::FilteredDlqEntry;
931 use std::sync::Arc as StdArc;
932 use std::sync::atomic::{AtomicUsize, Ordering};
933
934 #[derive(Clone, Debug)]
936 struct TestTok;
937 impl std::fmt::Display for TestTok {
938 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
939 f.write_str("test")
940 }
941 }
942 impl crate::transport::CommitToken for TestTok {}
943
944 let entry = || FilteredDlqEntry {
945 payload: b"x".to_vec(),
946 key: None,
947 reason: "r".to_string(),
948 };
949
950 let eng = default_engine();
952 let batch = RecvBatch::<TestTok> {
953 messages: vec![],
954 dlq_entries: vec![entry()],
955 };
956 assert!(matches!(
957 eng.apply_filter_dlq_policy(batch),
958 Err(EngineError::FilterDlqUnrouted(1))
959 ));
960 assert!(
962 eng.apply_filter_dlq_policy(RecvBatch::<TestTok>::from_messages(vec![]))
963 .is_ok()
964 );
965
966 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
968 let batch = RecvBatch::<TestTok> {
969 messages: vec![],
970 dlq_entries: vec![entry()],
971 };
972 assert!(eng.apply_filter_dlq_policy(batch).is_ok());
973
974 let seen = StdArc::new(AtomicUsize::new(0));
976 let s = StdArc::clone(&seen);
977 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
978 move |e: Vec<FilteredDlqEntry>| {
979 s.fetch_add(e.len(), Ordering::Relaxed);
980 },
981 )));
982 let batch = RecvBatch::<TestTok> {
983 messages: vec![],
984 dlq_entries: vec![entry(), entry()],
985 };
986 assert!(eng.apply_filter_dlq_policy(batch).is_ok());
987 assert_eq!(
988 seen.load(Ordering::Relaxed),
989 2,
990 "Route sink received all entries"
991 );
992 }
993
994 #[cfg(feature = "memory")]
995 #[test]
996 fn ingress_lease_accounts_and_releases() {
997 use crate::memory::{MemoryGuard, MemoryGuardConfig};
998
999 let mut engine = default_engine();
1000 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
1001 limit_bytes: 1024 * 1024,
1002 ..Default::default()
1003 }));
1004 engine.memory_guard = Some(Arc::clone(&guard));
1005
1006 let msgs = make_json_messages(10);
1007 let expected: u64 = msgs.iter().map(|m| m.payload.len() as u64).sum();
1008 assert_eq!(guard.current_bytes(), 0, "starts at zero");
1009
1010 {
1011 let _lease = engine.lease_ingress(&msgs).expect("guard present");
1012 assert_eq!(
1013 guard.current_bytes(),
1014 expected,
1015 "bytes accounted while lease held"
1016 );
1017 }
1018 assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
1020 }
1021
1022 #[cfg(feature = "memory")]
1023 #[test]
1024 fn ingress_lease_none_without_guard() {
1025 let engine = default_engine();
1026 let msgs = make_json_messages(5);
1027 assert!(
1028 engine.lease_ingress(&msgs).is_none(),
1029 "no lease when no guard wired"
1030 );
1031 }
1032
1033 #[test]
1034 fn process_mid_tier_basic() {
1035 let engine = default_engine();
1036 let msgs = make_json_messages(100);
1037
1038 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
1039 Ok(pm
1040 .field("_table")
1041 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1042 .unwrap_or("unknown")
1043 .to_string())
1044 });
1045
1046 assert_eq!(results.len(), 100);
1047 assert!(results.iter().all(|r| r.is_ok()));
1048 assert_eq!(results[0].as_ref().unwrap(), "events");
1049 }
1050
1051 #[test]
1052 fn process_mid_tier_parse_error() {
1053 let engine = default_engine();
1054 let mut msgs = make_json_messages(2);
1055 msgs.insert(
1057 1,
1058 RawMessage {
1059 payload: Bytes::from_static(b"not json {{{"),
1060 key: None,
1061 headers: vec![],
1062 metadata: MessageMetadata {
1063 timestamp_ms: None,
1064 format: types::PayloadFormat::Json,
1065 commit_token: None,
1066 },
1067 },
1068 );
1069
1070 let results: Vec<Result<String, String>> =
1071 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
1072
1073 assert_eq!(results.len(), 3);
1075 assert!(results[0].is_ok());
1076 assert!(results[1].is_err());
1077 assert!(results[1].as_ref().unwrap_err().contains("parse error"));
1078 assert!(results[2].is_ok());
1079 }
1080
1081 #[test]
1082 fn process_mid_tier_empty_batch() {
1083 let engine = default_engine();
1084 let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
1085 assert!(results.is_empty());
1086 }
1087
1088 #[test]
1089 fn process_mid_tier_respects_chunk_size() {
1090 let config = BatchProcessingConfig {
1091 max_chunk_size: 50,
1092 ..Default::default()
1093 };
1094 let engine = BatchEngine::new(config);
1095 let msgs = make_json_messages(120);
1096
1097 let results: Vec<Result<usize, String>> =
1098 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
1099
1100 assert_eq!(results.len(), 120);
1101 assert!(results.iter().all(|r| r.is_ok()));
1102 let snap = engine.stats().snapshot();
1104 assert_eq!(snap.received, 120);
1105 }
1106
1107 #[test]
1108 fn stats_updated_after_processing() {
1109 let engine = default_engine();
1110 let msgs = make_json_messages(10);
1111
1112 let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1113
1114 let snap = engine.stats().snapshot();
1115 assert_eq!(snap.received, 10);
1116 assert_eq!(snap.processed, 10);
1117 assert_eq!(snap.errors, 0);
1118 assert_eq!(snap.filtered, 0);
1119 }
1120
1121 #[test]
1122 fn process_raw_passthrough() {
1123 let engine = default_engine();
1124 let msgs = make_json_messages(50);
1125
1126 let results: Vec<Result<usize, String>> =
1127 engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
1128
1129 assert_eq!(results.len(), 50);
1130 assert!(results.iter().all(|r| r.is_ok()));
1131 assert!(results[0].as_ref().unwrap() > &0);
1133
1134 let snap = engine.stats().snapshot();
1135 assert_eq!(snap.received, 50);
1136 assert_eq!(snap.processed, 50);
1137 }
1138
1139 #[test]
1140 fn process_mid_tier_with_pre_route() {
1141 let config = BatchProcessingConfig {
1142 routing_field: Some("_table".to_string()),
1143 pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
1144 field: "_table".to_string(),
1145 value: "poison".to_string(),
1146 }],
1147 ..Default::default()
1148 };
1149 let engine = BatchEngine::new(config);
1150
1151 let mut msgs = make_json_messages(3);
1152 msgs[1] = RawMessage {
1154 payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
1155 key: None,
1156 headers: vec![],
1157 metadata: MessageMetadata {
1158 timestamp_ms: None,
1159 format: types::PayloadFormat::Json,
1160 commit_token: None,
1161 },
1162 };
1163
1164 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
1165 Ok(pm
1166 .field("_table")
1167 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1168 .unwrap_or("?")
1169 .to_string())
1170 });
1171
1172 assert_eq!(results.len(), 3);
1174 assert!(results[0].is_ok());
1175 assert!(results[1].is_err());
1176 assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
1177 assert!(results[2].is_ok());
1178
1179 let snap = engine.stats().snapshot();
1180 assert_eq!(snap.dlq, 1);
1181 assert_eq!(snap.errors, 1);
1182 }
1183
1184 #[test]
1185 fn process_mid_tier_filtered_not_in_results() {
1186 let config = BatchProcessingConfig {
1187 routing_field: Some("_table".to_string()),
1188 pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
1189 field: "_table".to_string(),
1190 }],
1191 ..Default::default()
1192 };
1193 let engine = BatchEngine::new(config);
1194
1195 let mut msgs = make_json_messages(3);
1196 msgs[1] = RawMessage {
1198 payload: Bytes::from(r#"{"host":"web1"}"#),
1199 key: None,
1200 headers: vec![],
1201 metadata: MessageMetadata {
1202 timestamp_ms: None,
1203 format: types::PayloadFormat::Json,
1204 commit_token: None,
1205 },
1206 };
1207
1208 let results: Vec<Result<String, String>> =
1209 engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
1210
1211 assert_eq!(results.len(), 2);
1213 assert!(results.iter().all(|r| r.is_ok()));
1214
1215 let snap = engine.stats().snapshot();
1216 assert_eq!(snap.filtered, 1);
1217 assert_eq!(snap.received, 3);
1218 }
1219
1220 #[test]
1221 fn from_cascade_creates_engine() {
1222 let engine = BatchEngine::from_cascade("batch_processing").unwrap();
1223 assert_eq!(engine.config().max_chunk_size, 10_000);
1224 }
1225
1226 #[test]
1227 fn accessors_return_expected_types() {
1228 let engine = default_engine();
1229 let _stats = engine.stats();
1230 let _pool = engine.pool();
1231 let _config = engine.config();
1232 assert_eq!(engine.stats().snapshot().received, 0);
1233 }
1234
1235 #[test]
1236 fn auto_wire_does_not_panic() {
1237 let mut engine = default_engine();
1238 let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1239 engine.auto_wire(
1240 &mgr,
1241 #[cfg(feature = "memory")]
1242 None,
1243 );
1244 let msgs = make_json_messages(5);
1246 let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1247 assert_eq!(results.len(), 5);
1248 }
1249
1250 #[test]
1251 fn debug_impl_works() {
1252 let engine = default_engine();
1253 let debug = format!("{engine:?}");
1254 assert!(debug.contains("BatchEngine"));
1255 assert!(debug.contains("config"));
1256 }
1257
1258 #[cfg(feature = "transport-memory")]
1259 mod async_engine_tests {
1260 use super::*;
1261 use std::sync::atomic::{AtomicU64, Ordering};
1262
1263 fn json_payload(table: &str, id: usize) -> Vec<u8> {
1264 format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1265 }
1266
1267 #[tokio::test]
1268 async fn run_async_processes_and_passes_tokens_to_sink() {
1269 let config = crate::transport::memory::MemoryConfig {
1270 recv_timeout_ms: 50,
1271 ..Default::default()
1272 };
1273 let transport = crate::transport::memory::MemoryTransport::new(&config)
1274 .expect("memory transport with valid config must construct");
1275 for i in 0..5 {
1277 transport
1278 .inject(None, json_payload("events", i))
1279 .await
1280 .unwrap();
1281 }
1282
1283 let engine = default_engine();
1284 let shutdown = tokio_util::sync::CancellationToken::new();
1285 let shutdown_clone = shutdown.clone();
1286
1287 let sink_count = Arc::new(AtomicU64::new(0));
1288 let token_count = Arc::new(AtomicU64::new(0));
1289 let sink_count_clone = Arc::clone(&sink_count);
1290 let token_count_clone = Arc::clone(&token_count);
1291
1292 tokio::spawn(async move {
1294 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1295 shutdown_clone.cancel();
1296 });
1297
1298 let result = engine
1299 .run_async(
1300 &transport,
1301 shutdown,
1302 |pm: &mut ParsedMessage| -> Result<String, String> {
1303 Ok(pm
1304 .field("_table")
1305 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
1306 .unwrap_or("?")
1307 .to_string())
1308 },
1309 |results, tokens| {
1310 let sc = Arc::clone(&sink_count_clone);
1311 let tc = Arc::clone(&token_count_clone);
1312 async move {
1313 sc.fetch_add(results.len() as u64, Ordering::Relaxed);
1314 tc.fetch_add(tokens.len() as u64, Ordering::Relaxed);
1315 Ok(())
1317 }
1318 },
1319 None::<(
1320 std::time::Duration,
1321 fn() -> std::future::Ready<Result<(), EngineError>>,
1322 )>,
1323 )
1324 .await;
1325
1326 assert!(result.is_ok());
1327 assert_eq!(sink_count.load(Ordering::Relaxed), 5);
1328 assert_eq!(token_count.load(Ordering::Relaxed), 5);
1329 }
1330
1331 #[tokio::test]
1332 async fn run_async_ticker_fires() {
1333 let config = crate::transport::memory::MemoryConfig {
1334 recv_timeout_ms: 50,
1335 ..Default::default()
1336 };
1337 let transport = crate::transport::memory::MemoryTransport::new(&config)
1338 .expect("memory transport with valid config must construct");
1339 let engine = default_engine();
1340 let shutdown = tokio_util::sync::CancellationToken::new();
1341 let shutdown_clone = shutdown.clone();
1342
1343 let tick_count = Arc::new(AtomicU64::new(0));
1344 let tick_count_clone = Arc::clone(&tick_count);
1345
1346 tokio::spawn(async move {
1348 tokio::time::sleep(std::time::Duration::from_millis(350)).await;
1349 shutdown_clone.cancel();
1350 });
1351
1352 let result = engine
1353 .run_async(
1354 &transport,
1355 shutdown,
1356 |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1357 |_results, _tokens| async { Ok(()) },
1358 Some((std::time::Duration::from_millis(100), move || {
1359 let tc = Arc::clone(&tick_count_clone);
1360 async move {
1361 tc.fetch_add(1, Ordering::Relaxed);
1362 Ok(())
1363 }
1364 })),
1365 )
1366 .await;
1367
1368 assert!(result.is_ok());
1369 let ticks = tick_count.load(Ordering::Relaxed);
1370 assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1371 }
1372
1373 #[tokio::test]
1374 async fn run_raw_async_processes_without_parse() {
1375 let config = crate::transport::memory::MemoryConfig {
1376 recv_timeout_ms: 50,
1377 ..Default::default()
1378 };
1379 let transport = crate::transport::memory::MemoryTransport::new(&config)
1380 .expect("memory transport with valid config must construct");
1381 for i in 0..3 {
1382 transport
1383 .inject(None, json_payload("logs", i))
1384 .await
1385 .unwrap();
1386 }
1387
1388 let engine = default_engine();
1389 let shutdown = tokio_util::sync::CancellationToken::new();
1390 let shutdown_clone = shutdown.clone();
1391
1392 let total_bytes = Arc::new(AtomicU64::new(0));
1393 let total_bytes_clone = Arc::clone(&total_bytes);
1394
1395 tokio::spawn(async move {
1396 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1397 shutdown_clone.cancel();
1398 });
1399
1400 let result = engine
1401 .run_raw_async(
1402 &transport,
1403 shutdown,
1404 |msg: &RawMessage| -> Result<usize, String> { Ok(msg.payload.len()) },
1405 |results, _tokens| {
1406 let tb = Arc::clone(&total_bytes_clone);
1407 async move {
1408 for len in results.iter().flatten() {
1409 tb.fetch_add(*len as u64, Ordering::Relaxed);
1410 }
1411 Ok(())
1412 }
1413 },
1414 None::<(
1415 std::time::Duration,
1416 fn() -> std::future::Ready<Result<(), EngineError>>,
1417 )>,
1418 )
1419 .await;
1420
1421 assert!(result.is_ok());
1422 assert!(total_bytes.load(Ordering::Relaxed) > 0);
1423 }
1424
1425 #[tokio::test]
1426 async fn run_async_sink_error_does_not_crash() {
1427 let config = crate::transport::memory::MemoryConfig {
1428 recv_timeout_ms: 50,
1429 ..Default::default()
1430 };
1431 let transport = crate::transport::memory::MemoryTransport::new(&config)
1432 .expect("memory transport with valid config must construct");
1433
1434 transport
1435 .inject(None, json_payload("events", 0))
1436 .await
1437 .unwrap();
1438
1439 let engine = default_engine();
1440 let shutdown = tokio_util::sync::CancellationToken::new();
1441 let shutdown_clone = shutdown.clone();
1442
1443 tokio::spawn(async move {
1444 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1445 shutdown_clone.cancel();
1446 });
1447
1448 let result = engine
1450 .run_async(
1451 &transport,
1452 shutdown,
1453 |_pm: &mut ParsedMessage| -> Result<(), String> { Ok(()) },
1454 |_results, _tokens| async { Err(EngineError::Sink("test sink error".into())) },
1455 None::<(
1456 std::time::Duration,
1457 fn() -> std::future::Ready<Result<(), EngineError>>,
1458 )>,
1459 )
1460 .await;
1461
1462 assert!(result.is_ok());
1464 }
1465 }
1466}