Skip to main content

varpulis_runtime/
window.rs

1//! Window implementations for stream processing
2//!
3//! Includes:
4//! - Time-based windows (tumbling, sliding)
5//! - Count-based windows
6//! - Partitioned windows
7//! - Delay buffers (rstream equivalent)
8
9use std::collections::VecDeque;
10use std::sync::Arc;
11
12use chrono::{DateTime, Duration, Utc};
13use rustc_hash::FxHashMap;
14
15use crate::columnar::{ColumnarAccess, ColumnarBuffer};
16use crate::event::{Event, SharedEvent};
17use crate::persistence::{PartitionedWindowCheckpoint, SerializableEvent, WindowCheckpoint};
18
19/// A tumbling window that collects events over a fixed duration.
20///
21/// Supports both row-oriented and columnar access patterns:
22/// - `flush_shared()`: Row-oriented access for backward compatibility
23/// - `flush_columnar()`: Columnar access for SIMD-optimized aggregations
24#[derive(Debug)]
25pub struct TumblingWindow {
26    duration: Duration,
27    /// Columnar storage for events (pub(crate) for checkpoint access)
28    pub(crate) columnar: ColumnarBuffer,
29    /// Window start time (pub(crate) for checkpoint access)
30    pub(crate) window_start: Option<DateTime<Utc>>,
31}
32
33impl TumblingWindow {
34    pub fn new(duration: Duration) -> Self {
35        Self {
36            duration,
37            columnar: ColumnarBuffer::new(),
38            window_start: None,
39        }
40    }
41
42    /// Add a shared event to the window, returning completed window if triggered.
43    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
44        let event_time = event.timestamp;
45
46        // Initialize window start on first event
47        if self.window_start.is_none() {
48            self.window_start = Some(event_time);
49        }
50
51        // Safe: we just set it above if it was None
52        let window_start = self.window_start?;
53        let window_end = window_start + self.duration;
54
55        if event_time >= window_end {
56            // Window is complete, emit and start new window
57            let completed = self.columnar.take_all();
58            self.window_start = Some(event_time);
59            self.columnar.push(event);
60            Some(completed)
61        } else {
62            self.columnar.push(event);
63            None
64        }
65    }
66
67    /// Add an event to the window (wraps in Arc).
68    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
69        self.add_shared(Arc::new(event))
70            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
71    }
72
73    /// Flush all events as SharedEvent references.
74    pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
75        self.columnar.take_all()
76    }
77
78    /// Flush and return the columnar buffer for SIMD-optimized aggregations.
79    ///
80    /// The returned buffer can be used with `Aggregator::apply_columnar()`.
81    pub fn flush_columnar(&mut self) -> ColumnarBuffer {
82        self.window_start = None;
83        std::mem::take(&mut self.columnar)
84    }
85
86    /// Flush all events (clones for backward compatibility).
87    pub fn flush(&mut self) -> Vec<Event> {
88        self.flush_shared()
89            .into_iter()
90            .map(|e| (*e).clone())
91            .collect()
92    }
93
94    /// Get the number of events currently in the window.
95    pub const fn len(&self) -> usize {
96        self.columnar.len()
97    }
98
99    /// Check if the window is empty.
100    pub const fn is_empty(&self) -> bool {
101        self.columnar.is_empty()
102    }
103
104    /// Create a checkpoint of the current window state.
105    pub fn checkpoint(&self) -> WindowCheckpoint {
106        WindowCheckpoint {
107            events: self
108                .columnar
109                .events()
110                .iter()
111                .map(|e| SerializableEvent::from(e.as_ref()))
112                .collect(),
113            window_start_ms: self.window_start.map(|t| t.timestamp_millis()),
114            last_emit_ms: None,
115            partitions: std::collections::HashMap::new(),
116        }
117    }
118
119    /// Restore window state from a checkpoint.
120    pub fn restore(&mut self, cp: &WindowCheckpoint) {
121        let events: Vec<SharedEvent> = cp
122            .events
123            .iter()
124            .map(|se| Arc::new(Event::from(se.clone())))
125            .collect();
126        self.columnar = ColumnarBuffer::from_events(events);
127        self.window_start = cp.window_start_ms.and_then(DateTime::from_timestamp_millis);
128    }
129
130    /// Advance watermark — close window if watermark passes window end.
131    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
132        if let Some(start) = self.window_start {
133            if wm >= start + self.duration && !self.columnar.is_empty() {
134                let completed = self.columnar.take_all();
135                self.window_start = Some(wm);
136                return Some(completed);
137            }
138        }
139        None
140    }
141}
142
143impl ColumnarAccess for TumblingWindow {
144    fn columnar(&self) -> &ColumnarBuffer {
145        &self.columnar
146    }
147
148    fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
149        &mut self.columnar
150    }
151}
152
153/// A sliding window that maintains overlapping windows
154#[derive(Debug)]
155pub struct SlidingWindow {
156    window_size: Duration,
157    slide_interval: Duration,
158    events: VecDeque<SharedEvent>,
159    last_emit: Option<DateTime<Utc>>,
160}
161
162impl SlidingWindow {
163    pub const fn new(window_size: Duration, slide_interval: Duration) -> Self {
164        Self {
165            window_size,
166            slide_interval,
167            events: VecDeque::new(),
168            last_emit: None,
169        }
170    }
171
172    /// Add a shared event, returning window contents if slide interval reached.
173    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
174        let event_time = event.timestamp;
175        self.events.push_back(event);
176
177        // Remove old events outside window using binary search + drain
178        // This is O(log n + k) where k is expired events, vs O(k) pop_front loops
179        let cutoff = event_time - self.window_size;
180        let expired_count = self
181            .events
182            .iter()
183            .position(|e| e.timestamp >= cutoff)
184            .unwrap_or(self.events.len());
185
186        if expired_count > 0 {
187            // Drain all expired events in one operation
188            self.events.drain(0..expired_count);
189        }
190
191        // Check if we should emit based on slide interval
192        let should_emit = match self.last_emit {
193            None => true,
194            Some(last) => event_time >= last + self.slide_interval,
195        };
196
197        should_emit.then(|| {
198            self.last_emit = Some(event_time);
199            self.events.iter().map(Arc::clone).collect()
200        })
201    }
202
203    /// Add an event (wraps in Arc).
204    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
205        self.add_shared(Arc::new(event))
206            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
207    }
208
209    /// Get current window contents as shared references.
210    pub fn current_shared(&self) -> Vec<SharedEvent> {
211        self.events.iter().map(Arc::clone).collect()
212    }
213
214    /// Get current window contents (clones for backward compatibility).
215    pub fn current(&self) -> Vec<Event> {
216        self.events.iter().map(|e| (**e).clone()).collect()
217    }
218
219    /// Create a checkpoint of the current window state.
220    pub fn checkpoint(&self) -> WindowCheckpoint {
221        WindowCheckpoint {
222            events: self
223                .events
224                .iter()
225                .map(|e| SerializableEvent::from(e.as_ref()))
226                .collect(),
227            window_start_ms: None,
228            last_emit_ms: self.last_emit.map(|t| t.timestamp_millis()),
229            partitions: std::collections::HashMap::new(),
230        }
231    }
232
233    /// Restore window state from a checkpoint.
234    pub fn restore(&mut self, cp: &WindowCheckpoint) {
235        self.events = cp
236            .events
237            .iter()
238            .map(|se| Arc::new(Event::from(se.clone())))
239            .collect::<Vec<_>>()
240            .into();
241        self.last_emit = cp.last_emit_ms.and_then(DateTime::from_timestamp_millis);
242    }
243
244    /// Advance watermark — emit window if slide interval has passed relative to watermark.
245    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
246        // Remove expired events
247        let cutoff = wm - self.window_size;
248        let expired_count = self
249            .events
250            .iter()
251            .position(|e| e.timestamp >= cutoff)
252            .unwrap_or(self.events.len());
253        if expired_count > 0 {
254            self.events.drain(0..expired_count);
255        }
256
257        let should_emit = match self.last_emit {
258            None => !self.events.is_empty(),
259            Some(last) => wm >= last + self.slide_interval && !self.events.is_empty(),
260        };
261
262        should_emit.then(|| {
263            self.last_emit = Some(wm);
264            self.events.iter().map(Arc::clone).collect()
265        })
266    }
267}
268
269/// A count-based window that emits after collecting N events.
270///
271/// Supports both row-oriented and columnar access patterns:
272/// - `flush_shared()`: Row-oriented access for backward compatibility
273/// - `flush_columnar()`: Columnar access for SIMD-optimized aggregations
274#[derive(Debug)]
275pub struct CountWindow {
276    count: usize,
277    columnar: ColumnarBuffer,
278}
279
280impl CountWindow {
281    pub fn new(count: usize) -> Self {
282        Self {
283            count,
284            columnar: ColumnarBuffer::with_capacity(count),
285        }
286    }
287
288    /// Add a shared event, returning completed window if count reached.
289    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
290        self.columnar.push(event);
291
292        (self.columnar.len() >= self.count).then(|| self.columnar.take_all())
293    }
294
295    /// Add an event (wraps in Arc).
296    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
297        self.add_shared(Arc::new(event))
298            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
299    }
300
301    /// Flush all events as SharedEvent references.
302    pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
303        self.columnar.take_all()
304    }
305
306    /// Flush and return the columnar buffer for SIMD-optimized aggregations.
307    pub fn flush_columnar(&mut self) -> ColumnarBuffer {
308        std::mem::take(&mut self.columnar)
309    }
310
311    /// Flush all events (clones for backward compatibility).
312    pub fn flush(&mut self) -> Vec<Event> {
313        self.flush_shared()
314            .into_iter()
315            .map(|e| (*e).clone())
316            .collect()
317    }
318
319    /// Get current count of events in buffer (for debugging)
320    pub const fn current_count(&self) -> usize {
321        self.columnar.len()
322    }
323
324    /// Create a checkpoint of the current window state.
325    pub fn checkpoint(&self) -> WindowCheckpoint {
326        WindowCheckpoint {
327            events: self
328                .columnar
329                .events()
330                .iter()
331                .map(|e| SerializableEvent::from(e.as_ref()))
332                .collect(),
333            window_start_ms: None,
334            last_emit_ms: None,
335            partitions: std::collections::HashMap::new(),
336        }
337    }
338
339    /// Restore window state from a checkpoint.
340    pub fn restore(&mut self, cp: &WindowCheckpoint) {
341        let events: Vec<SharedEvent> = cp
342            .events
343            .iter()
344            .map(|se| Arc::new(Event::from(se.clone())))
345            .collect();
346        self.columnar = ColumnarBuffer::from_events(events);
347    }
348}
349
350impl ColumnarAccess for CountWindow {
351    fn columnar(&self) -> &ColumnarBuffer {
352        &self.columnar
353    }
354
355    fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
356        &mut self.columnar
357    }
358}
359
360/// A sliding count window that maintains overlapping windows
361#[derive(Debug)]
362pub struct SlidingCountWindow {
363    window_size: usize,
364    slide_size: usize,
365    events: VecDeque<SharedEvent>,
366    events_since_emit: usize,
367}
368
369impl SlidingCountWindow {
370    pub fn new(window_size: usize, slide_size: usize) -> Self {
371        Self {
372            window_size,
373            slide_size,
374            events: VecDeque::with_capacity(window_size),
375            events_since_emit: 0,
376        }
377    }
378
379    /// Add a shared event, returning window contents if slide interval reached.
380    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
381        self.events.push_back(event);
382        self.events_since_emit += 1;
383
384        // Remove old events if window is overfull - drain excess in one operation
385        let overflow = self.events.len().saturating_sub(self.window_size);
386        if overflow > 0 {
387            self.events.drain(0..overflow);
388        }
389
390        // Emit if we have enough events and slide interval reached
391        (self.events.len() >= self.window_size && self.events_since_emit >= self.slide_size).then(
392            || {
393                self.events_since_emit = 0;
394                self.events.iter().map(Arc::clone).collect()
395            },
396        )
397    }
398
399    /// Add an event (wraps in Arc).
400    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
401        self.add_shared(Arc::new(event))
402            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
403    }
404
405    /// Get current count of events in buffer (for debugging)
406    pub fn current_count(&self) -> usize {
407        self.events.len()
408    }
409
410    /// Create a checkpoint of the current window state.
411    pub fn checkpoint(&self) -> WindowCheckpoint {
412        WindowCheckpoint {
413            events: self
414                .events
415                .iter()
416                .map(|e| SerializableEvent::from(e.as_ref()))
417                .collect(),
418            window_start_ms: None,
419            last_emit_ms: None,
420            partitions: std::collections::HashMap::new(),
421        }
422    }
423
424    /// Restore window state from a checkpoint.
425    pub fn restore(&mut self, cp: &WindowCheckpoint) {
426        self.events = cp
427            .events
428            .iter()
429            .map(|se| Arc::new(Event::from(se.clone())))
430            .collect::<Vec<_>>()
431            .into();
432        self.events_since_emit = 0;
433    }
434}
435
436/// A session window that groups events separated by gaps of inactivity.
437///
438/// Events are accumulated into a session. When a new event arrives after a gap
439/// exceeding the configured duration, the current session is closed (emitted)
440/// and a new session begins with the incoming event.
441///
442/// Supports both row-oriented and columnar access patterns:
443/// - `flush_shared()`: Row-oriented access for backward compatibility
444/// - `flush_columnar()`: Columnar access for SIMD-optimized aggregations
445#[derive(Debug)]
446pub struct SessionWindow {
447    gap: Duration,
448    /// Columnar storage for events (pub(crate) for checkpoint access)
449    pub(crate) columnar: ColumnarBuffer,
450    /// Last event time (pub(crate) for checkpoint access)
451    pub(crate) last_event_time: Option<DateTime<Utc>>,
452}
453
454impl SessionWindow {
455    pub fn new(gap: Duration) -> Self {
456        Self {
457            gap,
458            columnar: ColumnarBuffer::new(),
459            last_event_time: None,
460        }
461    }
462
463    /// Add a shared event to the session window.
464    /// Returns the completed session if the gap was exceeded.
465    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
466        let event_time = event.timestamp;
467        if let Some(last_time) = self.last_event_time {
468            if event_time - last_time > self.gap {
469                // Gap exceeded — close current session, start new one
470                let completed = self.columnar.take_all();
471                self.columnar.push(event);
472                self.last_event_time = Some(event_time);
473                return Some(completed);
474            }
475        }
476        // Within session gap (or first event)
477        self.columnar.push(event);
478        self.last_event_time = Some(event_time);
479        None
480    }
481
482    /// Add an event (wraps in Arc).
483    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
484        self.add_shared(Arc::new(event))
485            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
486    }
487
488    /// Check if this session has expired (last event time + gap < now).
489    /// Returns accumulated events if expired, None otherwise.
490    pub fn check_expired(&mut self, now: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
491        if let Some(last) = self.last_event_time {
492            if now - last > self.gap {
493                return Some(self.flush_shared());
494            }
495        }
496        None
497    }
498
499    /// Get the configured gap duration.
500    pub const fn gap(&self) -> Duration {
501        self.gap
502    }
503
504    /// Flush all events as SharedEvent references.
505    pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
506        self.last_event_time = None;
507        self.columnar.take_all()
508    }
509
510    /// Flush and return the columnar buffer for SIMD-optimized aggregations.
511    pub fn flush_columnar(&mut self) -> ColumnarBuffer {
512        self.last_event_time = None;
513        std::mem::take(&mut self.columnar)
514    }
515
516    /// Flush all events (clones for backward compatibility).
517    pub fn flush(&mut self) -> Vec<Event> {
518        self.flush_shared()
519            .into_iter()
520            .map(|e| (*e).clone())
521            .collect()
522    }
523
524    /// Create a checkpoint of the current session state.
525    pub fn checkpoint(&self) -> WindowCheckpoint {
526        WindowCheckpoint {
527            events: self
528                .columnar
529                .events()
530                .iter()
531                .map(|e| SerializableEvent::from(e.as_ref()))
532                .collect(),
533            window_start_ms: self.last_event_time.map(|t| t.timestamp_millis()),
534            last_emit_ms: None,
535            partitions: std::collections::HashMap::new(),
536        }
537    }
538
539    /// Restore session state from a checkpoint.
540    pub fn restore(&mut self, cp: &WindowCheckpoint) {
541        let events: Vec<SharedEvent> = cp
542            .events
543            .iter()
544            .map(|se| Arc::new(Event::from(se.clone())))
545            .collect();
546        self.columnar = ColumnarBuffer::from_events(events);
547        self.last_event_time = cp.window_start_ms.and_then(DateTime::from_timestamp_millis);
548    }
549
550    /// Advance watermark — close session if watermark exceeds last_event_time + gap.
551    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
552        if let Some(last) = self.last_event_time {
553            if wm >= last + self.gap && !self.columnar.is_empty() {
554                return Some(self.flush_shared());
555            }
556        }
557        None
558    }
559}
560
561impl ColumnarAccess for SessionWindow {
562    fn columnar(&self) -> &ColumnarBuffer {
563        &self.columnar
564    }
565
566    fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
567        &mut self.columnar
568    }
569}
570
571/// A partitioned session window that maintains separate sessions per partition key
572#[derive(Debug)]
573pub struct PartitionedSessionWindow {
574    partition_key: String,
575    gap: Duration,
576    windows: FxHashMap<String, SessionWindow>,
577}
578
579impl PartitionedSessionWindow {
580    pub fn new(partition_key: String, gap: Duration) -> Self {
581        Self {
582            partition_key,
583            gap,
584            windows: FxHashMap::default(),
585        }
586    }
587
588    /// Add a shared event to the appropriate partition session.
589    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
590        let key = event.get(&self.partition_key).map_or_else(
591            || "default".to_string(),
592            |v| v.to_partition_key().into_owned(),
593        );
594
595        let window = self
596            .windows
597            .entry(key)
598            .or_insert_with(|| SessionWindow::new(self.gap));
599
600        window.add_shared(event)
601    }
602
603    /// Add an event (wraps in Arc).
604    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
605        self.add_shared(Arc::new(event))
606            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
607    }
608
609    /// Check all partitions for expired sessions.
610    /// Returns a list of (partition_key, events) for each expired session,
611    /// and removes those partitions from the map.
612    pub fn check_expired(&mut self, now: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
613        let mut expired = Vec::with_capacity(self.windows.len());
614        let mut to_remove = Vec::new();
615        for (key, window) in &mut self.windows {
616            if let Some(events) = window.check_expired(now) {
617                if !events.is_empty() {
618                    expired.push((key.clone(), events));
619                }
620                to_remove.push(key.clone());
621            }
622        }
623        for key in to_remove {
624            self.windows.remove(&key);
625        }
626        expired
627    }
628
629    /// Get the configured gap duration.
630    pub const fn gap(&self) -> Duration {
631        self.gap
632    }
633
634    /// Flush all partition sessions as SharedEvent references.
635    pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
636        let mut all_events = Vec::new();
637        for window in self.windows.values_mut() {
638            all_events.extend(window.flush_shared());
639        }
640        all_events
641    }
642
643    /// Flush all partition sessions (clones for backward compatibility).
644    pub fn flush(&mut self) -> Vec<Event> {
645        self.flush_shared()
646            .into_iter()
647            .map(|e| (*e).clone())
648            .collect()
649    }
650
651    /// Create a checkpoint of all partition sessions.
652    pub fn checkpoint(&self) -> WindowCheckpoint {
653        let partitions = self
654            .windows
655            .iter()
656            .map(|(key, window)| {
657                (
658                    key.clone(),
659                    PartitionedWindowCheckpoint {
660                        events: window
661                            .columnar
662                            .events()
663                            .iter()
664                            .map(|e| SerializableEvent::from(e.as_ref()))
665                            .collect(),
666                        window_start_ms: window.last_event_time.map(|t| t.timestamp_millis()),
667                    },
668                )
669            })
670            .collect();
671
672        WindowCheckpoint {
673            events: Vec::new(),
674            window_start_ms: None,
675            last_emit_ms: None,
676            partitions,
677        }
678    }
679
680    /// Restore partition sessions from a checkpoint.
681    pub fn restore(&mut self, cp: &WindowCheckpoint) {
682        self.windows.clear();
683        for (key, pcp) in &cp.partitions {
684            let mut window = SessionWindow::new(self.gap);
685            let events: Vec<SharedEvent> = pcp
686                .events
687                .iter()
688                .map(|se| Arc::new(Event::from(se.clone())))
689                .collect();
690            window.columnar = ColumnarBuffer::from_events(events);
691            window.last_event_time = pcp
692                .window_start_ms
693                .and_then(DateTime::from_timestamp_millis);
694            self.windows.insert(key.clone(), window);
695        }
696    }
697
698    /// Advance watermark — close expired sessions across all partitions.
699    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
700        let mut expired = Vec::with_capacity(self.windows.len());
701        let mut to_remove = Vec::new();
702        for (key, window) in &mut self.windows {
703            if let Some(events) = window.advance_watermark(wm) {
704                if !events.is_empty() {
705                    expired.push((key.clone(), events));
706                }
707                to_remove.push(key.clone());
708            }
709        }
710        for key in to_remove {
711            self.windows.remove(&key);
712        }
713        expired
714    }
715}
716
717/// A partitioned tumbling window that maintains separate windows per partition key
718#[derive(Debug)]
719pub struct PartitionedTumblingWindow {
720    partition_key: String,
721    duration: Duration,
722    windows: FxHashMap<String, TumblingWindow>,
723}
724
725impl PartitionedTumblingWindow {
726    pub fn new(partition_key: String, duration: Duration) -> Self {
727        Self {
728            partition_key,
729            duration,
730            windows: FxHashMap::default(),
731        }
732    }
733
734    /// Add a shared event to the appropriate partition window.
735    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
736        let key = event.get(&self.partition_key).map_or_else(
737            || "default".to_string(),
738            |v| v.to_partition_key().into_owned(),
739        );
740
741        let window = self
742            .windows
743            .entry(key)
744            .or_insert_with(|| TumblingWindow::new(self.duration));
745
746        window.add_shared(event)
747    }
748
749    /// Add an event (wraps in Arc).
750    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
751        self.add_shared(Arc::new(event))
752            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
753    }
754
755    /// Flush all partition windows as SharedEvent references.
756    pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
757        let mut all_events = Vec::new();
758        for window in self.windows.values_mut() {
759            all_events.extend(window.flush_shared());
760        }
761        all_events
762    }
763
764    /// Flush all partition windows (clones for backward compatibility).
765    pub fn flush(&mut self) -> Vec<Event> {
766        self.flush_shared()
767            .into_iter()
768            .map(|e| (*e).clone())
769            .collect()
770    }
771
772    /// Create a checkpoint of all partition windows.
773    pub fn checkpoint(&self) -> WindowCheckpoint {
774        let partitions = self
775            .windows
776            .iter()
777            .map(|(key, window)| {
778                (
779                    key.clone(),
780                    PartitionedWindowCheckpoint {
781                        events: window
782                            .columnar
783                            .events()
784                            .iter()
785                            .map(|e| SerializableEvent::from(e.as_ref()))
786                            .collect(),
787                        window_start_ms: window.window_start.map(|t| t.timestamp_millis()),
788                    },
789                )
790            })
791            .collect();
792
793        WindowCheckpoint {
794            events: Vec::new(),
795            window_start_ms: None,
796            last_emit_ms: None,
797            partitions,
798        }
799    }
800
801    /// Restore partition windows from a checkpoint.
802    pub fn restore(&mut self, cp: &WindowCheckpoint) {
803        self.windows.clear();
804        for (key, pcp) in &cp.partitions {
805            let mut window = TumblingWindow::new(self.duration);
806            let events: Vec<SharedEvent> = pcp
807                .events
808                .iter()
809                .map(|se| Arc::new(Event::from(se.clone())))
810                .collect();
811            window.columnar = ColumnarBuffer::from_events(events);
812            window.window_start = pcp
813                .window_start_ms
814                .and_then(DateTime::from_timestamp_millis);
815            self.windows.insert(key.clone(), window);
816        }
817    }
818
819    /// Advance watermark across all partitions.
820    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
821        let mut results = Vec::new();
822        for (key, window) in &mut self.windows {
823            if let Some(events) = window.advance_watermark(wm) {
824                results.push((key.clone(), events));
825            }
826        }
827        results
828    }
829}
830
831/// A partitioned sliding window that maintains separate windows per partition key
832#[derive(Debug)]
833pub struct PartitionedSlidingWindow {
834    partition_key: String,
835    window_size: Duration,
836    slide_interval: Duration,
837    windows: FxHashMap<String, SlidingWindow>,
838}
839
840impl PartitionedSlidingWindow {
841    pub fn new(partition_key: String, window_size: Duration, slide_interval: Duration) -> Self {
842        Self {
843            partition_key,
844            window_size,
845            slide_interval,
846            windows: FxHashMap::default(),
847        }
848    }
849
850    /// Add a shared event to the appropriate partition window.
851    pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
852        let key = event.get(&self.partition_key).map_or_else(
853            || "default".to_string(),
854            |v| v.to_partition_key().into_owned(),
855        );
856
857        let window = self
858            .windows
859            .entry(key)
860            .or_insert_with(|| SlidingWindow::new(self.window_size, self.slide_interval));
861
862        window.add_shared(event)
863    }
864
865    /// Add an event (wraps in Arc).
866    pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
867        self.add_shared(Arc::new(event))
868            .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
869    }
870
871    /// Get all current events from all partitions as shared references.
872    pub fn current_all_shared(&self) -> Vec<SharedEvent> {
873        let mut all_events = Vec::new();
874        for window in self.windows.values() {
875            all_events.extend(window.current_shared());
876        }
877        all_events
878    }
879
880    /// Get all current events from all partitions (clones for backward compatibility).
881    pub fn current_all(&self) -> Vec<Event> {
882        self.current_all_shared()
883            .into_iter()
884            .map(|e| (*e).clone())
885            .collect()
886    }
887
888    /// Create a checkpoint of all partition windows.
889    pub fn checkpoint(&self) -> WindowCheckpoint {
890        let partitions = self
891            .windows
892            .iter()
893            .map(|(key, window)| {
894                (
895                    key.clone(),
896                    PartitionedWindowCheckpoint {
897                        events: window
898                            .events
899                            .iter()
900                            .map(|e| SerializableEvent::from(e.as_ref()))
901                            .collect(),
902                        window_start_ms: window.last_emit.map(|t| t.timestamp_millis()),
903                    },
904                )
905            })
906            .collect();
907
908        WindowCheckpoint {
909            events: Vec::new(),
910            window_start_ms: None,
911            last_emit_ms: None,
912            partitions,
913        }
914    }
915
916    /// Restore partition windows from a checkpoint.
917    pub fn restore(&mut self, cp: &WindowCheckpoint) {
918        self.windows.clear();
919        for (key, pcp) in &cp.partitions {
920            let mut window = SlidingWindow::new(self.window_size, self.slide_interval);
921            window.events = pcp
922                .events
923                .iter()
924                .map(|se| Arc::new(Event::from(se.clone())))
925                .collect::<Vec<_>>()
926                .into();
927            window.last_emit = pcp
928                .window_start_ms
929                .and_then(DateTime::from_timestamp_millis);
930            self.windows.insert(key.clone(), window);
931        }
932    }
933
934    /// Advance watermark across all partitions.
935    pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
936        let mut results = Vec::new();
937        for (key, window) in &mut self.windows {
938            if let Some(events) = window.advance_watermark(wm) {
939                results.push((key.clone(), events));
940            }
941        }
942        results
943    }
944}
945
946// =============================================================================
947// DELAY BUFFER (rstream equivalent)
948// =============================================================================
949
950/// A delay buffer that outputs items as they are pushed out by newer items.
951///
952/// This is equivalent to Apama's `rstream` operator:
953/// - `retain 1 select rstream a` delays output by 1 item
954/// - When a new item arrives, the oldest item is output
955///
956/// Use cases:
957/// - Compare current value with previous value
958/// - Detect changes between consecutive aggregations
959/// - Implement "previous" reference in expressions
960///
961/// # Example
962/// ```
963/// # use varpulis_runtime::window::DelayBuffer;
964/// let mut delay = DelayBuffer::new(1);
965///
966/// // First item: nothing output (buffer filling)
967/// assert_eq!(delay.push(10), None);
968///
969/// // Second item: first item output
970/// assert_eq!(delay.push(20), Some(10));
971///
972/// // Third item: second item output
973/// assert_eq!(delay.push(30), Some(20));
974/// ```
975#[derive(Debug, Clone)]
976pub struct DelayBuffer<T> {
977    buffer: VecDeque<T>,
978    delay: usize,
979}
980
981impl<T: Clone> DelayBuffer<T> {
982    /// Create a new delay buffer with specified delay count.
983    ///
984    /// A delay of 1 means the previous item is output when a new item arrives.
985    pub fn new(delay: usize) -> Self {
986        Self {
987            buffer: VecDeque::with_capacity(delay + 1),
988            delay: delay.max(1), // Minimum delay of 1
989        }
990    }
991
992    /// Push a new item and potentially output a delayed item.
993    ///
994    /// Returns `Some(item)` if there's an item to output (buffer was full),
995    /// or `None` if the buffer is still filling up.
996    pub fn push(&mut self, item: T) -> Option<T> {
997        self.buffer.push_back(item);
998
999        if self.buffer.len() > self.delay {
1000            self.buffer.pop_front()
1001        } else {
1002            None
1003        }
1004    }
1005
1006    /// Get the current item (most recent) without removing it.
1007    pub fn current(&self) -> Option<&T> {
1008        self.buffer.back()
1009    }
1010
1011    /// Get the previous item (one before current) without removing it.
1012    pub fn previous(&self) -> Option<&T> {
1013        if self.buffer.len() >= 2 {
1014            self.buffer.get(self.buffer.len() - 2)
1015        } else {
1016            None
1017        }
1018    }
1019
1020    /// Get the oldest item in the buffer (the one that would be output next).
1021    pub fn oldest(&self) -> Option<&T> {
1022        self.buffer.front()
1023    }
1024
1025    /// Check if the buffer is full (ready to output items).
1026    pub fn is_ready(&self) -> bool {
1027        self.buffer.len() >= self.delay
1028    }
1029
1030    /// Get the current number of items in the buffer.
1031    pub fn len(&self) -> usize {
1032        self.buffer.len()
1033    }
1034
1035    /// Check if the buffer is empty.
1036    pub fn is_empty(&self) -> bool {
1037        self.buffer.is_empty()
1038    }
1039
1040    /// Clear the buffer.
1041    pub fn clear(&mut self) {
1042        self.buffer.clear();
1043    }
1044
1045    /// Flush all items from the buffer, returning them in order.
1046    pub fn flush(&mut self) -> Vec<T> {
1047        self.buffer.drain(..).collect()
1048    }
1049}
1050
1051/// A specialized delay buffer for comparing current vs previous values.
1052///
1053/// This is optimized for the common pattern:
1054/// ```text
1055/// // Apama:
1056/// // from a in averages from p in (from a in averages retain 1 select rstream a)
1057/// // where a > p + threshold
1058///
1059/// // Varpulis:
1060/// let mut tracker = PreviousValueTracker::new();
1061/// tracker.update(current_avg);
1062/// if let Some(prev) = tracker.previous() {
1063///     if current > prev + threshold { ... }
1064/// }
1065/// ```
1066#[derive(Debug, Clone, Default)]
1067pub struct PreviousValueTracker<T> {
1068    current: Option<T>,
1069    previous: Option<T>,
1070}
1071
1072impl<T: Clone> PreviousValueTracker<T> {
1073    /// Create a new tracker.
1074    pub const fn new() -> Self {
1075        Self {
1076            current: None,
1077            previous: None,
1078        }
1079    }
1080
1081    /// Update with a new value, shifting current to previous.
1082    pub fn update(&mut self, value: T) {
1083        self.previous = self.current.take();
1084        self.current = Some(value);
1085    }
1086
1087    /// Get the current value.
1088    pub const fn current(&self) -> Option<&T> {
1089        self.current.as_ref()
1090    }
1091
1092    /// Get the previous value.
1093    pub const fn previous(&self) -> Option<&T> {
1094        self.previous.as_ref()
1095    }
1096
1097    /// Check if we have both current and previous values (ready for comparison).
1098    pub const fn has_both(&self) -> bool {
1099        self.current.is_some() && self.previous.is_some()
1100    }
1101
1102    /// Get both values as a tuple if both are available.
1103    pub const fn get_pair(&self) -> Option<(&T, &T)> {
1104        match (&self.current, &self.previous) {
1105            (Some(curr), Some(prev)) => Some((curr, prev)),
1106            _ => None,
1107        }
1108    }
1109
1110    /// Reset the tracker.
1111    pub fn reset(&mut self) {
1112        self.current = None;
1113        self.previous = None;
1114    }
1115}
1116
1117/// A partitioned delay buffer that maintains separate buffers per partition key.
1118#[derive(Debug)]
1119pub struct PartitionedDelayBuffer<T> {
1120    delay: usize,
1121    buffers: FxHashMap<String, DelayBuffer<T>>,
1122}
1123
1124impl<T: Clone> PartitionedDelayBuffer<T> {
1125    /// Create a new partitioned delay buffer.
1126    pub fn new(delay: usize) -> Self {
1127        Self {
1128            delay,
1129            buffers: FxHashMap::default(),
1130        }
1131    }
1132
1133    /// Push an item for a specific partition.
1134    pub fn push(&mut self, key: &str, item: T) -> Option<T> {
1135        let buffer = self
1136            .buffers
1137            .entry(key.to_string())
1138            .or_insert_with(|| DelayBuffer::new(self.delay));
1139        buffer.push(item)
1140    }
1141
1142    /// Get the current value for a partition.
1143    pub fn current(&self, key: &str) -> Option<&T> {
1144        self.buffers.get(key).and_then(|b| b.current())
1145    }
1146
1147    /// Get the previous value for a partition.
1148    pub fn previous(&self, key: &str) -> Option<&T> {
1149        self.buffers.get(key).and_then(|b| b.previous())
1150    }
1151
1152    /// Check if a partition buffer is ready.
1153    pub fn is_ready(&self, key: &str) -> bool {
1154        self.buffers.get(key).is_some_and(|b| b.is_ready())
1155    }
1156}
1157
1158/// A partitioned previous value tracker.
1159#[derive(Debug)]
1160pub struct PartitionedPreviousValueTracker<T> {
1161    trackers: FxHashMap<String, PreviousValueTracker<T>>,
1162}
1163
1164impl<T: Clone> Default for PartitionedPreviousValueTracker<T> {
1165    fn default() -> Self {
1166        Self::new()
1167    }
1168}
1169
1170impl<T: Clone> PartitionedPreviousValueTracker<T> {
1171    /// Create a new partitioned tracker.
1172    pub fn new() -> Self {
1173        Self {
1174            trackers: FxHashMap::default(),
1175        }
1176    }
1177
1178    /// Update the value for a specific partition.
1179    pub fn update(&mut self, key: &str, value: T) {
1180        let tracker = self
1181            .trackers
1182            .entry(key.to_string())
1183            .or_insert_with(PreviousValueTracker::new);
1184        tracker.update(value);
1185    }
1186
1187    /// Get the current value for a partition.
1188    pub fn current(&self, key: &str) -> Option<&T> {
1189        self.trackers.get(key).and_then(|t| t.current())
1190    }
1191
1192    /// Get the previous value for a partition.
1193    pub fn previous(&self, key: &str) -> Option<&T> {
1194        self.trackers.get(key).and_then(|t| t.previous())
1195    }
1196
1197    /// Check if a partition has both values.
1198    pub fn has_both(&self, key: &str) -> bool {
1199        self.trackers.get(key).is_some_and(|t| t.has_both())
1200    }
1201
1202    /// Get both values for a partition.
1203    pub fn get_pair(&self, key: &str) -> Option<(&T, &T)> {
1204        self.trackers.get(key).and_then(|t| t.get_pair())
1205    }
1206}
1207
1208// =============================================================================
1209// Incremental Sliding Window with Pre-computed Aggregates
1210// =============================================================================
1211
1212/// A sliding window optimized for aggregations with O(1) updates
1213///
1214/// Instead of recomputing aggregations over all events on each emit,
1215/// this window maintains running totals that are updated incrementally:
1216/// - Adding an event: O(1) update to aggregates
1217/// - Removing expired events: O(k) where k is expired count
1218/// - Getting aggregates: O(1)
1219///
1220/// This is ~10-100x faster than recomputing for large windows.
1221#[derive(Debug)]
1222pub struct IncrementalSlidingWindow {
1223    window_size: Duration,
1224    slide_interval: Duration,
1225    /// Events with pre-extracted field value for the tracked field
1226    events: VecDeque<(SharedEvent, Option<f64>)>,
1227    last_emit: Option<DateTime<Utc>>,
1228    /// Field name to track for aggregations
1229    field: String,
1230    /// Running aggregates
1231    sum: f64,
1232    count: usize,
1233    /// For min/max, we need to track all values (can't incrementally update)
1234    /// Using the SIMD incremental tracker
1235    minmax: crate::simd::IncrementalMinMax,
1236}
1237
1238impl IncrementalSlidingWindow {
1239    /// Create a new incremental sliding window
1240    pub fn new(window_size: Duration, slide_interval: Duration, field: impl Into<String>) -> Self {
1241        Self {
1242            window_size,
1243            slide_interval,
1244            events: VecDeque::new(),
1245            last_emit: None,
1246            field: field.into(),
1247            sum: 0.0,
1248            count: 0,
1249            minmax: crate::simd::IncrementalMinMax::new(),
1250        }
1251    }
1252
1253    /// Add an event, returning aggregates if slide interval reached
1254    pub fn add(&mut self, event: SharedEvent) -> Option<IncrementalAggregates> {
1255        let event_time = event.timestamp;
1256
1257        // Extract field value once
1258        let value = event.get_float(&self.field);
1259
1260        // Update running aggregates
1261        if let Some(v) = value {
1262            if !v.is_nan() {
1263                self.sum += v;
1264                self.count += 1;
1265                self.minmax.add(v);
1266            }
1267        }
1268
1269        self.events.push_back((event, value));
1270
1271        // Remove old events
1272        let cutoff = event_time - self.window_size;
1273        while let Some((front_event, front_value)) = self.events.front() {
1274            if front_event.timestamp >= cutoff {
1275                break;
1276            }
1277            // Update aggregates for removed event
1278            if let Some(v) = front_value {
1279                if !v.is_nan() {
1280                    self.sum -= v;
1281                    self.count = self.count.saturating_sub(1);
1282                    self.minmax.remove(*v);
1283                }
1284            }
1285            self.events.pop_front();
1286        }
1287
1288        // Check if we should emit
1289        let should_emit = match self.last_emit {
1290            None => true,
1291            Some(last) => event_time >= last + self.slide_interval,
1292        };
1293
1294        should_emit.then(|| {
1295            self.last_emit = Some(event_time);
1296            IncrementalAggregates {
1297                sum: self.sum,
1298                count: self.count,
1299                avg: if self.count > 0 {
1300                    Some(self.sum / self.count as f64)
1301                } else {
1302                    None
1303                },
1304                min: self.minmax.min(),
1305                max: self.minmax.max(),
1306                event_count: self.events.len(),
1307            }
1308        })
1309    }
1310
1311    /// Get current aggregates without emitting
1312    pub fn current_aggregates(&mut self) -> IncrementalAggregates {
1313        IncrementalAggregates {
1314            sum: self.sum,
1315            count: self.count,
1316            avg: if self.count > 0 {
1317                Some(self.sum / self.count as f64)
1318            } else {
1319                None
1320            },
1321            min: self.minmax.min(),
1322            max: self.minmax.max(),
1323            event_count: self.events.len(),
1324        }
1325    }
1326
1327    /// Get events in the current window (for patterns that need event access)
1328    pub fn events(&self) -> impl Iterator<Item = &SharedEvent> {
1329        self.events.iter().map(|(e, _)| e)
1330    }
1331
1332    /// Reset the window
1333    pub fn reset(&mut self) {
1334        self.events.clear();
1335        self.last_emit = None;
1336        self.sum = 0.0;
1337        self.count = 0;
1338        self.minmax.reset();
1339    }
1340}
1341
1342/// Pre-computed aggregates from an incremental window
1343#[derive(Debug, Clone)]
1344pub struct IncrementalAggregates {
1345    pub sum: f64,
1346    pub count: usize,
1347    pub avg: Option<f64>,
1348    pub min: Option<f64>,
1349    pub max: Option<f64>,
1350    /// Number of events in window (including those with null/NaN values)
1351    pub event_count: usize,
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356    use super::*;
1357
1358    #[test]
1359    fn test_tumbling_window() {
1360        let mut window = TumblingWindow::new(Duration::seconds(5));
1361        let base_time = Utc::now();
1362
1363        // Add events within first window
1364        for i in 0..3 {
1365            let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
1366            assert!(window.add(event).is_none());
1367        }
1368
1369        // Add event that triggers new window
1370        let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(6));
1371        let result = window.add(event);
1372        assert!(result.is_some());
1373        assert_eq!(result.unwrap().len(), 3);
1374    }
1375
1376    #[test]
1377    fn test_sliding_window() {
1378        let mut window = SlidingWindow::new(Duration::seconds(10), Duration::seconds(2));
1379        let base_time = Utc::now();
1380
1381        // Add first event - should emit
1382        let event = Event::new("Test").with_timestamp(base_time);
1383        assert!(window.add(event).is_some());
1384
1385        // Add event within slide interval - should not emit
1386        let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
1387        assert!(window.add(event).is_none());
1388
1389        // Add event after slide interval - should emit
1390        let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(3));
1391        let result = window.add(event);
1392        assert!(result.is_some());
1393        assert_eq!(result.unwrap().len(), 3);
1394    }
1395
1396    #[test]
1397    fn test_count_window() {
1398        let mut window = CountWindow::new(3);
1399
1400        // Add events
1401        let event1 = Event::new("Test").with_field("value", 1i64);
1402        assert!(window.add(event1).is_none());
1403        assert_eq!(window.current_count(), 1);
1404
1405        let event2 = Event::new("Test").with_field("value", 2i64);
1406        assert!(window.add(event2).is_none());
1407        assert_eq!(window.current_count(), 2);
1408
1409        // Third event should complete window
1410        let event3 = Event::new("Test").with_field("value", 3i64);
1411        let result = window.add(event3);
1412        assert!(result.is_some());
1413        assert_eq!(result.unwrap().len(), 3);
1414        assert_eq!(window.current_count(), 0);
1415    }
1416
1417    #[test]
1418    fn test_count_window_flush() {
1419        let mut window = CountWindow::new(5);
1420
1421        // Add fewer events than count
1422        for i in 0..3 {
1423            let event = Event::new("Test").with_field("value", i as i64);
1424            window.add(event);
1425        }
1426
1427        // Flush should return partial window
1428        let flushed = window.flush();
1429        assert_eq!(flushed.len(), 3);
1430        assert_eq!(window.current_count(), 0);
1431    }
1432
1433    #[test]
1434    fn test_sliding_count_window() {
1435        let mut window = SlidingCountWindow::new(5, 2);
1436
1437        // Fill initial window (need 5 events)
1438        for i in 0..5 {
1439            let event = Event::new("Test").with_field("value", i as i64);
1440            let result = window.add(event);
1441            if i < 4 {
1442                assert!(result.is_none(), "Should not emit before window is full");
1443            } else {
1444                assert!(result.is_some(), "Should emit when window is full");
1445                assert_eq!(result.unwrap().len(), 5);
1446            }
1447        }
1448
1449        // Add 2 more events (slide size = 2)
1450        for i in 5..7 {
1451            let event = Event::new("Test").with_field("value", i as i64);
1452            let result = window.add(event);
1453            if i < 6 {
1454                assert!(result.is_none());
1455            } else {
1456                assert!(result.is_some());
1457                // Window still has 5 events (oldest were dropped)
1458                assert_eq!(result.unwrap().len(), 5);
1459            }
1460        }
1461    }
1462
1463    #[test]
1464    fn test_tumbling_window_flush() {
1465        let mut window = TumblingWindow::new(Duration::seconds(10));
1466        let base_time = Utc::now();
1467
1468        // Add some events
1469        for i in 0..3 {
1470            let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
1471            window.add(event);
1472        }
1473
1474        // Flush should return all events
1475        let flushed = window.flush();
1476        assert_eq!(flushed.len(), 3);
1477    }
1478
1479    #[test]
1480    fn test_sliding_window_current() {
1481        let mut window = SlidingWindow::new(Duration::seconds(10), Duration::seconds(1));
1482        let base_time = Utc::now();
1483
1484        // Add events
1485        window.add(Event::new("Test").with_timestamp(base_time));
1486        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
1487        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
1488
1489        // current() should return all events in window
1490        let current = window.current();
1491        assert_eq!(current.len(), 3);
1492    }
1493
1494    #[test]
1495    fn test_sliding_window_expiry() {
1496        let mut window = SlidingWindow::new(Duration::seconds(5), Duration::seconds(1));
1497        let base_time = Utc::now();
1498
1499        // Add early event
1500        window.add(Event::new("Test").with_timestamp(base_time));
1501
1502        // Add event much later - should expire the first one
1503        let later_event = Event::new("Test").with_timestamp(base_time + Duration::seconds(10));
1504        window.add(later_event);
1505
1506        // Current should only have the later event
1507        let current = window.current();
1508        assert_eq!(current.len(), 1);
1509    }
1510
1511    #[test]
1512    fn test_partitioned_tumbling_window() {
1513        let mut window = PartitionedTumblingWindow::new("region".to_string(), Duration::seconds(5));
1514        let base_time = Utc::now();
1515
1516        // Add events for "east" region
1517        let event1 = Event::new("Test")
1518            .with_timestamp(base_time)
1519            .with_field("region", "east");
1520        window.add(event1);
1521
1522        // Add events for "west" region
1523        let event2 = Event::new("Test")
1524            .with_timestamp(base_time)
1525            .with_field("region", "west");
1526        window.add(event2);
1527
1528        // Trigger east window completion
1529        let event3 = Event::new("Test")
1530            .with_timestamp(base_time + Duration::seconds(6))
1531            .with_field("region", "east");
1532        let result = window.add(event3);
1533        assert!(result.is_some());
1534        // Only "east" events should be in result
1535        assert_eq!(result.unwrap().len(), 1);
1536    }
1537
1538    #[test]
1539    fn test_partitioned_sliding_window() {
1540        let mut window = PartitionedSlidingWindow::new(
1541            "region".to_string(),
1542            Duration::seconds(10),
1543            Duration::seconds(2),
1544        );
1545        let base_time = Utc::now();
1546
1547        // Add event for "east"
1548        let event = Event::new("Test")
1549            .with_timestamp(base_time)
1550            .with_field("region", "east");
1551        let result = window.add(event);
1552        assert!(result.is_some());
1553        assert_eq!(result.unwrap().len(), 1);
1554
1555        // Add event for "west" within slide interval
1556        let event = Event::new("Test")
1557            .with_timestamp(base_time + Duration::seconds(1))
1558            .with_field("region", "west");
1559        let result = window.add(event);
1560        // First event for west partition, should emit
1561        assert!(result.is_some());
1562    }
1563
1564    #[test]
1565    fn test_count_window_multiple_completions() {
1566        let mut window = CountWindow::new(2);
1567
1568        // Complete first window
1569        window.add(Event::new("Test").with_field("batch", 1i64));
1570        let result = window.add(Event::new("Test").with_field("batch", 1i64));
1571        assert!(result.is_some());
1572        assert_eq!(result.unwrap().len(), 2);
1573
1574        // Complete second window
1575        window.add(Event::new("Test").with_field("batch", 2i64));
1576        let result = window.add(Event::new("Test").with_field("batch", 2i64));
1577        assert!(result.is_some());
1578        assert_eq!(result.unwrap().len(), 2);
1579    }
1580
1581    // ==========================================================================
1582    // DelayBuffer Tests (rstream equivalent)
1583    // ==========================================================================
1584
1585    #[test]
1586    fn test_delay_buffer_basic() {
1587        let mut delay: DelayBuffer<i32> = DelayBuffer::new(1);
1588
1589        // First item: buffer filling
1590        assert_eq!(delay.push(10), None);
1591        assert!(!delay.is_empty());
1592        assert_eq!(delay.len(), 1);
1593
1594        // Second item: first item output
1595        assert_eq!(delay.push(20), Some(10));
1596        assert_eq!(delay.len(), 1);
1597
1598        // Third item: second item output
1599        assert_eq!(delay.push(30), Some(20));
1600        assert_eq!(delay.len(), 1);
1601    }
1602
1603    #[test]
1604    fn test_delay_buffer_delay_2() {
1605        let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1606
1607        // First two items: buffer filling
1608        assert_eq!(delay.push(10), None);
1609        assert_eq!(delay.push(20), None);
1610        assert_eq!(delay.len(), 2);
1611
1612        // Third item: first item output
1613        assert_eq!(delay.push(30), Some(10));
1614
1615        // Fourth item: second item output
1616        assert_eq!(delay.push(40), Some(20));
1617    }
1618
1619    #[test]
1620    fn test_delay_buffer_current_previous() {
1621        let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1622
1623        delay.push(10);
1624        delay.push(20);
1625
1626        assert_eq!(delay.current(), Some(&20));
1627        assert_eq!(delay.previous(), Some(&10));
1628        assert_eq!(delay.oldest(), Some(&10));
1629    }
1630
1631    #[test]
1632    fn test_delay_buffer_is_ready() {
1633        let mut delay: DelayBuffer<i32> = DelayBuffer::new(3);
1634
1635        assert!(!delay.is_ready());
1636        delay.push(1);
1637        assert!(!delay.is_ready());
1638        delay.push(2);
1639        assert!(!delay.is_ready());
1640        delay.push(3);
1641        assert!(delay.is_ready());
1642    }
1643
1644    #[test]
1645    fn test_delay_buffer_flush() {
1646        let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1647
1648        delay.push(10);
1649        delay.push(20);
1650        delay.push(30);
1651
1652        let flushed = delay.flush();
1653        assert_eq!(flushed, vec![20, 30]);
1654        assert!(delay.is_empty());
1655    }
1656
1657    #[test]
1658    fn test_delay_buffer_clear() {
1659        let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1660
1661        delay.push(10);
1662        delay.push(20);
1663
1664        delay.clear();
1665        assert!(delay.is_empty());
1666        assert_eq!(delay.len(), 0);
1667    }
1668
1669    #[test]
1670    fn test_delay_buffer_with_events() {
1671        let mut delay: DelayBuffer<Event> = DelayBuffer::new(1);
1672
1673        let e1 = Event::new("Test").with_field("value", 100i64);
1674        let e2 = Event::new("Test").with_field("value", 200i64);
1675
1676        assert!(delay.push(e1).is_none());
1677        let output = delay.push(e2);
1678        assert!(output.is_some());
1679        assert_eq!(output.unwrap().get_int("value"), Some(100));
1680    }
1681
1682    // ==========================================================================
1683    // PreviousValueTracker Tests
1684    // ==========================================================================
1685
1686    #[test]
1687    fn test_previous_value_tracker_basic() {
1688        let mut tracker: PreviousValueTracker<f64> = PreviousValueTracker::new();
1689
1690        // Initially empty
1691        assert!(tracker.current().is_none());
1692        assert!(tracker.previous().is_none());
1693        assert!(!tracker.has_both());
1694
1695        // First value
1696        tracker.update(10.0);
1697        assert_eq!(tracker.current(), Some(&10.0));
1698        assert!(tracker.previous().is_none());
1699        assert!(!tracker.has_both());
1700
1701        // Second value
1702        tracker.update(20.0);
1703        assert_eq!(tracker.current(), Some(&20.0));
1704        assert_eq!(tracker.previous(), Some(&10.0));
1705        assert!(tracker.has_both());
1706
1707        // Third value
1708        tracker.update(30.0);
1709        assert_eq!(tracker.current(), Some(&30.0));
1710        assert_eq!(tracker.previous(), Some(&20.0));
1711    }
1712
1713    #[test]
1714    fn test_previous_value_tracker_get_pair() {
1715        let mut tracker: PreviousValueTracker<i32> = PreviousValueTracker::new();
1716
1717        // No pair yet
1718        assert!(tracker.get_pair().is_none());
1719
1720        tracker.update(1);
1721        assert!(tracker.get_pair().is_none());
1722
1723        tracker.update(2);
1724        let pair = tracker.get_pair();
1725        assert!(pair.is_some());
1726        let (curr, prev) = pair.unwrap();
1727        assert_eq!(*curr, 2);
1728        assert_eq!(*prev, 1);
1729    }
1730
1731    #[test]
1732    fn test_previous_value_tracker_reset() {
1733        let mut tracker: PreviousValueTracker<i32> = PreviousValueTracker::new();
1734
1735        tracker.update(1);
1736        tracker.update(2);
1737        assert!(tracker.has_both());
1738
1739        tracker.reset();
1740        assert!(!tracker.has_both());
1741        assert!(tracker.current().is_none());
1742        assert!(tracker.previous().is_none());
1743    }
1744
1745    #[test]
1746    fn test_previous_value_tracker_threshold_comparison() {
1747        let mut tracker: PreviousValueTracker<f64> = PreviousValueTracker::new();
1748        let threshold = 5.0;
1749
1750        tracker.update(100.0);
1751        tracker.update(107.0);
1752
1753        if let Some((curr, prev)) = tracker.get_pair() {
1754            let diff = curr - prev;
1755            assert!(diff > threshold, "Should detect change > threshold");
1756        }
1757    }
1758
1759    // ==========================================================================
1760    // PartitionedDelayBuffer Tests
1761    // ==========================================================================
1762
1763    #[test]
1764    fn test_partitioned_delay_buffer() {
1765        let mut buffer: PartitionedDelayBuffer<i32> = PartitionedDelayBuffer::new(1);
1766
1767        // Push to partition "a"
1768        assert_eq!(buffer.push("a", 10), None);
1769        assert_eq!(buffer.push("a", 20), Some(10));
1770
1771        // Push to partition "b" - independent
1772        assert_eq!(buffer.push("b", 100), None);
1773        assert_eq!(buffer.push("b", 200), Some(100));
1774
1775        // Check current values
1776        assert_eq!(buffer.current("a"), Some(&20));
1777        assert_eq!(buffer.current("b"), Some(&200));
1778    }
1779
1780    #[test]
1781    fn test_partitioned_delay_buffer_previous() {
1782        let mut buffer: PartitionedDelayBuffer<i32> = PartitionedDelayBuffer::new(2);
1783
1784        buffer.push("x", 1);
1785        buffer.push("x", 2);
1786
1787        assert_eq!(buffer.current("x"), Some(&2));
1788        assert_eq!(buffer.previous("x"), Some(&1));
1789        assert!(buffer.is_ready("x"));
1790        assert!(!buffer.is_ready("y")); // non-existent partition
1791    }
1792
1793    // ==========================================================================
1794    // PartitionedPreviousValueTracker Tests
1795    // ==========================================================================
1796
1797    #[test]
1798    fn test_partitioned_previous_tracker() {
1799        let mut tracker: PartitionedPreviousValueTracker<f64> =
1800            PartitionedPreviousValueTracker::new();
1801
1802        // Update IBM
1803        tracker.update("IBM", 100.0);
1804        tracker.update("IBM", 105.0);
1805
1806        // Update MSFT
1807        tracker.update("MSFT", 200.0);
1808        tracker.update("MSFT", 198.0);
1809
1810        // Check IBM
1811        assert!(tracker.has_both("IBM"));
1812        let (curr, prev) = tracker.get_pair("IBM").unwrap();
1813        assert_eq!(*curr, 105.0);
1814        assert_eq!(*prev, 100.0);
1815
1816        // Check MSFT
1817        assert!(tracker.has_both("MSFT"));
1818        let (curr, prev) = tracker.get_pair("MSFT").unwrap();
1819        assert_eq!(*curr, 198.0);
1820        assert_eq!(*prev, 200.0);
1821
1822        // Non-existent partition
1823        assert!(!tracker.has_both("AAPL"));
1824    }
1825
1826    #[test]
1827    fn test_partitioned_previous_tracker_avg_change_detection() {
1828        // Simulate Apama streams example:
1829        // Alert when average price changes by more than THRESHOLD
1830        let mut tracker: PartitionedPreviousValueTracker<f64> =
1831            PartitionedPreviousValueTracker::new();
1832        let threshold = 1.0;
1833
1834        // Simulate averages arriving over time
1835        let averages = vec![
1836            ("ibm", 100.0),
1837            ("msft", 50.0),
1838            ("ibm", 100.5), // small change
1839            ("msft", 52.5), // big change > threshold
1840            ("ibm", 102.5), // big change > threshold
1841        ];
1842
1843        let mut alerts = Vec::new();
1844
1845        for (symbol, avg) in averages {
1846            tracker.update(symbol, avg);
1847
1848            if let Some((curr, prev)) = tracker.get_pair(symbol) {
1849                let diff = (curr - prev).abs();
1850                if diff > threshold {
1851                    alerts.push((symbol.to_string(), diff));
1852                }
1853            }
1854        }
1855
1856        assert_eq!(alerts.len(), 2);
1857        assert_eq!(alerts[0].0, "msft");
1858        assert!((alerts[0].1 - 2.5).abs() < 0.001);
1859        assert_eq!(alerts[1].0, "ibm");
1860        assert!((alerts[1].1 - 2.0).abs() < 0.001);
1861    }
1862
1863    #[test]
1864    fn test_incremental_sliding_window_basic() {
1865        let mut window =
1866            IncrementalSlidingWindow::new(Duration::seconds(5), Duration::seconds(1), "price");
1867        let base_time = Utc::now();
1868
1869        // Add events with prices
1870        for i in 0..5 {
1871            let event = Event::new("Trade")
1872                .with_timestamp(base_time + Duration::milliseconds(i * 500))
1873                .with_field("price", varpulis_core::Value::Float(100.0 + i as f64));
1874            let shared = Arc::new(event);
1875            let _ = window.add(shared);
1876        }
1877
1878        // Get aggregates
1879        let agg = window.current_aggregates();
1880        assert_eq!(agg.count, 5);
1881        assert_eq!(agg.sum, 510.0); // 100 + 101 + 102 + 103 + 104
1882        assert!((agg.avg.unwrap() - 102.0).abs() < 0.001);
1883        assert_eq!(agg.min, Some(100.0));
1884        assert_eq!(agg.max, Some(104.0));
1885    }
1886
1887    #[test]
1888    fn test_incremental_sliding_window_expiry() {
1889        let mut window =
1890            IncrementalSlidingWindow::new(Duration::seconds(2), Duration::seconds(1), "value");
1891        let base_time = Utc::now();
1892
1893        // Add 3 events at t=0, t=1, t=2
1894        for i in 0..3 {
1895            let event = Event::new("Metric")
1896                .with_timestamp(base_time + Duration::seconds(i))
1897                .with_field("value", varpulis_core::Value::Float(10.0 * (i + 1) as f64));
1898            window.add(Arc::new(event));
1899        }
1900
1901        // At t=2, window contains t=0,1,2 (all within 2s)
1902        let agg = window.current_aggregates();
1903        assert_eq!(agg.count, 3);
1904        assert_eq!(agg.sum, 60.0); // 10 + 20 + 30
1905
1906        // Add event at t=3 - should expire t=0
1907        let event = Event::new("Metric")
1908            .with_timestamp(base_time + Duration::seconds(3))
1909            .with_field("value", varpulis_core::Value::Float(40.0));
1910        window.add(Arc::new(event));
1911
1912        let agg = window.current_aggregates();
1913        assert_eq!(agg.count, 3); // t=1,2,3
1914        assert_eq!(agg.sum, 90.0); // 20 + 30 + 40 (10 expired)
1915        assert_eq!(agg.min, Some(20.0)); // 10 was removed
1916    }
1917
1918    #[test]
1919    fn test_incremental_sliding_window_emit() {
1920        let mut window =
1921            IncrementalSlidingWindow::new(Duration::seconds(5), Duration::seconds(2), "value");
1922        let base_time = Utc::now();
1923
1924        // First event should emit
1925        let event = Event::new("Test")
1926            .with_timestamp(base_time)
1927            .with_field("value", varpulis_core::Value::Float(100.0));
1928        let result = window.add(Arc::new(event));
1929        assert!(result.is_some());
1930
1931        // Event at t=1 should not emit (slide interval is 2s)
1932        let event = Event::new("Test")
1933            .with_timestamp(base_time + Duration::seconds(1))
1934            .with_field("value", varpulis_core::Value::Float(200.0));
1935        let result = window.add(Arc::new(event));
1936        assert!(result.is_none());
1937
1938        // Event at t=2 should emit
1939        let event = Event::new("Test")
1940            .with_timestamp(base_time + Duration::seconds(2))
1941            .with_field("value", varpulis_core::Value::Float(300.0));
1942        let result = window.add(Arc::new(event));
1943        assert!(result.is_some());
1944
1945        let agg = result.unwrap();
1946        assert_eq!(agg.count, 3);
1947        assert_eq!(agg.sum, 600.0);
1948    }
1949
1950    // ==========================================================================
1951    // SessionWindow Tests
1952    // ==========================================================================
1953
1954    #[test]
1955    fn test_session_window_basic() {
1956        let mut window = SessionWindow::new(Duration::seconds(5));
1957        let base_time = Utc::now();
1958
1959        // Events within 5s gap — all in same session
1960        assert!(window
1961            .add(Event::new("Test").with_timestamp(base_time))
1962            .is_none());
1963        assert!(window
1964            .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)))
1965            .is_none());
1966        assert!(window
1967            .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(4)))
1968            .is_none());
1969
1970        // Event after 6s gap — closes session
1971        let result =
1972            window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(10)));
1973        assert!(result.is_some());
1974        assert_eq!(
1975            result.unwrap().len(),
1976            3,
1977            "First session should have 3 events"
1978        );
1979    }
1980
1981    #[test]
1982    fn test_session_window_no_gap() {
1983        let mut window = SessionWindow::new(Duration::seconds(10));
1984        let base_time = Utc::now();
1985
1986        // All events within gap — no session closes
1987        for i in 0..5 {
1988            assert!(window
1989                .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(i)))
1990                .is_none());
1991        }
1992
1993        // Flush returns all events
1994        let flushed = window.flush();
1995        assert_eq!(flushed.len(), 5);
1996    }
1997
1998    #[test]
1999    fn test_session_window_multiple_sessions() {
2000        let mut window = SessionWindow::new(Duration::seconds(3));
2001        let base_time = Utc::now();
2002
2003        // Session 1: t=0, t=1, t=2
2004        window.add(Event::new("Test").with_timestamp(base_time));
2005        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2006        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2007
2008        // Gap of 5s -> session 1 closes
2009        let result =
2010            window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(7)));
2011        assert!(result.is_some());
2012        assert_eq!(result.unwrap().len(), 3);
2013
2014        // Session 2: t=7, t=8
2015        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(8)));
2016
2017        // Gap of 4s -> session 2 closes
2018        let result =
2019            window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(12)));
2020        assert!(result.is_some());
2021        assert_eq!(result.unwrap().len(), 2);
2022    }
2023
2024    #[test]
2025    fn test_session_window_flush() {
2026        let mut window = SessionWindow::new(Duration::seconds(5));
2027        let base_time = Utc::now();
2028
2029        window.add(Event::new("Test").with_timestamp(base_time));
2030        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2031
2032        let flushed = window.flush();
2033        assert_eq!(flushed.len(), 2);
2034
2035        // After flush, window is empty
2036        let flushed_again = window.flush();
2037        assert_eq!(flushed_again.len(), 0);
2038    }
2039
2040    // ==========================================================================
2041    // PartitionedSessionWindow Tests
2042    // ==========================================================================
2043
2044    #[test]
2045    fn test_session_window_check_expired_not_expired() {
2046        let mut window = SessionWindow::new(Duration::seconds(5));
2047        let base_time = Utc::now();
2048
2049        // Add events
2050        window.add(Event::new("Test").with_timestamp(base_time));
2051        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2052
2053        // Check at a time within the gap — should not expire
2054        let check_time = base_time + Duration::seconds(4);
2055        assert!(window.check_expired(check_time).is_none());
2056    }
2057
2058    #[test]
2059    fn test_session_window_check_expired_returns_events() {
2060        let mut window = SessionWindow::new(Duration::seconds(5));
2061        let base_time = Utc::now();
2062
2063        // Add events
2064        window.add(Event::new("Test").with_timestamp(base_time));
2065        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2066        window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2067
2068        // Check at a time past the gap — should expire
2069        let check_time = base_time + Duration::seconds(8);
2070        let result = window.check_expired(check_time);
2071        assert!(result.is_some());
2072        assert_eq!(result.unwrap().len(), 3);
2073
2074        // After expiry, window should be empty
2075        let flushed = window.flush();
2076        assert_eq!(flushed.len(), 0);
2077    }
2078
2079    #[test]
2080    fn test_session_window_check_expired_empty() {
2081        let mut window = SessionWindow::new(Duration::seconds(5));
2082        // Empty window should return None
2083        assert!(window.check_expired(Utc::now()).is_none());
2084    }
2085
2086    #[test]
2087    fn test_session_window_gap_getter() {
2088        let window = SessionWindow::new(Duration::seconds(7));
2089        assert_eq!(window.gap(), Duration::seconds(7));
2090    }
2091
2092    #[test]
2093    fn test_partitioned_session_window_check_expired() {
2094        let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(5));
2095        let base_time = Utc::now();
2096
2097        // Add events to two partitions at different times
2098        window.add(
2099            Event::new("Test")
2100                .with_timestamp(base_time)
2101                .with_field("region", "east"),
2102        );
2103        window.add(
2104            Event::new("Test")
2105                .with_timestamp(base_time + Duration::seconds(3))
2106                .with_field("region", "west"),
2107        );
2108
2109        // At base_time + 6s: east expired (last=0, gap=5), west still active (last=3)
2110        // Note: partition keys are formatted via Display, which quotes strings
2111        let check_time = base_time + Duration::seconds(6);
2112        let expired = window.check_expired(check_time);
2113        assert_eq!(expired.len(), 1);
2114        assert_eq!(expired[0].1.len(), 1);
2115
2116        // At base_time + 9s: west now expired too
2117        let check_time2 = base_time + Duration::seconds(9);
2118        let expired2 = window.check_expired(check_time2);
2119        assert_eq!(expired2.len(), 1);
2120    }
2121
2122    #[test]
2123    fn test_partitioned_session_window_removes_expired() {
2124        let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(3));
2125        let base_time = Utc::now();
2126
2127        window.add(
2128            Event::new("Test")
2129                .with_timestamp(base_time)
2130                .with_field("region", "east"),
2131        );
2132        window.add(
2133            Event::new("Test")
2134                .with_timestamp(base_time)
2135                .with_field("region", "west"),
2136        );
2137
2138        // Expire both partitions
2139        let check_time = base_time + Duration::seconds(5);
2140        let expired = window.check_expired(check_time);
2141        assert_eq!(expired.len(), 2);
2142
2143        // After expiry, flush should return nothing
2144        let flushed = window.flush();
2145        assert_eq!(flushed.len(), 0);
2146    }
2147
2148    #[test]
2149    fn test_partitioned_session_window_gap_getter() {
2150        let window = PartitionedSessionWindow::new("k".to_string(), Duration::seconds(10));
2151        assert_eq!(window.gap(), Duration::seconds(10));
2152    }
2153
2154    #[test]
2155    fn test_partitioned_session_window() {
2156        let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(5));
2157        let base_time = Utc::now();
2158
2159        // East: t=0, t=2
2160        window.add(
2161            Event::new("Test")
2162                .with_timestamp(base_time)
2163                .with_field("region", "east"),
2164        );
2165        window.add(
2166            Event::new("Test")
2167                .with_timestamp(base_time + Duration::seconds(2))
2168                .with_field("region", "east"),
2169        );
2170
2171        // West: t=0
2172        window.add(
2173            Event::new("Test")
2174                .with_timestamp(base_time)
2175                .with_field("region", "west"),
2176        );
2177
2178        // East: gap of 6s -> east session closes
2179        let result = window.add(
2180            Event::new("Test")
2181                .with_timestamp(base_time + Duration::seconds(8))
2182                .with_field("region", "east"),
2183        );
2184        assert!(result.is_some());
2185        assert_eq!(
2186            result.unwrap().len(),
2187            2,
2188            "East session should have 2 events"
2189        );
2190
2191        // West still open (no gap exceeded)
2192        let result = window.add(
2193            Event::new("Test")
2194                .with_timestamp(base_time + Duration::seconds(3))
2195                .with_field("region", "west"),
2196        );
2197        assert!(result.is_none(), "West session should still be open");
2198    }
2199
2200    #[test]
2201    fn test_tumbling_window_checkpoint_restore() {
2202        let mut window = TumblingWindow::new(Duration::seconds(5));
2203        let base_time = Utc::now();
2204
2205        // Add 2 events within first window
2206        let e1 = Event::new("Test").with_timestamp(base_time);
2207        assert!(window.add(e1).is_none());
2208        let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
2209        assert!(window.add(e2).is_none());
2210
2211        // Checkpoint
2212        let cp = window.checkpoint();
2213
2214        // Restore into a new window
2215        let mut restored = TumblingWindow::new(Duration::seconds(5));
2216        restored.restore(&cp);
2217
2218        // Add event that triggers window close (falls into next window)
2219        let e3 = Event::new("Test").with_timestamp(base_time + Duration::seconds(6));
2220        let result = restored.add(e3);
2221        assert!(
2222            result.is_some(),
2223            "Window should close on event past boundary"
2224        );
2225        let emitted = result.unwrap();
2226        assert_eq!(
2227            emitted.len(),
2228            2,
2229            "Restored window should contain the 2 checkpointed events"
2230        );
2231    }
2232
2233    #[test]
2234    fn test_count_window_checkpoint_restore() {
2235        let mut window = CountWindow::new(3);
2236
2237        // Add 2 events
2238        let e1 = Event::new("Test").with_field("value", 1i64);
2239        assert!(window.add(e1).is_none());
2240        let e2 = Event::new("Test").with_field("value", 2i64);
2241        assert!(window.add(e2).is_none());
2242
2243        // Checkpoint
2244        let cp = window.checkpoint();
2245
2246        // Restore into a new window
2247        let mut restored = CountWindow::new(3);
2248        restored.restore(&cp);
2249
2250        // Add 1 more event to fill window
2251        let e3 = Event::new("Test").with_field("value", 3i64);
2252        let result = restored.add(e3);
2253        assert!(result.is_some(), "Window should emit after 3rd event");
2254        let emitted = result.unwrap();
2255        assert_eq!(emitted.len(), 3, "All 3 events should be returned");
2256    }
2257
2258    #[test]
2259    fn test_sliding_count_window_checkpoint_restore() {
2260        // window_size=4, slide_size=2
2261        let mut window = SlidingCountWindow::new(4, 2);
2262
2263        // Add 2 events (not enough to fill window yet)
2264        let e1 = Event::new("Test").with_field("seq", 1i64);
2265        assert!(window.add(e1).is_none());
2266        let e2 = Event::new("Test").with_field("seq", 2i64);
2267        assert!(window.add(e2).is_none());
2268
2269        // Checkpoint
2270        let cp = window.checkpoint();
2271
2272        // Restore into a new window
2273        let mut restored = SlidingCountWindow::new(4, 2);
2274        restored.restore(&cp);
2275
2276        // Add 2 more events to fill the window (4 total) and reach slide_size
2277        let e3 = Event::new("Test").with_field("seq", 3i64);
2278        assert!(restored.add(e3).is_none());
2279        let e4 = Event::new("Test").with_field("seq", 4i64);
2280        let result = restored.add(e4);
2281        assert!(
2282            result.is_some(),
2283            "Should emit when window is full and slide interval reached"
2284        );
2285        assert_eq!(
2286            result.unwrap().len(),
2287            4,
2288            "Window should contain all 4 events"
2289        );
2290
2291        // Continue sliding: add 2 more events
2292        let e5 = Event::new("Test").with_field("seq", 5i64);
2293        assert!(restored.add(e5).is_none());
2294        let e6 = Event::new("Test").with_field("seq", 6i64);
2295        let result = restored.add(e6);
2296        assert!(result.is_some(), "Should emit after slide_size more events");
2297        let emitted = result.unwrap();
2298        assert_eq!(
2299            emitted.len(),
2300            4,
2301            "Sliding window should still hold 4 events"
2302        );
2303    }
2304
2305    #[test]
2306    fn test_session_window_checkpoint_restore() {
2307        let mut window = SessionWindow::new(Duration::seconds(5));
2308        let base_time = Utc::now();
2309
2310        // Add 2 events 1 second apart (within session gap)
2311        let e1 = Event::new("Test").with_timestamp(base_time);
2312        assert!(window.add(e1).is_none());
2313        let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
2314        assert!(window.add(e2).is_none());
2315
2316        // Checkpoint
2317        let cp = window.checkpoint();
2318
2319        // Restore into a new window
2320        let mut restored = SessionWindow::new(Duration::seconds(5));
2321        restored.restore(&cp);
2322
2323        // Add event with 6s gap from last event to close the session
2324        let e3 = Event::new("Test").with_timestamp(base_time + Duration::seconds(7));
2325        let result = restored.add(e3);
2326        assert!(
2327            result.is_some(),
2328            "Session should close due to gap exceeding 5s"
2329        );
2330        let emitted = result.unwrap();
2331        assert_eq!(
2332            emitted.len(),
2333            2,
2334            "Closed session should contain the 2 checkpointed events"
2335        );
2336    }
2337
2338    #[test]
2339    fn test_tumbling_window_advance_watermark() {
2340        let mut window = TumblingWindow::new(Duration::seconds(5));
2341        let base_time = Utc::now();
2342
2343        // Add 3 events within first window
2344        for i in 0..3 {
2345            let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
2346            assert!(window.add(event).is_none());
2347        }
2348
2349        // Advance watermark past window boundary (base_time + 5s)
2350        let result = window.advance_watermark(base_time + Duration::seconds(6));
2351        assert!(
2352            result.is_some(),
2353            "Watermark past window end should emit events"
2354        );
2355        let emitted = result.unwrap();
2356        assert_eq!(
2357            emitted.len(),
2358            3,
2359            "All 3 events in the window should be emitted"
2360        );
2361    }
2362
2363    #[test]
2364    fn test_session_window_advance_watermark() {
2365        let mut window = SessionWindow::new(Duration::seconds(5));
2366        let base_time = Utc::now();
2367
2368        // Add events within session
2369        let e1 = Event::new("Test").with_timestamp(base_time);
2370        assert!(window.add(e1).is_none());
2371        let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(2));
2372        assert!(window.add(e2).is_none());
2373
2374        // Advance watermark past last_event_time + gap (base_time + 2s + 5s = base_time + 7s)
2375        let result = window.advance_watermark(base_time + Duration::seconds(8));
2376        assert!(
2377            result.is_some(),
2378            "Watermark past session gap should close the session"
2379        );
2380        let emitted = result.unwrap();
2381        assert_eq!(
2382            emitted.len(),
2383            2,
2384            "Closed session should contain the 2 events"
2385        );
2386    }
2387}