Skip to main content

varpulis_runtime/engine/
dispatch.rs

1//! Event dispatch: processing incoming events through stream pipelines.
2//!
3//! This module contains the `impl Engine` methods for event processing:
4//! `process()`, `process_shared()`, `process_inner()`, `process_batch()`,
5//! `process_batch_shared()`, `process_batch_sync()`, and their helpers.
6
7use std::collections::VecDeque;
8use std::sync::Arc;
9
10#[cfg(feature = "async-runtime")]
11use chrono::{DateTime, Utc};
12use rustc_hash::FxHashMap;
13use tracing::debug;
14
15use super::trace::TraceEntry;
16#[cfg(feature = "async-runtime")]
17use super::types::WindowType;
18use super::types::{RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction};
19use super::{evaluator, pipeline, Engine};
20use crate::event::{Event, SharedEvent};
21use crate::sequence::SequenceContext;
22
23impl Engine {
24    /// Process an incoming event (async-runtime only).
25    #[cfg(feature = "async-runtime")]
26    #[tracing::instrument(level = "trace", skip(self))]
27    pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
28        self.events_processed += 1;
29        self.process_inner(Arc::new(event)).await
30    }
31
32    /// Process a pre-wrapped SharedEvent (async-runtime only).
33    #[cfg(feature = "async-runtime")]
34    #[tracing::instrument(level = "trace", skip(self))]
35    pub async fn process_shared(
36        &mut self,
37        event: SharedEvent,
38    ) -> Result<(), super::error::EngineError> {
39        self.events_processed += 1;
40        self.process_inner(event).await
41    }
42
43    /// Internal processing logic shared by process() and process_shared() (async-runtime only).
44    #[cfg(feature = "async-runtime")]
45    #[tracing::instrument(level = "trace", skip(self))]
46    async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
47        // Record incoming event in Prometheus
48        if let Some(ref m) = self.metrics {
49            m.record_event(&event.event_type);
50        }
51
52        // Check for late data against the watermark
53        if let Some(ref tracker) = self.watermark_tracker {
54            if let Some(effective_wm) = tracker.effective_watermark() {
55                if event.timestamp < effective_wm {
56                    // Event is behind the watermark — check allowed lateness per stream
57                    let mut allowed = false;
58                    if let Some(stream_names) = self.router.get_routes(&event.event_type) {
59                        for sn in stream_names.iter() {
60                            if let Some(cfg) = self.late_data_configs.get(sn) {
61                                if event.timestamp >= effective_wm - cfg.allowed_lateness {
62                                    allowed = true;
63                                    break;
64                                }
65                            }
66                        }
67                    }
68                    if !allowed && !self.late_data_configs.is_empty() {
69                        // Route to side-output if configured, otherwise drop
70                        let mut routed = false;
71                        if let Some(stream_names) = self.router.get_routes(&event.event_type) {
72                            for sn in stream_names.iter() {
73                                if let Some(cfg) = self.late_data_configs.get(sn) {
74                                    if let Some(ref side_stream) = cfg.side_output_stream {
75                                        debug!(
76                                            "Routing late event to side-output '{}' type={} ts={}",
77                                            side_stream, event.event_type, event.timestamp
78                                        );
79                                        // Create a late-data event with metadata
80                                        let mut late_event = (*event).clone();
81                                        late_event.event_type = side_stream.clone().into();
82                                        self.send_output(late_event);
83                                        routed = true;
84                                        break;
85                                    }
86                                }
87                            }
88                        }
89                        if !routed {
90                            debug!(
91                                "Dropping late event type={} ts={} (watermark={})",
92                                event.event_type, event.timestamp, effective_wm
93                            );
94                        }
95                        return Ok(());
96                    }
97                }
98            }
99        }
100
101        // Process events with depth limit to prevent infinite loops
102        // Each event carries its depth level - use SharedEvent to avoid cloning
103        let mut pending_events: VecDeque<(SharedEvent, usize)> =
104            VecDeque::from([(event.clone(), 0)]);
105        const MAX_CHAIN_DEPTH: usize = 10;
106
107        // Observe event in watermark tracker (after processing to not block)
108        if let Some(ref mut tracker) = self.watermark_tracker {
109            tracker.observe_event(&event.event_type, event.timestamp);
110
111            if let Some(new_wm) = tracker.effective_watermark() {
112                if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
113                    self.last_applied_watermark = Some(new_wm);
114                    // Note: we don't call apply_watermark_to_windows here to avoid
115                    // double-mutable-borrow. The caller should periodically flush.
116                }
117            }
118        }
119
120        // Process events iteratively, feeding output to dependent streams
121        while let Some((current_event, depth)) = pending_events.pop_front() {
122            // Prevent infinite loops by limiting chain depth
123            if depth >= MAX_CHAIN_DEPTH {
124                debug!(
125                    "Max chain depth reached for event type: {}",
126                    current_event.event_type
127                );
128                continue;
129            }
130
131            // Collect stream names to avoid borrowing issues
132            // PERF: Arc<[String]> clone is O(1) - just atomic increment, not deep copy
133            let stream_names: Arc<[String]> = self
134                .router
135                .get_routes(&current_event.event_type)
136                .cloned()
137                .unwrap_or_else(|| Arc::from([]));
138
139            for stream_name in stream_names.iter() {
140                if let Some(stream) = self.streams.get_mut(stream_name) {
141                    // Record trace: event routed to stream
142                    if self.trace_collector.is_enabled() {
143                        self.trace_collector.record(TraceEntry::StreamMatched {
144                            stream_name: stream_name.clone(),
145                            event_type: current_event.event_type.to_string(),
146                        });
147                    }
148
149                    let start = std::time::Instant::now();
150                    let result = Self::process_stream_with_functions(
151                        stream,
152                        Arc::clone(&current_event),
153                        &self.functions,
154                        self.sinks.cache(),
155                    )
156                    .await?;
157
158                    // Record trace: pipeline result
159                    if self.trace_collector.is_enabled() {
160                        Self::record_trace_for_result(
161                            &mut self.trace_collector,
162                            stream_name,
163                            stream,
164                            &result,
165                        );
166                    }
167
168                    // Record per-stream processing metrics
169                    if let Some(ref m) = self.metrics {
170                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
171                    }
172
173                    // Check if we need to send output_events to the output channel.
174                    // This is true when there are no .emit() events AND the stream has
175                    // a .process() UDF or a .to() sink (so sink events appear in the
176                    // live event stream / WebSocket relay).
177                    let send_outputs = result.emitted_events.is_empty()
178                        && stream
179                            .operations
180                            .iter()
181                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
182
183                    // Send emitted events to output channel (non-blocking)
184                    // PERF: Use send_output_shared for zero-copy when using SharedEvent channel
185                    if self.output_channel.is_some() {
186                        for emitted in &result.emitted_events {
187                            self.output_events_emitted += 1;
188                            if let Some(ref m) = self.metrics {
189                                m.record_output_event("pipeline", &emitted.event_type);
190                            }
191                            self.send_output_shared(emitted);
192                        }
193                        if send_outputs {
194                            for output in &result.output_events {
195                                self.output_events_emitted += 1;
196                                if let Some(ref m) = self.metrics {
197                                    m.record_output_event("pipeline", &output.event_type);
198                                }
199                                self.send_output_shared(output);
200                            }
201                        }
202                    } else {
203                        // Benchmark mode: just count, don't send
204                        self.output_events_emitted += result.emitted_events.len() as u64;
205                        if send_outputs {
206                            self.output_events_emitted += result.output_events.len() as u64;
207                        }
208                    }
209
210                    // Count events sent to connector sinks via .to() operations.
211                    // Skip if already counted via output_events above (avoids double-counting).
212                    if !send_outputs {
213                        self.output_events_emitted += result.sink_events_sent;
214                    }
215
216                    // Queue output events for processing by dependent streams
217                    for output_event in result.output_events {
218                        pending_events.push_back((output_event, depth + 1));
219                    }
220                }
221            }
222        }
223
224        // Update active streams gauge
225        if let Some(ref m) = self.metrics {
226            m.set_stream_count(self.streams.len());
227        }
228
229        Ok(())
230    }
231
232    /// Process a batch of events for improved throughput (async-runtime only).
233    #[cfg(feature = "async-runtime")]
234    #[tracing::instrument(level = "trace", skip(self))]
235    pub async fn process_batch(
236        &mut self,
237        events: Vec<Event>,
238    ) -> Result<(), super::error::EngineError> {
239        if events.is_empty() {
240            return Ok(());
241        }
242
243        let batch_size = events.len();
244        self.events_processed += batch_size as u64;
245
246        // Update Prometheus metrics
247        if let Some(ref m) = self.metrics {
248            for event in &events {
249                m.record_event(&event.event_type);
250            }
251        }
252
253        // Pre-allocate pending events with capacity for batch + some derived events
254        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
255        let mut pending_events: VecDeque<(SharedEvent, usize)> =
256            VecDeque::with_capacity(batch_size + batch_size / 4);
257
258        // Convert all events to SharedEvents upfront
259        for event in events {
260            pending_events.push_back((Arc::new(event), 0));
261        }
262
263        const MAX_CHAIN_DEPTH: usize = 10;
264
265        // Collect emitted events to send in batch
266        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
267
268        // Process all events in FIFO order (critical for sequence patterns!)
269        while let Some((current_event, depth)) = pending_events.pop_front() {
270            if depth >= MAX_CHAIN_DEPTH {
271                debug!(
272                    "Max chain depth reached for event type: {}",
273                    current_event.event_type
274                );
275                continue;
276            }
277
278            // Get stream names (Arc clone is O(1))
279            let stream_names: Arc<[String]> = self
280                .router
281                .get_routes(&current_event.event_type)
282                .cloned()
283                .unwrap_or_else(|| Arc::from([]));
284
285            for stream_name in stream_names.iter() {
286                if let Some(stream) = self.streams.get_mut(stream_name) {
287                    // Record trace: event routed to stream
288                    if self.trace_collector.is_enabled() {
289                        self.trace_collector.record(TraceEntry::StreamMatched {
290                            stream_name: stream_name.clone(),
291                            event_type: current_event.event_type.to_string(),
292                        });
293                    }
294
295                    let start = std::time::Instant::now();
296                    let result = Self::process_stream_with_functions(
297                        stream,
298                        Arc::clone(&current_event),
299                        &self.functions,
300                        self.sinks.cache(),
301                    )
302                    .await?;
303
304                    // Record trace: pipeline result
305                    if self.trace_collector.is_enabled() {
306                        Self::record_trace_for_result(
307                            &mut self.trace_collector,
308                            stream_name,
309                            stream,
310                            &result,
311                        );
312                    }
313
314                    // Record per-stream processing in Prometheus
315                    if let Some(ref m) = self.metrics {
316                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
317                    }
318
319                    // Collect emitted events for batch sending
320                    self.output_events_emitted += result.emitted_events.len() as u64;
321                    let has_emitted = !result.emitted_events.is_empty();
322                    emitted_batch.extend(result.emitted_events);
323
324                    // If .process() or .to() was used but no .emit(), send output_events
325                    // to the output channel so they appear in the live event stream.
326                    let forward_outputs = !has_emitted
327                        && stream
328                            .operations
329                            .iter()
330                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
331                    if forward_outputs {
332                        self.output_events_emitted += result.output_events.len() as u64;
333                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
334                    }
335
336                    // Count sink events only when not already counted via forwarded outputs
337                    if !forward_outputs {
338                        self.output_events_emitted += result.sink_events_sent;
339                    }
340
341                    // Queue output events (push_back to maintain order)
342                    for output_event in result.output_events {
343                        pending_events.push_back((output_event, depth + 1));
344                    }
345                }
346            }
347        }
348
349        // Send all emitted events in batch (non-blocking to avoid async overhead)
350        // PERF: Use send_output_shared to avoid cloning in benchmark mode
351        for emitted in &emitted_batch {
352            self.send_output_shared(emitted);
353        }
354
355        // Update Prometheus output metrics
356        if let Some(ref m) = self.metrics {
357            for emitted in &emitted_batch {
358                m.record_output_event("pipeline", &emitted.event_type);
359            }
360            m.set_stream_count(self.streams.len());
361        }
362
363        Ok(())
364    }
365
366    /// Synchronous batch processing for maximum throughput.
367    /// Use this when no .to() sink operations are in the pipeline (e.g., benchmark mode).
368    /// Avoids async runtime overhead completely.
369    pub fn process_batch_sync(
370        &mut self,
371        events: Vec<Event>,
372    ) -> Result<(), super::error::EngineError> {
373        if events.is_empty() {
374            return Ok(());
375        }
376
377        let batch_size = events.len();
378        self.events_processed += batch_size as u64;
379
380        // Pre-allocate pending events with capacity for batch + some derived events
381        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
382        let mut pending_events: VecDeque<(SharedEvent, usize)> =
383            VecDeque::with_capacity(batch_size + batch_size / 4);
384
385        // Convert all events to SharedEvents upfront
386        for event in events {
387            pending_events.push_back((Arc::new(event), 0));
388        }
389
390        const MAX_CHAIN_DEPTH: usize = 10;
391
392        // Collect emitted events to send in batch
393        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
394
395        // Process all events in FIFO order (critical for sequence patterns!)
396        while let Some((current_event, depth)) = pending_events.pop_front() {
397            if depth >= MAX_CHAIN_DEPTH {
398                debug!(
399                    "Max chain depth reached for event type: {}",
400                    current_event.event_type
401                );
402                continue;
403            }
404
405            // Get stream names (Arc clone is O(1))
406            let stream_names: Arc<[String]> = self
407                .router
408                .get_routes(&current_event.event_type)
409                .cloned()
410                .unwrap_or_else(|| Arc::from([]));
411
412            for stream_name in stream_names.iter() {
413                if let Some(stream) = self.streams.get_mut(stream_name) {
414                    // Record trace: event routed to stream
415                    if self.trace_collector.is_enabled() {
416                        self.trace_collector.record(TraceEntry::StreamMatched {
417                            stream_name: stream_name.clone(),
418                            event_type: current_event.event_type.to_string(),
419                        });
420                    }
421
422                    // Skip output clone+rename when stream has no downstream routes
423                    let skip_rename = self.router.get_routes(stream_name).is_none();
424                    let result = Self::process_stream_sync(
425                        stream,
426                        Arc::clone(&current_event),
427                        &self.functions,
428                        skip_rename,
429                    )?;
430
431                    // Record trace: pipeline result
432                    if self.trace_collector.is_enabled() {
433                        Self::record_trace_for_result(
434                            &mut self.trace_collector,
435                            stream_name,
436                            stream,
437                            &result,
438                        );
439                    }
440
441                    // Collect emitted events for batch sending
442                    self.output_events_emitted += result.emitted_events.len() as u64;
443                    let has_emitted = !result.emitted_events.is_empty();
444                    emitted_batch.extend(result.emitted_events);
445
446                    // If .process() or .to() was used but no .emit(), send output_events
447                    // to the output channel so they appear in the live event stream.
448                    let forward_outputs = !has_emitted
449                        && stream
450                            .operations
451                            .iter()
452                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
453                    if forward_outputs {
454                        self.output_events_emitted += result.output_events.len() as u64;
455                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
456                    }
457
458                    // Count sink events only when not already counted via forwarded outputs
459                    if !forward_outputs {
460                        self.output_events_emitted += result.sink_events_sent;
461                    }
462
463                    // Queue output events (push_back to maintain order)
464                    for output_event in result.output_events {
465                        pending_events.push_back((output_event, depth + 1));
466                    }
467                }
468            }
469        }
470
471        // Send all emitted events in batch (non-blocking to avoid async overhead)
472        // PERF: Use send_output_shared to avoid cloning in benchmark mode
473        for emitted in &emitted_batch {
474            self.send_output_shared(emitted);
475        }
476
477        Ok(())
478    }
479
480    /// Synchronous stream processing - no async operations.
481    /// Skips .to() sink operations (which are the only async parts).
482    /// When `skip_output_rename` is true, output events skip the clone+rename step.
483    fn process_stream_sync(
484        stream: &mut StreamDefinition,
485        event: SharedEvent,
486        functions: &FxHashMap<String, UserFunction>,
487        skip_output_rename: bool,
488    ) -> Result<StreamProcessResult, super::error::EngineError> {
489        // For merge sources, check if the event passes the appropriate filter
490        if let RuntimeSource::Merge(ref sources) = stream.source {
491            let mut passes_filter = false;
492            for ms in sources {
493                if ms.event_type == *event.event_type {
494                    if let Some(ref filter) = ms.filter {
495                        let ctx = SequenceContext::new();
496                        if let Some(result) = evaluator::eval_expr_with_functions(
497                            filter,
498                            &event,
499                            &ctx,
500                            functions,
501                            &FxHashMap::default(),
502                        ) {
503                            if result.as_bool().unwrap_or(false) {
504                                passes_filter = true;
505                                break;
506                            }
507                        }
508                    } else {
509                        passes_filter = true;
510                        break;
511                    }
512                }
513            }
514            if !passes_filter {
515                return Ok(StreamProcessResult {
516                    emitted_events: vec![],
517                    output_events: vec![],
518                    sink_events_sent: 0,
519                });
520            }
521        }
522
523        // For join sources - return empty (join requires async in some paths)
524        if matches!(stream.source, RuntimeSource::Join(_)) {
525            return Ok(StreamProcessResult {
526                emitted_events: vec![],
527                output_events: vec![],
528                sink_events_sent: 0,
529            });
530        }
531
532        // Use synchronous pipeline execution
533        pipeline::execute_pipeline_sync(
534            stream,
535            vec![event],
536            0,
537            pipeline::SkipFlags::none(),
538            functions,
539            skip_output_rename,
540        )
541    }
542
543    /// Process a batch of pre-wrapped SharedEvents (async-runtime only).
544    #[cfg(feature = "async-runtime")]
545    #[tracing::instrument(level = "trace", skip(self))]
546    pub async fn process_batch_shared(
547        &mut self,
548        events: Vec<SharedEvent>,
549    ) -> Result<(), super::error::EngineError> {
550        if events.is_empty() {
551            return Ok(());
552        }
553
554        let batch_size = events.len();
555        self.events_processed += batch_size as u64;
556
557        // Update Prometheus metrics
558        if let Some(ref m) = self.metrics {
559            for event in &events {
560                m.record_event(&event.event_type);
561            }
562        }
563
564        // Use VecDeque so we can process in FIFO order (critical for sequence patterns!)
565        let mut pending_events: VecDeque<(SharedEvent, usize)> =
566            VecDeque::with_capacity(batch_size + batch_size / 4);
567
568        for event in events {
569            pending_events.push_back((event, 0));
570        }
571
572        const MAX_CHAIN_DEPTH: usize = 10;
573
574        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
575
576        // Process all events in FIFO order
577        while let Some((current_event, depth)) = pending_events.pop_front() {
578            if depth >= MAX_CHAIN_DEPTH {
579                debug!(
580                    "Max chain depth reached for event type: {}",
581                    current_event.event_type
582                );
583                continue;
584            }
585
586            let stream_names: Arc<[String]> = self
587                .router
588                .get_routes(&current_event.event_type)
589                .cloned()
590                .unwrap_or_else(|| Arc::from([]));
591
592            for stream_name in stream_names.iter() {
593                if let Some(stream) = self.streams.get_mut(stream_name) {
594                    let start = std::time::Instant::now();
595                    let result = Self::process_stream_with_functions(
596                        stream,
597                        Arc::clone(&current_event),
598                        &self.functions,
599                        self.sinks.cache(),
600                    )
601                    .await?;
602
603                    if let Some(ref m) = self.metrics {
604                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
605                    }
606
607                    self.output_events_emitted += result.emitted_events.len() as u64;
608                    let has_emitted = !result.emitted_events.is_empty();
609                    emitted_batch.extend(result.emitted_events);
610
611                    // If .process() or .to() was used but no .emit(), send output_events
612                    // to the output channel so they appear in the live event stream.
613                    let forward_outputs = !has_emitted
614                        && stream
615                            .operations
616                            .iter()
617                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
618                    if forward_outputs {
619                        self.output_events_emitted += result.output_events.len() as u64;
620                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
621                    }
622
623                    // Count sink events only when not already counted via forwarded outputs
624                    if !forward_outputs {
625                        self.output_events_emitted += result.sink_events_sent;
626                    }
627
628                    for output_event in result.output_events {
629                        pending_events.push_back((output_event, depth + 1));
630                    }
631                }
632            }
633        }
634
635        // PERF: Use send_output_shared to avoid cloning in benchmark mode
636        for emitted in &emitted_batch {
637            self.send_output_shared(emitted);
638        }
639
640        // Update Prometheus output metrics
641        if let Some(ref m) = self.metrics {
642            for emitted in &emitted_batch {
643                m.record_output_event("pipeline", &emitted.event_type);
644            }
645            m.set_stream_count(self.streams.len());
646        }
647
648        Ok(())
649    }
650
651    #[cfg(feature = "async-runtime")]
652    async fn process_stream_with_functions(
653        stream: &mut StreamDefinition,
654        event: SharedEvent,
655        functions: &FxHashMap<String, UserFunction>,
656        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
657    ) -> Result<StreamProcessResult, super::error::EngineError> {
658        // For merge sources, check if the event passes the appropriate filter
659        if let RuntimeSource::Merge(ref sources) = stream.source {
660            let mut passes_filter = false;
661            let mut matched_source_name = None;
662            for ms in sources {
663                if ms.event_type == *event.event_type {
664                    if let Some(ref filter) = ms.filter {
665                        let ctx = SequenceContext::new();
666                        if let Some(result) = evaluator::eval_expr_with_functions(
667                            filter,
668                            &event,
669                            &ctx,
670                            functions,
671                            &FxHashMap::default(),
672                        ) {
673                            if result.as_bool().unwrap_or(false) {
674                                passes_filter = true;
675                                matched_source_name = Some(&ms.name);
676                                break;
677                            }
678                        }
679                    } else {
680                        // No filter means it passes
681                        passes_filter = true;
682                        matched_source_name = Some(&ms.name);
683                        break;
684                    }
685                }
686            }
687            if !passes_filter {
688                return Ok(StreamProcessResult {
689                    emitted_events: vec![],
690                    output_events: vec![],
691                    sink_events_sent: 0,
692                });
693            }
694            // Log which merge source matched (uses ms.name)
695            if let Some(source_name) = matched_source_name {
696                tracing::trace!("Event matched merge source: {}", source_name);
697            }
698        }
699
700        // For join sources, route through the JoinBuffer for correlation
701        if let RuntimeSource::Join(ref _sources) = stream.source {
702            if let Some(ref mut join_buffer) = stream.join_buffer {
703                // Determine which source this event came from using the event_type_to_source mapping
704                // This maps event types (e.g., "MarketATick") to source names (e.g., "MarketA")
705                let source_name = stream
706                    .event_type_to_source
707                    .get(&*event.event_type)
708                    .cloned()
709                    .unwrap_or_else(|| event.event_type.to_string());
710
711                tracing::debug!(
712                    "Join stream {}: Adding event from source '{}' (event_type: {})",
713                    stream.name,
714                    source_name,
715                    event.event_type
716                );
717
718                // Add event to join buffer and try to correlate (join still needs owned Event)
719                match join_buffer.add_event(&source_name, (*event).clone()) {
720                    Some(correlated_event) => {
721                        tracing::debug!(
722                            "Join stream {}: Correlated event with {} fields",
723                            stream.name,
724                            correlated_event.data.len()
725                        );
726                        // Continue processing with the correlated event
727                        return Self::process_join_result(
728                            stream,
729                            Arc::new(correlated_event),
730                            functions,
731                            sinks,
732                        )
733                        .await;
734                    }
735                    None => {
736                        // No correlation yet - need events from all sources
737                        tracing::debug!(
738                            "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
739                            stream.name,
740                            join_buffer.stats()
741                        );
742                        return Ok(StreamProcessResult {
743                            emitted_events: vec![],
744                            output_events: vec![],
745                            sink_events_sent: 0,
746                        });
747                    }
748                }
749            }
750            tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
751            return Ok(StreamProcessResult {
752                emitted_events: vec![],
753                output_events: vec![],
754                sink_events_sent: 0,
755            });
756        }
757
758        // Delegate to unified pipeline execution
759        pipeline::execute_pipeline(
760            stream,
761            vec![event],
762            0,
763            pipeline::SkipFlags::none(),
764            functions,
765            sinks,
766        )
767        .await
768    }
769
770    /// Process a join result (async-runtime only).
771    #[cfg(feature = "async-runtime")]
772    async fn process_join_result(
773        stream: &mut StreamDefinition,
774        correlated_event: SharedEvent,
775        functions: &FxHashMap<String, UserFunction>,
776        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
777    ) -> Result<StreamProcessResult, super::error::EngineError> {
778        // Delegate to unified pipeline with join-specific skip flags
779        pipeline::execute_pipeline(
780            stream,
781            vec![correlated_event],
782            0,
783            pipeline::SkipFlags::for_join(),
784            functions,
785            sinks,
786        )
787        .await
788    }
789
790    // =========================================================================
791    // Session Window Sweep
792    // =========================================================================
793
794    /// Flush all expired session windows (async-runtime only).
795    #[cfg(feature = "async-runtime")]
796    #[tracing::instrument(skip(self))]
797    pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
798        let now = chrono::Utc::now();
799        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
800
801        for stream_name in stream_names {
802            // Step 1: Find the window op index and collect expired events
803            let (window_idx, expired) = {
804                let stream = self.streams.get_mut(&stream_name).unwrap();
805                let mut result = Vec::new();
806                let mut found_idx = None;
807
808                for (idx, op) in stream.operations.iter_mut().enumerate() {
809                    if let RuntimeOp::Window(window) = op {
810                        match window {
811                            WindowType::Session(w) => {
812                                if let Some(events) = w.check_expired(now) {
813                                    result = events;
814                                }
815                                found_idx = Some(idx);
816                            }
817                            WindowType::PartitionedSession(w) => {
818                                for (_key, events) in w.check_expired(now) {
819                                    result.extend(events);
820                                }
821                                found_idx = Some(idx);
822                            }
823                            _ => {}
824                        }
825                        // Only process the first session window op per stream
826                        if found_idx.is_some() {
827                            break;
828                        }
829                    }
830                }
831                (found_idx, result)
832            };
833
834            if expired.is_empty() {
835                continue;
836            }
837
838            let window_idx = match window_idx {
839                Some(idx) => idx,
840                None => continue,
841            };
842
843            // Step 2: Process expired events through the post-window pipeline
844            let result = Self::process_post_window(
845                self.streams.get_mut(&stream_name).unwrap(),
846                expired,
847                window_idx,
848                &self.functions,
849                self.sinks.cache(),
850            )
851            .await?;
852
853            // Step 3: Send emitted events to output channel
854            for emitted in &result.emitted_events {
855                self.output_events_emitted += 1;
856                let owned = (**emitted).clone();
857                self.send_output(owned);
858            }
859        }
860
861        Ok(())
862    }
863
864    /// Process events through the pipeline operations after the window (async-runtime only).
865    #[cfg(feature = "async-runtime")]
866    async fn process_post_window(
867        stream: &mut StreamDefinition,
868        events: Vec<SharedEvent>,
869        window_idx: usize,
870        functions: &FxHashMap<String, UserFunction>,
871        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
872    ) -> Result<StreamProcessResult, super::error::EngineError> {
873        // Delegate to unified pipeline starting after the window with post-window skip flags
874        pipeline::execute_pipeline(
875            stream,
876            events,
877            window_idx + 1,
878            pipeline::SkipFlags::for_post_window(),
879            functions,
880            sinks,
881        )
882        .await
883    }
884
885    /// Apply a watermark advance to all windows (async-runtime only).
886    #[cfg(feature = "async-runtime")]
887    #[tracing::instrument(skip(self))]
888    pub(super) async fn apply_watermark_to_windows(
889        &mut self,
890        wm: DateTime<Utc>,
891    ) -> Result<(), super::error::EngineError> {
892        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
893
894        for stream_name in stream_names {
895            let (window_idx, expired) = {
896                let stream = self.streams.get_mut(&stream_name).unwrap();
897                let mut result = Vec::new();
898                let mut found_idx = None;
899
900                for (idx, op) in stream.operations.iter_mut().enumerate() {
901                    if let RuntimeOp::Window(window) = op {
902                        let events: Option<Vec<SharedEvent>> = match window {
903                            WindowType::Tumbling(w) => w.advance_watermark(wm),
904                            WindowType::Sliding(w) => w.advance_watermark(wm),
905                            WindowType::Session(w) => w.advance_watermark(wm),
906                            WindowType::PartitionedTumbling(w) => {
907                                let parts = w.advance_watermark(wm);
908                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
909                                if all.is_empty() {
910                                    None
911                                } else {
912                                    Some(all)
913                                }
914                            }
915                            WindowType::PartitionedSliding(w) => {
916                                let parts = w.advance_watermark(wm);
917                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
918                                if all.is_empty() {
919                                    None
920                                } else {
921                                    Some(all)
922                                }
923                            }
924                            WindowType::PartitionedSession(w) => {
925                                let parts = w.advance_watermark(wm);
926                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
927                                if all.is_empty() {
928                                    None
929                                } else {
930                                    Some(all)
931                                }
932                            }
933                            _ => None, // Count-based windows don't use watermarks
934                        };
935
936                        if let Some(evts) = events {
937                            result = evts;
938                            found_idx = Some(idx);
939                        }
940                        break;
941                    }
942                }
943                (found_idx, result)
944            };
945
946            if expired.is_empty() {
947                continue;
948            }
949
950            let window_idx = match window_idx {
951                Some(idx) => idx,
952                None => continue,
953            };
954
955            let result = Self::process_post_window(
956                self.streams.get_mut(&stream_name).unwrap(),
957                expired,
958                window_idx,
959                &self.functions,
960                self.sinks.cache(),
961            )
962            .await?;
963
964            for emitted in &result.emitted_events {
965                self.output_events_emitted += 1;
966                let owned = (**emitted).clone();
967                self.send_output(owned);
968            }
969        }
970
971        Ok(())
972    }
973
974    /// Record trace entries for a pipeline result.
975    ///
976    /// Inspects the stream's operations and the result to produce per-operator
977    /// trace entries and, for SASE streams, pattern-state entries.
978    fn record_trace_for_result(
979        trace: &mut super::trace::TraceCollector,
980        stream_name: &str,
981        stream: &StreamDefinition,
982        result: &StreamProcessResult,
983    ) {
984        // Record per-operator summary based on the stream's operations
985        for op in &stream.operations {
986            let op_name = op.summary_name();
987            match op {
988                RuntimeOp::WhereExpr(_) | RuntimeOp::WhereClosure(_) | RuntimeOp::Having(_) => {
989                    // For filter-type ops, report whether events survived
990                    let passed =
991                        !result.output_events.is_empty() || !result.emitted_events.is_empty();
992                    trace.record(TraceEntry::OperatorResult {
993                        stream_name: stream_name.to_string(),
994                        op_name: op_name.to_string(),
995                        passed,
996                        detail: None,
997                    });
998                }
999                RuntimeOp::Aggregate(_) | RuntimeOp::PartitionedAggregate(_) => {
1000                    let passed =
1001                        !result.output_events.is_empty() || !result.emitted_events.is_empty();
1002                    trace.record(TraceEntry::OperatorResult {
1003                        stream_name: stream_name.to_string(),
1004                        op_name: op_name.to_string(),
1005                        passed,
1006                        detail: None,
1007                    });
1008                }
1009                RuntimeOp::Window(_)
1010                | RuntimeOp::PartitionedWindow(_)
1011                | RuntimeOp::PartitionedSlidingCountWindow(_) => {
1012                    let passed =
1013                        !result.output_events.is_empty() || !result.emitted_events.is_empty();
1014                    trace.record(TraceEntry::OperatorResult {
1015                        stream_name: stream_name.to_string(),
1016                        op_name: op_name.to_string(),
1017                        passed,
1018                        detail: if passed {
1019                            None
1020                        } else {
1021                            Some("window not yet full".to_string())
1022                        },
1023                    });
1024                }
1025                _ => {}
1026            }
1027        }
1028
1029        // Record SASE pattern state
1030        if let Some(ref sase) = stream.sase_engine {
1031            let active_runs =
1032                sase.runs.len() + sase.partitioned_runs.values().map(Vec::len).sum::<usize>();
1033            let completed = result.emitted_events.len().max(result.output_events.len());
1034            trace.record(TraceEntry::PatternState {
1035                stream_name: stream_name.to_string(),
1036                active_runs,
1037                completed,
1038            });
1039        }
1040
1041        // Record emitted output events
1042        for emitted in &result.emitted_events {
1043            let fields: Vec<(String, String)> = emitted
1044                .data
1045                .iter()
1046                .map(|(k, v)| (k.to_string(), format!("{v}")))
1047                .collect();
1048            trace.record(TraceEntry::EventEmitted {
1049                stream_name: stream_name.to_string(),
1050                fields,
1051            });
1052        }
1053    }
1054}