Skip to main content

varpulis_runtime/engine/
dispatch.rs

1//! Event dispatch: processing incoming events through stream pipelines.
2//!
3//! This module contains the `impl Engine` methods for event processing:
4//! `process()`, `process_shared()`, `process_inner()`, `process_batch()`,
5//! `process_batch_shared()`, `process_batch_sync()`, and their helpers.
6
7use crate::event::{Event, SharedEvent};
8use crate::sequence::SequenceContext;
9use chrono::{DateTime, Utc};
10use rustc_hash::FxHashMap;
11use std::collections::VecDeque;
12use std::sync::Arc;
13use tracing::debug;
14
15use super::evaluator;
16use super::pipeline;
17use super::types::{
18    RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction, WindowType,
19};
20use super::Engine;
21
22impl Engine {
23    /// Process an incoming event
24    #[tracing::instrument(level = "trace", skip(self))]
25    pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
26        self.events_processed += 1;
27        self.process_inner(Arc::new(event)).await
28    }
29
30    /// Process a pre-wrapped SharedEvent (zero-copy path for context pipelines)
31    #[tracing::instrument(level = "trace", skip(self))]
32    pub async fn process_shared(
33        &mut self,
34        event: SharedEvent,
35    ) -> Result<(), super::error::EngineError> {
36        self.events_processed += 1;
37        self.process_inner(event).await
38    }
39
40    /// Internal processing logic shared by process() and process_shared()
41    #[tracing::instrument(level = "trace", skip(self))]
42    async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
43        // Record incoming event in Prometheus
44        if let Some(ref m) = self.metrics {
45            m.record_event(&event.event_type);
46        }
47
48        // Check for late data against the watermark
49        if let Some(ref tracker) = self.watermark_tracker {
50            if let Some(effective_wm) = tracker.effective_watermark() {
51                if event.timestamp < effective_wm {
52                    // Event is behind the watermark — check allowed lateness per stream
53                    let mut allowed = false;
54                    if let Some(stream_names) = self.router.get_routes(&event.event_type) {
55                        for sn in stream_names.iter() {
56                            if let Some(cfg) = self.late_data_configs.get(sn) {
57                                if event.timestamp >= effective_wm - cfg.allowed_lateness {
58                                    allowed = true;
59                                    break;
60                                }
61                            }
62                        }
63                    }
64                    if !allowed && !self.late_data_configs.is_empty() {
65                        // Route to side-output if configured, otherwise drop
66                        let mut routed = false;
67                        if let Some(stream_names) = self.router.get_routes(&event.event_type) {
68                            for sn in stream_names.iter() {
69                                if let Some(cfg) = self.late_data_configs.get(sn) {
70                                    if let Some(ref side_stream) = cfg.side_output_stream {
71                                        debug!(
72                                            "Routing late event to side-output '{}' type={} ts={}",
73                                            side_stream, event.event_type, event.timestamp
74                                        );
75                                        // Create a late-data event with metadata
76                                        let mut late_event = (*event).clone();
77                                        late_event.event_type = side_stream.clone().into();
78                                        self.send_output(late_event);
79                                        routed = true;
80                                        break;
81                                    }
82                                }
83                            }
84                        }
85                        if !routed {
86                            debug!(
87                                "Dropping late event type={} ts={} (watermark={})",
88                                event.event_type, event.timestamp, effective_wm
89                            );
90                        }
91                        return Ok(());
92                    }
93                }
94            }
95        }
96
97        // Process events with depth limit to prevent infinite loops
98        // Each event carries its depth level - use SharedEvent to avoid cloning
99        let mut pending_events: VecDeque<(SharedEvent, usize)> =
100            VecDeque::from([(event.clone(), 0)]);
101        const MAX_CHAIN_DEPTH: usize = 10;
102
103        // Observe event in watermark tracker (after processing to not block)
104        if let Some(ref mut tracker) = self.watermark_tracker {
105            tracker.observe_event(&event.event_type, event.timestamp);
106
107            if let Some(new_wm) = tracker.effective_watermark() {
108                if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
109                    self.last_applied_watermark = Some(new_wm);
110                    // Note: we don't call apply_watermark_to_windows here to avoid
111                    // double-mutable-borrow. The caller should periodically flush.
112                }
113            }
114        }
115
116        // Process events iteratively, feeding output to dependent streams
117        while let Some((current_event, depth)) = pending_events.pop_front() {
118            // Prevent infinite loops by limiting chain depth
119            if depth >= MAX_CHAIN_DEPTH {
120                debug!(
121                    "Max chain depth reached for event type: {}",
122                    current_event.event_type
123                );
124                continue;
125            }
126
127            // Collect stream names to avoid borrowing issues
128            // PERF: Arc<[String]> clone is O(1) - just atomic increment, not deep copy
129            let stream_names: Arc<[String]> = self
130                .router
131                .get_routes(&current_event.event_type)
132                .cloned()
133                .unwrap_or_else(|| Arc::from([]));
134
135            for stream_name in stream_names.iter() {
136                if let Some(stream) = self.streams.get_mut(stream_name) {
137                    let start = std::time::Instant::now();
138                    let result = Self::process_stream_with_functions(
139                        stream,
140                        Arc::clone(&current_event),
141                        &self.functions,
142                        self.sinks.cache(),
143                    )
144                    .await?;
145
146                    // Record per-stream processing metrics
147                    if let Some(ref m) = self.metrics {
148                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
149                    }
150
151                    // Check if we need to send output_events to the output channel.
152                    // This is true when there are no .emit() events AND the stream has
153                    // a .process() UDF or a .to() sink (so sink events appear in the
154                    // live event stream / WebSocket relay).
155                    let send_outputs = result.emitted_events.is_empty()
156                        && stream
157                            .operations
158                            .iter()
159                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
160
161                    // Send emitted events to output channel (non-blocking)
162                    // PERF: Use send_output_shared for zero-copy when using SharedEvent channel
163                    if self.output_channel.is_some() {
164                        for emitted in &result.emitted_events {
165                            self.output_events_emitted += 1;
166                            if let Some(ref m) = self.metrics {
167                                m.record_output_event("pipeline", &emitted.event_type);
168                            }
169                            self.send_output_shared(emitted);
170                        }
171                        if send_outputs {
172                            for output in &result.output_events {
173                                self.output_events_emitted += 1;
174                                if let Some(ref m) = self.metrics {
175                                    m.record_output_event("pipeline", &output.event_type);
176                                }
177                                self.send_output_shared(output);
178                            }
179                        }
180                    } else {
181                        // Benchmark mode: just count, don't send
182                        self.output_events_emitted += result.emitted_events.len() as u64;
183                        if send_outputs {
184                            self.output_events_emitted += result.output_events.len() as u64;
185                        }
186                    }
187
188                    // Count events sent to connector sinks via .to() operations.
189                    // Skip if already counted via output_events above (avoids double-counting).
190                    if !send_outputs {
191                        self.output_events_emitted += result.sink_events_sent;
192                    }
193
194                    // Queue output events for processing by dependent streams
195                    for output_event in result.output_events {
196                        pending_events.push_back((output_event, depth + 1));
197                    }
198                }
199            }
200        }
201
202        // Update active streams gauge
203        if let Some(ref m) = self.metrics {
204            m.set_stream_count(self.streams.len());
205        }
206
207        Ok(())
208    }
209
210    /// Process a batch of events for improved throughput.
211    /// More efficient than calling process() repeatedly because:
212    /// - Pre-allocates SharedEvents in bulk
213    /// - Collects output events and sends in batches
214    /// - Amortizes async overhead
215    #[tracing::instrument(level = "trace", skip(self))]
216    pub async fn process_batch(
217        &mut self,
218        events: Vec<Event>,
219    ) -> Result<(), super::error::EngineError> {
220        if events.is_empty() {
221            return Ok(());
222        }
223
224        let batch_size = events.len();
225        self.events_processed += batch_size as u64;
226
227        // Update Prometheus metrics
228        if let Some(ref m) = self.metrics {
229            for event in &events {
230                m.record_event(&event.event_type);
231            }
232        }
233
234        // Pre-allocate pending events with capacity for batch + some derived events
235        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
236        let mut pending_events: VecDeque<(SharedEvent, usize)> =
237            VecDeque::with_capacity(batch_size + batch_size / 4);
238
239        // Convert all events to SharedEvents upfront
240        for event in events {
241            pending_events.push_back((Arc::new(event), 0));
242        }
243
244        const MAX_CHAIN_DEPTH: usize = 10;
245
246        // Collect emitted events to send in batch
247        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
248
249        // Process all events in FIFO order (critical for sequence patterns!)
250        while let Some((current_event, depth)) = pending_events.pop_front() {
251            if depth >= MAX_CHAIN_DEPTH {
252                debug!(
253                    "Max chain depth reached for event type: {}",
254                    current_event.event_type
255                );
256                continue;
257            }
258
259            // Get stream names (Arc clone is O(1))
260            let stream_names: Arc<[String]> = self
261                .router
262                .get_routes(&current_event.event_type)
263                .cloned()
264                .unwrap_or_else(|| Arc::from([]));
265
266            for stream_name in stream_names.iter() {
267                if let Some(stream) = self.streams.get_mut(stream_name) {
268                    let start = std::time::Instant::now();
269                    let result = Self::process_stream_with_functions(
270                        stream,
271                        Arc::clone(&current_event),
272                        &self.functions,
273                        self.sinks.cache(),
274                    )
275                    .await?;
276
277                    // Record per-stream processing in Prometheus
278                    if let Some(ref m) = self.metrics {
279                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
280                    }
281
282                    // Collect emitted events for batch sending
283                    self.output_events_emitted += result.emitted_events.len() as u64;
284                    let has_emitted = !result.emitted_events.is_empty();
285                    emitted_batch.extend(result.emitted_events);
286
287                    // If .process() or .to() was used but no .emit(), send output_events
288                    // to the output channel so they appear in the live event stream.
289                    let forward_outputs = !has_emitted
290                        && stream
291                            .operations
292                            .iter()
293                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
294                    if forward_outputs {
295                        self.output_events_emitted += result.output_events.len() as u64;
296                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
297                    }
298
299                    // Count sink events only when not already counted via forwarded outputs
300                    if !forward_outputs {
301                        self.output_events_emitted += result.sink_events_sent;
302                    }
303
304                    // Queue output events (push_back to maintain order)
305                    for output_event in result.output_events {
306                        pending_events.push_back((output_event, depth + 1));
307                    }
308                }
309            }
310        }
311
312        // Send all emitted events in batch (non-blocking to avoid async overhead)
313        // PERF: Use send_output_shared to avoid cloning in benchmark mode
314        for emitted in &emitted_batch {
315            self.send_output_shared(emitted);
316        }
317
318        // Update Prometheus output metrics
319        if let Some(ref m) = self.metrics {
320            for emitted in &emitted_batch {
321                m.record_output_event("pipeline", &emitted.event_type);
322            }
323            m.set_stream_count(self.streams.len());
324        }
325
326        Ok(())
327    }
328
329    /// Synchronous batch processing for maximum throughput.
330    /// Use this when no .to() sink operations are in the pipeline (e.g., benchmark mode).
331    /// Avoids async runtime overhead completely.
332    pub fn process_batch_sync(
333        &mut self,
334        events: Vec<Event>,
335    ) -> Result<(), super::error::EngineError> {
336        if events.is_empty() {
337            return Ok(());
338        }
339
340        let batch_size = events.len();
341        self.events_processed += batch_size as u64;
342
343        // Pre-allocate pending events with capacity for batch + some derived events
344        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
345        let mut pending_events: VecDeque<(SharedEvent, usize)> =
346            VecDeque::with_capacity(batch_size + batch_size / 4);
347
348        // Convert all events to SharedEvents upfront
349        for event in events {
350            pending_events.push_back((Arc::new(event), 0));
351        }
352
353        const MAX_CHAIN_DEPTH: usize = 10;
354
355        // Collect emitted events to send in batch
356        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
357
358        // Process all events in FIFO order (critical for sequence patterns!)
359        while let Some((current_event, depth)) = pending_events.pop_front() {
360            if depth >= MAX_CHAIN_DEPTH {
361                debug!(
362                    "Max chain depth reached for event type: {}",
363                    current_event.event_type
364                );
365                continue;
366            }
367
368            // Get stream names (Arc clone is O(1))
369            let stream_names: Arc<[String]> = self
370                .router
371                .get_routes(&current_event.event_type)
372                .cloned()
373                .unwrap_or_else(|| Arc::from([]));
374
375            for stream_name in stream_names.iter() {
376                if let Some(stream) = self.streams.get_mut(stream_name) {
377                    // Skip output clone+rename when stream has no downstream routes
378                    let skip_rename = self.router.get_routes(stream_name).is_none();
379                    let result = Self::process_stream_sync(
380                        stream,
381                        Arc::clone(&current_event),
382                        &self.functions,
383                        skip_rename,
384                    )?;
385
386                    // Collect emitted events for batch sending
387                    self.output_events_emitted += result.emitted_events.len() as u64;
388                    let has_emitted = !result.emitted_events.is_empty();
389                    emitted_batch.extend(result.emitted_events);
390
391                    // If .process() or .to() was used but no .emit(), send output_events
392                    // to the output channel so they appear in the live event stream.
393                    let forward_outputs = !has_emitted
394                        && stream
395                            .operations
396                            .iter()
397                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
398                    if forward_outputs {
399                        self.output_events_emitted += result.output_events.len() as u64;
400                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
401                    }
402
403                    // Count sink events only when not already counted via forwarded outputs
404                    if !forward_outputs {
405                        self.output_events_emitted += result.sink_events_sent;
406                    }
407
408                    // Queue output events (push_back to maintain order)
409                    for output_event in result.output_events {
410                        pending_events.push_back((output_event, depth + 1));
411                    }
412                }
413            }
414        }
415
416        // Send all emitted events in batch (non-blocking to avoid async overhead)
417        // PERF: Use send_output_shared to avoid cloning in benchmark mode
418        for emitted in &emitted_batch {
419            self.send_output_shared(emitted);
420        }
421
422        Ok(())
423    }
424
425    /// Synchronous stream processing - no async operations.
426    /// Skips .to() sink operations (which are the only async parts).
427    /// When `skip_output_rename` is true, output events skip the clone+rename step.
428    fn process_stream_sync(
429        stream: &mut StreamDefinition,
430        event: SharedEvent,
431        functions: &FxHashMap<String, UserFunction>,
432        skip_output_rename: bool,
433    ) -> Result<StreamProcessResult, super::error::EngineError> {
434        // For merge sources, check if the event passes the appropriate filter
435        if let RuntimeSource::Merge(ref sources) = stream.source {
436            let mut passes_filter = false;
437            for ms in sources {
438                if ms.event_type == *event.event_type {
439                    if let Some(ref filter) = ms.filter {
440                        let ctx = SequenceContext::new();
441                        if let Some(result) = evaluator::eval_expr_with_functions(
442                            filter,
443                            &event,
444                            &ctx,
445                            functions,
446                            &FxHashMap::default(),
447                        ) {
448                            if result.as_bool().unwrap_or(false) {
449                                passes_filter = true;
450                                break;
451                            }
452                        }
453                    } else {
454                        passes_filter = true;
455                        break;
456                    }
457                }
458            }
459            if !passes_filter {
460                return Ok(StreamProcessResult {
461                    emitted_events: vec![],
462                    output_events: vec![],
463                    sink_events_sent: 0,
464                });
465            }
466        }
467
468        // For join sources - return empty (join requires async in some paths)
469        if matches!(stream.source, RuntimeSource::Join(_)) {
470            return Ok(StreamProcessResult {
471                emitted_events: vec![],
472                output_events: vec![],
473                sink_events_sent: 0,
474            });
475        }
476
477        // Use synchronous pipeline execution
478        pipeline::execute_pipeline_sync(
479            stream,
480            vec![event],
481            0,
482            pipeline::SkipFlags::none(),
483            functions,
484            skip_output_rename,
485        )
486    }
487
488    /// Process a batch of pre-wrapped SharedEvents (zero-copy path for context pipelines)
489    #[tracing::instrument(level = "trace", skip(self))]
490    pub async fn process_batch_shared(
491        &mut self,
492        events: Vec<SharedEvent>,
493    ) -> Result<(), super::error::EngineError> {
494        if events.is_empty() {
495            return Ok(());
496        }
497
498        let batch_size = events.len();
499        self.events_processed += batch_size as u64;
500
501        // Update Prometheus metrics
502        if let Some(ref m) = self.metrics {
503            for event in &events {
504                m.record_event(&event.event_type);
505            }
506        }
507
508        // Use VecDeque so we can process in FIFO order (critical for sequence patterns!)
509        let mut pending_events: VecDeque<(SharedEvent, usize)> =
510            VecDeque::with_capacity(batch_size + batch_size / 4);
511
512        for event in events {
513            pending_events.push_back((event, 0));
514        }
515
516        const MAX_CHAIN_DEPTH: usize = 10;
517
518        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
519
520        // Process all events in FIFO order
521        while let Some((current_event, depth)) = pending_events.pop_front() {
522            if depth >= MAX_CHAIN_DEPTH {
523                debug!(
524                    "Max chain depth reached for event type: {}",
525                    current_event.event_type
526                );
527                continue;
528            }
529
530            let stream_names: Arc<[String]> = self
531                .router
532                .get_routes(&current_event.event_type)
533                .cloned()
534                .unwrap_or_else(|| Arc::from([]));
535
536            for stream_name in stream_names.iter() {
537                if let Some(stream) = self.streams.get_mut(stream_name) {
538                    let start = std::time::Instant::now();
539                    let result = Self::process_stream_with_functions(
540                        stream,
541                        Arc::clone(&current_event),
542                        &self.functions,
543                        self.sinks.cache(),
544                    )
545                    .await?;
546
547                    if let Some(ref m) = self.metrics {
548                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
549                    }
550
551                    self.output_events_emitted += result.emitted_events.len() as u64;
552                    let has_emitted = !result.emitted_events.is_empty();
553                    emitted_batch.extend(result.emitted_events);
554
555                    // If .process() or .to() was used but no .emit(), send output_events
556                    // to the output channel so they appear in the live event stream.
557                    let forward_outputs = !has_emitted
558                        && stream
559                            .operations
560                            .iter()
561                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
562                    if forward_outputs {
563                        self.output_events_emitted += result.output_events.len() as u64;
564                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
565                    }
566
567                    // Count sink events only when not already counted via forwarded outputs
568                    if !forward_outputs {
569                        self.output_events_emitted += result.sink_events_sent;
570                    }
571
572                    for output_event in result.output_events {
573                        pending_events.push_back((output_event, depth + 1));
574                    }
575                }
576            }
577        }
578
579        // PERF: Use send_output_shared to avoid cloning in benchmark mode
580        for emitted in &emitted_batch {
581            self.send_output_shared(emitted);
582        }
583
584        // Update Prometheus output metrics
585        if let Some(ref m) = self.metrics {
586            for emitted in &emitted_batch {
587                m.record_output_event("pipeline", &emitted.event_type);
588            }
589            m.set_stream_count(self.streams.len());
590        }
591
592        Ok(())
593    }
594
595    async fn process_stream_with_functions(
596        stream: &mut StreamDefinition,
597        event: SharedEvent,
598        functions: &FxHashMap<String, UserFunction>,
599        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
600    ) -> Result<StreamProcessResult, super::error::EngineError> {
601        // For merge sources, check if the event passes the appropriate filter
602        if let RuntimeSource::Merge(ref sources) = stream.source {
603            let mut passes_filter = false;
604            let mut matched_source_name = None;
605            for ms in sources {
606                if ms.event_type == *event.event_type {
607                    if let Some(ref filter) = ms.filter {
608                        let ctx = SequenceContext::new();
609                        if let Some(result) = evaluator::eval_expr_with_functions(
610                            filter,
611                            &event,
612                            &ctx,
613                            functions,
614                            &FxHashMap::default(),
615                        ) {
616                            if result.as_bool().unwrap_or(false) {
617                                passes_filter = true;
618                                matched_source_name = Some(&ms.name);
619                                break;
620                            }
621                        }
622                    } else {
623                        // No filter means it passes
624                        passes_filter = true;
625                        matched_source_name = Some(&ms.name);
626                        break;
627                    }
628                }
629            }
630            if !passes_filter {
631                return Ok(StreamProcessResult {
632                    emitted_events: vec![],
633                    output_events: vec![],
634                    sink_events_sent: 0,
635                });
636            }
637            // Log which merge source matched (uses ms.name)
638            if let Some(source_name) = matched_source_name {
639                tracing::trace!("Event matched merge source: {}", source_name);
640            }
641        }
642
643        // For join sources, route through the JoinBuffer for correlation
644        if let RuntimeSource::Join(ref _sources) = stream.source {
645            if let Some(ref mut join_buffer) = stream.join_buffer {
646                // Determine which source this event came from using the event_type_to_source mapping
647                // This maps event types (e.g., "MarketATick") to source names (e.g., "MarketA")
648                let source_name = stream
649                    .event_type_to_source
650                    .get(&*event.event_type)
651                    .cloned()
652                    .unwrap_or_else(|| event.event_type.to_string());
653
654                tracing::debug!(
655                    "Join stream {}: Adding event from source '{}' (event_type: {})",
656                    stream.name,
657                    source_name,
658                    event.event_type
659                );
660
661                // Add event to join buffer and try to correlate (join still needs owned Event)
662                match join_buffer.add_event(&source_name, (*event).clone()) {
663                    Some(correlated_event) => {
664                        tracing::debug!(
665                            "Join stream {}: Correlated event with {} fields",
666                            stream.name,
667                            correlated_event.data.len()
668                        );
669                        // Continue processing with the correlated event
670                        return Self::process_join_result(
671                            stream,
672                            Arc::new(correlated_event),
673                            functions,
674                            sinks,
675                        )
676                        .await;
677                    }
678                    None => {
679                        // No correlation yet - need events from all sources
680                        tracing::debug!(
681                            "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
682                            stream.name,
683                            join_buffer.stats()
684                        );
685                        return Ok(StreamProcessResult {
686                            emitted_events: vec![],
687                            output_events: vec![],
688                            sink_events_sent: 0,
689                        });
690                    }
691                }
692            }
693            tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
694            return Ok(StreamProcessResult {
695                emitted_events: vec![],
696                output_events: vec![],
697                sink_events_sent: 0,
698            });
699        }
700
701        // Delegate to unified pipeline execution
702        pipeline::execute_pipeline(
703            stream,
704            vec![event],
705            0,
706            pipeline::SkipFlags::none(),
707            functions,
708            sinks,
709        )
710        .await
711    }
712
713    /// Process a join result through the stream operations (skipping join-specific handling)
714    async fn process_join_result(
715        stream: &mut StreamDefinition,
716        correlated_event: SharedEvent,
717        functions: &FxHashMap<String, UserFunction>,
718        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
719    ) -> Result<StreamProcessResult, super::error::EngineError> {
720        // Delegate to unified pipeline with join-specific skip flags
721        pipeline::execute_pipeline(
722            stream,
723            vec![correlated_event],
724            0,
725            pipeline::SkipFlags::for_join(),
726            functions,
727            sinks,
728        )
729        .await
730    }
731
732    // =========================================================================
733    // Session Window Sweep
734    // =========================================================================
735
736    /// Flush all expired session windows and process the resulting events
737    /// through the remaining pipeline stages (aggregate, having, select, emit, etc.).
738    #[tracing::instrument(skip(self))]
739    pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
740        let now = chrono::Utc::now();
741        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
742
743        for stream_name in stream_names {
744            // Step 1: Find the window op index and collect expired events
745            let (window_idx, expired) = {
746                let stream = self.streams.get_mut(&stream_name).unwrap();
747                let mut result = Vec::new();
748                let mut found_idx = None;
749
750                for (idx, op) in stream.operations.iter_mut().enumerate() {
751                    if let RuntimeOp::Window(window) = op {
752                        match window {
753                            WindowType::Session(w) => {
754                                if let Some(events) = w.check_expired(now) {
755                                    result = events;
756                                }
757                                found_idx = Some(idx);
758                            }
759                            WindowType::PartitionedSession(w) => {
760                                for (_key, events) in w.check_expired(now) {
761                                    result.extend(events);
762                                }
763                                found_idx = Some(idx);
764                            }
765                            _ => {}
766                        }
767                        // Only process the first session window op per stream
768                        if found_idx.is_some() {
769                            break;
770                        }
771                    }
772                }
773                (found_idx, result)
774            };
775
776            if expired.is_empty() {
777                continue;
778            }
779
780            let window_idx = match window_idx {
781                Some(idx) => idx,
782                None => continue,
783            };
784
785            // Step 2: Process expired events through the post-window pipeline
786            let result = Self::process_post_window(
787                self.streams.get_mut(&stream_name).unwrap(),
788                expired,
789                window_idx,
790                &self.functions,
791                self.sinks.cache(),
792            )
793            .await?;
794
795            // Step 3: Send emitted events to output channel
796            for emitted in &result.emitted_events {
797                self.output_events_emitted += 1;
798                let owned = (**emitted).clone();
799                self.send_output(owned);
800            }
801        }
802
803        Ok(())
804    }
805
806    /// Process events through the pipeline operations that come after the window
807    /// at `window_idx`. This runs aggregate, having, select, emit, etc.
808    async fn process_post_window(
809        stream: &mut StreamDefinition,
810        events: Vec<SharedEvent>,
811        window_idx: usize,
812        functions: &FxHashMap<String, UserFunction>,
813        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
814    ) -> Result<StreamProcessResult, super::error::EngineError> {
815        // Delegate to unified pipeline starting after the window with post-window skip flags
816        pipeline::execute_pipeline(
817            stream,
818            events,
819            window_idx + 1,
820            pipeline::SkipFlags::for_post_window(),
821            functions,
822            sinks,
823        )
824        .await
825    }
826
827    /// Apply a watermark advance to all windows, triggering closure of expired windows.
828    #[tracing::instrument(skip(self))]
829    pub(super) async fn apply_watermark_to_windows(
830        &mut self,
831        wm: DateTime<Utc>,
832    ) -> Result<(), super::error::EngineError> {
833        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
834
835        for stream_name in stream_names {
836            let (window_idx, expired) = {
837                let stream = self.streams.get_mut(&stream_name).unwrap();
838                let mut result = Vec::new();
839                let mut found_idx = None;
840
841                for (idx, op) in stream.operations.iter_mut().enumerate() {
842                    if let RuntimeOp::Window(window) = op {
843                        let events: Option<Vec<SharedEvent>> = match window {
844                            WindowType::Tumbling(w) => w.advance_watermark(wm),
845                            WindowType::Sliding(w) => w.advance_watermark(wm),
846                            WindowType::Session(w) => w.advance_watermark(wm),
847                            WindowType::PartitionedTumbling(w) => {
848                                let parts = w.advance_watermark(wm);
849                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
850                                if all.is_empty() {
851                                    None
852                                } else {
853                                    Some(all)
854                                }
855                            }
856                            WindowType::PartitionedSliding(w) => {
857                                let parts = w.advance_watermark(wm);
858                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
859                                if all.is_empty() {
860                                    None
861                                } else {
862                                    Some(all)
863                                }
864                            }
865                            WindowType::PartitionedSession(w) => {
866                                let parts = w.advance_watermark(wm);
867                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
868                                if all.is_empty() {
869                                    None
870                                } else {
871                                    Some(all)
872                                }
873                            }
874                            _ => None, // Count-based windows don't use watermarks
875                        };
876
877                        if let Some(evts) = events {
878                            result = evts;
879                            found_idx = Some(idx);
880                        }
881                        break;
882                    }
883                }
884                (found_idx, result)
885            };
886
887            if expired.is_empty() {
888                continue;
889            }
890
891            let window_idx = match window_idx {
892                Some(idx) => idx,
893                None => continue,
894            };
895
896            let result = Self::process_post_window(
897                self.streams.get_mut(&stream_name).unwrap(),
898                expired,
899                window_idx,
900                &self.functions,
901                self.sinks.cache(),
902            )
903            .await?;
904
905            for emitted in &result.emitted_events {
906                self.output_events_emitted += 1;
907                let owned = (**emitted).clone();
908                self.send_output(owned);
909            }
910        }
911
912        Ok(())
913    }
914}