Skip to main content

oxirs_stream/
window_algebra.rs

1//! # Stream Windowing Algebra
2//!
3//! Provides a composable windowing algebra for stream processing with support for
4//! tumbling, sliding, session, and count-based windows. Integrates with the
5//! existing watermark infrastructure for late-data handling and eviction.
6//!
7//! ## Features
8//!
9//! - **Tumbling windows**: Fixed-size, non-overlapping time intervals
10//! - **Sliding windows**: Fixed-size intervals that advance by a configurable slide
11//! - **Session windows**: Gap-based windows that merge when events arrive within a timeout
12//! - **Count-based windows**: Windows that trigger after N events
13//! - **Watermark-based eviction**: Automatically close and evict windows past the watermark
14//! - **Late data handling**: Configurable policies for events arriving after window close
15//! - **Window aggregation**: Composable fold/reduce over window contents
16
17use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{BTreeMap, HashMap, VecDeque};
20use std::time::Duration;
21
22// ─────────────────────────────────────────────
23// Window types
24// ─────────────────────────────────────────────
25
26/// The kind of window to apply.
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub enum WindowKind {
29    /// Fixed-size, non-overlapping time window.
30    Tumbling {
31        /// Window size.
32        size: Duration,
33    },
34    /// Fixed-size window that advances by `slide`.
35    Sliding {
36        /// Window size.
37        size: Duration,
38        /// Slide interval.
39        slide: Duration,
40    },
41    /// Gap-based window: a new window opens when no events arrive within `gap`.
42    Session {
43        /// Inactivity gap that closes the current session.
44        gap: Duration,
45    },
46    /// Trigger after receiving `count` events.
47    Count {
48        /// Number of events per window.
49        count: usize,
50    },
51}
52
53/// Policy for handling events that arrive after the window has been closed.
54#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
55pub enum LatePolicy {
56    /// Silently drop late events.
57    #[default]
58    Drop,
59    /// Accept into a side output.
60    SideOutput,
61    /// Reopen the window (allowed lateness).
62    AllowedLateness {
63        /// How long past the window end to still accept events.
64        lateness: Duration,
65    },
66}
67
68/// Configuration for the window algebra operator.
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct WindowAlgebraConfig {
71    /// The kind of window.
72    pub kind: WindowKind,
73    /// Policy for late-arriving events.
74    pub late_policy: LatePolicy,
75    /// Maximum number of open windows before forced eviction of the oldest.
76    pub max_open_windows: usize,
77    /// Whether to emit partial results on eviction.
78    pub emit_on_evict: bool,
79}
80
81impl Default for WindowAlgebraConfig {
82    fn default() -> Self {
83        Self {
84            kind: WindowKind::Tumbling {
85                size: Duration::from_secs(60),
86            },
87            late_policy: LatePolicy::default(),
88            max_open_windows: 10_000,
89            emit_on_evict: true,
90        }
91    }
92}
93
94// ─────────────────────────────────────────────
95// Window identifier and pane
96// ─────────────────────────────────────────────
97
98/// Identifies a window instance.
99#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
100pub struct WindowId {
101    /// Start of window (millis since epoch) or sequence number.
102    pub start: i64,
103    /// End of window (millis since epoch) or sequence number.
104    pub end: i64,
105    /// Optional key for keyed windows.
106    pub key: Option<String>,
107}
108
109impl WindowId {
110    /// Create a time-range window id.
111    pub fn time_range(start_ms: i64, end_ms: i64) -> Self {
112        Self {
113            start: start_ms,
114            end: end_ms,
115            key: None,
116        }
117    }
118
119    /// Create a keyed time-range window id.
120    pub fn keyed(start_ms: i64, end_ms: i64, key: impl Into<String>) -> Self {
121        Self {
122            start: start_ms,
123            end: end_ms,
124            key: Some(key.into()),
125        }
126    }
127
128    /// Duration of this window in milliseconds.
129    pub fn duration_ms(&self) -> i64 {
130        self.end - self.start
131    }
132
133    /// Whether `ts` falls within this window.
134    pub fn contains(&self, ts: i64) -> bool {
135        ts >= self.start && ts < self.end
136    }
137}
138
139/// A windowed event with its event-time timestamp.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct WindowEvent<T: Clone> {
142    /// The event payload.
143    pub value: T,
144    /// Event-time timestamp in milliseconds since epoch.
145    pub timestamp_ms: i64,
146    /// Ingestion time.
147    pub ingestion_time: DateTime<Utc>,
148}
149
150impl<T: Clone> WindowEvent<T> {
151    pub fn new(value: T, timestamp_ms: i64) -> Self {
152        Self {
153            value,
154            timestamp_ms,
155            ingestion_time: Utc::now(),
156        }
157    }
158}
159
160/// State of a single window pane.
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub enum WindowState {
163    /// Actively collecting events.
164    Open,
165    /// Closed but within allowed-lateness period.
166    Closing,
167    /// Fully closed and results emitted.
168    Closed,
169}
170
171/// A single window pane containing events.
172#[derive(Debug, Clone)]
173pub struct WindowPane<T: Clone> {
174    /// Window identifier.
175    pub id: WindowId,
176    /// Events in this pane.
177    pub events: Vec<WindowEvent<T>>,
178    /// Current state.
179    pub state: WindowState,
180    /// When the pane was created.
181    pub created_at: DateTime<Utc>,
182    /// Allowed lateness deadline (if applicable).
183    pub lateness_deadline_ms: Option<i64>,
184}
185
186impl<T: Clone> WindowPane<T> {
187    fn new(id: WindowId) -> Self {
188        Self {
189            id,
190            events: Vec::new(),
191            state: WindowState::Open,
192            created_at: Utc::now(),
193            lateness_deadline_ms: None,
194        }
195    }
196
197    fn new_with_lateness(id: WindowId, lateness_ms: i64) -> Self {
198        let deadline = id.end + lateness_ms;
199        Self {
200            id,
201            events: Vec::new(),
202            state: WindowState::Open,
203            created_at: Utc::now(),
204            lateness_deadline_ms: Some(deadline),
205        }
206    }
207
208    /// Number of events in this pane.
209    pub fn len(&self) -> usize {
210        self.events.len()
211    }
212
213    /// Whether this pane is empty.
214    pub fn is_empty(&self) -> bool {
215        self.events.is_empty()
216    }
217
218    /// Minimum event timestamp in this pane.
219    pub fn min_timestamp(&self) -> Option<i64> {
220        self.events.iter().map(|e| e.timestamp_ms).min()
221    }
222
223    /// Maximum event timestamp in this pane.
224    pub fn max_timestamp(&self) -> Option<i64> {
225        self.events.iter().map(|e| e.timestamp_ms).max()
226    }
227}
228
229/// Emitted when a window fires (triggers).
230#[derive(Debug, Clone)]
231pub struct WindowOutput<T: Clone> {
232    /// The window that fired.
233    pub window_id: WindowId,
234    /// Events in the window.
235    pub events: Vec<WindowEvent<T>>,
236    /// Whether this is a partial result (from eviction).
237    pub is_partial: bool,
238    /// Number of late events that were dropped.
239    pub late_dropped: usize,
240    /// Number of late events accepted into side output.
241    pub late_side_output: usize,
242}
243
244/// Statistics for the window algebra operator.
245#[derive(Debug, Clone, Default, Serialize, Deserialize)]
246pub struct WindowAlgebraStats {
247    /// Total events processed.
248    pub total_events: u64,
249    /// Total windows opened.
250    pub windows_opened: u64,
251    /// Total windows closed.
252    pub windows_closed: u64,
253    /// Total windows evicted.
254    pub windows_evicted: u64,
255    /// Total late events dropped.
256    pub late_events_dropped: u64,
257    /// Total late events sent to side output.
258    pub late_events_side_output: u64,
259    /// Total late events accepted within allowed lateness.
260    pub late_events_accepted: u64,
261    /// Currently open windows.
262    pub open_windows: u64,
263}
264
265// ─────────────────────────────────────────────
266// WindowAlgebra operator
267// ─────────────────────────────────────────────
268
269/// The main stream windowing algebra operator.
270///
271/// Generic over the event payload type `T`.
272pub struct WindowAlgebra<T: Clone> {
273    config: WindowAlgebraConfig,
274    /// Open window panes keyed by WindowId.
275    panes: BTreeMap<WindowId, WindowPane<T>>,
276    /// Side output for late events.
277    side_output: VecDeque<WindowEvent<T>>,
278    /// Current watermark (millis).
279    watermark_ms: i64,
280    /// Statistics.
281    stats: WindowAlgebraStats,
282    // For count windows: buffer keyed by key
283    count_buffers: HashMap<Option<String>, Vec<WindowEvent<T>>>,
284    // Monotonic sequence for count window ids
285    count_seq: i64,
286    // For session windows: last event time per key
287    session_last_event: HashMap<Option<String>, i64>,
288}
289
290impl<T: Clone> WindowAlgebra<T> {
291    /// Create a new windowing operator with the given configuration.
292    pub fn new(config: WindowAlgebraConfig) -> Self {
293        Self {
294            config,
295            panes: BTreeMap::new(),
296            side_output: VecDeque::new(),
297            watermark_ms: i64::MIN,
298            stats: WindowAlgebraStats::default(),
299            count_buffers: HashMap::new(),
300            count_seq: 0,
301            session_last_event: HashMap::new(),
302        }
303    }
304
305    /// Create a tumbling window operator.
306    pub fn tumbling(size: Duration) -> Self {
307        Self::new(WindowAlgebraConfig {
308            kind: WindowKind::Tumbling { size },
309            ..Default::default()
310        })
311    }
312
313    /// Create a sliding window operator.
314    pub fn sliding(size: Duration, slide: Duration) -> Self {
315        Self::new(WindowAlgebraConfig {
316            kind: WindowKind::Sliding { size, slide },
317            ..Default::default()
318        })
319    }
320
321    /// Create a session window operator.
322    pub fn session(gap: Duration) -> Self {
323        Self::new(WindowAlgebraConfig {
324            kind: WindowKind::Session { gap },
325            ..Default::default()
326        })
327    }
328
329    /// Create a count-based window operator.
330    pub fn count(count: usize) -> Self {
331        Self::new(WindowAlgebraConfig {
332            kind: WindowKind::Count { count },
333            ..Default::default()
334        })
335    }
336
337    /// Set the late data policy.
338    pub fn with_late_policy(mut self, policy: LatePolicy) -> Self {
339        self.config.late_policy = policy;
340        self
341    }
342
343    /// Set the maximum number of open windows.
344    pub fn with_max_open_windows(mut self, max: usize) -> Self {
345        self.config.max_open_windows = max;
346        self
347    }
348
349    /// Get current statistics.
350    pub fn stats(&self) -> &WindowAlgebraStats {
351        &self.stats
352    }
353
354    /// Get current watermark in milliseconds.
355    pub fn watermark_ms(&self) -> i64 {
356        self.watermark_ms
357    }
358
359    /// Get the number of currently open panes.
360    pub fn open_pane_count(&self) -> usize {
361        self.panes
362            .values()
363            .filter(|p| p.state == WindowState::Open || p.state == WindowState::Closing)
364            .count()
365    }
366
367    /// Get the side output (late events).
368    pub fn drain_side_output(&mut self) -> Vec<WindowEvent<T>> {
369        self.side_output.drain(..).collect()
370    }
371
372    /// Advance the watermark and return any windows that should be closed.
373    pub fn advance_watermark(&mut self, watermark_ms: i64) -> Vec<WindowOutput<T>> {
374        if watermark_ms <= self.watermark_ms {
375            return Vec::new();
376        }
377        self.watermark_ms = watermark_ms;
378        self.close_expired_windows()
379    }
380
381    /// Ingest a single event. Returns any triggered window outputs.
382    pub fn ingest(&mut self, event: WindowEvent<T>) -> Vec<WindowOutput<T>> {
383        self.stats.total_events += 1;
384
385        match &self.config.kind {
386            WindowKind::Tumbling { size } => self.ingest_tumbling(event, *size),
387            WindowKind::Sliding { size, slide } => self.ingest_sliding(event, *size, *slide),
388            WindowKind::Session { gap } => self.ingest_session(event, *gap),
389            WindowKind::Count { count } => self.ingest_count(event, *count),
390        }
391    }
392
393    /// Ingest a batch of events.
394    pub fn ingest_batch(&mut self, events: Vec<WindowEvent<T>>) -> Vec<WindowOutput<T>> {
395        let mut outputs = Vec::new();
396        for event in events {
397            outputs.extend(self.ingest(event));
398        }
399        outputs
400    }
401
402    // ─── Tumbling ────────────────────────────────────────
403
404    fn ingest_tumbling(&mut self, event: WindowEvent<T>, size: Duration) -> Vec<WindowOutput<T>> {
405        let size_ms = size.as_millis() as i64;
406        if size_ms == 0 {
407            return Vec::new();
408        }
409        let window_start = (event.timestamp_ms / size_ms) * size_ms;
410        let window_end = window_start + size_ms;
411        let wid = WindowId::time_range(window_start, window_end);
412
413        self.assign_to_window(event, wid)
414    }
415
416    // ─── Sliding ─────────────────────────────────────────
417
418    fn ingest_sliding(
419        &mut self,
420        event: WindowEvent<T>,
421        size: Duration,
422        slide: Duration,
423    ) -> Vec<WindowOutput<T>> {
424        let size_ms = size.as_millis() as i64;
425        let slide_ms = slide.as_millis() as i64;
426        if slide_ms == 0 || size_ms == 0 {
427            return Vec::new();
428        }
429
430        // An event belongs to all windows whose [start, start+size) contains it
431        let ts = event.timestamp_ms;
432        // The earliest window start that could contain ts
433        let latest_start = (ts / slide_ms) * slide_ms;
434        let earliest_start = latest_start - size_ms + slide_ms;
435
436        let mut outputs = Vec::new();
437        let mut start = earliest_start;
438        while start <= latest_start {
439            let end = start + size_ms;
440            if ts >= start && ts < end {
441                let wid = WindowId::time_range(start, end);
442                outputs.extend(self.assign_to_window(event.clone(), wid));
443            }
444            start += slide_ms;
445        }
446        outputs
447    }
448
449    // ─── Session ─────────────────────────────────────────
450
451    fn ingest_session(&mut self, event: WindowEvent<T>, gap: Duration) -> Vec<WindowOutput<T>> {
452        let gap_ms = gap.as_millis() as i64;
453        let ts = event.timestamp_ms;
454        let key: Option<String> = None; // non-keyed session
455
456        let mut outputs = Vec::new();
457
458        if let Some(&last_ts) = self.session_last_event.get(&key) {
459            if ts - last_ts > gap_ms {
460                // Gap exceeded: close the current session window
461                outputs.extend(self.close_session_windows(&key));
462            }
463        }
464
465        // Find or create the active session window for this key.
466        // If an existing session is extended, migrate its pane to the new key.
467        let active_wid = self.find_or_extend_active_session(&key, ts, gap_ms);
468        self.session_last_event.insert(key.clone(), ts);
469
470        outputs.extend(self.assign_to_window(event, active_wid));
471        outputs
472    }
473
474    fn find_or_extend_active_session(
475        &mut self,
476        key: &Option<String>,
477        ts: i64,
478        gap_ms: i64,
479    ) -> WindowId {
480        // Look for an open session window whose key matches
481        let existing_wid = self
482            .panes
483            .iter()
484            .find(|(wid, pane)| {
485                wid.key == *key && pane.state == WindowState::Open && ts >= wid.start
486            })
487            .map(|(wid, _)| wid.clone());
488
489        if let Some(old_wid) = existing_wid {
490            let new_end = ts + gap_ms;
491            if new_end == old_wid.end {
492                // No change needed, reuse existing window id
493                return old_wid;
494            }
495            // Build the extended window id
496            let new_wid = WindowId {
497                start: old_wid.start,
498                end: new_end,
499                key: key.clone(),
500            };
501            // Migrate the pane from old key to new key so events accumulate
502            // in a single pane rather than creating duplicates.
503            if let Some(mut pane) = self.panes.remove(&old_wid) {
504                pane.id = new_wid.clone();
505                self.panes.insert(new_wid.clone(), pane);
506            }
507            new_wid
508        } else {
509            // No active session, create new
510            WindowId {
511                start: ts,
512                end: ts + gap_ms,
513                key: key.clone(),
514            }
515        }
516    }
517
518    fn close_session_windows(&mut self, key: &Option<String>) -> Vec<WindowOutput<T>> {
519        let mut outputs = Vec::new();
520        let wids_to_close: Vec<WindowId> = self
521            .panes
522            .keys()
523            .filter(|wid| wid.key == *key)
524            .cloned()
525            .collect();
526
527        for wid in wids_to_close {
528            if let Some(mut pane) = self.panes.remove(&wid) {
529                pane.state = WindowState::Closed;
530                self.stats.windows_closed += 1;
531                self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
532                outputs.push(WindowOutput {
533                    window_id: wid,
534                    events: pane.events,
535                    is_partial: false,
536                    late_dropped: 0,
537                    late_side_output: 0,
538                });
539            }
540        }
541        outputs
542    }
543
544    // ─── Count-based ─────────────────────────────────────
545
546    fn ingest_count(&mut self, event: WindowEvent<T>, count: usize) -> Vec<WindowOutput<T>> {
547        let key: Option<String> = None;
548        let buf = self.count_buffers.entry(key.clone()).or_default();
549        buf.push(event);
550
551        let mut outputs = Vec::new();
552        while buf.len() >= count {
553            let window_events: Vec<_> = buf.drain(..count).collect();
554            let seq = self.count_seq;
555            self.count_seq += 1;
556            let wid = WindowId {
557                start: seq * count as i64,
558                end: (seq + 1) * count as i64,
559                key: key.clone(),
560            };
561            self.stats.windows_opened += 1;
562            self.stats.windows_closed += 1;
563            outputs.push(WindowOutput {
564                window_id: wid,
565                events: window_events,
566                is_partial: false,
567                late_dropped: 0,
568                late_side_output: 0,
569            });
570        }
571        outputs
572    }
573
574    // ─── Common assignment ───────────────────────────────
575
576    fn assign_to_window(&mut self, event: WindowEvent<T>, wid: WindowId) -> Vec<WindowOutput<T>> {
577        let mut outputs = Vec::new();
578
579        // Check if window is already closed
580        if let Some(pane) = self.panes.get(&wid) {
581            match pane.state {
582                WindowState::Closed => {
583                    return self.handle_late_event(event);
584                }
585                WindowState::Closing => {
586                    // Check allowed lateness
587                    if let Some(deadline) = pane.lateness_deadline_ms {
588                        if event.timestamp_ms > deadline {
589                            return self.handle_late_event(event);
590                        }
591                    }
592                    // Fall through: accept event
593                }
594                WindowState::Open => {
595                    // Normal case
596                }
597            }
598        }
599
600        // Check if event is before watermark and window doesn't exist yet
601        if event.timestamp_ms < self.watermark_ms && !self.panes.contains_key(&wid) {
602            return self.handle_late_event(event);
603        }
604
605        // Get or create pane
606        if !self.panes.contains_key(&wid) {
607            let pane = match self.config.late_policy {
608                LatePolicy::AllowedLateness { lateness } => {
609                    WindowPane::new_with_lateness(wid.clone(), lateness.as_millis() as i64)
610                }
611                _ => WindowPane::new(wid.clone()),
612            };
613            self.panes.insert(wid.clone(), pane);
614            self.stats.windows_opened += 1;
615            self.stats.open_windows += 1;
616        }
617
618        if let Some(pane) = self.panes.get_mut(&wid) {
619            pane.events.push(event);
620        }
621
622        // Enforce max open windows
623        outputs.extend(self.enforce_max_open_windows());
624
625        outputs
626    }
627
628    fn handle_late_event(&mut self, event: WindowEvent<T>) -> Vec<WindowOutput<T>> {
629        match self.config.late_policy {
630            LatePolicy::Drop => {
631                self.stats.late_events_dropped += 1;
632            }
633            LatePolicy::SideOutput => {
634                self.stats.late_events_side_output += 1;
635                self.side_output.push_back(event);
636            }
637            LatePolicy::AllowedLateness { .. } => {
638                // If we're here, it's past the allowed lateness too
639                self.stats.late_events_dropped += 1;
640            }
641        }
642        Vec::new()
643    }
644
645    fn close_expired_windows(&mut self) -> Vec<WindowOutput<T>> {
646        let mut outputs = Vec::new();
647        let wm = self.watermark_ms;
648
649        let expired: Vec<WindowId> = self
650            .panes
651            .iter()
652            .filter(|(wid, pane)| {
653                if pane.state == WindowState::Closed {
654                    return false;
655                }
656                match pane.lateness_deadline_ms {
657                    Some(deadline) => wm >= deadline,
658                    None => wm >= wid.end,
659                }
660            })
661            .map(|(wid, _)| wid.clone())
662            .collect();
663
664        for wid in expired {
665            if let Some(mut pane) = self.panes.remove(&wid) {
666                pane.state = WindowState::Closed;
667                self.stats.windows_closed += 1;
668                self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
669                outputs.push(WindowOutput {
670                    window_id: wid,
671                    events: pane.events,
672                    is_partial: false,
673                    late_dropped: 0,
674                    late_side_output: 0,
675                });
676            }
677        }
678
679        outputs
680    }
681
682    fn enforce_max_open_windows(&mut self) -> Vec<WindowOutput<T>> {
683        let mut outputs = Vec::new();
684        while self.panes.len() > self.config.max_open_windows {
685            // Evict oldest window (smallest start)
686            if let Some(wid) = self.panes.keys().next().cloned() {
687                if let Some(mut pane) = self.panes.remove(&wid) {
688                    pane.state = WindowState::Closed;
689                    self.stats.windows_evicted += 1;
690                    self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
691                    if self.config.emit_on_evict {
692                        outputs.push(WindowOutput {
693                            window_id: wid,
694                            events: pane.events,
695                            is_partial: true,
696                            late_dropped: 0,
697                            late_side_output: 0,
698                        });
699                    }
700                }
701            } else {
702                break;
703            }
704        }
705        outputs
706    }
707
708    /// Flush all open windows, emitting their contents.
709    pub fn flush(&mut self) -> Vec<WindowOutput<T>> {
710        let mut outputs = Vec::new();
711
712        // Also flush count buffers
713        for (key, buf) in self.count_buffers.drain() {
714            if !buf.is_empty() {
715                let seq = self.count_seq;
716                self.count_seq += 1;
717                let wid = WindowId {
718                    start: seq * 1000,
719                    end: (seq + 1) * 1000,
720                    key,
721                };
722                outputs.push(WindowOutput {
723                    window_id: wid,
724                    events: buf,
725                    is_partial: true,
726                    late_dropped: 0,
727                    late_side_output: 0,
728                });
729            }
730        }
731
732        let wids: Vec<_> = self.panes.keys().cloned().collect();
733        for wid in wids {
734            if let Some(mut pane) = self.panes.remove(&wid) {
735                pane.state = WindowState::Closed;
736                self.stats.windows_closed += 1;
737                self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
738                outputs.push(WindowOutput {
739                    window_id: wid,
740                    events: pane.events,
741                    is_partial: true,
742                    late_dropped: 0,
743                    late_side_output: 0,
744                });
745            }
746        }
747        outputs
748    }
749
750    /// Aggregate window contents using a fold function.
751    pub fn aggregate<A, F>(&self, window_id: &WindowId, init: A, fold: F) -> Option<A>
752    where
753        F: Fn(A, &T) -> A,
754    {
755        self.panes.get(window_id).map(|pane| {
756            pane.events
757                .iter()
758                .fold(init, |acc, evt| fold(acc, &evt.value))
759        })
760    }
761}
762
763// ─────────────────────────────────────────────
764// Window Assigner (helper to compute window assignments)
765// ─────────────────────────────────────────────
766
767/// Computes which tumbling windows a given timestamp belongs to.
768pub fn tumbling_window_for(ts_ms: i64, size: Duration) -> WindowId {
769    let size_ms = size.as_millis() as i64;
770    if size_ms == 0 {
771        return WindowId::time_range(ts_ms, ts_ms);
772    }
773    let start = (ts_ms / size_ms) * size_ms;
774    WindowId::time_range(start, start + size_ms)
775}
776
777/// Computes which sliding windows a given timestamp belongs to.
778pub fn sliding_windows_for(ts_ms: i64, size: Duration, slide: Duration) -> Vec<WindowId> {
779    let size_ms = size.as_millis() as i64;
780    let slide_ms = slide.as_millis() as i64;
781    if slide_ms == 0 || size_ms == 0 {
782        return Vec::new();
783    }
784    let latest_start = (ts_ms / slide_ms) * slide_ms;
785    let earliest_start = latest_start - size_ms + slide_ms;
786
787    let mut windows = Vec::new();
788    let mut start = earliest_start;
789    while start <= latest_start {
790        let end = start + size_ms;
791        if ts_ms >= start && ts_ms < end {
792            windows.push(WindowId::time_range(start, end));
793        }
794        start += slide_ms;
795    }
796    windows
797}
798
799// ─────────────────────────────────────────────
800// Tests
801// ─────────────────────────────────────────────
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806
807    // Helper to create events at given timestamps
808    fn events(timestamps: &[i64]) -> Vec<WindowEvent<i64>> {
809        timestamps
810            .iter()
811            .map(|&ts| WindowEvent::new(ts, ts))
812            .collect()
813    }
814
815    fn event_at(ts: i64) -> WindowEvent<i64> {
816        WindowEvent::new(ts, ts)
817    }
818
819    // ═══ WindowId tests ═══════════════════════════════════
820
821    #[test]
822    fn test_window_id_time_range() {
823        let wid = WindowId::time_range(1000, 2000);
824        assert_eq!(wid.start, 1000);
825        assert_eq!(wid.end, 2000);
826        assert!(wid.key.is_none());
827    }
828
829    #[test]
830    fn test_window_id_keyed() {
831        let wid = WindowId::keyed(0, 100, "sensor-1");
832        assert_eq!(wid.key, Some("sensor-1".to_string()));
833    }
834
835    #[test]
836    fn test_window_id_duration() {
837        let wid = WindowId::time_range(1000, 2000);
838        assert_eq!(wid.duration_ms(), 1000);
839    }
840
841    #[test]
842    fn test_window_id_contains() {
843        let wid = WindowId::time_range(1000, 2000);
844        assert!(wid.contains(1000));
845        assert!(wid.contains(1500));
846        assert!(wid.contains(1999));
847        assert!(!wid.contains(2000)); // exclusive end
848        assert!(!wid.contains(999));
849    }
850
851    // ═══ WindowEvent tests ═══════════════════════════════
852
853    #[test]
854    fn test_window_event_creation() {
855        let evt = WindowEvent::new(42, 1000);
856        assert_eq!(evt.value, 42);
857        assert_eq!(evt.timestamp_ms, 1000);
858    }
859
860    // ═══ WindowPane tests ═══════════════════════════════
861
862    #[test]
863    fn test_window_pane_empty() {
864        let pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
865        assert!(pane.is_empty());
866        assert_eq!(pane.len(), 0);
867        assert_eq!(pane.state, WindowState::Open);
868    }
869
870    #[test]
871    fn test_window_pane_min_max_timestamp() {
872        let mut pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
873        pane.events.push(WindowEvent::new(1, 500));
874        pane.events.push(WindowEvent::new(2, 100));
875        pane.events.push(WindowEvent::new(3, 900));
876        assert_eq!(pane.min_timestamp(), Some(100));
877        assert_eq!(pane.max_timestamp(), Some(900));
878    }
879
880    #[test]
881    fn test_window_pane_no_timestamps() {
882        let pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
883        assert_eq!(pane.min_timestamp(), None);
884        assert_eq!(pane.max_timestamp(), None);
885    }
886
887    // ═══ Tumbling window tests ═══════════════════════════
888
889    #[test]
890    fn test_tumbling_basic() {
891        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
892        // Events at 0, 5, 9 should all be in [0, 10000)
893        for ts in [0, 5000, 9999] {
894            wa.ingest(event_at(ts));
895        }
896        assert_eq!(wa.stats().total_events, 3);
897        assert_eq!(wa.open_pane_count(), 1);
898    }
899
900    #[test]
901    fn test_tumbling_multiple_windows() {
902        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
903        // Window [0..10000)
904        wa.ingest(event_at(0));
905        wa.ingest(event_at(5000));
906        // Window [10000..20000)
907        wa.ingest(event_at(10000));
908        wa.ingest(event_at(15000));
909        assert_eq!(wa.open_pane_count(), 2);
910    }
911
912    #[test]
913    fn test_tumbling_watermark_closes_window() {
914        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
915        wa.ingest(event_at(0));
916        wa.ingest(event_at(5000));
917
918        let outputs = wa.advance_watermark(10000);
919        assert_eq!(outputs.len(), 1);
920        assert_eq!(outputs[0].window_id.start, 0);
921        assert_eq!(outputs[0].window_id.end, 10000);
922        assert_eq!(outputs[0].events.len(), 2);
923        assert!(!outputs[0].is_partial);
924    }
925
926    #[test]
927    fn test_tumbling_watermark_no_close_if_below() {
928        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
929        wa.ingest(event_at(0));
930        let outputs = wa.advance_watermark(5000);
931        assert!(outputs.is_empty());
932    }
933
934    #[test]
935    fn test_tumbling_late_event_dropped() {
936        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
937        wa.advance_watermark(20000);
938        wa.ingest(event_at(5000)); // late
939        assert_eq!(wa.stats().late_events_dropped, 1);
940    }
941
942    #[test]
943    fn test_tumbling_late_event_side_output() {
944        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
945            .with_late_policy(LatePolicy::SideOutput);
946        wa.advance_watermark(20000);
947        wa.ingest(event_at(5000)); // late
948        assert_eq!(wa.stats().late_events_side_output, 1);
949        let side = wa.drain_side_output();
950        assert_eq!(side.len(), 1);
951        assert_eq!(side[0].timestamp_ms, 5000);
952    }
953
954    #[test]
955    fn test_tumbling_flush() {
956        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
957        wa.ingest(event_at(0));
958        wa.ingest(event_at(5000));
959        wa.ingest(event_at(15000));
960
961        let outputs = wa.flush();
962        assert_eq!(outputs.len(), 2);
963        assert!(outputs.iter().all(|o| o.is_partial));
964    }
965
966    // ═══ Sliding window tests ════════════════════════════
967
968    #[test]
969    fn test_sliding_basic() {
970        let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
971        wa.ingest(event_at(7500));
972        // Event at 7500 with size=10s, slide=5s should be in windows starting at 0 and 5000
973        assert!(wa.open_pane_count() >= 1);
974    }
975
976    #[test]
977    fn test_sliding_event_in_multiple_windows() {
978        let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
979        wa.ingest(event_at(6000));
980        // Event at 6000ms with size=10000ms, slide=5000ms:
981        // Window [0, 10000) contains it, Window [5000, 15000) contains it
982        assert_eq!(wa.open_pane_count(), 2);
983    }
984
985    #[test]
986    fn test_sliding_watermark_closes_old_windows() {
987        let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
988        wa.ingest(event_at(3000));
989        wa.ingest(event_at(6000));
990        let outputs = wa.advance_watermark(10000);
991        // Window [0, 10000) should be closed
992        assert!(!outputs.is_empty());
993    }
994
995    #[test]
996    fn test_sliding_window_helper() {
997        let windows = sliding_windows_for(7500, Duration::from_secs(10), Duration::from_secs(5));
998        assert!(!windows.is_empty());
999        assert!(windows.iter().all(|w| w.contains(7500)));
1000    }
1001
1002    // ═══ Session window tests ════════════════════════════
1003
1004    #[test]
1005    fn test_session_basic() {
1006        let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1007        wa.ingest(event_at(1000));
1008        wa.ingest(event_at(3000));
1009        wa.ingest(event_at(4000));
1010        // All within 5s gap, should be one session
1011        assert_eq!(wa.open_pane_count(), 1);
1012    }
1013
1014    #[test]
1015    fn test_session_gap_closes_window() {
1016        let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1017        wa.ingest(event_at(1000));
1018        wa.ingest(event_at(3000));
1019        // Gap > 5000ms, should close previous session
1020        let outputs = wa.ingest(event_at(10000));
1021        assert!(!outputs.is_empty());
1022    }
1023
1024    #[test]
1025    fn test_session_multiple_sessions() {
1026        let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1027        wa.ingest(event_at(1000));
1028        wa.ingest(event_at(3000));
1029        let out1 = wa.ingest(event_at(20000)); // gap > 5s
1030                                               // First session closed
1031        assert!(!out1.is_empty());
1032        wa.ingest(event_at(22000));
1033        let out2 = wa.ingest(event_at(40000)); // gap > 5s
1034        assert!(!out2.is_empty());
1035    }
1036
1037    // ═══ Count window tests ══════════════════════════════
1038
1039    #[test]
1040    fn test_count_basic() {
1041        let mut wa = WindowAlgebra::<i64>::count(3);
1042        let out1 = wa.ingest(event_at(1));
1043        assert!(out1.is_empty());
1044        let out2 = wa.ingest(event_at(2));
1045        assert!(out2.is_empty());
1046        let out3 = wa.ingest(event_at(3));
1047        assert_eq!(out3.len(), 1);
1048        assert_eq!(out3[0].events.len(), 3);
1049    }
1050
1051    #[test]
1052    fn test_count_multiple_triggers() {
1053        let mut wa = WindowAlgebra::<i64>::count(2);
1054        let evts = events(&[1, 2, 3, 4, 5, 6]);
1055        let outputs = wa.ingest_batch(evts);
1056        assert_eq!(outputs.len(), 3);
1057    }
1058
1059    #[test]
1060    fn test_count_partial_flush() {
1061        let mut wa = WindowAlgebra::<i64>::count(3);
1062        wa.ingest(event_at(1));
1063        wa.ingest(event_at(2));
1064        let outputs = wa.flush();
1065        assert_eq!(outputs.len(), 1);
1066        assert!(outputs[0].is_partial);
1067        assert_eq!(outputs[0].events.len(), 2);
1068    }
1069
1070    // ═══ Late data policy tests ══════════════════════════
1071
1072    #[test]
1073    fn test_late_policy_drop() {
1074        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
1075            .with_late_policy(LatePolicy::Drop);
1076        wa.advance_watermark(30000);
1077        wa.ingest(event_at(5000));
1078        assert_eq!(wa.stats().late_events_dropped, 1);
1079        assert_eq!(wa.drain_side_output().len(), 0);
1080    }
1081
1082    #[test]
1083    fn test_late_policy_side_output() {
1084        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
1085            .with_late_policy(LatePolicy::SideOutput);
1086        wa.advance_watermark(30000);
1087        wa.ingest(event_at(5000));
1088        wa.ingest(event_at(8000));
1089        assert_eq!(wa.stats().late_events_side_output, 2);
1090        let side = wa.drain_side_output();
1091        assert_eq!(side.len(), 2);
1092    }
1093
1094    #[test]
1095    fn test_late_policy_allowed_lateness() {
1096        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10)).with_late_policy(
1097            LatePolicy::AllowedLateness {
1098                lateness: Duration::from_secs(5),
1099            },
1100        );
1101        // Create window [0..10000), event in it
1102        wa.ingest(event_at(5000));
1103        // Advance watermark just past window end but within lateness
1104        // Events before watermark that have no existing window -> late
1105        wa.advance_watermark(30000);
1106        wa.ingest(event_at(2000)); // late, past allowed lateness too
1107        assert_eq!(wa.stats().late_events_dropped, 1);
1108    }
1109
1110    // ═══ Max open windows / eviction tests ═══════════════
1111
1112    #[test]
1113    fn test_max_open_windows_eviction() {
1114        let mut wa =
1115            WindowAlgebra::<i64>::tumbling(Duration::from_secs(1)).with_max_open_windows(3);
1116        // Create 4 windows
1117        wa.ingest(event_at(0));
1118        wa.ingest(event_at(1000));
1119        wa.ingest(event_at(2000));
1120        let outputs = wa.ingest(event_at(3000));
1121        // Should evict the oldest
1122        assert!(wa.stats().windows_evicted >= 1 || !outputs.is_empty());
1123    }
1124
1125    #[test]
1126    fn test_evicted_window_emits_partial() {
1127        let mut wa =
1128            WindowAlgebra::<i64>::tumbling(Duration::from_secs(1)).with_max_open_windows(2);
1129        wa.ingest(event_at(0));
1130        wa.ingest(event_at(1000));
1131        let outputs = wa.ingest(event_at(2000));
1132        let partial = outputs.iter().filter(|o| o.is_partial).count();
1133        assert!(partial >= 1);
1134    }
1135
1136    // ═══ Statistics tests ════════════════════════════════
1137
1138    #[test]
1139    fn test_stats_total_events() {
1140        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1141        for i in 0..10 {
1142            wa.ingest(event_at(i * 100));
1143        }
1144        assert_eq!(wa.stats().total_events, 10);
1145    }
1146
1147    #[test]
1148    fn test_stats_windows_opened() {
1149        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1150        wa.ingest(event_at(0));
1151        wa.ingest(event_at(10000));
1152        wa.ingest(event_at(20000));
1153        assert_eq!(wa.stats().windows_opened, 3);
1154    }
1155
1156    #[test]
1157    fn test_stats_windows_closed() {
1158        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1159        wa.ingest(event_at(0));
1160        wa.ingest(event_at(10000));
1161        wa.advance_watermark(20000);
1162        assert_eq!(wa.stats().windows_closed, 2);
1163    }
1164
1165    // ═══ Aggregation tests ═══════════════════════════════
1166
1167    #[test]
1168    fn test_aggregate_sum() {
1169        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1170        wa.ingest(event_at(100));
1171        wa.ingest(event_at(200));
1172        wa.ingest(event_at(300));
1173
1174        let wid = WindowId::time_range(0, 10000);
1175        let sum = wa.aggregate(&wid, 0i64, |acc, &val| acc + val);
1176        assert_eq!(sum, Some(600));
1177    }
1178
1179    #[test]
1180    fn test_aggregate_count() {
1181        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1182        wa.ingest(event_at(100));
1183        wa.ingest(event_at(200));
1184
1185        let wid = WindowId::time_range(0, 10000);
1186        let count = wa.aggregate(&wid, 0usize, |acc, _| acc + 1);
1187        assert_eq!(count, Some(2));
1188    }
1189
1190    #[test]
1191    fn test_aggregate_nonexistent_window() {
1192        let wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1193        let wid = WindowId::time_range(0, 10000);
1194        let result = wa.aggregate(&wid, 0, |acc, _: &i64| acc + 1);
1195        assert!(result.is_none());
1196    }
1197
1198    // ═══ Helper function tests ═══════════════════════════
1199
1200    #[test]
1201    fn test_tumbling_window_for_helper() {
1202        let wid = tumbling_window_for(7500, Duration::from_secs(10));
1203        assert_eq!(wid.start, 0);
1204        assert_eq!(wid.end, 10000);
1205    }
1206
1207    #[test]
1208    fn test_tumbling_window_for_exact_boundary() {
1209        let wid = tumbling_window_for(10000, Duration::from_secs(10));
1210        assert_eq!(wid.start, 10000);
1211        assert_eq!(wid.end, 20000);
1212    }
1213
1214    #[test]
1215    fn test_sliding_windows_for_helper() {
1216        let windows = sliding_windows_for(12000, Duration::from_secs(10), Duration::from_secs(5));
1217        // 12000 should be in [5000, 15000) and [10000, 20000)
1218        assert!(!windows.is_empty());
1219        for w in &windows {
1220            assert!(w.contains(12000));
1221        }
1222    }
1223
1224    // ═══ Batch ingest tests ══════════════════════════════
1225
1226    #[test]
1227    fn test_batch_ingest() {
1228        let mut wa = WindowAlgebra::<i64>::count(3);
1229        let evts = events(&[1, 2, 3, 4, 5, 6, 7, 8, 9]);
1230        let outputs = wa.ingest_batch(evts);
1231        assert_eq!(outputs.len(), 3);
1232        assert!(outputs.iter().all(|o| o.events.len() == 3));
1233    }
1234
1235    // ═══ WindowAlgebraConfig tests ═══════════════════════
1236
1237    #[test]
1238    fn test_default_config() {
1239        let config = WindowAlgebraConfig::default();
1240        assert_eq!(
1241            config.kind,
1242            WindowKind::Tumbling {
1243                size: Duration::from_secs(60)
1244            }
1245        );
1246        assert_eq!(config.late_policy, LatePolicy::Drop);
1247        assert_eq!(config.max_open_windows, 10_000);
1248        assert!(config.emit_on_evict);
1249    }
1250
1251    #[test]
1252    fn test_custom_config() {
1253        let config = WindowAlgebraConfig {
1254            kind: WindowKind::Sliding {
1255                size: Duration::from_secs(30),
1256                slide: Duration::from_secs(10),
1257            },
1258            late_policy: LatePolicy::SideOutput,
1259            max_open_windows: 500,
1260            emit_on_evict: false,
1261        };
1262        assert_eq!(config.max_open_windows, 500);
1263        assert!(!config.emit_on_evict);
1264    }
1265
1266    // ═══ WindowState tests ═══════════════════════════════
1267
1268    #[test]
1269    fn test_window_state_variants() {
1270        assert_eq!(WindowState::Open, WindowState::Open);
1271        assert_ne!(WindowState::Open, WindowState::Closed);
1272        assert_ne!(WindowState::Closing, WindowState::Closed);
1273    }
1274
1275    // ═══ Edge case tests ═════════════════════════════════
1276
1277    #[test]
1278    fn test_watermark_no_regression() {
1279        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1280        wa.advance_watermark(10000);
1281        let outputs = wa.advance_watermark(5000); // regression
1282        assert!(outputs.is_empty());
1283        assert_eq!(wa.watermark_ms(), 10000);
1284    }
1285
1286    #[test]
1287    fn test_empty_flush() {
1288        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1289        let outputs = wa.flush();
1290        assert!(outputs.is_empty());
1291    }
1292
1293    #[test]
1294    fn test_double_watermark_advance() {
1295        let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1296        wa.ingest(event_at(5000));
1297        let out1 = wa.advance_watermark(10000);
1298        assert_eq!(out1.len(), 1);
1299        let out2 = wa.advance_watermark(10000);
1300        assert!(out2.is_empty()); // already closed
1301    }
1302}