1pub mod config;
10#[cfg(feature = "transport")]
11pub mod driver;
12pub mod intern;
13pub mod metrics;
14pub mod parse;
15pub mod pre_route;
16pub mod types;
17
18pub use config::{BatchProcessingConfig, ParseErrorAction, PreRouteFilterConfig};
19#[cfg(feature = "transport")]
20pub use driver::{CommitMode, ParsedBatch};
21pub use intern::FieldInterner;
22pub use types::{MessageMetadata, ParsedMessage, PreRouteResult};
23
24#[cfg(feature = "transport")]
30#[derive(Debug, thiserror::Error)]
31pub enum EngineError {
32 #[error("transport error: {0}")]
34 Transport(#[from] crate::TransportError),
35 #[error("sink error: {0}")]
37 Sink(String),
38 #[error("shutdown")]
40 Shutdown,
41 #[error(
45 "{0} inbound-filter DLQ entries were produced but no FilterDlqPolicy is \
46 configured -- set a policy via BatchEngine::with_filter_dlq_policy \
47 (Route to forward, or DiscardWithMetric to deliberately drop)"
48 )]
49 FilterDlqUnrouted(usize),
50 #[error("DLQ route failed: {0}")]
56 DlqRouteFailed(String),
57 #[error("parse failed (fail_batch): {0}")]
61 ParseBatchFailed(String),
62 #[error(
66 "CommitMode::SinkManaged unsupported on the streaming/governed driver; \
67 use CommitMode::Auto, or run_workbatch for sink-managed commits"
68 )]
69 SinkManagedUnsupported,
70}
71
72#[cfg(feature = "transport")]
79#[derive(Clone, Default)]
80pub enum FilterDlqPolicy {
81 #[default]
84 Reject,
85 DiscardWithMetric,
88 Route(
99 Arc<
100 dyn Fn(Vec<crate::transport::filter::FilteredDlqEntry>) -> Result<(), EngineError>
101 + Send
102 + Sync,
103 >,
104 ),
105}
106
107#[cfg(feature = "transport")]
108impl std::fmt::Debug for FilterDlqPolicy {
109 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110 match self {
111 Self::Reject => f.write_str("Reject"),
112 Self::DiscardWithMetric => f.write_str("DiscardWithMetric"),
113 Self::Route(_) => f.write_str("Route(..)"),
114 }
115 }
116}
117
118use std::sync::Arc;
119
120use super::pool::AdaptiveWorkerPool;
121use super::stats::PipelineStats;
122
123use self::pre_route::filters_from_config;
124#[cfg(feature = "transport")]
127use self::pre_route::{PreRouteOutcome, apply_filters, extract_routing_field};
128#[cfg(feature = "transport")]
129use self::types::PayloadFormat;
130use super::config::WorkerPoolConfig;
131
132pub struct BatchEngine {
156 config: BatchProcessingConfig,
157 pool: Arc<AdaptiveWorkerPool>,
158 stats: Arc<PipelineStats>,
159 interner: Arc<FieldInterner>,
160 filters: Vec<pre_route::PreRouteFilter>,
161 #[cfg(feature = "memory")]
162 memory_guard: Option<Arc<crate::memory::MemoryGuard>>,
163 #[cfg(feature = "transport")]
166 filter_dlq_policy: FilterDlqPolicy,
167 #[cfg(feature = "governor")]
174 byte_budget: Option<Arc<crate::governor::ByteBudgetController>>,
175}
176
177impl BatchEngine {
178 #[must_use]
183 pub fn new(config: BatchProcessingConfig) -> Self {
184 let pool = Arc::new(AdaptiveWorkerPool::new(WorkerPoolConfig::default()));
185 Self::with_pool(pool, config)
186 }
187
188 #[must_use]
193 pub fn with_pool(pool: Arc<AdaptiveWorkerPool>, config: BatchProcessingConfig) -> Self {
194 let known_refs: Vec<&str> = config.known_fields.iter().map(String::as_str).collect();
195 let interner = Arc::new(FieldInterner::with_known_fields(&known_refs));
196 let filters = filters_from_config(&config.pre_route_filters);
197 Self {
198 config,
199 pool,
200 stats: Arc::new(PipelineStats::new()),
201 interner,
202 filters,
203 #[cfg(feature = "memory")]
204 memory_guard: None,
205 #[cfg(feature = "transport")]
206 filter_dlq_policy: FilterDlqPolicy::default(),
207 #[cfg(feature = "governor")]
208 byte_budget: None,
209 }
210 }
211
212 #[cfg(feature = "governor")]
219 pub fn set_byte_budget(&mut self, budget: Arc<crate::governor::ByteBudgetController>) {
220 self.byte_budget = Some(budget);
221 }
222
223 #[cfg(feature = "governor")]
227 #[must_use]
228 pub fn is_self_regulated(&self) -> bool {
229 self.byte_budget.is_some()
230 }
231
232 #[cfg(feature = "transport")]
238 #[must_use]
239 pub fn with_filter_dlq_policy(mut self, policy: FilterDlqPolicy) -> Self {
240 self.filter_dlq_policy = policy;
241 self
242 }
243
244 pub fn from_cascade(key: &str) -> Result<Self, crate::config::ConfigError> {
250 let config = BatchProcessingConfig::from_cascade(key)?;
251 Ok(Self::new(config))
252 }
253
254 #[must_use]
256 pub fn stats(&self) -> &Arc<PipelineStats> {
257 &self.stats
258 }
259
260 #[must_use]
262 pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
263 &self.pool
264 }
265
266 #[must_use]
268 pub fn config(&self) -> &BatchProcessingConfig {
269 &self.config
270 }
271
272 pub fn auto_wire(
276 &mut self,
277 metrics_manager: &crate::metrics::MetricsManager,
278 #[cfg(feature = "memory")] memory_guard: Option<&Arc<crate::memory::MemoryGuard>>,
279 ) {
280 metrics::register(metrics_manager, &self.config);
281
282 #[cfg(feature = "memory")]
283 if let Some(guard) = memory_guard {
284 self.memory_guard = Some(Arc::clone(guard));
285 }
286 }
287
288 #[cfg(all(test, feature = "memory"))]
291 pub(crate) fn set_memory_guard_for_test(&mut self, guard: Arc<crate::memory::MemoryGuard>) {
292 self.memory_guard = Some(guard);
293 }
294
295 #[cfg(feature = "transport")]
306 pub fn process_mid_tier<O, E, F>(
307 &self,
308 messages: &[crate::transport::Record],
309 transform: F,
310 ) -> Vec<Result<O, E>>
311 where
312 O: Send,
313 E: Send + From<String>,
314 F: Fn(&mut ParsedMessage) -> Result<O, E> + Sync,
315 {
316 if messages.is_empty() {
317 return Vec::new();
318 }
319
320 let chunk_size = if self.config.max_chunk_size == 0 {
321 messages.len()
322 } else {
323 self.config.max_chunk_size
324 };
325
326 let has_routing = self.config.routing_field.is_some();
327 let mut all_results = Vec::with_capacity(messages.len());
328
329 for chunk in messages.chunks(chunk_size) {
330 self.stats.add_received(chunk.len() as u64);
331
332 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
334 self.stats.add_bytes_received(chunk_bytes);
335
336 let mut parsed_msgs: Vec<Result<ParsedMessage, String>> =
339 Vec::with_capacity(chunk.len());
340
341 for msg in chunk {
342 if has_routing {
344 let field_name = self.config.routing_field.as_ref().expect("checked above");
345 let extraction = extract_routing_field(&msg.payload, field_name);
346 let outcome = apply_filters(&extraction, &self.filters);
347
348 match outcome {
349 PreRouteOutcome::Continue => {}
350 PreRouteOutcome::Filtered => {
351 self.stats.incr_filtered();
352 continue; }
354 PreRouteOutcome::Dlq(reason) => {
355 self.stats.incr_dlq();
356 self.stats.incr_errors();
357 parsed_msgs.push(Err(reason));
358 continue;
359 }
360 }
361 }
362
363 let format: PayloadFormat = match PayloadFormat::from(msg.metadata.format) {
366 PayloadFormat::Auto => PayloadFormat::detect(&msg.payload),
367 other => other,
368 };
369
370 match parse::parse_payload(&msg.payload, format) {
371 Ok(value) => {
372 let extracted = self.interner.extract_known(&value);
373 let metadata = MessageMetadata {
376 timestamp_ms: msg.metadata.timestamp_ms,
377 format,
378 };
379 parsed_msgs.push(Ok(ParsedMessage {
380 value,
381 raw: msg.payload.clone(),
382 format,
383 key: msg.key.clone(),
384 headers: msg.headers.clone(),
385 metadata,
386 extracted,
387 }));
388 }
389 Err(e) => {
390 self.stats.incr_errors();
391 match self.config.parse_error_action {
392 ParseErrorAction::Dlq => {
393 self.stats.incr_dlq();
394 parsed_msgs.push(Err(format!("parse error: {e}")));
395 }
396 ParseErrorAction::Skip => {
397 }
399 ParseErrorAction::FailBatch => {
400 parsed_msgs.push(Err(format!("parse error (fail_batch): {e}")));
403 let results: Vec<Result<O, E>> = parsed_msgs
404 .into_iter()
405 .map(|r| match r {
406 Ok(_) => Err(E::from(
407 "batch failed due to parse error".to_string(),
408 )),
409 Err(reason) => Err(E::from(reason)),
410 })
411 .collect();
412 all_results.extend(results);
413 return all_results;
414 }
415 }
416 }
417 }
418 }
419
420 let mut indexed: Vec<(usize, Result<ParsedMessage, String>)> =
423 parsed_msgs.into_iter().enumerate().collect();
424
425 let mut chunk_results: Vec<(usize, Result<O, E>)> = Vec::with_capacity(indexed.len());
427 let mut to_transform: Vec<(usize, ParsedMessage)> = Vec::with_capacity(indexed.len());
428
429 for (idx, item) in indexed.drain(..) {
430 match item {
431 Ok(pm) => to_transform.push((idx, pm)),
432 Err(reason) => chunk_results.push((idx, Err(E::from(reason)))),
433 }
434 }
435
436 let transformed: Vec<(usize, Result<O, E>)> =
440 self.pool.map_owned(to_transform, |(idx, mut pm)| {
441 let result = transform(&mut pm);
442 (idx, result)
443 });
444
445 chunk_results.extend(transformed);
446
447 chunk_results.sort_by_key(|(idx, _)| *idx);
449
450 let ok_count = chunk_results.iter().filter(|(_, r)| r.is_ok()).count();
452 self.stats.add_processed(ok_count as u64);
453
454 all_results.extend(chunk_results.into_iter().map(|(_, r)| r));
455 }
456
457 all_results
458 }
459
460 #[cfg(feature = "transport")]
466 pub fn process_raw<O, E, F>(
467 &self,
468 messages: &[crate::transport::Record],
469 transform: F,
470 ) -> Vec<Result<O, E>>
471 where
472 O: Send,
473 E: Send + From<String>,
474 F: Fn(&crate::transport::Record) -> Result<O, E> + Sync,
475 {
476 if messages.is_empty() {
477 return Vec::new();
478 }
479
480 let chunk_size = if self.config.max_chunk_size == 0 {
481 messages.len()
482 } else {
483 self.config.max_chunk_size
484 };
485
486 let has_routing = self.config.routing_field.is_some();
487 let mut all_results = Vec::with_capacity(messages.len());
488
489 for chunk in messages.chunks(chunk_size) {
490 self.stats.add_received(chunk.len() as u64);
491
492 let chunk_bytes: u64 = chunk.iter().map(|m| m.payload.len() as u64).sum();
493 self.stats.add_bytes_received(chunk_bytes);
494
495 let to_process: Vec<&crate::transport::Record> = if has_routing {
497 let field_name = self.config.routing_field.as_ref().expect("checked above");
498 let mut passed = Vec::with_capacity(chunk.len());
499 for msg in chunk {
500 let extraction = extract_routing_field(&msg.payload, field_name);
501 let outcome = apply_filters(&extraction, &self.filters);
502 match outcome {
503 PreRouteOutcome::Continue => passed.push(msg),
504 PreRouteOutcome::Filtered => {
505 self.stats.incr_filtered();
506 }
507 PreRouteOutcome::Dlq(reason) => {
508 self.stats.incr_dlq();
509 self.stats.incr_errors();
510 all_results.push(Err(E::from(reason)));
511 }
512 }
513 }
514 passed
515 } else {
516 chunk.iter().collect()
517 };
518
519 let results = self.pool.process_batch(&to_process, |msg| transform(msg));
521
522 let ok_count = results.iter().filter(|r| r.is_ok()).count();
523 self.stats.add_processed(ok_count as u64);
524
525 all_results.extend(results);
526 }
527
528 all_results
529 }
530
531 #[cfg(feature = "transport")]
546 fn apply_workbatch_dlq_policy<T: crate::transport::CommitToken>(
547 &self,
548 mut batch: crate::transport::WorkBatch<T>,
549 ) -> Result<crate::transport::WorkBatch<T>, EngineError> {
550 if !batch.dlq_entries.is_empty() {
551 let entries = std::mem::take(&mut batch.dlq_entries);
552 self.route_dlq_entries(entries)?;
553 }
554 Ok(batch)
555 }
556
557 #[cfg(feature = "transport")]
576 pub(crate) fn route_dlq_entries(
577 &self,
578 entries: Vec<crate::transport::filter::FilteredDlqEntry>,
579 ) -> Result<(), EngineError> {
580 if entries.is_empty() {
581 return Ok(());
582 }
583 match &self.filter_dlq_policy {
584 FilterDlqPolicy::Reject => Err(EngineError::FilterDlqUnrouted(entries.len())),
585 FilterDlqPolicy::DiscardWithMetric => {
586 #[cfg(feature = "metrics")]
587 ::metrics::counter!("dfe_engine_filter_dlq_discarded_total")
588 .increment(entries.len() as u64);
589 Ok(())
590 }
591 FilterDlqPolicy::Route(sink) => sink(entries),
592 }
593 }
594}
595
596#[cfg(feature = "memory")]
604#[must_use = "the lease must be held for the lifetime of the in-flight block; \
605 dropping it immediately releases the reservation and corrupts the \
606 in-flight byte accounting"]
607pub(crate) struct IngressLease<'a> {
608 guard: &'a crate::memory::MemoryGuard,
609 bytes: u64,
610}
611
612#[cfg(feature = "memory")]
613impl<'a> IngressLease<'a> {
614 fn new(guard: &'a crate::memory::MemoryGuard, bytes: u64) -> Self {
618 Self { guard, bytes }
619 }
620}
621
622#[cfg(feature = "memory")]
623impl Drop for IngressLease<'_> {
624 fn drop(&mut self) {
625 self.guard.release(self.bytes);
626 }
627}
628
629impl std::fmt::Debug for BatchEngine {
630 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
631 let mut s = f.debug_struct("BatchEngine");
632 s.field("config", &self.config)
633 .field("pool_max_threads", &self.pool.max_threads())
634 .field("stats", &self.stats.snapshot())
635 .field("interner_len", &self.interner.len())
636 .field("filters", &self.filters);
637 #[cfg(feature = "memory")]
638 s.field("memory_guard", &self.memory_guard.is_some());
639 #[cfg(feature = "transport")]
640 s.field("filter_dlq_policy", &self.filter_dlq_policy);
641 #[cfg(feature = "governor")]
642 s.field("self_regulated", &self.byte_budget.is_some());
643 s.finish()
644 }
645}
646
647#[cfg(all(test, feature = "transport"))]
648mod engine_tests {
649 use super::*;
650 use crate::transport::{PayloadFormat as TPayloadFormat, Record, RecordMeta};
651 use bytes::Bytes;
652
653 fn make_json_messages(n: usize) -> Vec<Record> {
654 (0..n)
655 .map(|i| Record {
656 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
657 key: None,
658 headers: vec![],
659 metadata: RecordMeta {
660 timestamp_ms: None,
661 format: TPayloadFormat::Json,
662 },
663 })
664 .collect()
665 }
666
667 fn default_engine() -> BatchEngine {
668 BatchEngine::new(BatchProcessingConfig::default())
669 }
670
671 #[cfg(feature = "transport")]
672 #[test]
673 fn filter_dlq_policy_routes_discards_or_rejects() {
674 use crate::transport::WorkBatch;
675 use crate::transport::filter::FilteredDlqEntry;
676 use std::sync::Arc as StdArc;
677 use std::sync::atomic::{AtomicUsize, Ordering};
678
679 #[derive(Clone, Debug)]
681 struct TestTok;
682 impl std::fmt::Display for TestTok {
683 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
684 f.write_str("test")
685 }
686 }
687 impl crate::transport::CommitToken for TestTok {}
688
689 let entry = || FilteredDlqEntry {
690 payload: b"x".to_vec(),
691 key: None,
692 reason: "r".to_string(),
693 };
694 let batch_with = |n: usize| {
695 WorkBatch::<TestTok>::from_records(vec![])
696 .with_dlq_entries((0..n).map(|_| entry()).collect())
697 };
698
699 let eng = default_engine();
701 assert!(matches!(
702 eng.apply_workbatch_dlq_policy(batch_with(1)),
703 Err(EngineError::FilterDlqUnrouted(1))
704 ));
705 let passed = eng
707 .apply_workbatch_dlq_policy(WorkBatch::<TestTok>::from_records(vec![]))
708 .expect("no entries -> ok");
709 assert!(passed.dlq_entries.is_empty());
710
711 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::DiscardWithMetric);
713 let passed = eng
714 .apply_workbatch_dlq_policy(batch_with(1))
715 .expect("discard -> ok");
716 assert!(
717 passed.dlq_entries.is_empty(),
718 "entries consumed after routing"
719 );
720
721 let seen = StdArc::new(AtomicUsize::new(0));
723 let s = StdArc::clone(&seen);
724 let eng = default_engine().with_filter_dlq_policy(FilterDlqPolicy::Route(StdArc::new(
725 move |e: Vec<FilteredDlqEntry>| {
726 s.fetch_add(e.len(), Ordering::Relaxed);
727 Ok(())
728 },
729 )));
730 let passed = eng
731 .apply_workbatch_dlq_policy(batch_with(2))
732 .expect("route -> ok");
733 assert!(passed.dlq_entries.is_empty());
734 assert_eq!(
735 seen.load(Ordering::Relaxed),
736 2,
737 "Route sink received all entries"
738 );
739 }
740
741 #[cfg(all(feature = "memory", feature = "transport"))]
744 fn make_record_batch(n: usize) -> crate::transport::WorkBatch<TestTok> {
745 use crate::transport::{PayloadFormat, Record, RecordMeta};
746 let records = (0..n)
747 .map(|i| Record {
748 payload: Bytes::from(format!(r#"{{"_table":"events","id":{i}}}"#)),
749 key: None,
750 headers: vec![],
751 metadata: RecordMeta {
752 timestamp_ms: None,
753 format: PayloadFormat::Json,
754 },
755 })
756 .collect();
757 crate::transport::WorkBatch::from_records(records)
758 }
759
760 #[cfg(all(feature = "memory", feature = "transport"))]
762 #[derive(Debug, Clone)]
763 struct TestTok;
764 #[cfg(all(feature = "memory", feature = "transport"))]
765 impl std::fmt::Display for TestTok {
766 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
767 f.write_str("test")
768 }
769 }
770 #[cfg(all(feature = "memory", feature = "transport"))]
771 impl crate::transport::CommitToken for TestTok {}
772
773 #[cfg(all(feature = "memory", feature = "transport"))]
774 #[test]
775 fn ingress_lease_accounts_and_releases() {
776 use crate::memory::{MemoryGuard, MemoryGuardConfig};
777
778 let mut engine = default_engine();
779 let guard = Arc::new(MemoryGuard::new(MemoryGuardConfig {
780 limit_bytes: 1024 * 1024,
781 ..Default::default()
782 }));
783 engine.memory_guard = Some(Arc::clone(&guard));
784
785 let batch = make_record_batch(10);
786 let expected = batch.total_payload_bytes() as u64;
787 assert_eq!(guard.current_bytes(), 0, "starts at zero");
788
789 {
790 let _lease = engine.lease_ingress_batch(&batch).expect("guard present");
791 assert_eq!(
792 guard.current_bytes(),
793 expected,
794 "bytes accounted while lease held"
795 );
796 }
797 assert_eq!(guard.current_bytes(), 0, "bytes released on drop");
799 }
800
801 #[cfg(all(feature = "memory", feature = "transport"))]
802 #[test]
803 fn ingress_lease_none_without_guard() {
804 let engine = default_engine();
805 let batch = make_record_batch(5);
806 assert!(
807 engine.lease_ingress_batch(&batch).is_none(),
808 "no lease when no guard wired"
809 );
810 }
811
812 #[test]
813 fn process_mid_tier_basic() {
814 let engine = default_engine();
815 let msgs = make_json_messages(100);
816
817 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
818 Ok(pm
819 .field("_table")
820 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
821 .unwrap_or("unknown")
822 .to_string())
823 });
824
825 assert_eq!(results.len(), 100);
826 assert!(results.iter().all(|r| r.is_ok()));
827 assert_eq!(results[0].as_ref().unwrap(), "events");
828 }
829
830 #[test]
831 fn process_mid_tier_parse_error() {
832 let engine = default_engine();
833 let mut msgs = make_json_messages(2);
834 msgs.insert(
836 1,
837 Record {
838 payload: Bytes::from_static(b"not json {{{"),
839 key: None,
840 headers: vec![],
841 metadata: RecordMeta {
842 timestamp_ms: None,
843 format: TPayloadFormat::Json,
844 },
845 },
846 );
847
848 let results: Vec<Result<String, String>> =
849 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len().to_string()));
850
851 assert_eq!(results.len(), 3);
853 assert!(results[0].is_ok());
854 assert!(results[1].is_err());
855 assert!(results[1].as_ref().unwrap_err().contains("parse error"));
856 assert!(results[2].is_ok());
857 }
858
859 #[test]
860 fn process_mid_tier_empty_batch() {
861 let engine = default_engine();
862 let results: Vec<Result<(), String>> = engine.process_mid_tier(&[], |_| Ok(()));
863 assert!(results.is_empty());
864 }
865
866 #[test]
867 fn process_mid_tier_respects_chunk_size() {
868 let config = BatchProcessingConfig {
869 max_chunk_size: 50,
870 ..Default::default()
871 };
872 let engine = BatchEngine::new(config);
873 let msgs = make_json_messages(120);
874
875 let results: Vec<Result<usize, String>> =
876 engine.process_mid_tier(&msgs, |pm| Ok(pm.raw_payload().len()));
877
878 assert_eq!(results.len(), 120);
879 assert!(results.iter().all(|r| r.is_ok()));
880 let snap = engine.stats().snapshot();
882 assert_eq!(snap.received, 120);
883 }
884
885 #[test]
886 fn stats_updated_after_processing() {
887 let engine = default_engine();
888 let msgs = make_json_messages(10);
889
890 let _results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
891
892 let snap = engine.stats().snapshot();
893 assert_eq!(snap.received, 10);
894 assert_eq!(snap.processed, 10);
895 assert_eq!(snap.errors, 0);
896 assert_eq!(snap.filtered, 0);
897 }
898
899 #[test]
900 fn process_raw_passthrough() {
901 let engine = default_engine();
902 let msgs = make_json_messages(50);
903
904 let results: Vec<Result<usize, String>> =
905 engine.process_raw(&msgs, |msg| Ok(msg.payload.len()));
906
907 assert_eq!(results.len(), 50);
908 assert!(results.iter().all(|r| r.is_ok()));
909 assert!(results[0].as_ref().unwrap() > &0);
911
912 let snap = engine.stats().snapshot();
913 assert_eq!(snap.received, 50);
914 assert_eq!(snap.processed, 50);
915 }
916
917 #[test]
918 fn process_mid_tier_with_pre_route() {
919 let config = BatchProcessingConfig {
920 routing_field: Some("_table".to_string()),
921 pre_route_filters: vec![config::PreRouteFilterConfig::DlqFieldValue {
922 field: "_table".to_string(),
923 value: "poison".to_string(),
924 }],
925 ..Default::default()
926 };
927 let engine = BatchEngine::new(config);
928
929 let mut msgs = make_json_messages(3);
930 msgs[1] = Record {
932 payload: Bytes::from(r#"{"_table":"poison","id":999}"#),
933 key: None,
934 headers: vec![],
935 metadata: RecordMeta {
936 timestamp_ms: None,
937 format: TPayloadFormat::Json,
938 },
939 };
940
941 let results: Vec<Result<String, String>> = engine.process_mid_tier(&msgs, |pm| {
942 Ok(pm
943 .field("_table")
944 .and_then(|v| sonic_rs::JsonValueTrait::as_str(v))
945 .unwrap_or("?")
946 .to_string())
947 });
948
949 assert_eq!(results.len(), 3);
951 assert!(results[0].is_ok());
952 assert!(results[1].is_err());
953 assert!(results[1].as_ref().unwrap_err().contains("DLQ"));
954 assert!(results[2].is_ok());
955
956 let snap = engine.stats().snapshot();
957 assert_eq!(snap.dlq, 1);
958 assert_eq!(snap.errors, 1);
959 }
960
961 #[test]
962 fn process_mid_tier_filtered_not_in_results() {
963 let config = BatchProcessingConfig {
964 routing_field: Some("_table".to_string()),
965 pre_route_filters: vec![config::PreRouteFilterConfig::DropFieldMissing {
966 field: "_table".to_string(),
967 }],
968 ..Default::default()
969 };
970 let engine = BatchEngine::new(config);
971
972 let mut msgs = make_json_messages(3);
973 msgs[1] = Record {
975 payload: Bytes::from(r#"{"host":"web1"}"#),
976 key: None,
977 headers: vec![],
978 metadata: RecordMeta {
979 timestamp_ms: None,
980 format: TPayloadFormat::Json,
981 },
982 };
983
984 let results: Vec<Result<String, String>> =
985 engine.process_mid_tier(&msgs, |_pm| Ok("ok".to_string()));
986
987 assert_eq!(results.len(), 2);
989 assert!(results.iter().all(|r| r.is_ok()));
990
991 let snap = engine.stats().snapshot();
992 assert_eq!(snap.filtered, 1);
993 assert_eq!(snap.received, 3);
994 }
995
996 #[test]
997 fn from_cascade_creates_engine() {
998 let engine = BatchEngine::from_cascade("batch_processing").unwrap();
999 assert_eq!(engine.config().max_chunk_size, 10_000);
1000 }
1001
1002 #[test]
1003 fn accessors_return_expected_types() {
1004 let engine = default_engine();
1005 let _stats = engine.stats();
1006 let _pool = engine.pool();
1007 let _config = engine.config();
1008 assert_eq!(engine.stats().snapshot().received, 0);
1009 }
1010
1011 #[test]
1012 fn auto_wire_does_not_panic() {
1013 let mut engine = default_engine();
1014 let mgr = crate::metrics::MetricsManager::new_for_test("test_auto_wire");
1015 engine.auto_wire(
1016 &mgr,
1017 #[cfg(feature = "memory")]
1018 None,
1019 );
1020 let msgs = make_json_messages(5);
1022 let results: Vec<Result<(), String>> = engine.process_mid_tier(&msgs, |_| Ok(()));
1023 assert_eq!(results.len(), 5);
1024 }
1025
1026 #[test]
1027 fn debug_impl_works() {
1028 let engine = default_engine();
1029 let debug = format!("{engine:?}");
1030 assert!(debug.contains("BatchEngine"));
1031 assert!(debug.contains("config"));
1032 }
1033
1034 #[cfg(feature = "transport-memory")]
1039 mod driver_engine_tests {
1040 use super::*;
1041 use crate::transport::WorkBatch;
1042 use crate::worker::engine::CommitMode;
1043 use std::sync::atomic::{AtomicU64, Ordering};
1044
1045 fn json_payload(table: &str, id: usize) -> Vec<u8> {
1046 format!(r#"{{"_table":"{table}","id":{id}}}"#).into_bytes()
1047 }
1048
1049 #[allow(clippy::type_complexity)]
1051 fn no_ticker() -> Option<(
1052 std::time::Duration,
1053 fn() -> std::future::Ready<Result<(), EngineError>>,
1054 )> {
1055 None
1056 }
1057
1058 fn cancel_after(shutdown: tokio_util::sync::CancellationToken, ms: u64) {
1059 tokio::spawn(async move {
1060 tokio::time::sleep(std::time::Duration::from_millis(ms)).await;
1061 shutdown.cancel();
1062 });
1063 }
1064
1065 #[tokio::test]
1066 async fn run_workbatch_processes_and_passes_tokens_to_sink() {
1067 let config = crate::transport::memory::MemoryConfig {
1068 recv_timeout_ms: 50,
1069 ..Default::default()
1070 };
1071 let transport = crate::transport::memory::MemoryTransport::new(&config)
1072 .expect("memory transport with valid config must construct");
1073 for i in 0..5 {
1074 transport
1075 .inject(None, json_payload("events", i))
1076 .await
1077 .unwrap();
1078 }
1079
1080 let engine = default_engine();
1081 let shutdown = tokio_util::sync::CancellationToken::new();
1082 cancel_after(shutdown.clone(), 200);
1083
1084 let record_count = Arc::new(AtomicU64::new(0));
1085 let token_count = Arc::new(AtomicU64::new(0));
1086 let rc = Arc::clone(&record_count);
1087 let tc = Arc::clone(&token_count);
1088
1089 let result = engine
1090 .run_workbatch(
1091 &transport,
1092 shutdown,
1093 |batch| Ok(batch),
1094 |out: &WorkBatch<_>| {
1095 let rc = Arc::clone(&rc);
1096 let tc = Arc::clone(&tc);
1097 let records = out.records.len();
1098 let tokens = out.commit_tokens.len();
1099 async move {
1100 rc.fetch_add(records as u64, Ordering::Relaxed);
1101 tc.fetch_add(tokens as u64, Ordering::Relaxed);
1102 Ok(())
1103 }
1104 },
1105 CommitMode::SinkManaged,
1107 no_ticker(),
1108 )
1109 .await;
1110
1111 assert!(result.is_ok());
1112 assert_eq!(record_count.load(Ordering::Relaxed), 5);
1113 assert_eq!(token_count.load(Ordering::Relaxed), 5);
1114 }
1115
1116 #[tokio::test]
1117 async fn run_workbatch_ticker_fires() {
1118 let config = crate::transport::memory::MemoryConfig {
1119 recv_timeout_ms: 50,
1120 ..Default::default()
1121 };
1122 let transport = crate::transport::memory::MemoryTransport::new(&config)
1123 .expect("memory transport with valid config must construct");
1124 let engine = default_engine();
1125 let shutdown = tokio_util::sync::CancellationToken::new();
1126 cancel_after(shutdown.clone(), 350);
1127
1128 let tick_count = Arc::new(AtomicU64::new(0));
1129 let tick_count_clone = Arc::clone(&tick_count);
1130
1131 let result = engine
1132 .run_workbatch(
1133 &transport,
1134 shutdown,
1135 |batch| Ok(batch),
1136 |_out: &WorkBatch<_>| async { Ok(()) },
1137 CommitMode::Auto,
1138 Some((std::time::Duration::from_millis(100), move || {
1139 let tc = Arc::clone(&tick_count_clone);
1140 async move {
1141 tc.fetch_add(1, Ordering::Relaxed);
1142 Ok(())
1143 }
1144 })),
1145 )
1146 .await;
1147
1148 assert!(result.is_ok());
1149 let ticks = tick_count.load(Ordering::Relaxed);
1150 assert!(ticks >= 2, "Expected at least 2 ticks, got {ticks}");
1151 }
1152
1153 #[tokio::test]
1154 async fn run_workbatch_passthrough_without_parse() {
1155 let config = crate::transport::memory::MemoryConfig {
1156 recv_timeout_ms: 50,
1157 ..Default::default()
1158 };
1159 let transport = crate::transport::memory::MemoryTransport::new(&config)
1160 .expect("memory transport with valid config must construct");
1161 for i in 0..3 {
1162 transport
1163 .inject(None, json_payload("logs", i))
1164 .await
1165 .unwrap();
1166 }
1167
1168 let engine = default_engine();
1169 let shutdown = tokio_util::sync::CancellationToken::new();
1170 cancel_after(shutdown.clone(), 200);
1171
1172 let total_bytes = Arc::new(AtomicU64::new(0));
1173 let total_bytes_clone = Arc::clone(&total_bytes);
1174
1175 let result = engine
1177 .run_workbatch(
1178 &transport,
1179 shutdown,
1180 |batch| Ok(batch),
1181 |out: &WorkBatch<_>| {
1182 let tb = Arc::clone(&total_bytes_clone);
1183 let sum: u64 = out.records.iter().map(|r| r.payload.len() as u64).sum();
1184 async move {
1185 tb.fetch_add(sum, Ordering::Relaxed);
1186 Ok(())
1187 }
1188 },
1189 CommitMode::Auto,
1190 no_ticker(),
1191 )
1192 .await;
1193
1194 assert!(result.is_ok());
1195 assert!(total_bytes.load(Ordering::Relaxed) > 0);
1196 }
1197
1198 #[tokio::test]
1199 async fn run_workbatch_parsed_reads_field() {
1200 let config = crate::transport::memory::MemoryConfig {
1203 recv_timeout_ms: 50,
1204 ..Default::default()
1205 };
1206 let transport = crate::transport::memory::MemoryTransport::new(&config)
1207 .expect("memory transport with valid config must construct");
1208 for i in 0..4 {
1209 transport
1210 .inject(None, json_payload("events", i))
1211 .await
1212 .unwrap();
1213 }
1214
1215 let engine = default_engine();
1216 let shutdown = tokio_util::sync::CancellationToken::new();
1217 cancel_after(shutdown.clone(), 200);
1218
1219 let hits = Arc::new(AtomicU64::new(0));
1220 let hc = Arc::clone(&hits);
1221
1222 let result = engine
1223 .run_workbatch_parsed(
1224 &transport,
1225 shutdown,
1226 move |pb| {
1227 let field = pb.intern("_table");
1228 let mut local = 0u64;
1229 for parsed in &pb.parsed {
1230 if parsed.field_str(&field) == Some("events") {
1231 local += 1;
1232 }
1233 }
1234 hc.fetch_add(local, Ordering::Relaxed);
1235 Ok(WorkBatch::new(pb.records, pb.commit_tokens)
1236 .with_dlq_entries(pb.dlq_entries))
1237 },
1238 |_out: &WorkBatch<_>| async { Ok(()) },
1239 CommitMode::Auto,
1240 no_ticker(),
1241 )
1242 .await;
1243
1244 assert!(result.is_ok());
1245 assert_eq!(hits.load(Ordering::Relaxed), 4);
1246 }
1247
1248 #[tokio::test]
1249 async fn run_workbatch_sink_error_does_not_crash() {
1250 let config = crate::transport::memory::MemoryConfig {
1251 recv_timeout_ms: 50,
1252 ..Default::default()
1253 };
1254 let transport = crate::transport::memory::MemoryTransport::new(&config)
1255 .expect("memory transport with valid config must construct");
1256 transport
1257 .inject(None, json_payload("events", 0))
1258 .await
1259 .unwrap();
1260
1261 let engine = default_engine();
1262 let shutdown = tokio_util::sync::CancellationToken::new();
1263 cancel_after(shutdown.clone(), 200);
1264
1265 let result = engine
1269 .run_workbatch(
1270 &transport,
1271 shutdown,
1272 |batch| Ok(batch),
1273 |_out: &WorkBatch<_>| async {
1274 Err(EngineError::Sink("test sink error".into()))
1275 },
1276 CommitMode::Auto,
1277 no_ticker(),
1278 )
1279 .await;
1280
1281 assert!(
1282 matches!(result, Err(EngineError::Sink(_))),
1283 "sink error returns terminally without crashing: {result:?}"
1284 );
1285 }
1286 }
1287}