1use std::collections::VecDeque;
8use std::sync::Arc;
9
10#[cfg(feature = "async-runtime")]
11use chrono::{DateTime, Utc};
12use rustc_hash::FxHashMap;
13use tracing::debug;
14
15use super::trace::TraceEntry;
16#[cfg(feature = "async-runtime")]
17use super::types::WindowType;
18use super::types::{RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction};
19use super::{evaluator, pipeline, Engine};
20use crate::event::{Event, SharedEvent};
21use crate::sequence::SequenceContext;
22
23impl Engine {
24 #[cfg(feature = "async-runtime")]
26 #[tracing::instrument(level = "trace", skip(self))]
27 pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
28 self.events_processed += 1;
29 self.process_inner(Arc::new(event)).await
30 }
31
32 #[cfg(feature = "async-runtime")]
34 #[tracing::instrument(level = "trace", skip(self))]
35 pub async fn process_shared(
36 &mut self,
37 event: SharedEvent,
38 ) -> Result<(), super::error::EngineError> {
39 self.events_processed += 1;
40 self.process_inner(event).await
41 }
42
43 #[cfg(feature = "async-runtime")]
45 #[tracing::instrument(level = "trace", skip(self))]
46 async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
47 if let Some(ref m) = self.metrics {
49 m.record_event(&event.event_type);
50 }
51
52 if let Some(ref tracker) = self.watermark_tracker {
54 if let Some(effective_wm) = tracker.effective_watermark() {
55 if event.timestamp < effective_wm {
56 let mut allowed = false;
58 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
59 for sn in stream_names.iter() {
60 if let Some(cfg) = self.late_data_configs.get(sn) {
61 if event.timestamp >= effective_wm - cfg.allowed_lateness {
62 allowed = true;
63 break;
64 }
65 }
66 }
67 }
68 if !allowed && !self.late_data_configs.is_empty() {
69 let mut routed = false;
71 if let Some(stream_names) = self.router.get_routes(&event.event_type) {
72 for sn in stream_names.iter() {
73 if let Some(cfg) = self.late_data_configs.get(sn) {
74 if let Some(ref side_stream) = cfg.side_output_stream {
75 debug!(
76 "Routing late event to side-output '{}' type={} ts={}",
77 side_stream, event.event_type, event.timestamp
78 );
79 let mut late_event = (*event).clone();
81 late_event.event_type = side_stream.clone().into();
82 self.send_output(late_event);
83 routed = true;
84 break;
85 }
86 }
87 }
88 }
89 if !routed {
90 debug!(
91 "Dropping late event type={} ts={} (watermark={})",
92 event.event_type, event.timestamp, effective_wm
93 );
94 }
95 return Ok(());
96 }
97 }
98 }
99 }
100
101 let mut pending_events: VecDeque<(SharedEvent, usize)> =
104 VecDeque::from([(event.clone(), 0)]);
105 const MAX_CHAIN_DEPTH: usize = 10;
106
107 if let Some(ref mut tracker) = self.watermark_tracker {
109 tracker.observe_event(&event.event_type, event.timestamp);
110
111 if let Some(new_wm) = tracker.effective_watermark() {
112 if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
113 self.last_applied_watermark = Some(new_wm);
114 }
117 }
118 }
119
120 while let Some((current_event, depth)) = pending_events.pop_front() {
122 if depth >= MAX_CHAIN_DEPTH {
124 debug!(
125 "Max chain depth reached for event type: {}",
126 current_event.event_type
127 );
128 continue;
129 }
130
131 let stream_names: Arc<[String]> = self
134 .router
135 .get_routes(¤t_event.event_type)
136 .cloned()
137 .unwrap_or_else(|| Arc::from([]));
138
139 for stream_name in stream_names.iter() {
140 if let Some(stream) = self.streams.get_mut(stream_name) {
141 if self.trace_collector.is_enabled() {
143 self.trace_collector.record(TraceEntry::StreamMatched {
144 stream_name: stream_name.clone(),
145 event_type: current_event.event_type.to_string(),
146 });
147 }
148
149 let start = std::time::Instant::now();
150 let result = Self::process_stream_with_functions(
151 stream,
152 Arc::clone(¤t_event),
153 &self.functions,
154 self.sinks.cache(),
155 )
156 .await?;
157
158 if self.trace_collector.is_enabled() {
160 Self::record_trace_for_result(
161 &mut self.trace_collector,
162 stream_name,
163 stream,
164 &result,
165 );
166 }
167
168 if let Some(ref m) = self.metrics {
170 m.record_processing(stream_name, start.elapsed().as_secs_f64());
171 }
172
173 let send_outputs = result.emitted_events.is_empty()
178 && stream
179 .operations
180 .iter()
181 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
182
183 if self.output_channel.is_some() {
186 for emitted in &result.emitted_events {
187 self.output_events_emitted += 1;
188 if let Some(ref m) = self.metrics {
189 m.record_output_event("pipeline", &emitted.event_type);
190 }
191 self.send_output_shared(emitted);
192 }
193 if send_outputs {
194 for output in &result.output_events {
195 self.output_events_emitted += 1;
196 if let Some(ref m) = self.metrics {
197 m.record_output_event("pipeline", &output.event_type);
198 }
199 self.send_output_shared(output);
200 }
201 }
202 } else {
203 self.output_events_emitted += result.emitted_events.len() as u64;
205 if send_outputs {
206 self.output_events_emitted += result.output_events.len() as u64;
207 }
208 }
209
210 if !send_outputs {
213 self.output_events_emitted += result.sink_events_sent;
214 }
215
216 for output_event in result.output_events {
218 pending_events.push_back((output_event, depth + 1));
219 }
220 }
221 }
222 }
223
224 if let Some(ref m) = self.metrics {
226 m.set_stream_count(self.streams.len());
227 }
228
229 Ok(())
230 }
231
232 #[cfg(feature = "async-runtime")]
234 #[tracing::instrument(level = "trace", skip(self))]
235 pub async fn process_batch(
236 &mut self,
237 events: Vec<Event>,
238 ) -> Result<(), super::error::EngineError> {
239 if events.is_empty() {
240 return Ok(());
241 }
242
243 let batch_size = events.len();
244 self.events_processed += batch_size as u64;
245
246 if let Some(ref m) = self.metrics {
248 for event in &events {
249 m.record_event(&event.event_type);
250 }
251 }
252
253 let mut pending_events: VecDeque<(SharedEvent, usize)> =
256 VecDeque::with_capacity(batch_size + batch_size / 4);
257
258 for event in events {
260 pending_events.push_back((Arc::new(event), 0));
261 }
262
263 const MAX_CHAIN_DEPTH: usize = 10;
264
265 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
267
268 while let Some((current_event, depth)) = pending_events.pop_front() {
270 if depth >= MAX_CHAIN_DEPTH {
271 debug!(
272 "Max chain depth reached for event type: {}",
273 current_event.event_type
274 );
275 continue;
276 }
277
278 let stream_names: Arc<[String]> = self
280 .router
281 .get_routes(¤t_event.event_type)
282 .cloned()
283 .unwrap_or_else(|| Arc::from([]));
284
285 for stream_name in stream_names.iter() {
286 if let Some(stream) = self.streams.get_mut(stream_name) {
287 if self.trace_collector.is_enabled() {
289 self.trace_collector.record(TraceEntry::StreamMatched {
290 stream_name: stream_name.clone(),
291 event_type: current_event.event_type.to_string(),
292 });
293 }
294
295 let start = std::time::Instant::now();
296 let result = Self::process_stream_with_functions(
297 stream,
298 Arc::clone(¤t_event),
299 &self.functions,
300 self.sinks.cache(),
301 )
302 .await?;
303
304 if self.trace_collector.is_enabled() {
306 Self::record_trace_for_result(
307 &mut self.trace_collector,
308 stream_name,
309 stream,
310 &result,
311 );
312 }
313
314 if let Some(ref m) = self.metrics {
316 m.record_processing(stream_name, start.elapsed().as_secs_f64());
317 }
318
319 self.output_events_emitted += result.emitted_events.len() as u64;
321 let has_emitted = !result.emitted_events.is_empty();
322 emitted_batch.extend(result.emitted_events);
323
324 let forward_outputs = !has_emitted
327 && stream
328 .operations
329 .iter()
330 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
331 if forward_outputs {
332 self.output_events_emitted += result.output_events.len() as u64;
333 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
334 }
335
336 if !forward_outputs {
338 self.output_events_emitted += result.sink_events_sent;
339 }
340
341 for output_event in result.output_events {
343 pending_events.push_back((output_event, depth + 1));
344 }
345 }
346 }
347 }
348
349 for emitted in &emitted_batch {
352 self.send_output_shared(emitted);
353 }
354
355 if let Some(ref m) = self.metrics {
357 for emitted in &emitted_batch {
358 m.record_output_event("pipeline", &emitted.event_type);
359 }
360 m.set_stream_count(self.streams.len());
361 }
362
363 Ok(())
364 }
365
366 pub fn process_batch_sync(
370 &mut self,
371 events: Vec<Event>,
372 ) -> Result<(), super::error::EngineError> {
373 if events.is_empty() {
374 return Ok(());
375 }
376
377 let batch_size = events.len();
378 self.events_processed += batch_size as u64;
379
380 let mut pending_events: VecDeque<(SharedEvent, usize)> =
383 VecDeque::with_capacity(batch_size + batch_size / 4);
384
385 for event in events {
387 pending_events.push_back((Arc::new(event), 0));
388 }
389
390 const MAX_CHAIN_DEPTH: usize = 10;
391
392 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
394
395 while let Some((current_event, depth)) = pending_events.pop_front() {
397 if depth >= MAX_CHAIN_DEPTH {
398 debug!(
399 "Max chain depth reached for event type: {}",
400 current_event.event_type
401 );
402 continue;
403 }
404
405 let stream_names: Arc<[String]> = self
407 .router
408 .get_routes(¤t_event.event_type)
409 .cloned()
410 .unwrap_or_else(|| Arc::from([]));
411
412 for stream_name in stream_names.iter() {
413 if let Some(stream) = self.streams.get_mut(stream_name) {
414 if self.trace_collector.is_enabled() {
416 self.trace_collector.record(TraceEntry::StreamMatched {
417 stream_name: stream_name.clone(),
418 event_type: current_event.event_type.to_string(),
419 });
420 }
421
422 let skip_rename = self.router.get_routes(stream_name).is_none();
424 let result = Self::process_stream_sync(
425 stream,
426 Arc::clone(¤t_event),
427 &self.functions,
428 skip_rename,
429 )?;
430
431 if self.trace_collector.is_enabled() {
433 Self::record_trace_for_result(
434 &mut self.trace_collector,
435 stream_name,
436 stream,
437 &result,
438 );
439 }
440
441 self.output_events_emitted += result.emitted_events.len() as u64;
443 let has_emitted = !result.emitted_events.is_empty();
444 emitted_batch.extend(result.emitted_events);
445
446 let forward_outputs = !has_emitted
449 && stream
450 .operations
451 .iter()
452 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
453 if forward_outputs {
454 self.output_events_emitted += result.output_events.len() as u64;
455 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
456 }
457
458 if !forward_outputs {
460 self.output_events_emitted += result.sink_events_sent;
461 }
462
463 for output_event in result.output_events {
465 pending_events.push_back((output_event, depth + 1));
466 }
467 }
468 }
469 }
470
471 for emitted in &emitted_batch {
474 self.send_output_shared(emitted);
475 }
476
477 Ok(())
478 }
479
480 fn process_stream_sync(
484 stream: &mut StreamDefinition,
485 event: SharedEvent,
486 functions: &FxHashMap<String, UserFunction>,
487 skip_output_rename: bool,
488 ) -> Result<StreamProcessResult, super::error::EngineError> {
489 if let RuntimeSource::Merge(ref sources) = stream.source {
491 let mut passes_filter = false;
492 for ms in sources {
493 if ms.event_type == *event.event_type {
494 if let Some(ref filter) = ms.filter {
495 let ctx = SequenceContext::new();
496 if let Some(result) = evaluator::eval_expr_with_functions(
497 filter,
498 &event,
499 &ctx,
500 functions,
501 &FxHashMap::default(),
502 ) {
503 if result.as_bool().unwrap_or(false) {
504 passes_filter = true;
505 break;
506 }
507 }
508 } else {
509 passes_filter = true;
510 break;
511 }
512 }
513 }
514 if !passes_filter {
515 return Ok(StreamProcessResult {
516 emitted_events: vec![],
517 output_events: vec![],
518 sink_events_sent: 0,
519 });
520 }
521 }
522
523 if matches!(stream.source, RuntimeSource::Join(_)) {
525 return Ok(StreamProcessResult {
526 emitted_events: vec![],
527 output_events: vec![],
528 sink_events_sent: 0,
529 });
530 }
531
532 pipeline::execute_pipeline_sync(
534 stream,
535 vec![event],
536 0,
537 pipeline::SkipFlags::none(),
538 functions,
539 skip_output_rename,
540 )
541 }
542
543 #[cfg(feature = "async-runtime")]
545 #[tracing::instrument(level = "trace", skip(self))]
546 pub async fn process_batch_shared(
547 &mut self,
548 events: Vec<SharedEvent>,
549 ) -> Result<(), super::error::EngineError> {
550 if events.is_empty() {
551 return Ok(());
552 }
553
554 let batch_size = events.len();
555 self.events_processed += batch_size as u64;
556
557 if let Some(ref m) = self.metrics {
559 for event in &events {
560 m.record_event(&event.event_type);
561 }
562 }
563
564 let mut pending_events: VecDeque<(SharedEvent, usize)> =
566 VecDeque::with_capacity(batch_size + batch_size / 4);
567
568 for event in events {
569 pending_events.push_back((event, 0));
570 }
571
572 const MAX_CHAIN_DEPTH: usize = 10;
573
574 let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
575
576 while let Some((current_event, depth)) = pending_events.pop_front() {
578 if depth >= MAX_CHAIN_DEPTH {
579 debug!(
580 "Max chain depth reached for event type: {}",
581 current_event.event_type
582 );
583 continue;
584 }
585
586 let stream_names: Arc<[String]> = self
587 .router
588 .get_routes(¤t_event.event_type)
589 .cloned()
590 .unwrap_or_else(|| Arc::from([]));
591
592 for stream_name in stream_names.iter() {
593 if let Some(stream) = self.streams.get_mut(stream_name) {
594 let start = std::time::Instant::now();
595 let result = Self::process_stream_with_functions(
596 stream,
597 Arc::clone(¤t_event),
598 &self.functions,
599 self.sinks.cache(),
600 )
601 .await?;
602
603 if let Some(ref m) = self.metrics {
604 m.record_processing(stream_name, start.elapsed().as_secs_f64());
605 }
606
607 self.output_events_emitted += result.emitted_events.len() as u64;
608 let has_emitted = !result.emitted_events.is_empty();
609 emitted_batch.extend(result.emitted_events);
610
611 let forward_outputs = !has_emitted
614 && stream
615 .operations
616 .iter()
617 .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
618 if forward_outputs {
619 self.output_events_emitted += result.output_events.len() as u64;
620 emitted_batch.extend(result.output_events.iter().map(Arc::clone));
621 }
622
623 if !forward_outputs {
625 self.output_events_emitted += result.sink_events_sent;
626 }
627
628 for output_event in result.output_events {
629 pending_events.push_back((output_event, depth + 1));
630 }
631 }
632 }
633 }
634
635 for emitted in &emitted_batch {
637 self.send_output_shared(emitted);
638 }
639
640 if let Some(ref m) = self.metrics {
642 for emitted in &emitted_batch {
643 m.record_output_event("pipeline", &emitted.event_type);
644 }
645 m.set_stream_count(self.streams.len());
646 }
647
648 Ok(())
649 }
650
651 #[cfg(feature = "async-runtime")]
652 async fn process_stream_with_functions(
653 stream: &mut StreamDefinition,
654 event: SharedEvent,
655 functions: &FxHashMap<String, UserFunction>,
656 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
657 ) -> Result<StreamProcessResult, super::error::EngineError> {
658 if let RuntimeSource::Merge(ref sources) = stream.source {
660 let mut passes_filter = false;
661 let mut matched_source_name = None;
662 for ms in sources {
663 if ms.event_type == *event.event_type {
664 if let Some(ref filter) = ms.filter {
665 let ctx = SequenceContext::new();
666 if let Some(result) = evaluator::eval_expr_with_functions(
667 filter,
668 &event,
669 &ctx,
670 functions,
671 &FxHashMap::default(),
672 ) {
673 if result.as_bool().unwrap_or(false) {
674 passes_filter = true;
675 matched_source_name = Some(&ms.name);
676 break;
677 }
678 }
679 } else {
680 passes_filter = true;
682 matched_source_name = Some(&ms.name);
683 break;
684 }
685 }
686 }
687 if !passes_filter {
688 return Ok(StreamProcessResult {
689 emitted_events: vec![],
690 output_events: vec![],
691 sink_events_sent: 0,
692 });
693 }
694 if let Some(source_name) = matched_source_name {
696 tracing::trace!("Event matched merge source: {}", source_name);
697 }
698 }
699
700 if let RuntimeSource::Join(ref _sources) = stream.source {
702 if let Some(ref mut join_buffer) = stream.join_buffer {
703 let source_name = stream
706 .event_type_to_source
707 .get(&*event.event_type)
708 .cloned()
709 .unwrap_or_else(|| event.event_type.to_string());
710
711 tracing::debug!(
712 "Join stream {}: Adding event from source '{}' (event_type: {})",
713 stream.name,
714 source_name,
715 event.event_type
716 );
717
718 match join_buffer.add_event(&source_name, (*event).clone()) {
720 Some(correlated_event) => {
721 tracing::debug!(
722 "Join stream {}: Correlated event with {} fields",
723 stream.name,
724 correlated_event.data.len()
725 );
726 return Self::process_join_result(
728 stream,
729 Arc::new(correlated_event),
730 functions,
731 sinks,
732 )
733 .await;
734 }
735 None => {
736 tracing::debug!(
738 "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
739 stream.name,
740 join_buffer.stats()
741 );
742 return Ok(StreamProcessResult {
743 emitted_events: vec![],
744 output_events: vec![],
745 sink_events_sent: 0,
746 });
747 }
748 }
749 }
750 tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
751 return Ok(StreamProcessResult {
752 emitted_events: vec![],
753 output_events: vec![],
754 sink_events_sent: 0,
755 });
756 }
757
758 pipeline::execute_pipeline(
760 stream,
761 vec![event],
762 0,
763 pipeline::SkipFlags::none(),
764 functions,
765 sinks,
766 )
767 .await
768 }
769
770 #[cfg(feature = "async-runtime")]
772 async fn process_join_result(
773 stream: &mut StreamDefinition,
774 correlated_event: SharedEvent,
775 functions: &FxHashMap<String, UserFunction>,
776 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
777 ) -> Result<StreamProcessResult, super::error::EngineError> {
778 pipeline::execute_pipeline(
780 stream,
781 vec![correlated_event],
782 0,
783 pipeline::SkipFlags::for_join(),
784 functions,
785 sinks,
786 )
787 .await
788 }
789
790 #[cfg(feature = "async-runtime")]
796 #[tracing::instrument(skip(self))]
797 pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
798 let now = chrono::Utc::now();
799 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
800
801 for stream_name in stream_names {
802 let (window_idx, expired) = {
804 let stream = self.streams.get_mut(&stream_name).unwrap();
805 let mut result = Vec::new();
806 let mut found_idx = None;
807
808 for (idx, op) in stream.operations.iter_mut().enumerate() {
809 if let RuntimeOp::Window(window) = op {
810 match window {
811 WindowType::Session(w) => {
812 if let Some(events) = w.check_expired(now) {
813 result = events;
814 }
815 found_idx = Some(idx);
816 }
817 WindowType::PartitionedSession(w) => {
818 for (_key, events) in w.check_expired(now) {
819 result.extend(events);
820 }
821 found_idx = Some(idx);
822 }
823 _ => {}
824 }
825 if found_idx.is_some() {
827 break;
828 }
829 }
830 }
831 (found_idx, result)
832 };
833
834 if expired.is_empty() {
835 continue;
836 }
837
838 let window_idx = match window_idx {
839 Some(idx) => idx,
840 None => continue,
841 };
842
843 let result = Self::process_post_window(
845 self.streams.get_mut(&stream_name).unwrap(),
846 expired,
847 window_idx,
848 &self.functions,
849 self.sinks.cache(),
850 )
851 .await?;
852
853 for emitted in &result.emitted_events {
855 self.output_events_emitted += 1;
856 let owned = (**emitted).clone();
857 self.send_output(owned);
858 }
859 }
860
861 Ok(())
862 }
863
864 #[cfg(feature = "async-runtime")]
866 async fn process_post_window(
867 stream: &mut StreamDefinition,
868 events: Vec<SharedEvent>,
869 window_idx: usize,
870 functions: &FxHashMap<String, UserFunction>,
871 sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
872 ) -> Result<StreamProcessResult, super::error::EngineError> {
873 pipeline::execute_pipeline(
875 stream,
876 events,
877 window_idx + 1,
878 pipeline::SkipFlags::for_post_window(),
879 functions,
880 sinks,
881 )
882 .await
883 }
884
885 #[cfg(feature = "async-runtime")]
887 #[tracing::instrument(skip(self))]
888 pub(super) async fn apply_watermark_to_windows(
889 &mut self,
890 wm: DateTime<Utc>,
891 ) -> Result<(), super::error::EngineError> {
892 let stream_names: Vec<String> = self.streams.keys().cloned().collect();
893
894 for stream_name in stream_names {
895 let (window_idx, expired) = {
896 let stream = self.streams.get_mut(&stream_name).unwrap();
897 let mut result = Vec::new();
898 let mut found_idx = None;
899
900 for (idx, op) in stream.operations.iter_mut().enumerate() {
901 if let RuntimeOp::Window(window) = op {
902 let events: Option<Vec<SharedEvent>> = match window {
903 WindowType::Tumbling(w) => w.advance_watermark(wm),
904 WindowType::Sliding(w) => w.advance_watermark(wm),
905 WindowType::Session(w) => w.advance_watermark(wm),
906 WindowType::PartitionedTumbling(w) => {
907 let parts = w.advance_watermark(wm);
908 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
909 if all.is_empty() {
910 None
911 } else {
912 Some(all)
913 }
914 }
915 WindowType::PartitionedSliding(w) => {
916 let parts = w.advance_watermark(wm);
917 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
918 if all.is_empty() {
919 None
920 } else {
921 Some(all)
922 }
923 }
924 WindowType::PartitionedSession(w) => {
925 let parts = w.advance_watermark(wm);
926 let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
927 if all.is_empty() {
928 None
929 } else {
930 Some(all)
931 }
932 }
933 _ => None, };
935
936 if let Some(evts) = events {
937 result = evts;
938 found_idx = Some(idx);
939 }
940 break;
941 }
942 }
943 (found_idx, result)
944 };
945
946 if expired.is_empty() {
947 continue;
948 }
949
950 let window_idx = match window_idx {
951 Some(idx) => idx,
952 None => continue,
953 };
954
955 let result = Self::process_post_window(
956 self.streams.get_mut(&stream_name).unwrap(),
957 expired,
958 window_idx,
959 &self.functions,
960 self.sinks.cache(),
961 )
962 .await?;
963
964 for emitted in &result.emitted_events {
965 self.output_events_emitted += 1;
966 let owned = (**emitted).clone();
967 self.send_output(owned);
968 }
969 }
970
971 Ok(())
972 }
973
974 fn record_trace_for_result(
979 trace: &mut super::trace::TraceCollector,
980 stream_name: &str,
981 stream: &StreamDefinition,
982 result: &StreamProcessResult,
983 ) {
984 for op in &stream.operations {
986 let op_name = op.summary_name();
987 match op {
988 RuntimeOp::WhereExpr(_) | RuntimeOp::WhereClosure(_) | RuntimeOp::Having(_) => {
989 let passed =
991 !result.output_events.is_empty() || !result.emitted_events.is_empty();
992 trace.record(TraceEntry::OperatorResult {
993 stream_name: stream_name.to_string(),
994 op_name: op_name.to_string(),
995 passed,
996 detail: None,
997 });
998 }
999 RuntimeOp::Aggregate(_) | RuntimeOp::PartitionedAggregate(_) => {
1000 let passed =
1001 !result.output_events.is_empty() || !result.emitted_events.is_empty();
1002 trace.record(TraceEntry::OperatorResult {
1003 stream_name: stream_name.to_string(),
1004 op_name: op_name.to_string(),
1005 passed,
1006 detail: None,
1007 });
1008 }
1009 RuntimeOp::Window(_)
1010 | RuntimeOp::PartitionedWindow(_)
1011 | RuntimeOp::PartitionedSlidingCountWindow(_) => {
1012 let passed =
1013 !result.output_events.is_empty() || !result.emitted_events.is_empty();
1014 trace.record(TraceEntry::OperatorResult {
1015 stream_name: stream_name.to_string(),
1016 op_name: op_name.to_string(),
1017 passed,
1018 detail: if passed {
1019 None
1020 } else {
1021 Some("window not yet full".to_string())
1022 },
1023 });
1024 }
1025 _ => {}
1026 }
1027 }
1028
1029 if let Some(ref sase) = stream.sase_engine {
1031 let active_runs =
1032 sase.runs.len() + sase.partitioned_runs.values().map(Vec::len).sum::<usize>();
1033 let completed = result.emitted_events.len().max(result.output_events.len());
1034 trace.record(TraceEntry::PatternState {
1035 stream_name: stream_name.to_string(),
1036 active_runs,
1037 completed,
1038 });
1039 }
1040
1041 for emitted in &result.emitted_events {
1043 let fields: Vec<(String, String)> = emitted
1044 .data
1045 .iter()
1046 .map(|(k, v)| (k.to_string(), format!("{v}")))
1047 .collect();
1048 trace.record(TraceEntry::EventEmitted {
1049 stream_name: stream_name.to_string(),
1050 fields,
1051 });
1052 }
1053 }
1054}