1pub mod config;
10#[cfg(feature = "transport")]
11pub mod driver;
12pub mod intern;
13pub mod metrics;
14pub mod parse;
15pub mod pre_route;
16pub mod types;
17
18pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
19#[cfg(feature = "transport")]
20pub use driver::{CommitMode, ParsedBatch};
21pub use intern::FieldInterner;
22pub use types::{MessageMetadata, ParsedMessage, PreRouteResult};
23
24#[cfg(feature = "transport")]
30#[derive(Debug, thiserror::Error)]
31pub enum EngineError {
32 #[error("transport error: {0}")]
34 Transport(#[from] crate::TransportError),
35 #[error("sink error: {0}")]
37 Sink(String),
38 #[error("shutdown")]
40 Shutdown,
41 #[error(
45 "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
46 configured -- set a policy via BatchEngine::with_filter_dlq_policy \
47 (Route to forward, or DiscardWithMetric to deliberately drop)"
48 )]
49 FilterDlqUnrouted(usize),
50 #[error("DLQ route failed: {0}")]
56 DlqRouteFailed(String),
57 #[error("parse failed (fail_batch): {0}")]
61 ParseBatchFailed(String),
62}
63
64#[cfg(feature = "transport")]
71#[derive(Clone, Default)]
72pub enum FilterDlqPolicy {
73 #[default]
76 Reject,
77 DiscardWithMetric,
80 Route(
91 Arc<
92 dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) -> Result<(), EngineError>
93 + Send
94 + Sync,
95 >,
96 ),
97}
98
99#[cfg(feature = "transport")]
100impl std::fmt::Debug for FilterDlqPolicy {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 match self {
103 Self::Reject => f.write_str("Reject"),
104 Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
105 Self::Route(_) => f.write_str("Route(..)"),
106 }
107 }
108}
109
110use std::sync::Arc;
111
112use super::pool::AdaptiveWorkerPool;
113use super::stats::PipelineStats;
114
115use self::pre_route::filters_from_config;
116#[cfg(feature = "transport")]
119use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field};
120#[cfg(feature = "transport")]
121use self::types::PayloadFormat;
122use super::config::WorkerPoolConfig;
123
124pub struct BatchEngine {
148 config: BatchProcessingConfig,
149 pool: Arc<AdaptiveWorkerPool>,
150 stats: Arc<PipelineStats>,
151 interner: Arc<FieldInterner>,
152 filters: Vec<pre_route::PreRouteFilter>,
153 #[cfg(feature = "memory")]
154 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
155 #[cfg(feature = "transport")]
158 filter_dlq_policy: FilterDlqPolicy,
159 #[cfg(feature = "governor")]
166 byte_budget: Option<Arc<crate::governor::ByteBudgetController>>,
167}
168
169impl BatchEngine {
170 #[must_use]
175 pub fn new(config: BatchProcessingConfig) -> Self {
176 let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
177 Self::with_pool(pool, config)
178 }
179
180 #[must_use]
185 pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
186 let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
187 let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
188 let filters = filters_from_config(&config.pre_route_filters);
189 Self {
190 config,
191 pool,
192 stats: Arc::new(PipelineStats::new()),
193 interner,
194 filters,
195 #[cfg(feature = "memory")]
196 memory_guard: None,
197 #[cfg(feature = "transport")]
198 filter_dlq_policy: FilterDlqPolicy::default(),
199 #[cfg(feature = "governor")]
200 byte_budget: None,
201 }
202 }
203
204 #[cfg(feature = "governor")]
211 pub fn set_byte_budget(&mut self, budget: Arc<crate::governor::ByteBudgetController>) {
212 self.byte_budget = Some(budget);
213 }
214
215 #[cfg(feature = "governor")]
219 #[must_use]
220 pub fn is_self_regulated(&self) -> bool {
221 self.byte_budget.is_some()
222 }
223
224 #[cfg(feature = "transport")]
230 #[must_use]
231 pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
232 self.filter_dlq_policy = policy;
233 self
234 }
235
236 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
242 let config = BatchProcessingConfig::from_cascade(key)?;
243 Ok(Self::new(config))
244 }
245
246 #[must_use]
248 pub fn stats(&self) -> &Arc<PipelineStats> {
249 &self.stats
250 }
251
252 #[must_use]
254 pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
255 &self.pool
256 }
257
258 #[must_use]
260 pub fn config(&self) -> &BatchProcessingConfig {
261 &self.config
262 }
263
264 pub fn auto_wire(
268 &mut self,
269 metrics_manager: &crate::metrics::MetricsManager,
270 #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
271 ) {
272 metrics::register(metrics_manager, &self.config);
273
274 #[cfg(feature = "memory")]
275 if let Some(guard) = memory_guard {
276 self.memory_guard = Some(Arc::clone(guard));
277 }
278 }
279
280 #[cfg(all(test, feature = "memory"))]
283 pub(crate) fn set_memory_guard_for_test(&mut self, guard: Arc<crate::memory::MemoryGuard>) {
284 self.memory_guard = Some(guard);
285 }
286
287 #[cfg(feature = "transport")]
298 pub fn process_mid_tier<O, E, F>(
299 &self,
300 messages: &[crate::transport::Record],
301 transform: F,
302 ) -> Vec<Result<O, E>>
303 where
304 O: Send,
305 E: Send + From<String>,
306 F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
307 {
308 if messages.is_empty() {
309 return Vec::new();
310 }
311
312 let chunk_size = if self.config.max_chunk_size == 0 {
313 messages.len()
314 } else {
315 self.config.max_chunk_size
316 };
317
318 let has_routing = self.config.routing_field.is_some();
319 let mut all_results = Vec::with_capacity(messages.len());
320
321 for chunk in messages.chunks(chunk_size) {
322 self.stats.add_received(chunk.len() as u64);
323
324 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
326 self.stats.add_bytes_received(chunk_bytes);
327
328 let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
331 Vec::with_capacity(chunk.len());
332
333 for msg in chunk {
334 if has_routing {
336 let field_name = self.config.routing_field.as_ref().expect("checked above");
337 let extraction = extract_routing_field(&msg.payload, field_name);
338 let outcome = apply_filters(&extraction, &self.filters);
339
340 match outcome {
341 PreRouteOutcome::Continue => {}
342 PreRouteOutcome::Filtered => {
343 self.stats.incr_filtered();
344 continue; }
346 PreRouteOutcome::Dlq(reason) => {
347 self.stats.incr_dlq();
348 self.stats.incr_errors();
349 parsed_msgs.push(Err(reason));
350 continue;
351 }
352 }
353 }
354
355 let format: PayloadFormat = match PayloadFormat::from(msg.metadata.format) {
358 PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
359 other => other,
360 };
361
362 match parse::parse_payload(&msg.payload, format) {
363 Ok(value) => {
364 let extracted = self.interner.extract_known(&value);
365 let metadata = MessageMetadata {
368 timestamp_ms: msg.metadata.timestamp_ms,
369 format,
370 };
371 parsed_msgs.push(Ok(ParsedMessage::Parsed {
372 value,
373 raw: msg.payload.clone(),
374 format,
375 key: msg.key.clone(),
376 headers: msg.headers.clone(),
377 metadata,
378 extracted,
379 }));
380 }
381 Err(e) => {
382 self.stats.incr_errors();
383 match self.config.parse_error_action {
384 ParseErrorAction::Dlq => {
385 self.stats.incr_dlq();
386 parsed_msgs.push(Err(format!("parse error: {e}")));
387 }
388 ParseErrorAction::Skip => {
389 }
391 ParseErrorAction::FailBatch => {
392 parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
395 let results: Vec<Result<O, E>> = parsed_msgs
396 .into_iter()
397 .map(|r| match r {
398 Ok(_) => Err(E::from(
399 "batch failed due to parse error".to_string(),
400 )),
401 Err(reason) => Err(E::from(reason)),
402 })
403 .collect();
404 all_results.extend(results);
405 return all_results;
406 }
407 }
408 }
409 }
410 }
411
412 let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
415 parsed_msgs.into_iter().enumerate().collect();
416
417 let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
419 let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
420
421 for (idx, item) in indexed.drain(..) {
422 match item {
423 Ok(pm) => to_transform.push((idx, pm)),
424 Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
425 }
426 }
427
428 let transformed: Vec<(usize, Result<O, E>)> =
432 self.pool.map_owned(to_transform, |(idx, mut pm)| {
433 let result = transform(&mut pm);
434 (idx, result)
435 });
436
437 chunk_results.extend(transformed);
438
439 chunk_results.sort_by_key(|(idx, _)| *idx);
441
442 let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
444 self.stats.add_processed(ok_count as u64);
445
446 all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
447 }
448
449 all_results
450 }
451
452 #[cfg(feature = "transport")]
458 pub fn process_raw<O, E, F>(
459 &self,
460 messages: &[crate::transport::Record],
461 transform: F,
462 ) -> Vec<Result<O, E>>
463 where
464 O: Send,
465 E: Send + From<String>,
466 F: Fn(&crate::transport::Record) -> Result<O, E> + Sync,
467 {
468 if messages.is_empty() {
469 return Vec::new();
470 }
471
472 let chunk_size = if self.config.max_chunk_size == 0 {
473 messages.len()
474 } else {
475 self.config.max_chunk_size
476 };
477
478 let has_routing = self.config.routing_field.is_some();
479 let mut all_results = Vec::with_capacity(messages.len());
480
481 for chunk in messages.chunks(chunk_size) {
482 self.stats.add_received(chunk.len() as u64);
483
484 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
485 self.stats.add_bytes_received(chunk_bytes);
486
487 let to_process: Vec<&crate::transport::Record> = if has_routing {
489 let field_name = self.config.routing_field.as_ref().expect("checked above");
490 let mut passed = Vec::with_capacity(chunk.len());
491 for msg in chunk {
492 let extraction = extract_routing_field(&msg.payload, field_name);
493 let outcome = apply_filters(&extraction, &self.filters);
494 match outcome {
495 PreRouteOutcome::Continue => passed.push(msg),
496 PreRouteOutcome::Filtered => {
497 self.stats.incr_filtered();
498 }
499 PreRouteOutcome::Dlq(reason) => {
500 self.stats.incr_dlq();
501 self.stats.incr_errors();
502 all_results.push(Err(E::from(reason)));
503 }
504 }
505 }
506 passed
507 } else {
508 chunk.iter().collect()
509 };
510
511 let results = self.pool.process_batch(&to_process, |msg| transform(msg));
513
514 let ok_count = results.iter().filter(|r| r.is_ok()).count();
515 self.stats.add_processed(ok_count as u64);
516
517 all_results.extend(results);
518 }
519
520 all_results
521 }
522
523 #[cfg(feature = "transport")]
538 fn apply_workbatch_dlq_policy<T: crate::transport::CommitToken>(
539 &self,
540 mut batch: crate::transport::WorkBatch<T>,
541 ) -> Result<crate::transport::WorkBatch<T>, EngineError> {
542 if !batch.dlq_entries.is_empty() {
543 let entries = std::mem::take(&mut batch.dlq_entries);
544 self.route_dlq_entries(entries)?;
545 }
546 Ok(batch)
547 }
548
549 #[cfg(feature = "transport")]
568 pub(crate) fn route_dlq_entries(
569 &self,
570 entries: Vec<crate::transport::filter::FilteredDlqEntry>,
571 ) -> Result<(), EngineError> {
572 if entries.is_empty() {
573 return Ok(());
574 }
575 match &self.filter_dlq_policy {
576 FilterDlqPolicy::Reject => Err(EngineError::FilterDlqUnrouted(entries.len())),
577 FilterDlqPolicy::DiscardWithMetric => {
578 #[cfg(feature = "metrics")]
579 ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
580 .increment(entries.len() as u64);
581 Ok(())
582 }
583 FilterDlqPolicy::Route(sink) => sink(entries),
584 }
585 }
586}
587
588#[cfg(feature = "memory")]
596pub(crate) struct IngressLease<'a> {
597 guard: &'a crate::memory::MemoryGuard,
598 bytes: u64,
599}
600
601#[cfg(feature = "memory")]
602impl<'a> IngressLease<'a> {
603 fn new(guard: &'a crate::memory::MemoryGuard, bytes: u64) -> Self {
607 Self { guard, bytes }
608 }
609}
610
611#[cfg(feature = "memory")]
612impl Drop for IngressLease<'_> {
613 fn drop(&mut self) {
614 self.guard.release(self.bytes);
615 }
616}
617
618impl std::fmt::Debug for BatchEngine {
619 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620 let mut s = f.debug_struct("BatchEngine");
621 s.field("config", &self.config)
622 .field("pool_max_threads", &self.pool.max_threads())
623 .field("stats", &self.stats.snapshot())
624 .field("interner_len", &self.interner.len())
625 .field("filters", &self.filters);
626 #[cfg(feature = "memory")]
627 s.field("memory_guard", &self.memory_guard.is_some());
628 #[cfg(feature = "transport")]
629 s.field("filter_dlq_policy", &self.filter_dlq_policy);
630 #[cfg(feature = "governor")]
631 s.field("self_regulated", &self.byte_budget.is_some());
632 s.finish()
633 }
634}
635
636#[cfg(all(test, feature = "transport"))]
637mod engine_tests {
638 use super::*;
639 use crate::transport::{PayloadFormat as TPayloadFormat, Record, RecordMeta};
640 use bytes::Bytes;
641
642 fn make_json_messages(n: usize) -> Vec<Record> {
643 (0..n)
644 .map(|i| Record {
645 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
646 key: None,
647 headers: vec![],
648 metadata: RecordMeta {
649 timestamp_ms: None,
650 format: TPayloadFormat::Json,
651 },
652 })
653 .collect()
654 }
655
656 fn default_engine() -> BatchEngine {
657 BatchEngine::new(BatchProcessingConfig::default())
658 }
659
660 #[cfg(feature = "transport")]
661 #[test]
662 fn filter_dlq_policy_routes_discards_or_rejects() {
663 use crate::transport::WorkBatch;
664 use crate::transport::filter::FilteredDlqEntry;
665 use std::sync::Arc as StdArc;
666 use std::sync::atomic::{AtomicUsize, Ordering};
667
668 #[derive(Clone, Debug)]
670 struct TestTok;
671 impl std::fmt::Display for TestTok {
672 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673 f.write_str("test")
674 }
675 }
676 impl crate::transport::CommitToken for TestTok {}
677
678 let entry = || FilteredDlqEntry {
679 payload: b"x".to_vec(),
680 key: None,
681 reason: "r".to_string(),
682 };
683 let batch_with = |n: usize| {
684 WorkBatch::<TestTok>::from_records(vec![])
685 .with_dlq_entries((0..n).map(|_| entry()).collect())
686 };
687
688 let eng = default_engine();
690 assert!(matches!(
691 eng.apply_workbatch_dlq_policy(batch_with(1)),
692 Err(EngineError::FilterDlqUnrouted(1))
693 ));
694 let passed = eng
696 .apply_workbatch_dlq_policy(WorkBatch::<TestTok>::from_records(vec![]))
697 .expect("no entries -> ok");
698 assert!(passed.dlq_entries.is_empty());
699
700 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
702 let passed = eng
703 .apply_workbatch_dlq_policy(batch_with(1))
704 .expect("discard -> ok");
705 assert!(
706 passed.dlq_entries.is_empty(),
707 "entries consumed after routing"
708 );
709
710 let seen = StdArc::new(AtomicUsize::new(0));
712 let s = StdArc::clone(&seen);
713 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
714 move |e: Vec<FilteredDlqEntry>| {
715 s.fetch_add(e.len(), Ordering::Relaxed);
716 Ok(())
717 },
718 )));
719 let passed = eng
720 .apply_workbatch_dlq_policy(batch_with(2))
721 .expect("route -> ok");
722 assert!(passed.dlq_entries.is_empty());
723 assert_eq!(
724 seen.load(Ordering::Relaxed),
725 2,
726 "Route sink received all entries"
727 );
728 }
729
730 #[cfg(all(feature = "memory", feature = "transport"))]
733 fn make_record_batch(n: usize) -> crate::transport::WorkBatch<TestTok> {
734 use crate::transport::{PayloadFormat, Record, RecordMeta};
735 let records = (0..n)
736 .map(|i| Record {
737 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
738 key: None,
739 headers: vec![],
740 metadata: RecordMeta {
741 timestamp_ms: None,
742 format: PayloadFormat::Json,
743 },
744 })
745 .collect();
746 crate::transport::WorkBatch::from_records(records)
747 }
748
749 #[cfg(all(feature = "memory", feature = "transport"))]
751 #[derive(Debug, Clone)]
752 struct TestTok;
753 #[cfg(all(feature = "memory", feature = "transport"))]
754 impl std::fmt::Display for TestTok {
755 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756 f.write_str("test")
757 }
758 }
759 #[cfg(all(feature = "memory", feature = "transport"))]
760 impl crate::transport::CommitToken for TestTok {}
761
762 #[cfg(all(feature = "memory", feature = "transport"))]
763 #[test]
764 fn ingress_lease_accounts_and_releases() {
765 use crate::memory::{MemoryGuard, MemoryGuardConfig};
766
767 let mut engine = default_engine();
768 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
769 limit_bytes: 1024 * 1024,
770 ..Default::default()
771 }));
772 engine.memory_guard = Some(Arc::clone(&guard));
773
774 let batch = make_record_batch(10);
775 let expected = batch.total_payload_bytes() as u64;
776 assert_eq!(guard.current_bytes(), 0, "starts at zero");
777
778 {
779 let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
780 assert_eq!(
781 guard.current_bytes(),
782 expected,
783 "bytes accounted while lease held"
784 );
785 }
786 assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
788 }
789
790 #[cfg(all(feature = "memory", feature = "transport"))]
791 #[test]
792 fn ingress_lease_none_without_guard() {
793 let engine = default_engine();
794 let batch = make_record_batch(5);
795 assert!(
796 engine.lease_ingress_batch(&batch).is_none(),
797 "no lease when no guard wired"
798 );
799 }
800
801 #[test]
802 fn process_mid_tier_basic() {
803 let engine = default_engine();
804 let msgs = make_json_messages(100);
805
806 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
807 Ok(pm
808 .field("_table")
809 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
810 .unwrap_or("unknown")
811 .to_string())
812 });
813
814 assert_eq!(results.len(), 100);
815 assert!(results.iter().all(|r| r.is_ok()));
816 assert_eq!(results[0].as_ref().unwrap(), "events");
817 }
818
819 #[test]
820 fn process_mid_tier_parse_error() {
821 let engine = default_engine();
822 let mut msgs = make_json_messages(2);
823 msgs.insert(
825 1,
826 Record {
827 payload: Bytes::from_static(b"not json {{{"),
828 key: None,
829 headers: vec![],
830 metadata: RecordMeta {
831 timestamp_ms: None,
832 format: TPayloadFormat::Json,
833 },
834 },
835 );
836
837 let results: Vec<Result<String, String>> =
838 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
839
840 assert_eq!(results.len(), 3);
842 assert!(results[0].is_ok());
843 assert!(results[1].is_err());
844 assert!(results[1].as_ref().unwrap_err().contains("parse error"));
845 assert!(results[2].is_ok());
846 }
847
848 #[test]
849 fn process_mid_tier_empty_batch() {
850 let engine = default_engine();
851 let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
852 assert!(results.is_empty());
853 }
854
855 #[test]
856 fn process_mid_tier_respects_chunk_size() {
857 let config = BatchProcessingConfig {
858 max_chunk_size: 50,
859 ..Default::default()
860 };
861 let engine = BatchEngine::new(config);
862 let msgs = make_json_messages(120);
863
864 let results: Vec<Result<usize, String>> =
865 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
866
867 assert_eq!(results.len(), 120);
868 assert!(results.iter().all(|r| r.is_ok()));
869 let snap = engine.stats().snapshot();
871 assert_eq!(snap.received, 120);
872 }
873
874 #[test]
875 fn stats_updated_after_processing() {
876 let engine = default_engine();
877 let msgs = make_json_messages(10);
878
879 let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
880
881 let snap = engine.stats().snapshot();
882 assert_eq!(snap.received, 10);
883 assert_eq!(snap.processed, 10);
884 assert_eq!(snap.errors, 0);
885 assert_eq!(snap.filtered, 0);
886 }
887
888 #[test]
889 fn process_raw_passthrough() {
890 let engine = default_engine();
891 let msgs = make_json_messages(50);
892
893 let results: Vec<Result<usize, String>> =
894 engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
895
896 assert_eq!(results.len(), 50);
897 assert!(results.iter().all(|r| r.is_ok()));
898 assert!(results[0].as_ref().unwrap() > &0);
900
901 let snap = engine.stats().snapshot();
902 assert_eq!(snap.received, 50);
903 assert_eq!(snap.processed, 50);
904 }
905
906 #[test]
907 fn process_mid_tier_with_pre_route() {
908 let config = BatchProcessingConfig {
909 routing_field: Some("_table".to_string()),
910 pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
911 field: "_table".to_string(),
912 value: "poison".to_string(),
913 }],
914 ..Default::default()
915 };
916 let engine = BatchEngine::new(config);
917
918 let mut msgs = make_json_messages(3);
919 msgs[1] = Record {
921 payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
922 key: None,
923 headers: vec![],
924 metadata: RecordMeta {
925 timestamp_ms: None,
926 format: TPayloadFormat::Json,
927 },
928 };
929
930 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
931 Ok(pm
932 .field("_table")
933 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
934 .unwrap_or("?")
935 .to_string())
936 });
937
938 assert_eq!(results.len(), 3);
940 assert!(results[0].is_ok());
941 assert!(results[1].is_err());
942 assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
943 assert!(results[2].is_ok());
944
945 let snap = engine.stats().snapshot();
946 assert_eq!(snap.dlq, 1);
947 assert_eq!(snap.errors, 1);
948 }
949
950 #[test]
951 fn process_mid_tier_filtered_not_in_results() {
952 let config = BatchProcessingConfig {
953 routing_field: Some("_table".to_string()),
954 pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
955 field: "_table".to_string(),
956 }],
957 ..Default::default()
958 };
959 let engine = BatchEngine::new(config);
960
961 let mut msgs = make_json_messages(3);
962 msgs[1] = Record {
964 payload: Bytes::from(r#"{"host":"web1"}"#),
965 key: None,
966 headers: vec![],
967 metadata: RecordMeta {
968 timestamp_ms: None,
969 format: TPayloadFormat::Json,
970 },
971 };
972
973 let results: Vec<Result<String, String>> =
974 engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
975
976 assert_eq!(results.len(), 2);
978 assert!(results.iter().all(|r| r.is_ok()));
979
980 let snap = engine.stats().snapshot();
981 assert_eq!(snap.filtered, 1);
982 assert_eq!(snap.received, 3);
983 }
984
985 #[test]
986 fn from_cascade_creates_engine() {
987 let engine = BatchEngine::from_cascade("batch_processing").unwrap();
988 assert_eq!(engine.config().max_chunk_size, 10_000);
989 }
990
991 #[test]
992 fn accessors_return_expected_types() {
993 let engine = default_engine();
994 let _stats = engine.stats();
995 let _pool = engine.pool();
996 let _config = engine.config();
997 assert_eq!(engine.stats().snapshot().received, 0);
998 }
999
1000 #[test]
1001 fn auto_wire_does_not_panic() {
1002 let mut engine = default_engine();
1003 let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1004 engine.auto_wire(
1005 &mgr,
1006 #[cfg(feature = "memory")]
1007 None,
1008 );
1009 let msgs = make_json_messages(5);
1011 let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1012 assert_eq!(results.len(), 5);
1013 }
1014
1015 #[test]
1016 fn debug_impl_works() {
1017 let engine = default_engine();
1018 let debug = format!("{engine:?}");
1019 assert!(debug.contains("BatchEngine"));
1020 assert!(debug.contains("config"));
1021 }
1022
1023 #[cfg(feature = "transport-memory")]
1028 mod driver_engine_tests {
1029 use super::*;
1030 use crate::transport::WorkBatch;
1031 use crate::worker::engine::CommitMode;
1032 use std::sync::atomic::{AtomicU64, Ordering};
1033
1034 fn json_payload(table: &str, id: usize) -> Vec<u8> {
1035 format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1036 }
1037
1038 #[allow(clippy::type_complexity)]
1040 fn no_ticker() -> Option<(
1041 std::time::Duration,
1042 fn() -> std::future::Ready<Result<(), EngineError>>,
1043 )> {
1044 None
1045 }
1046
1047 fn cancel_after(shutdown: tokio_util::sync::CancellationToken, ms: u64) {
1048 tokio::spawn(async move {
1049 tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
1050 shutdown.cancel();
1051 });
1052 }
1053
1054 #[tokio::test]
1055 async fn run_workbatch_processes_and_passes_tokens_to_sink() {
1056 let config = crate::transport::memory::MemoryConfig {
1057 recv_timeout_ms: 50,
1058 ..Default::default()
1059 };
1060 let transport = crate::transport::memory::MemoryTransport::new(&config)
1061 .expect("memory transport with valid config must construct");
1062 for i in 0..5 {
1063 transport
1064 .inject(None, json_payload("events", i))
1065 .await
1066 .unwrap();
1067 }
1068
1069 let engine = default_engine();
1070 let shutdown = tokio_util::sync::CancellationToken::new();
1071 cancel_after(shutdown.clone(), 200);
1072
1073 let record_count = Arc::new(AtomicU64::new(0));
1074 let token_count = Arc::new(AtomicU64::new(0));
1075 let rc = Arc::clone(&record_count);
1076 let tc = Arc::clone(&token_count);
1077
1078 let result = engine
1079 .run_workbatch(
1080 &transport,
1081 shutdown,
1082 |batch| Ok(batch),
1083 |out: &WorkBatch<_>| {
1084 let rc = Arc::clone(&rc);
1085 let tc = Arc::clone(&tc);
1086 let records = out.records.len();
1087 let tokens = out.commit_tokens.len();
1088 async move {
1089 rc.fetch_add(records as u64, Ordering::Relaxed);
1090 tc.fetch_add(tokens as u64, Ordering::Relaxed);
1091 Ok(())
1092 }
1093 },
1094 CommitMode::SinkManaged,
1096 no_ticker(),
1097 )
1098 .await;
1099
1100 assert!(result.is_ok());
1101 assert_eq!(record_count.load(Ordering::Relaxed), 5);
1102 assert_eq!(token_count.load(Ordering::Relaxed), 5);
1103 }
1104
1105 #[tokio::test]
1106 async fn run_workbatch_ticker_fires() {
1107 let config = crate::transport::memory::MemoryConfig {
1108 recv_timeout_ms: 50,
1109 ..Default::default()
1110 };
1111 let transport = crate::transport::memory::MemoryTransport::new(&config)
1112 .expect("memory transport with valid config must construct");
1113 let engine = default_engine();
1114 let shutdown = tokio_util::sync::CancellationToken::new();
1115 cancel_after(shutdown.clone(), 350);
1116
1117 let tick_count = Arc::new(AtomicU64::new(0));
1118 let tick_count_clone = Arc::clone(&tick_count);
1119
1120 let result = engine
1121 .run_workbatch(
1122 &transport,
1123 shutdown,
1124 |batch| Ok(batch),
1125 |_out: &WorkBatch<_>| async { Ok(()) },
1126 CommitMode::Auto,
1127 Some((std::time::Duration::from_millis(100), move || {
1128 let tc = Arc::clone(&tick_count_clone);
1129 async move {
1130 tc.fetch_add(1, Ordering::Relaxed);
1131 Ok(())
1132 }
1133 })),
1134 )
1135 .await;
1136
1137 assert!(result.is_ok());
1138 let ticks = tick_count.load(Ordering::Relaxed);
1139 assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1140 }
1141
1142 #[tokio::test]
1143 async fn run_workbatch_passthrough_without_parse() {
1144 let config = crate::transport::memory::MemoryConfig {
1145 recv_timeout_ms: 50,
1146 ..Default::default()
1147 };
1148 let transport = crate::transport::memory::MemoryTransport::new(&config)
1149 .expect("memory transport with valid config must construct");
1150 for i in 0..3 {
1151 transport
1152 .inject(None, json_payload("logs", i))
1153 .await
1154 .unwrap();
1155 }
1156
1157 let engine = default_engine();
1158 let shutdown = tokio_util::sync::CancellationToken::new();
1159 cancel_after(shutdown.clone(), 200);
1160
1161 let total_bytes = Arc::new(AtomicU64::new(0));
1162 let total_bytes_clone = Arc::clone(&total_bytes);
1163
1164 let result = engine
1166 .run_workbatch(
1167 &transport,
1168 shutdown,
1169 |batch| Ok(batch),
1170 |out: &WorkBatch<_>| {
1171 let tb = Arc::clone(&total_bytes_clone);
1172 let sum: u64 = out.records.iter().map(|r| r.payload.len() as u64).sum();
1173 async move {
1174 tb.fetch_add(sum, Ordering::Relaxed);
1175 Ok(())
1176 }
1177 },
1178 CommitMode::Auto,
1179 no_ticker(),
1180 )
1181 .await;
1182
1183 assert!(result.is_ok());
1184 assert!(total_bytes.load(Ordering::Relaxed) > 0);
1185 }
1186
1187 #[tokio::test]
1188 async fn run_workbatch_parsed_reads_field() {
1189 let config = crate::transport::memory::MemoryConfig {
1192 recv_timeout_ms: 50,
1193 ..Default::default()
1194 };
1195 let transport = crate::transport::memory::MemoryTransport::new(&config)
1196 .expect("memory transport with valid config must construct");
1197 for i in 0..4 {
1198 transport
1199 .inject(None, json_payload("events", i))
1200 .await
1201 .unwrap();
1202 }
1203
1204 let engine = default_engine();
1205 let shutdown = tokio_util::sync::CancellationToken::new();
1206 cancel_after(shutdown.clone(), 200);
1207
1208 let hits = Arc::new(AtomicU64::new(0));
1209 let hc = Arc::clone(&hits);
1210
1211 let result = engine
1212 .run_workbatch_parsed(
1213 &transport,
1214 shutdown,
1215 move |pb| {
1216 let field = pb.intern("_table");
1217 let mut local = 0u64;
1218 for parsed in &pb.parsed {
1219 if parsed.field_str(&field) == Some("events") {
1220 local += 1;
1221 }
1222 }
1223 hc.fetch_add(local, Ordering::Relaxed);
1224 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1225 .with_dlq_entries(pb.dlq_entries))
1226 },
1227 |_out: &WorkBatch<_>| async { Ok(()) },
1228 CommitMode::Auto,
1229 no_ticker(),
1230 )
1231 .await;
1232
1233 assert!(result.is_ok());
1234 assert_eq!(hits.load(Ordering::Relaxed), 4);
1235 }
1236
1237 #[tokio::test]
1238 async fn run_workbatch_sink_error_does_not_crash() {
1239 let config = crate::transport::memory::MemoryConfig {
1240 recv_timeout_ms: 50,
1241 ..Default::default()
1242 };
1243 let transport = crate::transport::memory::MemoryTransport::new(&config)
1244 .expect("memory transport with valid config must construct");
1245 transport
1246 .inject(None, json_payload("events", 0))
1247 .await
1248 .unwrap();
1249
1250 let engine = default_engine();
1251 let shutdown = tokio_util::sync::CancellationToken::new();
1252 cancel_after(shutdown.clone(), 200);
1253
1254 let result = engine
1258 .run_workbatch(
1259 &transport,
1260 shutdown,
1261 |batch| Ok(batch),
1262 |_out: &WorkBatch<_>| async {
1263 Err(EngineError::Sink("test sink error".into()))
1264 },
1265 CommitMode::Auto,
1266 no_ticker(),
1267 )
1268 .await;
1269
1270 assert!(
1271 matches!(result, Err(EngineError::Sink(_))),
1272 "sink error returns terminally without crashing: {result:?}"
1273 );
1274 }
1275 }
1276}