1use std::collections::VecDeque;
8use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use rustc_hash::FxHashMap;
12use tracing::debug;
13
14use super::types::{
15 RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction, WindowType,
16};
17use super::{evaluator, pipeline, Engine};
18use crate::event::{Event, SharedEvent};
19use crate::sequence::SequenceContext;
20
21impl Engine {
22 #[tracing::instrument(level = "trace", skip(self))]
24 pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
25 self.events_processed += 1;
26 self.process_inner(Arc::new(event)).await
27 }
28
29 #[tracing::instrument(level = "trace", skip(self))]
31 pub async fn process_shared(
32 &mut self,
33 event: SharedEvent,
34 ) -> Result<(), super::error::EngineError> {
35 self.events_processed += 1;
36 self.process_inner(event).await
37 }
38
39 #[tracing::instrument(level = "trace", skip(self))]
41 async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
42 if let Some(ref m) = self.metrics {
44 m.record_event(&event.event_type);
45 }
46
47 if let Some(ref tracker) = self.watermark_tracker {
49 if let Some(effective_wm) = tracker.effective_watermark() {
50 if event.timestamp < effective_wm {
51 let mut allowed = false;
53 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
54 for sn in stream_names.iter() {
55 if let Some(cfg) = self.late_data_configs.get(sn) {
56 if event.timestamp >= effective_wm - cfg.allowed_lateness {
57 allowed = true;
58 break;
59 }
60 }
61 }
62 }
63 if !allowed && !self.late_data_configs.is_empty() {
64 let mut routed = false;
66 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
67 for sn in stream_names.iter() {
68 if let Some(cfg) = self.late_data_configs.get(sn) {
69 if let Some(ref side_stream) = cfg.side_output_stream {
70 debug!(
71 "Routing late event to side-output '{}' type={} ts={}",
72 side_stream, event.event_type, event.timestamp
73 );
74 let mut late_event = (*event).clone();
76 late_event.event_type = side_stream.clone().into();
77 self.send_output(late_event);
78 routed = true;
79 break;
80 }
81 }
82 }
83 }
84 if !routed {
85 debug!(
86 "Dropping late event type={} ts={} (watermark={})",
87 event.event_type, event.timestamp, effective_wm
88 );
89 }
90 return Ok(());
91 }
92 }
93 }
94 }
95
96 let mut pending_events: VecDeque<(SharedEvent, usize)> =
99 VecDeque::from([(event.clone(), 0)]);
100 const MAX_CHAIN_DEPTH: usize = 10;
101
102 if let Some(ref mut tracker) = self.watermark_tracker {
104 tracker.observe_event(&event.event_type, event.timestamp);
105
106 if let Some(new_wm) = tracker.effective_watermark() {
107 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
108 self.last_applied_watermark = Some(new_wm);
109 }
112 }
113 }
114
115 while let Some((current_event, depth)) = pending_events.pop_front() {
117 if depth >= MAX_CHAIN_DEPTH {
119 debug!(
120 "Max chain depth reached for event type: {}",
121 current_event.event_type
122 );
123 continue;
124 }
125
126 let stream_names: Arc<[String]> = self
129 .router
130 .get_routes(¤t_event.event_type)
131 .cloned()
132 .unwrap_or_else(|| Arc::from([]));
133
134 for stream_name in stream_names.iter() {
135 if let Some(stream) = self.streams.get_mut(stream_name) {
136 let start = std::time::Instant::now();
137 let result = Self::process_stream_with_functions(
138 stream,
139 Arc::clone(¤t_event),
140 &self.functions,
141 self.sinks.cache(),
142 )
143 .await?;
144
145 if let Some(ref m) = self.metrics {
147 m.record_processing(stream_name, start.elapsed().as_secs_f64());
148 }
149
150 let send_outputs = result.emitted_events.is_empty()
155 && stream
156 .operations
157 .iter()
158 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
159
160 if self.output_channel.is_some() {
163 for emitted in &result.emitted_events {
164 self.output_events_emitted += 1;
165 if let Some(ref m) = self.metrics {
166 m.record_output_event("pipeline", &emitted.event_type);
167 }
168 self.send_output_shared(emitted);
169 }
170 if send_outputs {
171 for output in &result.output_events {
172 self.output_events_emitted += 1;
173 if let Some(ref m) = self.metrics {
174 m.record_output_event("pipeline", &output.event_type);
175 }
176 self.send_output_shared(output);
177 }
178 }
179 } else {
180 self.output_events_emitted += result.emitted_events.len() as u64;
182 if send_outputs {
183 self.output_events_emitted += result.output_events.len() as u64;
184 }
185 }
186
187 if !send_outputs {
190 self.output_events_emitted += result.sink_events_sent;
191 }
192
193 for output_event in result.output_events {
195 pending_events.push_back((output_event, depth + 1));
196 }
197 }
198 }
199 }
200
201 if let Some(ref m) = self.metrics {
203 m.set_stream_count(self.streams.len());
204 }
205
206 Ok(())
207 }
208
209 #[tracing::instrument(level = "trace", skip(self))]
215 pub async fn process_batch(
216 &mut self,
217 events: Vec<Event>,
218 ) -> Result<(), super::error::EngineError> {
219 if events.is_empty() {
220 return Ok(());
221 }
222
223 let batch_size = events.len();
224 self.events_processed += batch_size as u64;
225
226 if let Some(ref m) = self.metrics {
228 for event in &events {
229 m.record_event(&event.event_type);
230 }
231 }
232
233 let mut pending_events: VecDeque<(SharedEvent, usize)> =
236 VecDeque::with_capacity(batch_size + batch_size / 4);
237
238 for event in events {
240 pending_events.push_back((Arc::new(event), 0));
241 }
242
243 const MAX_CHAIN_DEPTH: usize = 10;
244
245 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
247
248 while let Some((current_event, depth)) = pending_events.pop_front() {
250 if depth >= MAX_CHAIN_DEPTH {
251 debug!(
252 "Max chain depth reached for event type: {}",
253 current_event.event_type
254 );
255 continue;
256 }
257
258 let stream_names: Arc<[String]> = self
260 .router
261 .get_routes(¤t_event.event_type)
262 .cloned()
263 .unwrap_or_else(|| Arc::from([]));
264
265 for stream_name in stream_names.iter() {
266 if let Some(stream) = self.streams.get_mut(stream_name) {
267 let start = std::time::Instant::now();
268 let result = Self::process_stream_with_functions(
269 stream,
270 Arc::clone(¤t_event),
271 &self.functions,
272 self.sinks.cache(),
273 )
274 .await?;
275
276 if let Some(ref m) = self.metrics {
278 m.record_processing(stream_name, start.elapsed().as_secs_f64());
279 }
280
281 self.output_events_emitted += result.emitted_events.len() as u64;
283 let has_emitted = !result.emitted_events.is_empty();
284 emitted_batch.extend(result.emitted_events);
285
286 let forward_outputs = !has_emitted
289 && stream
290 .operations
291 .iter()
292 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
293 if forward_outputs {
294 self.output_events_emitted += result.output_events.len() as u64;
295 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
296 }
297
298 if !forward_outputs {
300 self.output_events_emitted += result.sink_events_sent;
301 }
302
303 for output_event in result.output_events {
305 pending_events.push_back((output_event, depth + 1));
306 }
307 }
308 }
309 }
310
311 for emitted in &emitted_batch {
314 self.send_output_shared(emitted);
315 }
316
317 if let Some(ref m) = self.metrics {
319 for emitted in &emitted_batch {
320 m.record_output_event("pipeline", &emitted.event_type);
321 }
322 m.set_stream_count(self.streams.len());
323 }
324
325 Ok(())
326 }
327
328 pub fn process_batch_sync(
332 &mut self,
333 events: Vec<Event>,
334 ) -> Result<(), super::error::EngineError> {
335 if events.is_empty() {
336 return Ok(());
337 }
338
339 let batch_size = events.len();
340 self.events_processed += batch_size as u64;
341
342 let mut pending_events: VecDeque<(SharedEvent, usize)> =
345 VecDeque::with_capacity(batch_size + batch_size / 4);
346
347 for event in events {
349 pending_events.push_back((Arc::new(event), 0));
350 }
351
352 const MAX_CHAIN_DEPTH: usize = 10;
353
354 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
356
357 while let Some((current_event, depth)) = pending_events.pop_front() {
359 if depth >= MAX_CHAIN_DEPTH {
360 debug!(
361 "Max chain depth reached for event type: {}",
362 current_event.event_type
363 );
364 continue;
365 }
366
367 let stream_names: Arc<[String]> = self
369 .router
370 .get_routes(¤t_event.event_type)
371 .cloned()
372 .unwrap_or_else(|| Arc::from([]));
373
374 for stream_name in stream_names.iter() {
375 if let Some(stream) = self.streams.get_mut(stream_name) {
376 let skip_rename = self.router.get_routes(stream_name).is_none();
378 let result = Self::process_stream_sync(
379 stream,
380 Arc::clone(¤t_event),
381 &self.functions,
382 skip_rename,
383 )?;
384
385 self.output_events_emitted += result.emitted_events.len() as u64;
387 let has_emitted = !result.emitted_events.is_empty();
388 emitted_batch.extend(result.emitted_events);
389
390 let forward_outputs = !has_emitted
393 && stream
394 .operations
395 .iter()
396 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
397 if forward_outputs {
398 self.output_events_emitted += result.output_events.len() as u64;
399 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
400 }
401
402 if !forward_outputs {
404 self.output_events_emitted += result.sink_events_sent;
405 }
406
407 for output_event in result.output_events {
409 pending_events.push_back((output_event, depth + 1));
410 }
411 }
412 }
413 }
414
415 for emitted in &emitted_batch {
418 self.send_output_shared(emitted);
419 }
420
421 Ok(())
422 }
423
424 fn process_stream_sync(
428 stream: &mut StreamDefinition,
429 event: SharedEvent,
430 functions: &FxHashMap<String, UserFunction>,
431 skip_output_rename: bool,
432 ) -> Result<StreamProcessResult, super::error::EngineError> {
433 if let RuntimeSource::Merge(ref sources) = stream.source {
435 let mut passes_filter = false;
436 for ms in sources {
437 if ms.event_type == *event.event_type {
438 if let Some(ref filter) = ms.filter {
439 let ctx = SequenceContext::new();
440 if let Some(result) = evaluator::eval_expr_with_functions(
441 filter,
442 &event,
443 &ctx,
444 functions,
445 &FxHashMap::default(),
446 ) {
447 if result.as_bool().unwrap_or(false) {
448 passes_filter = true;
449 break;
450 }
451 }
452 } else {
453 passes_filter = true;
454 break;
455 }
456 }
457 }
458 if !passes_filter {
459 return Ok(StreamProcessResult {
460 emitted_events: vec![],
461 output_events: vec![],
462 sink_events_sent: 0,
463 });
464 }
465 }
466
467 if matches!(stream.source, RuntimeSource::Join(_)) {
469 return Ok(StreamProcessResult {
470 emitted_events: vec![],
471 output_events: vec![],
472 sink_events_sent: 0,
473 });
474 }
475
476 pipeline::execute_pipeline_sync(
478 stream,
479 vec![event],
480 0,
481 pipeline::SkipFlags::none(),
482 functions,
483 skip_output_rename,
484 )
485 }
486
487 #[tracing::instrument(level = "trace", skip(self))]
489 pub async fn process_batch_shared(
490 &mut self,
491 events: Vec<SharedEvent>,
492 ) -> Result<(), super::error::EngineError> {
493 if events.is_empty() {
494 return Ok(());
495 }
496
497 let batch_size = events.len();
498 self.events_processed += batch_size as u64;
499
500 if let Some(ref m) = self.metrics {
502 for event in &events {
503 m.record_event(&event.event_type);
504 }
505 }
506
507 let mut pending_events: VecDeque<(SharedEvent, usize)> =
509 VecDeque::with_capacity(batch_size + batch_size / 4);
510
511 for event in events {
512 pending_events.push_back((event, 0));
513 }
514
515 const MAX_CHAIN_DEPTH: usize = 10;
516
517 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
518
519 while let Some((current_event, depth)) = pending_events.pop_front() {
521 if depth >= MAX_CHAIN_DEPTH {
522 debug!(
523 "Max chain depth reached for event type: {}",
524 current_event.event_type
525 );
526 continue;
527 }
528
529 let stream_names: Arc<[String]> = self
530 .router
531 .get_routes(¤t_event.event_type)
532 .cloned()
533 .unwrap_or_else(|| Arc::from([]));
534
535 for stream_name in stream_names.iter() {
536 if let Some(stream) = self.streams.get_mut(stream_name) {
537 let start = std::time::Instant::now();
538 let result = Self::process_stream_with_functions(
539 stream,
540 Arc::clone(¤t_event),
541 &self.functions,
542 self.sinks.cache(),
543 )
544 .await?;
545
546 if let Some(ref m) = self.metrics {
547 m.record_processing(stream_name, start.elapsed().as_secs_f64());
548 }
549
550 self.output_events_emitted += result.emitted_events.len() as u64;
551 let has_emitted = !result.emitted_events.is_empty();
552 emitted_batch.extend(result.emitted_events);
553
554 let forward_outputs = !has_emitted
557 && stream
558 .operations
559 .iter()
560 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
561 if forward_outputs {
562 self.output_events_emitted += result.output_events.len() as u64;
563 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
564 }
565
566 if !forward_outputs {
568 self.output_events_emitted += result.sink_events_sent;
569 }
570
571 for output_event in result.output_events {
572 pending_events.push_back((output_event, depth + 1));
573 }
574 }
575 }
576 }
577
578 for emitted in &emitted_batch {
580 self.send_output_shared(emitted);
581 }
582
583 if let Some(ref m) = self.metrics {
585 for emitted in &emitted_batch {
586 m.record_output_event("pipeline", &emitted.event_type);
587 }
588 m.set_stream_count(self.streams.len());
589 }
590
591 Ok(())
592 }
593
594 async fn process_stream_with_functions(
595 stream: &mut StreamDefinition,
596 event: SharedEvent,
597 functions: &FxHashMap<String, UserFunction>,
598 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
599 ) -> Result<StreamProcessResult, super::error::EngineError> {
600 if let RuntimeSource::Merge(ref sources) = stream.source {
602 let mut passes_filter = false;
603 let mut matched_source_name = None;
604 for ms in sources {
605 if ms.event_type == *event.event_type {
606 if let Some(ref filter) = ms.filter {
607 let ctx = SequenceContext::new();
608 if let Some(result) = evaluator::eval_expr_with_functions(
609 filter,
610 &event,
611 &ctx,
612 functions,
613 &FxHashMap::default(),
614 ) {
615 if result.as_bool().unwrap_or(false) {
616 passes_filter = true;
617 matched_source_name = Some(&ms.name);
618 break;
619 }
620 }
621 } else {
622 passes_filter = true;
624 matched_source_name = Some(&ms.name);
625 break;
626 }
627 }
628 }
629 if !passes_filter {
630 return Ok(StreamProcessResult {
631 emitted_events: vec![],
632 output_events: vec![],
633 sink_events_sent: 0,
634 });
635 }
636 if let Some(source_name) = matched_source_name {
638 tracing::trace!("Event matched merge source: {}", source_name);
639 }
640 }
641
642 if let RuntimeSource::Join(ref _sources) = stream.source {
644 if let Some(ref mut join_buffer) = stream.join_buffer {
645 let source_name = stream
648 .event_type_to_source
649 .get(&*event.event_type)
650 .cloned()
651 .unwrap_or_else(|| event.event_type.to_string());
652
653 tracing::debug!(
654 "Join stream {}: Adding event from source '{}' (event_type: {})",
655 stream.name,
656 source_name,
657 event.event_type
658 );
659
660 match join_buffer.add_event(&source_name, (*event).clone()) {
662 Some(correlated_event) => {
663 tracing::debug!(
664 "Join stream {}: Correlated event with {} fields",
665 stream.name,
666 correlated_event.data.len()
667 );
668 return Self::process_join_result(
670 stream,
671 Arc::new(correlated_event),
672 functions,
673 sinks,
674 )
675 .await;
676 }
677 None => {
678 tracing::debug!(
680 "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
681 stream.name,
682 join_buffer.stats()
683 );
684 return Ok(StreamProcessResult {
685 emitted_events: vec![],
686 output_events: vec![],
687 sink_events_sent: 0,
688 });
689 }
690 }
691 }
692 tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
693 return Ok(StreamProcessResult {
694 emitted_events: vec![],
695 output_events: vec![],
696 sink_events_sent: 0,
697 });
698 }
699
700 pipeline::execute_pipeline(
702 stream,
703 vec![event],
704 0,
705 pipeline::SkipFlags::none(),
706 functions,
707 sinks,
708 )
709 .await
710 }
711
712 async fn process_join_result(
714 stream: &mut StreamDefinition,
715 correlated_event: SharedEvent,
716 functions: &FxHashMap<String, UserFunction>,
717 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
718 ) -> Result<StreamProcessResult, super::error::EngineError> {
719 pipeline::execute_pipeline(
721 stream,
722 vec![correlated_event],
723 0,
724 pipeline::SkipFlags::for_join(),
725 functions,
726 sinks,
727 )
728 .await
729 }
730
731 #[tracing::instrument(skip(self))]
738 pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
739 let now = chrono::Utc::now();
740 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
741
742 for stream_name in stream_names {
743 let (window_idx, expired) = {
745 let stream = self.streams.get_mut(&stream_name).unwrap();
746 let mut result = Vec::new();
747 let mut found_idx = None;
748
749 for (idx, op) in stream.operations.iter_mut().enumerate() {
750 if let RuntimeOp::Window(window) = op {
751 match window {
752 WindowType::Session(w) => {
753 if let Some(events) = w.check_expired(now) {
754 result = events;
755 }
756 found_idx = Some(idx);
757 }
758 WindowType::PartitionedSession(w) => {
759 for (_key, events) in w.check_expired(now) {
760 result.extend(events);
761 }
762 found_idx = Some(idx);
763 }
764 _ => {}
765 }
766 if found_idx.is_some() {
768 break;
769 }
770 }
771 }
772 (found_idx, result)
773 };
774
775 if expired.is_empty() {
776 continue;
777 }
778
779 let window_idx = match window_idx {
780 Some(idx) => idx,
781 None => continue,
782 };
783
784 let result = Self::process_post_window(
786 self.streams.get_mut(&stream_name).unwrap(),
787 expired,
788 window_idx,
789 &self.functions,
790 self.sinks.cache(),
791 )
792 .await?;
793
794 for emitted in &result.emitted_events {
796 self.output_events_emitted += 1;
797 let owned = (**emitted).clone();
798 self.send_output(owned);
799 }
800 }
801
802 Ok(())
803 }
804
805 async fn process_post_window(
808 stream: &mut StreamDefinition,
809 events: Vec<SharedEvent>,
810 window_idx: usize,
811 functions: &FxHashMap<String, UserFunction>,
812 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
813 ) -> Result<StreamProcessResult, super::error::EngineError> {
814 pipeline::execute_pipeline(
816 stream,
817 events,
818 window_idx + 1,
819 pipeline::SkipFlags::for_post_window(),
820 functions,
821 sinks,
822 )
823 .await
824 }
825
826 #[tracing::instrument(skip(self))]
828 pub(super) async fn apply_watermark_to_windows(
829 &mut self,
830 wm: DateTime<Utc>,
831 ) -> Result<(), super::error::EngineError> {
832 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
833
834 for stream_name in stream_names {
835 let (window_idx, expired) = {
836 let stream = self.streams.get_mut(&stream_name).unwrap();
837 let mut result = Vec::new();
838 let mut found_idx = None;
839
840 for (idx, op) in stream.operations.iter_mut().enumerate() {
841 if let RuntimeOp::Window(window) = op {
842 let events: Option<Vec<SharedEvent>> = match window {
843 WindowType::Tumbling(w) => w.advance_watermark(wm),
844 WindowType::Sliding(w) => w.advance_watermark(wm),
845 WindowType::Session(w) => w.advance_watermark(wm),
846 WindowType::PartitionedTumbling(w) => {
847 let parts = w.advance_watermark(wm);
848 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
849 if all.is_empty() {
850 None
851 } else {
852 Some(all)
853 }
854 }
855 WindowType::PartitionedSliding(w) => {
856 let parts = w.advance_watermark(wm);
857 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
858 if all.is_empty() {
859 None
860 } else {
861 Some(all)
862 }
863 }
864 WindowType::PartitionedSession(w) => {
865 let parts = w.advance_watermark(wm);
866 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
867 if all.is_empty() {
868 None
869 } else {
870 Some(all)
871 }
872 }
873 _ => None, };
875
876 if let Some(evts) = events {
877 result = evts;
878 found_idx = Some(idx);
879 }
880 break;
881 }
882 }
883 (found_idx, result)
884 };
885
886 if expired.is_empty() {
887 continue;
888 }
889
890 let window_idx = match window_idx {
891 Some(idx) => idx,
892 None => continue,
893 };
894
895 let result = Self::process_post_window(
896 self.streams.get_mut(&stream_name).unwrap(),
897 expired,
898 window_idx,
899 &self.functions,
900 self.sinks.cache(),
901 )
902 .await?;
903
904 for emitted in &result.emitted_events {
905 self.output_events_emitted += 1;
906 let owned = (**emitted).clone();
907 self.send_output(owned);
908 }
909 }
910
911 Ok(())
912 }
913}