Skip to main content

varpulis_runtime/engine/
dispatch.rs

1//! Event dispatch: processing incoming events through stream pipelines.
2//!
3//! This module contains the `impl Engine` methods for event processing:
4//! `process()`, `process_shared()`, `process_inner()`, `process_batch()`,
5//! `process_batch_shared()`, `process_batch_sync()`, and their helpers.
6
7use std::collections::VecDeque;
8use std::sync::Arc;
9
10use chrono::{DateTime, Utc};
11use rustc_hash::FxHashMap;
12use tracing::debug;
13
14use super::types::{
15    RuntimeOp, RuntimeSource, StreamDefinition, StreamProcessResult, UserFunction, WindowType,
16};
17use super::{evaluator, pipeline, Engine};
18use crate::event::{Event, SharedEvent};
19use crate::sequence::SequenceContext;
20
21impl Engine {
22    /// Process an incoming event
23    #[tracing::instrument(level = "trace", skip(self))]
24    pub async fn process(&mut self, event: Event) -> Result<(), super::error::EngineError> {
25        self.events_processed += 1;
26        self.process_inner(Arc::new(event)).await
27    }
28
29    /// Process a pre-wrapped SharedEvent (zero-copy path for context pipelines)
30    #[tracing::instrument(level = "trace", skip(self))]
31    pub async fn process_shared(
32        &mut self,
33        event: SharedEvent,
34    ) -> Result<(), super::error::EngineError> {
35        self.events_processed += 1;
36        self.process_inner(event).await
37    }
38
39    /// Internal processing logic shared by process() and process_shared()
40    #[tracing::instrument(level = "trace", skip(self))]
41    async fn process_inner(&mut self, event: SharedEvent) -> Result<(), super::error::EngineError> {
42        // Record incoming event in Prometheus
43        if let Some(ref m) = self.metrics {
44            m.record_event(&event.event_type);
45        }
46
47        // Check for late data against the watermark
48        if let Some(ref tracker) = self.watermark_tracker {
49            if let Some(effective_wm) = tracker.effective_watermark() {
50                if event.timestamp < effective_wm {
51                    // Event is behind the watermark — check allowed lateness per stream
52                    let mut allowed = false;
53                    if let Some(stream_names) = self.router.get_routes(&event.event_type) {
54                        for sn in stream_names.iter() {
55                            if let Some(cfg) = self.late_data_configs.get(sn) {
56                                if event.timestamp >= effective_wm - cfg.allowed_lateness {
57                                    allowed = true;
58                                    break;
59                                }
60                            }
61                        }
62                    }
63                    if !allowed && !self.late_data_configs.is_empty() {
64                        // Route to side-output if configured, otherwise drop
65                        let mut routed = false;
66                        if let Some(stream_names) = self.router.get_routes(&event.event_type) {
67                            for sn in stream_names.iter() {
68                                if let Some(cfg) = self.late_data_configs.get(sn) {
69                                    if let Some(ref side_stream) = cfg.side_output_stream {
70                                        debug!(
71                                            "Routing late event to side-output '{}' type={} ts={}",
72                                            side_stream, event.event_type, event.timestamp
73                                        );
74                                        // Create a late-data event with metadata
75                                        let mut late_event = (*event).clone();
76                                        late_event.event_type = side_stream.clone().into();
77                                        self.send_output(late_event);
78                                        routed = true;
79                                        break;
80                                    }
81                                }
82                            }
83                        }
84                        if !routed {
85                            debug!(
86                                "Dropping late event type={} ts={} (watermark={})",
87                                event.event_type, event.timestamp, effective_wm
88                            );
89                        }
90                        return Ok(());
91                    }
92                }
93            }
94        }
95
96        // Process events with depth limit to prevent infinite loops
97        // Each event carries its depth level - use SharedEvent to avoid cloning
98        let mut pending_events: VecDeque<(SharedEvent, usize)> =
99            VecDeque::from([(event.clone(), 0)]);
100        const MAX_CHAIN_DEPTH: usize = 10;
101
102        // Observe event in watermark tracker (after processing to not block)
103        if let Some(ref mut tracker) = self.watermark_tracker {
104            tracker.observe_event(&event.event_type, event.timestamp);
105
106            if let Some(new_wm) = tracker.effective_watermark() {
107                if self.last_applied_watermark.is_none_or(|last| new_wm > last) {
108                    self.last_applied_watermark = Some(new_wm);
109                    // Note: we don't call apply_watermark_to_windows here to avoid
110                    // double-mutable-borrow. The caller should periodically flush.
111                }
112            }
113        }
114
115        // Process events iteratively, feeding output to dependent streams
116        while let Some((current_event, depth)) = pending_events.pop_front() {
117            // Prevent infinite loops by limiting chain depth
118            if depth >= MAX_CHAIN_DEPTH {
119                debug!(
120                    "Max chain depth reached for event type: {}",
121                    current_event.event_type
122                );
123                continue;
124            }
125
126            // Collect stream names to avoid borrowing issues
127            // PERF: Arc<[String]> clone is O(1) - just atomic increment, not deep copy
128            let stream_names: Arc<[String]> = self
129                .router
130                .get_routes(&current_event.event_type)
131                .cloned()
132                .unwrap_or_else(|| Arc::from([]));
133
134            for stream_name in stream_names.iter() {
135                if let Some(stream) = self.streams.get_mut(stream_name) {
136                    let start = std::time::Instant::now();
137                    let result = Self::process_stream_with_functions(
138                        stream,
139                        Arc::clone(&current_event),
140                        &self.functions,
141                        self.sinks.cache(),
142                    )
143                    .await?;
144
145                    // Record per-stream processing metrics
146                    if let Some(ref m) = self.metrics {
147                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
148                    }
149
150                    // Check if we need to send output_events to the output channel.
151                    // This is true when there are no .emit() events AND the stream has
152                    // a .process() UDF or a .to() sink (so sink events appear in the
153                    // live event stream / WebSocket relay).
154                    let send_outputs = result.emitted_events.is_empty()
155                        && stream
156                            .operations
157                            .iter()
158                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
159
160                    // Send emitted events to output channel (non-blocking)
161                    // PERF: Use send_output_shared for zero-copy when using SharedEvent channel
162                    if self.output_channel.is_some() {
163                        for emitted in &result.emitted_events {
164                            self.output_events_emitted += 1;
165                            if let Some(ref m) = self.metrics {
166                                m.record_output_event("pipeline", &emitted.event_type);
167                            }
168                            self.send_output_shared(emitted);
169                        }
170                        if send_outputs {
171                            for output in &result.output_events {
172                                self.output_events_emitted += 1;
173                                if let Some(ref m) = self.metrics {
174                                    m.record_output_event("pipeline", &output.event_type);
175                                }
176                                self.send_output_shared(output);
177                            }
178                        }
179                    } else {
180                        // Benchmark mode: just count, don't send
181                        self.output_events_emitted += result.emitted_events.len() as u64;
182                        if send_outputs {
183                            self.output_events_emitted += result.output_events.len() as u64;
184                        }
185                    }
186
187                    // Count events sent to connector sinks via .to() operations.
188                    // Skip if already counted via output_events above (avoids double-counting).
189                    if !send_outputs {
190                        self.output_events_emitted += result.sink_events_sent;
191                    }
192
193                    // Queue output events for processing by dependent streams
194                    for output_event in result.output_events {
195                        pending_events.push_back((output_event, depth + 1));
196                    }
197                }
198            }
199        }
200
201        // Update active streams gauge
202        if let Some(ref m) = self.metrics {
203            m.set_stream_count(self.streams.len());
204        }
205
206        Ok(())
207    }
208
209    /// Process a batch of events for improved throughput.
210    /// More efficient than calling process() repeatedly because:
211    /// - Pre-allocates SharedEvents in bulk
212    /// - Collects output events and sends in batches
213    /// - Amortizes async overhead
214    #[tracing::instrument(level = "trace", skip(self))]
215    pub async fn process_batch(
216        &mut self,
217        events: Vec<Event>,
218    ) -> Result<(), super::error::EngineError> {
219        if events.is_empty() {
220            return Ok(());
221        }
222
223        let batch_size = events.len();
224        self.events_processed += batch_size as u64;
225
226        // Update Prometheus metrics
227        if let Some(ref m) = self.metrics {
228            for event in &events {
229                m.record_event(&event.event_type);
230            }
231        }
232
233        // Pre-allocate pending events with capacity for batch + some derived events
234        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
235        let mut pending_events: VecDeque<(SharedEvent, usize)> =
236            VecDeque::with_capacity(batch_size + batch_size / 4);
237
238        // Convert all events to SharedEvents upfront
239        for event in events {
240            pending_events.push_back((Arc::new(event), 0));
241        }
242
243        const MAX_CHAIN_DEPTH: usize = 10;
244
245        // Collect emitted events to send in batch
246        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
247
248        // Process all events in FIFO order (critical for sequence patterns!)
249        while let Some((current_event, depth)) = pending_events.pop_front() {
250            if depth >= MAX_CHAIN_DEPTH {
251                debug!(
252                    "Max chain depth reached for event type: {}",
253                    current_event.event_type
254                );
255                continue;
256            }
257
258            // Get stream names (Arc clone is O(1))
259            let stream_names: Arc<[String]> = self
260                .router
261                .get_routes(&current_event.event_type)
262                .cloned()
263                .unwrap_or_else(|| Arc::from([]));
264
265            for stream_name in stream_names.iter() {
266                if let Some(stream) = self.streams.get_mut(stream_name) {
267                    let start = std::time::Instant::now();
268                    let result = Self::process_stream_with_functions(
269                        stream,
270                        Arc::clone(&current_event),
271                        &self.functions,
272                        self.sinks.cache(),
273                    )
274                    .await?;
275
276                    // Record per-stream processing in Prometheus
277                    if let Some(ref m) = self.metrics {
278                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
279                    }
280
281                    // Collect emitted events for batch sending
282                    self.output_events_emitted += result.emitted_events.len() as u64;
283                    let has_emitted = !result.emitted_events.is_empty();
284                    emitted_batch.extend(result.emitted_events);
285
286                    // If .process() or .to() was used but no .emit(), send output_events
287                    // to the output channel so they appear in the live event stream.
288                    let forward_outputs = !has_emitted
289                        && stream
290                            .operations
291                            .iter()
292                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
293                    if forward_outputs {
294                        self.output_events_emitted += result.output_events.len() as u64;
295                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
296                    }
297
298                    // Count sink events only when not already counted via forwarded outputs
299                    if !forward_outputs {
300                        self.output_events_emitted += result.sink_events_sent;
301                    }
302
303                    // Queue output events (push_back to maintain order)
304                    for output_event in result.output_events {
305                        pending_events.push_back((output_event, depth + 1));
306                    }
307                }
308            }
309        }
310
311        // Send all emitted events in batch (non-blocking to avoid async overhead)
312        // PERF: Use send_output_shared to avoid cloning in benchmark mode
313        for emitted in &emitted_batch {
314            self.send_output_shared(emitted);
315        }
316
317        // Update Prometheus output metrics
318        if let Some(ref m) = self.metrics {
319            for emitted in &emitted_batch {
320                m.record_output_event("pipeline", &emitted.event_type);
321            }
322            m.set_stream_count(self.streams.len());
323        }
324
325        Ok(())
326    }
327
328    /// Synchronous batch processing for maximum throughput.
329    /// Use this when no .to() sink operations are in the pipeline (e.g., benchmark mode).
330    /// Avoids async runtime overhead completely.
331    pub fn process_batch_sync(
332        &mut self,
333        events: Vec<Event>,
334    ) -> Result<(), super::error::EngineError> {
335        if events.is_empty() {
336            return Ok(());
337        }
338
339        let batch_size = events.len();
340        self.events_processed += batch_size as u64;
341
342        // Pre-allocate pending events with capacity for batch + some derived events
343        // Use VecDeque so we can process in FIFO order (push_back + pop_front)
344        let mut pending_events: VecDeque<(SharedEvent, usize)> =
345            VecDeque::with_capacity(batch_size + batch_size / 4);
346
347        // Convert all events to SharedEvents upfront
348        for event in events {
349            pending_events.push_back((Arc::new(event), 0));
350        }
351
352        const MAX_CHAIN_DEPTH: usize = 10;
353
354        // Collect emitted events to send in batch
355        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
356
357        // Process all events in FIFO order (critical for sequence patterns!)
358        while let Some((current_event, depth)) = pending_events.pop_front() {
359            if depth >= MAX_CHAIN_DEPTH {
360                debug!(
361                    "Max chain depth reached for event type: {}",
362                    current_event.event_type
363                );
364                continue;
365            }
366
367            // Get stream names (Arc clone is O(1))
368            let stream_names: Arc<[String]> = self
369                .router
370                .get_routes(&current_event.event_type)
371                .cloned()
372                .unwrap_or_else(|| Arc::from([]));
373
374            for stream_name in stream_names.iter() {
375                if let Some(stream) = self.streams.get_mut(stream_name) {
376                    // Skip output clone+rename when stream has no downstream routes
377                    let skip_rename = self.router.get_routes(stream_name).is_none();
378                    let result = Self::process_stream_sync(
379                        stream,
380                        Arc::clone(&current_event),
381                        &self.functions,
382                        skip_rename,
383                    )?;
384
385                    // Collect emitted events for batch sending
386                    self.output_events_emitted += result.emitted_events.len() as u64;
387                    let has_emitted = !result.emitted_events.is_empty();
388                    emitted_batch.extend(result.emitted_events);
389
390                    // If .process() or .to() was used but no .emit(), send output_events
391                    // to the output channel so they appear in the live event stream.
392                    let forward_outputs = !has_emitted
393                        && stream
394                            .operations
395                            .iter()
396                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
397                    if forward_outputs {
398                        self.output_events_emitted += result.output_events.len() as u64;
399                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
400                    }
401
402                    // Count sink events only when not already counted via forwarded outputs
403                    if !forward_outputs {
404                        self.output_events_emitted += result.sink_events_sent;
405                    }
406
407                    // Queue output events (push_back to maintain order)
408                    for output_event in result.output_events {
409                        pending_events.push_back((output_event, depth + 1));
410                    }
411                }
412            }
413        }
414
415        // Send all emitted events in batch (non-blocking to avoid async overhead)
416        // PERF: Use send_output_shared to avoid cloning in benchmark mode
417        for emitted in &emitted_batch {
418            self.send_output_shared(emitted);
419        }
420
421        Ok(())
422    }
423
424    /// Synchronous stream processing - no async operations.
425    /// Skips .to() sink operations (which are the only async parts).
426    /// When `skip_output_rename` is true, output events skip the clone+rename step.
427    fn process_stream_sync(
428        stream: &mut StreamDefinition,
429        event: SharedEvent,
430        functions: &FxHashMap<String, UserFunction>,
431        skip_output_rename: bool,
432    ) -> Result<StreamProcessResult, super::error::EngineError> {
433        // For merge sources, check if the event passes the appropriate filter
434        if let RuntimeSource::Merge(ref sources) = stream.source {
435            let mut passes_filter = false;
436            for ms in sources {
437                if ms.event_type == *event.event_type {
438                    if let Some(ref filter) = ms.filter {
439                        let ctx = SequenceContext::new();
440                        if let Some(result) = evaluator::eval_expr_with_functions(
441                            filter,
442                            &event,
443                            &ctx,
444                            functions,
445                            &FxHashMap::default(),
446                        ) {
447                            if result.as_bool().unwrap_or(false) {
448                                passes_filter = true;
449                                break;
450                            }
451                        }
452                    } else {
453                        passes_filter = true;
454                        break;
455                    }
456                }
457            }
458            if !passes_filter {
459                return Ok(StreamProcessResult {
460                    emitted_events: vec![],
461                    output_events: vec![],
462                    sink_events_sent: 0,
463                });
464            }
465        }
466
467        // For join sources - return empty (join requires async in some paths)
468        if matches!(stream.source, RuntimeSource::Join(_)) {
469            return Ok(StreamProcessResult {
470                emitted_events: vec![],
471                output_events: vec![],
472                sink_events_sent: 0,
473            });
474        }
475
476        // Use synchronous pipeline execution
477        pipeline::execute_pipeline_sync(
478            stream,
479            vec![event],
480            0,
481            pipeline::SkipFlags::none(),
482            functions,
483            skip_output_rename,
484        )
485    }
486
487    /// Process a batch of pre-wrapped SharedEvents (zero-copy path for context pipelines)
488    #[tracing::instrument(level = "trace", skip(self))]
489    pub async fn process_batch_shared(
490        &mut self,
491        events: Vec<SharedEvent>,
492    ) -> Result<(), super::error::EngineError> {
493        if events.is_empty() {
494            return Ok(());
495        }
496
497        let batch_size = events.len();
498        self.events_processed += batch_size as u64;
499
500        // Update Prometheus metrics
501        if let Some(ref m) = self.metrics {
502            for event in &events {
503                m.record_event(&event.event_type);
504            }
505        }
506
507        // Use VecDeque so we can process in FIFO order (critical for sequence patterns!)
508        let mut pending_events: VecDeque<(SharedEvent, usize)> =
509            VecDeque::with_capacity(batch_size + batch_size / 4);
510
511        for event in events {
512            pending_events.push_back((event, 0));
513        }
514
515        const MAX_CHAIN_DEPTH: usize = 10;
516
517        let mut emitted_batch: Vec<SharedEvent> = Vec::with_capacity(batch_size / 10);
518
519        // Process all events in FIFO order
520        while let Some((current_event, depth)) = pending_events.pop_front() {
521            if depth >= MAX_CHAIN_DEPTH {
522                debug!(
523                    "Max chain depth reached for event type: {}",
524                    current_event.event_type
525                );
526                continue;
527            }
528
529            let stream_names: Arc<[String]> = self
530                .router
531                .get_routes(&current_event.event_type)
532                .cloned()
533                .unwrap_or_else(|| Arc::from([]));
534
535            for stream_name in stream_names.iter() {
536                if let Some(stream) = self.streams.get_mut(stream_name) {
537                    let start = std::time::Instant::now();
538                    let result = Self::process_stream_with_functions(
539                        stream,
540                        Arc::clone(&current_event),
541                        &self.functions,
542                        self.sinks.cache(),
543                    )
544                    .await?;
545
546                    if let Some(ref m) = self.metrics {
547                        m.record_processing(stream_name, start.elapsed().as_secs_f64());
548                    }
549
550                    self.output_events_emitted += result.emitted_events.len() as u64;
551                    let has_emitted = !result.emitted_events.is_empty();
552                    emitted_batch.extend(result.emitted_events);
553
554                    // If .process() or .to() was used but no .emit(), send output_events
555                    // to the output channel so they appear in the live event stream.
556                    let forward_outputs = !has_emitted
557                        && stream
558                            .operations
559                            .iter()
560                            .any(|op| matches!(op, RuntimeOp::Process(_) | RuntimeOp::To(_)));
561                    if forward_outputs {
562                        self.output_events_emitted += result.output_events.len() as u64;
563                        emitted_batch.extend(result.output_events.iter().map(Arc::clone));
564                    }
565
566                    // Count sink events only when not already counted via forwarded outputs
567                    if !forward_outputs {
568                        self.output_events_emitted += result.sink_events_sent;
569                    }
570
571                    for output_event in result.output_events {
572                        pending_events.push_back((output_event, depth + 1));
573                    }
574                }
575            }
576        }
577
578        // PERF: Use send_output_shared to avoid cloning in benchmark mode
579        for emitted in &emitted_batch {
580            self.send_output_shared(emitted);
581        }
582
583        // Update Prometheus output metrics
584        if let Some(ref m) = self.metrics {
585            for emitted in &emitted_batch {
586                m.record_output_event("pipeline", &emitted.event_type);
587            }
588            m.set_stream_count(self.streams.len());
589        }
590
591        Ok(())
592    }
593
594    async fn process_stream_with_functions(
595        stream: &mut StreamDefinition,
596        event: SharedEvent,
597        functions: &FxHashMap<String, UserFunction>,
598        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
599    ) -> Result<StreamProcessResult, super::error::EngineError> {
600        // For merge sources, check if the event passes the appropriate filter
601        if let RuntimeSource::Merge(ref sources) = stream.source {
602            let mut passes_filter = false;
603            let mut matched_source_name = None;
604            for ms in sources {
605                if ms.event_type == *event.event_type {
606                    if let Some(ref filter) = ms.filter {
607                        let ctx = SequenceContext::new();
608                        if let Some(result) = evaluator::eval_expr_with_functions(
609                            filter,
610                            &event,
611                            &ctx,
612                            functions,
613                            &FxHashMap::default(),
614                        ) {
615                            if result.as_bool().unwrap_or(false) {
616                                passes_filter = true;
617                                matched_source_name = Some(&ms.name);
618                                break;
619                            }
620                        }
621                    } else {
622                        // No filter means it passes
623                        passes_filter = true;
624                        matched_source_name = Some(&ms.name);
625                        break;
626                    }
627                }
628            }
629            if !passes_filter {
630                return Ok(StreamProcessResult {
631                    emitted_events: vec![],
632                    output_events: vec![],
633                    sink_events_sent: 0,
634                });
635            }
636            // Log which merge source matched (uses ms.name)
637            if let Some(source_name) = matched_source_name {
638                tracing::trace!("Event matched merge source: {}", source_name);
639            }
640        }
641
642        // For join sources, route through the JoinBuffer for correlation
643        if let RuntimeSource::Join(ref _sources) = stream.source {
644            if let Some(ref mut join_buffer) = stream.join_buffer {
645                // Determine which source this event came from using the event_type_to_source mapping
646                // This maps event types (e.g., "MarketATick") to source names (e.g., "MarketA")
647                let source_name = stream
648                    .event_type_to_source
649                    .get(&*event.event_type)
650                    .cloned()
651                    .unwrap_or_else(|| event.event_type.to_string());
652
653                tracing::debug!(
654                    "Join stream {}: Adding event from source '{}' (event_type: {})",
655                    stream.name,
656                    source_name,
657                    event.event_type
658                );
659
660                // Add event to join buffer and try to correlate (join still needs owned Event)
661                match join_buffer.add_event(&source_name, (*event).clone()) {
662                    Some(correlated_event) => {
663                        tracing::debug!(
664                            "Join stream {}: Correlated event with {} fields",
665                            stream.name,
666                            correlated_event.data.len()
667                        );
668                        // Continue processing with the correlated event
669                        return Self::process_join_result(
670                            stream,
671                            Arc::new(correlated_event),
672                            functions,
673                            sinks,
674                        )
675                        .await;
676                    }
677                    None => {
678                        // No correlation yet - need events from all sources
679                        tracing::debug!(
680                            "Join stream {}: No correlation yet, waiting for more events (buffer stats: {:?})",
681                            stream.name,
682                            join_buffer.stats()
683                        );
684                        return Ok(StreamProcessResult {
685                            emitted_events: vec![],
686                            output_events: vec![],
687                            sink_events_sent: 0,
688                        });
689                    }
690                }
691            }
692            tracing::warn!("Join stream {} has no JoinBuffer configured", stream.name);
693            return Ok(StreamProcessResult {
694                emitted_events: vec![],
695                output_events: vec![],
696                sink_events_sent: 0,
697            });
698        }
699
700        // Delegate to unified pipeline execution
701        pipeline::execute_pipeline(
702            stream,
703            vec![event],
704            0,
705            pipeline::SkipFlags::none(),
706            functions,
707            sinks,
708        )
709        .await
710    }
711
712    /// Process a join result through the stream operations (skipping join-specific handling)
713    async fn process_join_result(
714        stream: &mut StreamDefinition,
715        correlated_event: SharedEvent,
716        functions: &FxHashMap<String, UserFunction>,
717        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
718    ) -> Result<StreamProcessResult, super::error::EngineError> {
719        // Delegate to unified pipeline with join-specific skip flags
720        pipeline::execute_pipeline(
721            stream,
722            vec![correlated_event],
723            0,
724            pipeline::SkipFlags::for_join(),
725            functions,
726            sinks,
727        )
728        .await
729    }
730
731    // =========================================================================
732    // Session Window Sweep
733    // =========================================================================
734
735    /// Flush all expired session windows and process the resulting events
736    /// through the remaining pipeline stages (aggregate, having, select, emit, etc.).
737    #[tracing::instrument(skip(self))]
738    pub async fn flush_expired_sessions(&mut self) -> Result<(), super::error::EngineError> {
739        let now = chrono::Utc::now();
740        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
741
742        for stream_name in stream_names {
743            // Step 1: Find the window op index and collect expired events
744            let (window_idx, expired) = {
745                let stream = self.streams.get_mut(&stream_name).unwrap();
746                let mut result = Vec::new();
747                let mut found_idx = None;
748
749                for (idx, op) in stream.operations.iter_mut().enumerate() {
750                    if let RuntimeOp::Window(window) = op {
751                        match window {
752                            WindowType::Session(w) => {
753                                if let Some(events) = w.check_expired(now) {
754                                    result = events;
755                                }
756                                found_idx = Some(idx);
757                            }
758                            WindowType::PartitionedSession(w) => {
759                                for (_key, events) in w.check_expired(now) {
760                                    result.extend(events);
761                                }
762                                found_idx = Some(idx);
763                            }
764                            _ => {}
765                        }
766                        // Only process the first session window op per stream
767                        if found_idx.is_some() {
768                            break;
769                        }
770                    }
771                }
772                (found_idx, result)
773            };
774
775            if expired.is_empty() {
776                continue;
777            }
778
779            let window_idx = match window_idx {
780                Some(idx) => idx,
781                None => continue,
782            };
783
784            // Step 2: Process expired events through the post-window pipeline
785            let result = Self::process_post_window(
786                self.streams.get_mut(&stream_name).unwrap(),
787                expired,
788                window_idx,
789                &self.functions,
790                self.sinks.cache(),
791            )
792            .await?;
793
794            // Step 3: Send emitted events to output channel
795            for emitted in &result.emitted_events {
796                self.output_events_emitted += 1;
797                let owned = (**emitted).clone();
798                self.send_output(owned);
799            }
800        }
801
802        Ok(())
803    }
804
805    /// Process events through the pipeline operations that come after the window
806    /// at `window_idx`. This runs aggregate, having, select, emit, etc.
807    async fn process_post_window(
808        stream: &mut StreamDefinition,
809        events: Vec<SharedEvent>,
810        window_idx: usize,
811        functions: &FxHashMap<String, UserFunction>,
812        sinks: &FxHashMap<String, Arc<dyn crate::sink::Sink>>,
813    ) -> Result<StreamProcessResult, super::error::EngineError> {
814        // Delegate to unified pipeline starting after the window with post-window skip flags
815        pipeline::execute_pipeline(
816            stream,
817            events,
818            window_idx + 1,
819            pipeline::SkipFlags::for_post_window(),
820            functions,
821            sinks,
822        )
823        .await
824    }
825
826    /// Apply a watermark advance to all windows, triggering closure of expired windows.
827    #[tracing::instrument(skip(self))]
828    pub(super) async fn apply_watermark_to_windows(
829        &mut self,
830        wm: DateTime<Utc>,
831    ) -> Result<(), super::error::EngineError> {
832        let stream_names: Vec<String> = self.streams.keys().cloned().collect();
833
834        for stream_name in stream_names {
835            let (window_idx, expired) = {
836                let stream = self.streams.get_mut(&stream_name).unwrap();
837                let mut result = Vec::new();
838                let mut found_idx = None;
839
840                for (idx, op) in stream.operations.iter_mut().enumerate() {
841                    if let RuntimeOp::Window(window) = op {
842                        let events: Option<Vec<SharedEvent>> = match window {
843                            WindowType::Tumbling(w) => w.advance_watermark(wm),
844                            WindowType::Sliding(w) => w.advance_watermark(wm),
845                            WindowType::Session(w) => w.advance_watermark(wm),
846                            WindowType::PartitionedTumbling(w) => {
847                                let parts = w.advance_watermark(wm);
848                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
849                                if all.is_empty() {
850                                    None
851                                } else {
852                                    Some(all)
853                                }
854                            }
855                            WindowType::PartitionedSliding(w) => {
856                                let parts = w.advance_watermark(wm);
857                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
858                                if all.is_empty() {
859                                    None
860                                } else {
861                                    Some(all)
862                                }
863                            }
864                            WindowType::PartitionedSession(w) => {
865                                let parts = w.advance_watermark(wm);
866                                let all: Vec<_> = parts.into_iter().flat_map(|(_, e)| e).collect();
867                                if all.is_empty() {
868                                    None
869                                } else {
870                                    Some(all)
871                                }
872                            }
873                            _ => None, // Count-based windows don't use watermarks
874                        };
875
876                        if let Some(evts) = events {
877                            result = evts;
878                            found_idx = Some(idx);
879                        }
880                        break;
881                    }
882                }
883                (found_idx, result)
884            };
885
886            if expired.is_empty() {
887                continue;
888            }
889
890            let window_idx = match window_idx {
891                Some(idx) => idx,
892                None => continue,
893            };
894
895            let result = Self::process_post_window(
896                self.streams.get_mut(&stream_name).unwrap(),
897                expired,
898                window_idx,
899                &self.functions,
900                self.sinks.cache(),
901            )
902            .await?;
903
904            for emitted in &result.emitted_events {
905                self.output_events_emitted += 1;
906                let owned = (**emitted).clone();
907                self.send_output(owned);
908            }
909        }
910
911        Ok(())
912    }
913}