1use crate::event::{Event, SharedEvent};
8use crate::sequence::SequenceContext;
9use chrono::{DateTime, Utc};
10use rustc_hash::FxHashMap;
11use std::collections::VecDeque;
12use std::sync::Arc;
13use tracing::debug;
14
15use super::evaluator;
16use super::pipeline;
17use super::types::{
18 RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction, WindowType,
19};
20use super::Engine;
21
22impl Engine {
23 #[tracing::instrument(level = "trace", skip(self))]
25 pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
26 self.events_processed += 1;
27 self.process_inner(Arc::new(event)).await
28 }
29
30 #[tracing::instrument(level = "trace", skip(self))]
32 pub async fn process_shared(
33 &mut self,
34 event: SharedEvent,
35 ) -> Result<(), super::error::EngineError> {
36 self.events_processed += 1;
37 self.process_inner(event).await
38 }
39
40 #[tracing::instrument(level = "trace", skip(self))]
42 async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
43 if let Some(ref m) = self.metrics {
45 m.record_event(&event.event_type);
46 }
47
48 if let Some(ref tracker) = self.watermark_tracker {
50 if let Some(effective_wm) = tracker.effective_watermark() {
51 if event.timestamp < effective_wm {
52 let mut allowed = false;
54 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
55 for sn in stream_names.iter() {
56 if let Some(cfg) = self.late_data_configs.get(sn) {
57 if event.timestamp >= effective_wm - cfg.allowed_lateness {
58 allowed = true;
59 break;
60 }
61 }
62 }
63 }
64 if !allowed && !self.late_data_configs.is_empty() {
65 let mut routed = false;
67 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
68 for sn in stream_names.iter() {
69 if let Some(cfg) = self.late_data_configs.get(sn) {
70 if let Some(ref side_stream) = cfg.side_output_stream {
71 debug!(
72 "Routing late event to side-output '{}' type={} ts={}",
73 side_stream, event.event_type, event.timestamp
74 );
75 let mut late_event = (*event).clone();
77 late_event.event_type = side_stream.clone().into();
78 self.send_output(late_event);
79 routed = true;
80 break;
81 }
82 }
83 }
84 }
85 if !routed {
86 debug!(
87 "Dropping late event type={} ts={} (watermark={})",
88 event.event_type, event.timestamp, effective_wm
89 );
90 }
91 return Ok(());
92 }
93 }
94 }
95 }
96
97 let mut pending_events: VecDeque<(SharedEvent, usize)> =
100 VecDeque::from([(event.clone(), 0)]);
101 const MAX_CHAIN_DEPTH: usize = 10;
102
103 if let Some(ref mut tracker) = self.watermark_tracker {
105 tracker.observe_event(&event.event_type, event.timestamp);
106
107 if let Some(new_wm) = tracker.effective_watermark() {
108 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
109 self.last_applied_watermark = Some(new_wm);
110 }
113 }
114 }
115
116 while let Some((current_event, depth)) = pending_events.pop_front() {
118 if depth >= MAX_CHAIN_DEPTH {
120 debug!(
121 "Max chain depth reached for event type: {}",
122 current_event.event_type
123 );
124 continue;
125 }
126
127 let stream_names: Arc<[String]> = self
130 .router
131 .get_routes(¤t_event.event_type)
132 .cloned()
133 .unwrap_or_else(|| Arc::from([]));
134
135 for stream_name in stream_names.iter() {
136 if let Some(stream) = self.streams.get_mut(stream_name) {
137 let start = std::time::Instant::now();
138 let result = Self::process_stream_with_functions(
139 stream,
140 Arc::clone(¤t_event),
141 &self.functions,
142 self.sinks.cache(),
143 )
144 .await?;
145
146 if let Some(ref m) = self.metrics {
148 m.record_processing(stream_name, start.elapsed().as_secs_f64());
149 }
150
151 let send_outputs = result.emitted_events.is_empty()
156 && stream
157 .operations
158 .iter()
159 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
160
161 if self.output_channel.is_some() {
164 for emitted in &result.emitted_events {
165 self.output_events_emitted += 1;
166 if let Some(ref m) = self.metrics {
167 m.record_output_event("pipeline", &emitted.event_type);
168 }
169 self.send_output_shared(emitted);
170 }
171 if send_outputs {
172 for output in &result.output_events {
173 self.output_events_emitted += 1;
174 if let Some(ref m) = self.metrics {
175 m.record_output_event("pipeline", &output.event_type);
176 }
177 self.send_output_shared(output);
178 }
179 }
180 } else {
181 self.output_events_emitted += result.emitted_events.len() as u64;
183 if send_outputs {
184 self.output_events_emitted += result.output_events.len() as u64;
185 }
186 }
187
188 if !send_outputs {
191 self.output_events_emitted += result.sink_events_sent;
192 }
193
194 for output_event in result.output_events {
196 pending_events.push_back((output_event, depth + 1));
197 }
198 }
199 }
200 }
201
202 if let Some(ref m) = self.metrics {
204 m.set_stream_count(self.streams.len());
205 }
206
207 Ok(())
208 }
209
210 #[tracing::instrument(level = "trace", skip(self))]
216 pub async fn process_batch(
217 &mut self,
218 events: Vec<Event>,
219 ) -> Result<(), super::error::EngineError> {
220 if events.is_empty() {
221 return Ok(());
222 }
223
224 let batch_size = events.len();
225 self.events_processed += batch_size as u64;
226
227 if let Some(ref m) = self.metrics {
229 for event in &events {
230 m.record_event(&event.event_type);
231 }
232 }
233
234 let mut pending_events: VecDeque<(SharedEvent, usize)> =
237 VecDeque::with_capacity(batch_size + batch_size / 4);
238
239 for event in events {
241 pending_events.push_back((Arc::new(event), 0));
242 }
243
244 const MAX_CHAIN_DEPTH: usize = 10;
245
246 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
248
249 while let Some((current_event, depth)) = pending_events.pop_front() {
251 if depth >= MAX_CHAIN_DEPTH {
252 debug!(
253 "Max chain depth reached for event type: {}",
254 current_event.event_type
255 );
256 continue;
257 }
258
259 let stream_names: Arc<[String]> = self
261 .router
262 .get_routes(¤t_event.event_type)
263 .cloned()
264 .unwrap_or_else(|| Arc::from([]));
265
266 for stream_name in stream_names.iter() {
267 if let Some(stream) = self.streams.get_mut(stream_name) {
268 let start = std::time::Instant::now();
269 let result = Self::process_stream_with_functions(
270 stream,
271 Arc::clone(¤t_event),
272 &self.functions,
273 self.sinks.cache(),
274 )
275 .await?;
276
277 if let Some(ref m) = self.metrics {
279 m.record_processing(stream_name, start.elapsed().as_secs_f64());
280 }
281
282 self.output_events_emitted += result.emitted_events.len() as u64;
284 let has_emitted = !result.emitted_events.is_empty();
285 emitted_batch.extend(result.emitted_events);
286
287 let forward_outputs = !has_emitted
290 && stream
291 .operations
292 .iter()
293 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
294 if forward_outputs {
295 self.output_events_emitted += result.output_events.len() as u64;
296 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
297 }
298
299 if !forward_outputs {
301 self.output_events_emitted += result.sink_events_sent;
302 }
303
304 for output_event in result.output_events {
306 pending_events.push_back((output_event, depth + 1));
307 }
308 }
309 }
310 }
311
312 for emitted in &emitted_batch {
315 self.send_output_shared(emitted);
316 }
317
318 if let Some(ref m) = self.metrics {
320 for emitted in &emitted_batch {
321 m.record_output_event("pipeline", &emitted.event_type);
322 }
323 m.set_stream_count(self.streams.len());
324 }
325
326 Ok(())
327 }
328
329 pub fn process_batch_sync(
333 &mut self,
334 events: Vec<Event>,
335 ) -> Result<(), super::error::EngineError> {
336 if events.is_empty() {
337 return Ok(());
338 }
339
340 let batch_size = events.len();
341 self.events_processed += batch_size as u64;
342
343 let mut pending_events: VecDeque<(SharedEvent, usize)> =
346 VecDeque::with_capacity(batch_size + batch_size / 4);
347
348 for event in events {
350 pending_events.push_back((Arc::new(event), 0));
351 }
352
353 const MAX_CHAIN_DEPTH: usize = 10;
354
355 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
357
358 while let Some((current_event, depth)) = pending_events.pop_front() {
360 if depth >= MAX_CHAIN_DEPTH {
361 debug!(
362 "Max chain depth reached for event type: {}",
363 current_event.event_type
364 );
365 continue;
366 }
367
368 let stream_names: Arc<[String]> = self
370 .router
371 .get_routes(¤t_event.event_type)
372 .cloned()
373 .unwrap_or_else(|| Arc::from([]));
374
375 for stream_name in stream_names.iter() {
376 if let Some(stream) = self.streams.get_mut(stream_name) {
377 let skip_rename = self.router.get_routes(stream_name).is_none();
379 let result = Self::process_stream_sync(
380 stream,
381 Arc::clone(¤t_event),
382 &self.functions,
383 skip_rename,
384 )?;
385
386 self.output_events_emitted += result.emitted_events.len() as u64;
388 let has_emitted = !result.emitted_events.is_empty();
389 emitted_batch.extend(result.emitted_events);
390
391 let forward_outputs = !has_emitted
394 && stream
395 .operations
396 .iter()
397 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
398 if forward_outputs {
399 self.output_events_emitted += result.output_events.len() as u64;
400 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
401 }
402
403 if !forward_outputs {
405 self.output_events_emitted += result.sink_events_sent;
406 }
407
408 for output_event in result.output_events {
410 pending_events.push_back((output_event, depth + 1));
411 }
412 }
413 }
414 }
415
416 for emitted in &emitted_batch {
419 self.send_output_shared(emitted);
420 }
421
422 Ok(())
423 }
424
425 fn process_stream_sync(
429 stream: &mut StreamDefinition,
430 event: SharedEvent,
431 functions: &FxHashMap<String, UserFunction>,
432 skip_output_rename: bool,
433 ) -> Result<StreamProcessResult, super::error::EngineError> {
434 if let RuntimeSource::Merge(ref sources) = stream.source {
436 let mut passes_filter = false;
437 for ms in sources {
438 if ms.event_type == *event.event_type {
439 if let Some(ref filter) = ms.filter {
440 let ctx = SequenceContext::new();
441 if let Some(result) = evaluator::eval_expr_with_functions(
442 filter,
443 &event,
444 &ctx,
445 functions,
446 &FxHashMap::default(),
447 ) {
448 if result.as_bool().unwrap_or(false) {
449 passes_filter = true;
450 break;
451 }
452 }
453 } else {
454 passes_filter = true;
455 break;
456 }
457 }
458 }
459 if !passes_filter {
460 return Ok(StreamProcessResult {
461 emitted_events: vec![],
462 output_events: vec![],
463 sink_events_sent: 0,
464 });
465 }
466 }
467
468 if matches!(stream.source, RuntimeSource::Join(_)) {
470 return Ok(StreamProcessResult {
471 emitted_events: vec![],
472 output_events: vec![],
473 sink_events_sent: 0,
474 });
475 }
476
477 pipeline::execute_pipeline_sync(
479 stream,
480 vec![event],
481 0,
482 pipeline::SkipFlags::none(),
483 functions,
484 skip_output_rename,
485 )
486 }
487
488 #[tracing::instrument(level = "trace", skip(self))]
490 pub async fn process_batch_shared(
491 &mut self,
492 events: Vec<SharedEvent>,
493 ) -> Result<(), super::error::EngineError> {
494 if events.is_empty() {
495 return Ok(());
496 }
497
498 let batch_size = events.len();
499 self.events_processed += batch_size as u64;
500
501 if let Some(ref m) = self.metrics {
503 for event in &events {
504 m.record_event(&event.event_type);
505 }
506 }
507
508 let mut pending_events: VecDeque<(SharedEvent, usize)> =
510 VecDeque::with_capacity(batch_size + batch_size / 4);
511
512 for event in events {
513 pending_events.push_back((event, 0));
514 }
515
516 const MAX_CHAIN_DEPTH: usize = 10;
517
518 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
519
520 while let Some((current_event, depth)) = pending_events.pop_front() {
522 if depth >= MAX_CHAIN_DEPTH {
523 debug!(
524 "Max chain depth reached for event type: {}",
525 current_event.event_type
526 );
527 continue;
528 }
529
530 let stream_names: Arc<[String]> = self
531 .router
532 .get_routes(¤t_event.event_type)
533 .cloned()
534 .unwrap_or_else(|| Arc::from([]));
535
536 for stream_name in stream_names.iter() {
537 if let Some(stream) = self.streams.get_mut(stream_name) {
538 let start = std::time::Instant::now();
539 let result = Self::process_stream_with_functions(
540 stream,
541 Arc::clone(¤t_event),
542 &self.functions,
543 self.sinks.cache(),
544 )
545 .await?;
546
547 if let Some(ref m) = self.metrics {
548 m.record_processing(stream_name, start.elapsed().as_secs_f64());
549 }
550
551 self.output_events_emitted += result.emitted_events.len() as u64;
552 let has_emitted = !result.emitted_events.is_empty();
553 emitted_batch.extend(result.emitted_events);
554
555 let forward_outputs = !has_emitted
558 && stream
559 .operations
560 .iter()
561 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
562 if forward_outputs {
563 self.output_events_emitted += result.output_events.len() as u64;
564 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
565 }
566
567 if !forward_outputs {
569 self.output_events_emitted += result.sink_events_sent;
570 }
571
572 for output_event in result.output_events {
573 pending_events.push_back((output_event, depth + 1));
574 }
575 }
576 }
577 }
578
579 for emitted in &emitted_batch {
581 self.send_output_shared(emitted);
582 }
583
584 if let Some(ref m) = self.metrics {
586 for emitted in &emitted_batch {
587 m.record_output_event("pipeline", &emitted.event_type);
588 }
589 m.set_stream_count(self.streams.len());
590 }
591
592 Ok(())
593 }
594
595 async fn process_stream_with_functions(
596 stream: &mut StreamDefinition,
597 event: SharedEvent,
598 functions: &FxHashMap<String, UserFunction>,
599 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
600 ) -> Result<StreamProcessResult, super::error::EngineError> {
601 if let RuntimeSource::Merge(ref sources) = stream.source {
603 let mut passes_filter = false;
604 let mut matched_source_name = None;
605 for ms in sources {
606 if ms.event_type == *event.event_type {
607 if let Some(ref filter) = ms.filter {
608 let ctx = SequenceContext::new();
609 if let Some(result) = evaluator::eval_expr_with_functions(
610 filter,
611 &event,
612 &ctx,
613 functions,
614 &FxHashMap::default(),
615 ) {
616 if result.as_bool().unwrap_or(false) {
617 passes_filter = true;
618 matched_source_name = Some(&ms.name);
619 break;
620 }
621 }
622 } else {
623 passes_filter = true;
625 matched_source_name = Some(&ms.name);
626 break;
627 }
628 }
629 }
630 if !passes_filter {
631 return Ok(StreamProcessResult {
632 emitted_events: vec![],
633 output_events: vec![],
634 sink_events_sent: 0,
635 });
636 }
637 if let Some(source_name) = matched_source_name {
639 tracing::trace!("Event matched merge source: {}", source_name);
640 }
641 }
642
643 if let RuntimeSource::Join(ref _sources) = stream.source {
645 if let Some(ref mut join_buffer) = stream.join_buffer {
646 let source_name = stream
649 .event_type_to_source
650 .get(&*event.event_type)
651 .cloned()
652 .unwrap_or_else(|| event.event_type.to_string());
653
654 tracing::debug!(
655 "Join stream {}: Adding event from source '{}' (event_type: {})",
656 stream.name,
657 source_name,
658 event.event_type
659 );
660
661 match join_buffer.add_event(&source_name, (*event).clone()) {
663 Some(correlated_event) => {
664 tracing::debug!(
665 "Join stream {}: Correlated event with {} fields",
666 stream.name,
667 correlated_event.data.len()
668 );
669 return Self::process_join_result(
671 stream,
672 Arc::new(correlated_event),
673 functions,
674 sinks,
675 )
676 .await;
677 }
678 None => {
679 tracing::debug!(
681 "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
682 stream.name,
683 join_buffer.stats()
684 );
685 return Ok(StreamProcessResult {
686 emitted_events: vec![],
687 output_events: vec![],
688 sink_events_sent: 0,
689 });
690 }
691 }
692 }
693 tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
694 return Ok(StreamProcessResult {
695 emitted_events: vec![],
696 output_events: vec![],
697 sink_events_sent: 0,
698 });
699 }
700
701 pipeline::execute_pipeline(
703 stream,
704 vec![event],
705 0,
706 pipeline::SkipFlags::none(),
707 functions,
708 sinks,
709 )
710 .await
711 }
712
713 async fn process_join_result(
715 stream: &mut StreamDefinition,
716 correlated_event: SharedEvent,
717 functions: &FxHashMap<String, UserFunction>,
718 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
719 ) -> Result<StreamProcessResult, super::error::EngineError> {
720 pipeline::execute_pipeline(
722 stream,
723 vec![correlated_event],
724 0,
725 pipeline::SkipFlags::for_join(),
726 functions,
727 sinks,
728 )
729 .await
730 }
731
732 #[tracing::instrument(skip(self))]
739 pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
740 let now = chrono::Utc::now();
741 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
742
743 for stream_name in stream_names {
744 let (window_idx, expired) = {
746 let stream = self.streams.get_mut(&stream_name).unwrap();
747 let mut result = Vec::new();
748 let mut found_idx = None;
749
750 for (idx, op) in stream.operations.iter_mut().enumerate() {
751 if let RuntimeOp::Window(window) = op {
752 match window {
753 WindowType::Session(w) => {
754 if let Some(events) = w.check_expired(now) {
755 result = events;
756 }
757 found_idx = Some(idx);
758 }
759 WindowType::PartitionedSession(w) => {
760 for (_key, events) in w.check_expired(now) {
761 result.extend(events);
762 }
763 found_idx = Some(idx);
764 }
765 _ => {}
766 }
767 if found_idx.is_some() {
769 break;
770 }
771 }
772 }
773 (found_idx, result)
774 };
775
776 if expired.is_empty() {
777 continue;
778 }
779
780 let window_idx = match window_idx {
781 Some(idx) => idx,
782 None => continue,
783 };
784
785 let result = Self::process_post_window(
787 self.streams.get_mut(&stream_name).unwrap(),
788 expired,
789 window_idx,
790 &self.functions,
791 self.sinks.cache(),
792 )
793 .await?;
794
795 for emitted in &result.emitted_events {
797 self.output_events_emitted += 1;
798 let owned = (**emitted).clone();
799 self.send_output(owned);
800 }
801 }
802
803 Ok(())
804 }
805
806 async fn process_post_window(
809 stream: &mut StreamDefinition,
810 events: Vec<SharedEvent>,
811 window_idx: usize,
812 functions: &FxHashMap<String, UserFunction>,
813 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
814 ) -> Result<StreamProcessResult, super::error::EngineError> {
815 pipeline::execute_pipeline(
817 stream,
818 events,
819 window_idx + 1,
820 pipeline::SkipFlags::for_post_window(),
821 functions,
822 sinks,
823 )
824 .await
825 }
826
827 #[tracing::instrument(skip(self))]
829 pub(super) async fn apply_watermark_to_windows(
830 &mut self,
831 wm: DateTime<Utc>,
832 ) -> Result<(), super::error::EngineError> {
833 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
834
835 for stream_name in stream_names {
836 let (window_idx, expired) = {
837 let stream = self.streams.get_mut(&stream_name).unwrap();
838 let mut result = Vec::new();
839 let mut found_idx = None;
840
841 for (idx, op) in stream.operations.iter_mut().enumerate() {
842 if let RuntimeOp::Window(window) = op {
843 let events: Option<Vec<SharedEvent>> = match window {
844 WindowType::Tumbling(w) => w.advance_watermark(wm),
845 WindowType::Sliding(w) => w.advance_watermark(wm),
846 WindowType::Session(w) => w.advance_watermark(wm),
847 WindowType::PartitionedTumbling(w) => {
848 let parts = w.advance_watermark(wm);
849 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
850 if all.is_empty() {
851 None
852 } else {
853 Some(all)
854 }
855 }
856 WindowType::PartitionedSliding(w) => {
857 let parts = w.advance_watermark(wm);
858 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
859 if all.is_empty() {
860 None
861 } else {
862 Some(all)
863 }
864 }
865 WindowType::PartitionedSession(w) => {
866 let parts = w.advance_watermark(wm);
867 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
868 if all.is_empty() {
869 None
870 } else {
871 Some(all)
872 }
873 }
874 _ => None, };
876
877 if let Some(evts) = events {
878 result = evts;
879 found_idx = Some(idx);
880 }
881 break;
882 }
883 }
884 (found_idx, result)
885 };
886
887 if expired.is_empty() {
888 continue;
889 }
890
891 let window_idx = match window_idx {
892 Some(idx) => idx,
893 None => continue,
894 };
895
896 let result = Self::process_post_window(
897 self.streams.get_mut(&stream_name).unwrap(),
898 expired,
899 window_idx,
900 &self.functions,
901 self.sinks.cache(),
902 )
903 .await?;
904
905 for emitted in &result.emitted_events {
906 self.output_events_emitted += 1;
907 let owned = (**emitted).clone();
908 self.send_output(owned);
909 }
910 }
911
912 Ok(())
913 }
914}