1use std::collections::VecDeque;
10use std::sync::Arc;
11
12use chrono::{DateTime, Duration, Utc};
13use rustc_hash::FxHashMap;
14
15use crate::columnar::{ColumnarAccess, ColumnarBuffer};
16use crate::event::{Event, SharedEvent};
17use crate::persistence::{PartitionedWindowCheckpoint, SerializableEvent, WindowCheckpoint};
18
19#[derive(Debug)]
25pub struct TumblingWindow {
26 duration: Duration,
27 pub(crate) columnar: ColumnarBuffer,
29 pub(crate) window_start: Option<DateTime<Utc>>,
31}
32
33impl TumblingWindow {
34 pub fn new(duration: Duration) -> Self {
35 Self {
36 duration,
37 columnar: ColumnarBuffer::new(),
38 window_start: None,
39 }
40 }
41
42 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
44 let event_time = event.timestamp;
45
46 if self.window_start.is_none() {
48 self.window_start = Some(event_time);
49 }
50
51 let window_start = self.window_start?;
53 let window_end = window_start + self.duration;
54
55 if event_time >= window_end {
56 let completed = self.columnar.take_all();
58 self.window_start = Some(event_time);
59 self.columnar.push(event);
60 Some(completed)
61 } else {
62 self.columnar.push(event);
63 None
64 }
65 }
66
67 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
69 self.add_shared(Arc::new(event))
70 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
71 }
72
73 pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
75 self.columnar.take_all()
76 }
77
78 pub fn flush_columnar(&mut self) -> ColumnarBuffer {
82 self.window_start = None;
83 std::mem::take(&mut self.columnar)
84 }
85
86 pub fn flush(&mut self) -> Vec<Event> {
88 self.flush_shared()
89 .into_iter()
90 .map(|e| (*e).clone())
91 .collect()
92 }
93
94 pub const fn len(&self) -> usize {
96 self.columnar.len()
97 }
98
99 pub const fn is_empty(&self) -> bool {
101 self.columnar.is_empty()
102 }
103
104 pub fn checkpoint(&self) -> WindowCheckpoint {
106 WindowCheckpoint {
107 events: self
108 .columnar
109 .events()
110 .iter()
111 .map(|e| SerializableEvent::from(e.as_ref()))
112 .collect(),
113 window_start_ms: self.window_start.map(|t| t.timestamp_millis()),
114 last_emit_ms: None,
115 partitions: std::collections::HashMap::new(),
116 }
117 }
118
119 pub fn restore(&mut self, cp: &WindowCheckpoint) {
121 let events: Vec<SharedEvent> = cp
122 .events
123 .iter()
124 .map(|se| Arc::new(Event::from(se.clone())))
125 .collect();
126 self.columnar = ColumnarBuffer::from_events(events);
127 self.window_start = cp.window_start_ms.and_then(DateTime::from_timestamp_millis);
128 }
129
130 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
132 if let Some(start) = self.window_start {
133 if wm >= start + self.duration && !self.columnar.is_empty() {
134 let completed = self.columnar.take_all();
135 self.window_start = Some(wm);
136 return Some(completed);
137 }
138 }
139 None
140 }
141}
142
143impl ColumnarAccess for TumblingWindow {
144 fn columnar(&self) -> &ColumnarBuffer {
145 &self.columnar
146 }
147
148 fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
149 &mut self.columnar
150 }
151}
152
153#[derive(Debug)]
155pub struct SlidingWindow {
156 window_size: Duration,
157 slide_interval: Duration,
158 events: VecDeque<SharedEvent>,
159 last_emit: Option<DateTime<Utc>>,
160}
161
162impl SlidingWindow {
163 pub const fn new(window_size: Duration, slide_interval: Duration) -> Self {
164 Self {
165 window_size,
166 slide_interval,
167 events: VecDeque::new(),
168 last_emit: None,
169 }
170 }
171
172 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
174 let event_time = event.timestamp;
175 self.events.push_back(event);
176
177 let cutoff = event_time - self.window_size;
180 let expired_count = self
181 .events
182 .iter()
183 .position(|e| e.timestamp >= cutoff)
184 .unwrap_or(self.events.len());
185
186 if expired_count > 0 {
187 self.events.drain(0..expired_count);
189 }
190
191 let should_emit = match self.last_emit {
193 None => true,
194 Some(last) => event_time >= last + self.slide_interval,
195 };
196
197 should_emit.then(|| {
198 self.last_emit = Some(event_time);
199 self.events.iter().map(Arc::clone).collect()
200 })
201 }
202
203 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
205 self.add_shared(Arc::new(event))
206 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
207 }
208
209 pub fn current_shared(&self) -> Vec<SharedEvent> {
211 self.events.iter().map(Arc::clone).collect()
212 }
213
214 pub fn current(&self) -> Vec<Event> {
216 self.events.iter().map(|e| (**e).clone()).collect()
217 }
218
219 pub fn checkpoint(&self) -> WindowCheckpoint {
221 WindowCheckpoint {
222 events: self
223 .events
224 .iter()
225 .map(|e| SerializableEvent::from(e.as_ref()))
226 .collect(),
227 window_start_ms: None,
228 last_emit_ms: self.last_emit.map(|t| t.timestamp_millis()),
229 partitions: std::collections::HashMap::new(),
230 }
231 }
232
233 pub fn restore(&mut self, cp: &WindowCheckpoint) {
235 self.events = cp
236 .events
237 .iter()
238 .map(|se| Arc::new(Event::from(se.clone())))
239 .collect::<Vec<_>>()
240 .into();
241 self.last_emit = cp.last_emit_ms.and_then(DateTime::from_timestamp_millis);
242 }
243
244 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
246 let cutoff = wm - self.window_size;
248 let expired_count = self
249 .events
250 .iter()
251 .position(|e| e.timestamp >= cutoff)
252 .unwrap_or(self.events.len());
253 if expired_count > 0 {
254 self.events.drain(0..expired_count);
255 }
256
257 let should_emit = match self.last_emit {
258 None => !self.events.is_empty(),
259 Some(last) => wm >= last + self.slide_interval && !self.events.is_empty(),
260 };
261
262 should_emit.then(|| {
263 self.last_emit = Some(wm);
264 self.events.iter().map(Arc::clone).collect()
265 })
266 }
267}
268
269#[derive(Debug)]
275pub struct CountWindow {
276 count: usize,
277 columnar: ColumnarBuffer,
278}
279
280impl CountWindow {
281 pub fn new(count: usize) -> Self {
282 Self {
283 count,
284 columnar: ColumnarBuffer::with_capacity(count),
285 }
286 }
287
288 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
290 self.columnar.push(event);
291
292 (self.columnar.len() >= self.count).then(|| self.columnar.take_all())
293 }
294
295 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
297 self.add_shared(Arc::new(event))
298 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
299 }
300
301 pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
303 self.columnar.take_all()
304 }
305
306 pub fn flush_columnar(&mut self) -> ColumnarBuffer {
308 std::mem::take(&mut self.columnar)
309 }
310
311 pub fn flush(&mut self) -> Vec<Event> {
313 self.flush_shared()
314 .into_iter()
315 .map(|e| (*e).clone())
316 .collect()
317 }
318
319 pub const fn current_count(&self) -> usize {
321 self.columnar.len()
322 }
323
324 pub fn checkpoint(&self) -> WindowCheckpoint {
326 WindowCheckpoint {
327 events: self
328 .columnar
329 .events()
330 .iter()
331 .map(|e| SerializableEvent::from(e.as_ref()))
332 .collect(),
333 window_start_ms: None,
334 last_emit_ms: None,
335 partitions: std::collections::HashMap::new(),
336 }
337 }
338
339 pub fn restore(&mut self, cp: &WindowCheckpoint) {
341 let events: Vec<SharedEvent> = cp
342 .events
343 .iter()
344 .map(|se| Arc::new(Event::from(se.clone())))
345 .collect();
346 self.columnar = ColumnarBuffer::from_events(events);
347 }
348}
349
350impl ColumnarAccess for CountWindow {
351 fn columnar(&self) -> &ColumnarBuffer {
352 &self.columnar
353 }
354
355 fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
356 &mut self.columnar
357 }
358}
359
360#[derive(Debug)]
362pub struct SlidingCountWindow {
363 window_size: usize,
364 slide_size: usize,
365 events: VecDeque<SharedEvent>,
366 events_since_emit: usize,
367}
368
369impl SlidingCountWindow {
370 pub fn new(window_size: usize, slide_size: usize) -> Self {
371 Self {
372 window_size,
373 slide_size,
374 events: VecDeque::with_capacity(window_size),
375 events_since_emit: 0,
376 }
377 }
378
379 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
381 self.events.push_back(event);
382 self.events_since_emit += 1;
383
384 let overflow = self.events.len().saturating_sub(self.window_size);
386 if overflow > 0 {
387 self.events.drain(0..overflow);
388 }
389
390 (self.events.len() >= self.window_size && self.events_since_emit >= self.slide_size).then(
392 || {
393 self.events_since_emit = 0;
394 self.events.iter().map(Arc::clone).collect()
395 },
396 )
397 }
398
399 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
401 self.add_shared(Arc::new(event))
402 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
403 }
404
405 pub fn current_count(&self) -> usize {
407 self.events.len()
408 }
409
410 pub fn checkpoint(&self) -> WindowCheckpoint {
412 WindowCheckpoint {
413 events: self
414 .events
415 .iter()
416 .map(|e| SerializableEvent::from(e.as_ref()))
417 .collect(),
418 window_start_ms: None,
419 last_emit_ms: None,
420 partitions: std::collections::HashMap::new(),
421 }
422 }
423
424 pub fn restore(&mut self, cp: &WindowCheckpoint) {
426 self.events = cp
427 .events
428 .iter()
429 .map(|se| Arc::new(Event::from(se.clone())))
430 .collect::<Vec<_>>()
431 .into();
432 self.events_since_emit = 0;
433 }
434}
435
436#[derive(Debug)]
446pub struct SessionWindow {
447 gap: Duration,
448 pub(crate) columnar: ColumnarBuffer,
450 pub(crate) last_event_time: Option<DateTime<Utc>>,
452}
453
454impl SessionWindow {
455 pub fn new(gap: Duration) -> Self {
456 Self {
457 gap,
458 columnar: ColumnarBuffer::new(),
459 last_event_time: None,
460 }
461 }
462
463 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
466 let event_time = event.timestamp;
467 if let Some(last_time) = self.last_event_time {
468 if event_time - last_time > self.gap {
469 let completed = self.columnar.take_all();
471 self.columnar.push(event);
472 self.last_event_time = Some(event_time);
473 return Some(completed);
474 }
475 }
476 self.columnar.push(event);
478 self.last_event_time = Some(event_time);
479 None
480 }
481
482 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
484 self.add_shared(Arc::new(event))
485 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
486 }
487
488 pub fn check_expired(&mut self, now: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
491 if let Some(last) = self.last_event_time {
492 if now - last > self.gap {
493 return Some(self.flush_shared());
494 }
495 }
496 None
497 }
498
499 pub const fn gap(&self) -> Duration {
501 self.gap
502 }
503
504 pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
506 self.last_event_time = None;
507 self.columnar.take_all()
508 }
509
510 pub fn flush_columnar(&mut self) -> ColumnarBuffer {
512 self.last_event_time = None;
513 std::mem::take(&mut self.columnar)
514 }
515
516 pub fn flush(&mut self) -> Vec<Event> {
518 self.flush_shared()
519 .into_iter()
520 .map(|e| (*e).clone())
521 .collect()
522 }
523
524 pub fn checkpoint(&self) -> WindowCheckpoint {
526 WindowCheckpoint {
527 events: self
528 .columnar
529 .events()
530 .iter()
531 .map(|e| SerializableEvent::from(e.as_ref()))
532 .collect(),
533 window_start_ms: self.last_event_time.map(|t| t.timestamp_millis()),
534 last_emit_ms: None,
535 partitions: std::collections::HashMap::new(),
536 }
537 }
538
539 pub fn restore(&mut self, cp: &WindowCheckpoint) {
541 let events: Vec<SharedEvent> = cp
542 .events
543 .iter()
544 .map(|se| Arc::new(Event::from(se.clone())))
545 .collect();
546 self.columnar = ColumnarBuffer::from_events(events);
547 self.last_event_time = cp.window_start_ms.and_then(DateTime::from_timestamp_millis);
548 }
549
550 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Option<Vec<SharedEvent>> {
552 if let Some(last) = self.last_event_time {
553 if wm >= last + self.gap && !self.columnar.is_empty() {
554 return Some(self.flush_shared());
555 }
556 }
557 None
558 }
559}
560
561impl ColumnarAccess for SessionWindow {
562 fn columnar(&self) -> &ColumnarBuffer {
563 &self.columnar
564 }
565
566 fn columnar_mut(&mut self) -> &mut ColumnarBuffer {
567 &mut self.columnar
568 }
569}
570
571#[derive(Debug)]
573pub struct PartitionedSessionWindow {
574 partition_key: String,
575 gap: Duration,
576 windows: FxHashMap<String, SessionWindow>,
577}
578
579impl PartitionedSessionWindow {
580 pub fn new(partition_key: String, gap: Duration) -> Self {
581 Self {
582 partition_key,
583 gap,
584 windows: FxHashMap::default(),
585 }
586 }
587
588 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
590 let key = event.get(&self.partition_key).map_or_else(
591 || "default".to_string(),
592 |v| v.to_partition_key().into_owned(),
593 );
594
595 let window = self
596 .windows
597 .entry(key)
598 .or_insert_with(|| SessionWindow::new(self.gap));
599
600 window.add_shared(event)
601 }
602
603 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
605 self.add_shared(Arc::new(event))
606 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
607 }
608
609 pub fn check_expired(&mut self, now: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
613 let mut expired = Vec::with_capacity(self.windows.len());
614 let mut to_remove = Vec::new();
615 for (key, window) in &mut self.windows {
616 if let Some(events) = window.check_expired(now) {
617 if !events.is_empty() {
618 expired.push((key.clone(), events));
619 }
620 to_remove.push(key.clone());
621 }
622 }
623 for key in to_remove {
624 self.windows.remove(&key);
625 }
626 expired
627 }
628
629 pub const fn gap(&self) -> Duration {
631 self.gap
632 }
633
634 pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
636 let mut all_events = Vec::new();
637 for window in self.windows.values_mut() {
638 all_events.extend(window.flush_shared());
639 }
640 all_events
641 }
642
643 pub fn flush(&mut self) -> Vec<Event> {
645 self.flush_shared()
646 .into_iter()
647 .map(|e| (*e).clone())
648 .collect()
649 }
650
651 pub fn checkpoint(&self) -> WindowCheckpoint {
653 let partitions = self
654 .windows
655 .iter()
656 .map(|(key, window)| {
657 (
658 key.clone(),
659 PartitionedWindowCheckpoint {
660 events: window
661 .columnar
662 .events()
663 .iter()
664 .map(|e| SerializableEvent::from(e.as_ref()))
665 .collect(),
666 window_start_ms: window.last_event_time.map(|t| t.timestamp_millis()),
667 },
668 )
669 })
670 .collect();
671
672 WindowCheckpoint {
673 events: Vec::new(),
674 window_start_ms: None,
675 last_emit_ms: None,
676 partitions,
677 }
678 }
679
680 pub fn restore(&mut self, cp: &WindowCheckpoint) {
682 self.windows.clear();
683 for (key, pcp) in &cp.partitions {
684 let mut window = SessionWindow::new(self.gap);
685 let events: Vec<SharedEvent> = pcp
686 .events
687 .iter()
688 .map(|se| Arc::new(Event::from(se.clone())))
689 .collect();
690 window.columnar = ColumnarBuffer::from_events(events);
691 window.last_event_time = pcp
692 .window_start_ms
693 .and_then(DateTime::from_timestamp_millis);
694 self.windows.insert(key.clone(), window);
695 }
696 }
697
698 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
700 let mut expired = Vec::with_capacity(self.windows.len());
701 let mut to_remove = Vec::new();
702 for (key, window) in &mut self.windows {
703 if let Some(events) = window.advance_watermark(wm) {
704 if !events.is_empty() {
705 expired.push((key.clone(), events));
706 }
707 to_remove.push(key.clone());
708 }
709 }
710 for key in to_remove {
711 self.windows.remove(&key);
712 }
713 expired
714 }
715}
716
717#[derive(Debug)]
719pub struct PartitionedTumblingWindow {
720 partition_key: String,
721 duration: Duration,
722 windows: FxHashMap<String, TumblingWindow>,
723}
724
725impl PartitionedTumblingWindow {
726 pub fn new(partition_key: String, duration: Duration) -> Self {
727 Self {
728 partition_key,
729 duration,
730 windows: FxHashMap::default(),
731 }
732 }
733
734 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
736 let key = event.get(&self.partition_key).map_or_else(
737 || "default".to_string(),
738 |v| v.to_partition_key().into_owned(),
739 );
740
741 let window = self
742 .windows
743 .entry(key)
744 .or_insert_with(|| TumblingWindow::new(self.duration));
745
746 window.add_shared(event)
747 }
748
749 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
751 self.add_shared(Arc::new(event))
752 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
753 }
754
755 pub fn flush_shared(&mut self) -> Vec<SharedEvent> {
757 let mut all_events = Vec::new();
758 for window in self.windows.values_mut() {
759 all_events.extend(window.flush_shared());
760 }
761 all_events
762 }
763
764 pub fn flush(&mut self) -> Vec<Event> {
766 self.flush_shared()
767 .into_iter()
768 .map(|e| (*e).clone())
769 .collect()
770 }
771
772 pub fn checkpoint(&self) -> WindowCheckpoint {
774 let partitions = self
775 .windows
776 .iter()
777 .map(|(key, window)| {
778 (
779 key.clone(),
780 PartitionedWindowCheckpoint {
781 events: window
782 .columnar
783 .events()
784 .iter()
785 .map(|e| SerializableEvent::from(e.as_ref()))
786 .collect(),
787 window_start_ms: window.window_start.map(|t| t.timestamp_millis()),
788 },
789 )
790 })
791 .collect();
792
793 WindowCheckpoint {
794 events: Vec::new(),
795 window_start_ms: None,
796 last_emit_ms: None,
797 partitions,
798 }
799 }
800
801 pub fn restore(&mut self, cp: &WindowCheckpoint) {
803 self.windows.clear();
804 for (key, pcp) in &cp.partitions {
805 let mut window = TumblingWindow::new(self.duration);
806 let events: Vec<SharedEvent> = pcp
807 .events
808 .iter()
809 .map(|se| Arc::new(Event::from(se.clone())))
810 .collect();
811 window.columnar = ColumnarBuffer::from_events(events);
812 window.window_start = pcp
813 .window_start_ms
814 .and_then(DateTime::from_timestamp_millis);
815 self.windows.insert(key.clone(), window);
816 }
817 }
818
819 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
821 let mut results = Vec::new();
822 for (key, window) in &mut self.windows {
823 if let Some(events) = window.advance_watermark(wm) {
824 results.push((key.clone(), events));
825 }
826 }
827 results
828 }
829}
830
831#[derive(Debug)]
833pub struct PartitionedSlidingWindow {
834 partition_key: String,
835 window_size: Duration,
836 slide_interval: Duration,
837 windows: FxHashMap<String, SlidingWindow>,
838}
839
840impl PartitionedSlidingWindow {
841 pub fn new(partition_key: String, window_size: Duration, slide_interval: Duration) -> Self {
842 Self {
843 partition_key,
844 window_size,
845 slide_interval,
846 windows: FxHashMap::default(),
847 }
848 }
849
850 pub fn add_shared(&mut self, event: SharedEvent) -> Option<Vec<SharedEvent>> {
852 let key = event.get(&self.partition_key).map_or_else(
853 || "default".to_string(),
854 |v| v.to_partition_key().into_owned(),
855 );
856
857 let window = self
858 .windows
859 .entry(key)
860 .or_insert_with(|| SlidingWindow::new(self.window_size, self.slide_interval));
861
862 window.add_shared(event)
863 }
864
865 pub fn add(&mut self, event: Event) -> Option<Vec<Event>> {
867 self.add_shared(Arc::new(event))
868 .map(|events| events.into_iter().map(|e| (*e).clone()).collect())
869 }
870
871 pub fn current_all_shared(&self) -> Vec<SharedEvent> {
873 let mut all_events = Vec::new();
874 for window in self.windows.values() {
875 all_events.extend(window.current_shared());
876 }
877 all_events
878 }
879
880 pub fn current_all(&self) -> Vec<Event> {
882 self.current_all_shared()
883 .into_iter()
884 .map(|e| (*e).clone())
885 .collect()
886 }
887
888 pub fn checkpoint(&self) -> WindowCheckpoint {
890 let partitions = self
891 .windows
892 .iter()
893 .map(|(key, window)| {
894 (
895 key.clone(),
896 PartitionedWindowCheckpoint {
897 events: window
898 .events
899 .iter()
900 .map(|e| SerializableEvent::from(e.as_ref()))
901 .collect(),
902 window_start_ms: window.last_emit.map(|t| t.timestamp_millis()),
903 },
904 )
905 })
906 .collect();
907
908 WindowCheckpoint {
909 events: Vec::new(),
910 window_start_ms: None,
911 last_emit_ms: None,
912 partitions,
913 }
914 }
915
916 pub fn restore(&mut self, cp: &WindowCheckpoint) {
918 self.windows.clear();
919 for (key, pcp) in &cp.partitions {
920 let mut window = SlidingWindow::new(self.window_size, self.slide_interval);
921 window.events = pcp
922 .events
923 .iter()
924 .map(|se| Arc::new(Event::from(se.clone())))
925 .collect::<Vec<_>>()
926 .into();
927 window.last_emit = pcp
928 .window_start_ms
929 .and_then(DateTime::from_timestamp_millis);
930 self.windows.insert(key.clone(), window);
931 }
932 }
933
934 pub fn advance_watermark(&mut self, wm: DateTime<Utc>) -> Vec<(String, Vec<SharedEvent>)> {
936 let mut results = Vec::new();
937 for (key, window) in &mut self.windows {
938 if let Some(events) = window.advance_watermark(wm) {
939 results.push((key.clone(), events));
940 }
941 }
942 results
943 }
944}
945
946#[derive(Debug, Clone)]
976pub struct DelayBuffer<T> {
977 buffer: VecDeque<T>,
978 delay: usize,
979}
980
981impl<T: Clone> DelayBuffer<T> {
982 pub fn new(delay: usize) -> Self {
986 Self {
987 buffer: VecDeque::with_capacity(delay + 1),
988 delay: delay.max(1), }
990 }
991
992 pub fn push(&mut self, item: T) -> Option<T> {
997 self.buffer.push_back(item);
998
999 if self.buffer.len() > self.delay {
1000 self.buffer.pop_front()
1001 } else {
1002 None
1003 }
1004 }
1005
1006 pub fn current(&self) -> Option<&T> {
1008 self.buffer.back()
1009 }
1010
1011 pub fn previous(&self) -> Option<&T> {
1013 if self.buffer.len() >= 2 {
1014 self.buffer.get(self.buffer.len() - 2)
1015 } else {
1016 None
1017 }
1018 }
1019
1020 pub fn oldest(&self) -> Option<&T> {
1022 self.buffer.front()
1023 }
1024
1025 pub fn is_ready(&self) -> bool {
1027 self.buffer.len() >= self.delay
1028 }
1029
1030 pub fn len(&self) -> usize {
1032 self.buffer.len()
1033 }
1034
1035 pub fn is_empty(&self) -> bool {
1037 self.buffer.is_empty()
1038 }
1039
1040 pub fn clear(&mut self) {
1042 self.buffer.clear();
1043 }
1044
1045 pub fn flush(&mut self) -> Vec<T> {
1047 self.buffer.drain(..).collect()
1048 }
1049}
1050
1051#[derive(Debug, Clone, Default)]
1067pub struct PreviousValueTracker<T> {
1068 current: Option<T>,
1069 previous: Option<T>,
1070}
1071
1072impl<T: Clone> PreviousValueTracker<T> {
1073 pub const fn new() -> Self {
1075 Self {
1076 current: None,
1077 previous: None,
1078 }
1079 }
1080
1081 pub fn update(&mut self, value: T) {
1083 self.previous = self.current.take();
1084 self.current = Some(value);
1085 }
1086
1087 pub const fn current(&self) -> Option<&T> {
1089 self.current.as_ref()
1090 }
1091
1092 pub const fn previous(&self) -> Option<&T> {
1094 self.previous.as_ref()
1095 }
1096
1097 pub const fn has_both(&self) -> bool {
1099 self.current.is_some() && self.previous.is_some()
1100 }
1101
1102 pub const fn get_pair(&self) -> Option<(&T, &T)> {
1104 match (&self.current, &self.previous) {
1105 (Some(curr), Some(prev)) => Some((curr, prev)),
1106 _ => None,
1107 }
1108 }
1109
1110 pub fn reset(&mut self) {
1112 self.current = None;
1113 self.previous = None;
1114 }
1115}
1116
1117#[derive(Debug)]
1119pub struct PartitionedDelayBuffer<T> {
1120 delay: usize,
1121 buffers: FxHashMap<String, DelayBuffer<T>>,
1122}
1123
1124impl<T: Clone> PartitionedDelayBuffer<T> {
1125 pub fn new(delay: usize) -> Self {
1127 Self {
1128 delay,
1129 buffers: FxHashMap::default(),
1130 }
1131 }
1132
1133 pub fn push(&mut self, key: &str, item: T) -> Option<T> {
1135 let buffer = self
1136 .buffers
1137 .entry(key.to_string())
1138 .or_insert_with(|| DelayBuffer::new(self.delay));
1139 buffer.push(item)
1140 }
1141
1142 pub fn current(&self, key: &str) -> Option<&T> {
1144 self.buffers.get(key).and_then(|b| b.current())
1145 }
1146
1147 pub fn previous(&self, key: &str) -> Option<&T> {
1149 self.buffers.get(key).and_then(|b| b.previous())
1150 }
1151
1152 pub fn is_ready(&self, key: &str) -> bool {
1154 self.buffers.get(key).is_some_and(|b| b.is_ready())
1155 }
1156}
1157
1158#[derive(Debug)]
1160pub struct PartitionedPreviousValueTracker<T> {
1161 trackers: FxHashMap<String, PreviousValueTracker<T>>,
1162}
1163
1164impl<T: Clone> Default for PartitionedPreviousValueTracker<T> {
1165 fn default() -> Self {
1166 Self::new()
1167 }
1168}
1169
1170impl<T: Clone> PartitionedPreviousValueTracker<T> {
1171 pub fn new() -> Self {
1173 Self {
1174 trackers: FxHashMap::default(),
1175 }
1176 }
1177
1178 pub fn update(&mut self, key: &str, value: T) {
1180 let tracker = self
1181 .trackers
1182 .entry(key.to_string())
1183 .or_insert_with(PreviousValueTracker::new);
1184 tracker.update(value);
1185 }
1186
1187 pub fn current(&self, key: &str) -> Option<&T> {
1189 self.trackers.get(key).and_then(|t| t.current())
1190 }
1191
1192 pub fn previous(&self, key: &str) -> Option<&T> {
1194 self.trackers.get(key).and_then(|t| t.previous())
1195 }
1196
1197 pub fn has_both(&self, key: &str) -> bool {
1199 self.trackers.get(key).is_some_and(|t| t.has_both())
1200 }
1201
1202 pub fn get_pair(&self, key: &str) -> Option<(&T, &T)> {
1204 self.trackers.get(key).and_then(|t| t.get_pair())
1205 }
1206}
1207
1208#[derive(Debug)]
1222pub struct IncrementalSlidingWindow {
1223 window_size: Duration,
1224 slide_interval: Duration,
1225 events: VecDeque<(SharedEvent, Option<f64>)>,
1227 last_emit: Option<DateTime<Utc>>,
1228 field: String,
1230 sum: f64,
1232 count: usize,
1233 minmax: crate::simd::IncrementalMinMax,
1236}
1237
1238impl IncrementalSlidingWindow {
1239 pub fn new(window_size: Duration, slide_interval: Duration, field: impl Into<String>) -> Self {
1241 Self {
1242 window_size,
1243 slide_interval,
1244 events: VecDeque::new(),
1245 last_emit: None,
1246 field: field.into(),
1247 sum: 0.0,
1248 count: 0,
1249 minmax: crate::simd::IncrementalMinMax::new(),
1250 }
1251 }
1252
1253 pub fn add(&mut self, event: SharedEvent) -> Option<IncrementalAggregates> {
1255 let event_time = event.timestamp;
1256
1257 let value = event.get_float(&self.field);
1259
1260 if let Some(v) = value {
1262 if !v.is_nan() {
1263 self.sum += v;
1264 self.count += 1;
1265 self.minmax.add(v);
1266 }
1267 }
1268
1269 self.events.push_back((event, value));
1270
1271 let cutoff = event_time - self.window_size;
1273 while let Some((front_event, front_value)) = self.events.front() {
1274 if front_event.timestamp >= cutoff {
1275 break;
1276 }
1277 if let Some(v) = front_value {
1279 if !v.is_nan() {
1280 self.sum -= v;
1281 self.count = self.count.saturating_sub(1);
1282 self.minmax.remove(*v);
1283 }
1284 }
1285 self.events.pop_front();
1286 }
1287
1288 let should_emit = match self.last_emit {
1290 None => true,
1291 Some(last) => event_time >= last + self.slide_interval,
1292 };
1293
1294 should_emit.then(|| {
1295 self.last_emit = Some(event_time);
1296 IncrementalAggregates {
1297 sum: self.sum,
1298 count: self.count,
1299 avg: if self.count > 0 {
1300 Some(self.sum / self.count as f64)
1301 } else {
1302 None
1303 },
1304 min: self.minmax.min(),
1305 max: self.minmax.max(),
1306 event_count: self.events.len(),
1307 }
1308 })
1309 }
1310
1311 pub fn current_aggregates(&mut self) -> IncrementalAggregates {
1313 IncrementalAggregates {
1314 sum: self.sum,
1315 count: self.count,
1316 avg: if self.count > 0 {
1317 Some(self.sum / self.count as f64)
1318 } else {
1319 None
1320 },
1321 min: self.minmax.min(),
1322 max: self.minmax.max(),
1323 event_count: self.events.len(),
1324 }
1325 }
1326
1327 pub fn events(&self) -> impl Iterator<Item = &SharedEvent> {
1329 self.events.iter().map(|(e, _)| e)
1330 }
1331
1332 pub fn reset(&mut self) {
1334 self.events.clear();
1335 self.last_emit = None;
1336 self.sum = 0.0;
1337 self.count = 0;
1338 self.minmax.reset();
1339 }
1340}
1341
1342#[derive(Debug, Clone)]
1344pub struct IncrementalAggregates {
1345 pub sum: f64,
1346 pub count: usize,
1347 pub avg: Option<f64>,
1348 pub min: Option<f64>,
1349 pub max: Option<f64>,
1350 pub event_count: usize,
1352}
1353
1354#[cfg(test)]
1355mod tests {
1356 use super::*;
1357
1358 #[test]
1359 fn test_tumbling_window() {
1360 let mut window = TumblingWindow::new(Duration::seconds(5));
1361 let base_time = Utc::now();
1362
1363 for i in 0..3 {
1365 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
1366 assert!(window.add(event).is_none());
1367 }
1368
1369 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(6));
1371 let result = window.add(event);
1372 assert!(result.is_some());
1373 assert_eq!(result.unwrap().len(), 3);
1374 }
1375
1376 #[test]
1377 fn test_sliding_window() {
1378 let mut window = SlidingWindow::new(Duration::seconds(10), Duration::seconds(2));
1379 let base_time = Utc::now();
1380
1381 let event = Event::new("Test").with_timestamp(base_time);
1383 assert!(window.add(event).is_some());
1384
1385 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
1387 assert!(window.add(event).is_none());
1388
1389 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(3));
1391 let result = window.add(event);
1392 assert!(result.is_some());
1393 assert_eq!(result.unwrap().len(), 3);
1394 }
1395
1396 #[test]
1397 fn test_count_window() {
1398 let mut window = CountWindow::new(3);
1399
1400 let event1 = Event::new("Test").with_field("value", 1i64);
1402 assert!(window.add(event1).is_none());
1403 assert_eq!(window.current_count(), 1);
1404
1405 let event2 = Event::new("Test").with_field("value", 2i64);
1406 assert!(window.add(event2).is_none());
1407 assert_eq!(window.current_count(), 2);
1408
1409 let event3 = Event::new("Test").with_field("value", 3i64);
1411 let result = window.add(event3);
1412 assert!(result.is_some());
1413 assert_eq!(result.unwrap().len(), 3);
1414 assert_eq!(window.current_count(), 0);
1415 }
1416
1417 #[test]
1418 fn test_count_window_flush() {
1419 let mut window = CountWindow::new(5);
1420
1421 for i in 0..3 {
1423 let event = Event::new("Test").with_field("value", i as i64);
1424 window.add(event);
1425 }
1426
1427 let flushed = window.flush();
1429 assert_eq!(flushed.len(), 3);
1430 assert_eq!(window.current_count(), 0);
1431 }
1432
1433 #[test]
1434 fn test_sliding_count_window() {
1435 let mut window = SlidingCountWindow::new(5, 2);
1436
1437 for i in 0..5 {
1439 let event = Event::new("Test").with_field("value", i as i64);
1440 let result = window.add(event);
1441 if i < 4 {
1442 assert!(result.is_none(), "Should not emit before window is full");
1443 } else {
1444 assert!(result.is_some(), "Should emit when window is full");
1445 assert_eq!(result.unwrap().len(), 5);
1446 }
1447 }
1448
1449 for i in 5..7 {
1451 let event = Event::new("Test").with_field("value", i as i64);
1452 let result = window.add(event);
1453 if i < 6 {
1454 assert!(result.is_none());
1455 } else {
1456 assert!(result.is_some());
1457 assert_eq!(result.unwrap().len(), 5);
1459 }
1460 }
1461 }
1462
1463 #[test]
1464 fn test_tumbling_window_flush() {
1465 let mut window = TumblingWindow::new(Duration::seconds(10));
1466 let base_time = Utc::now();
1467
1468 for i in 0..3 {
1470 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
1471 window.add(event);
1472 }
1473
1474 let flushed = window.flush();
1476 assert_eq!(flushed.len(), 3);
1477 }
1478
1479 #[test]
1480 fn test_sliding_window_current() {
1481 let mut window = SlidingWindow::new(Duration::seconds(10), Duration::seconds(1));
1482 let base_time = Utc::now();
1483
1484 window.add(Event::new("Test").with_timestamp(base_time));
1486 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
1487 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
1488
1489 let current = window.current();
1491 assert_eq!(current.len(), 3);
1492 }
1493
1494 #[test]
1495 fn test_sliding_window_expiry() {
1496 let mut window = SlidingWindow::new(Duration::seconds(5), Duration::seconds(1));
1497 let base_time = Utc::now();
1498
1499 window.add(Event::new("Test").with_timestamp(base_time));
1501
1502 let later_event = Event::new("Test").with_timestamp(base_time + Duration::seconds(10));
1504 window.add(later_event);
1505
1506 let current = window.current();
1508 assert_eq!(current.len(), 1);
1509 }
1510
1511 #[test]
1512 fn test_partitioned_tumbling_window() {
1513 let mut window = PartitionedTumblingWindow::new("region".to_string(), Duration::seconds(5));
1514 let base_time = Utc::now();
1515
1516 let event1 = Event::new("Test")
1518 .with_timestamp(base_time)
1519 .with_field("region", "east");
1520 window.add(event1);
1521
1522 let event2 = Event::new("Test")
1524 .with_timestamp(base_time)
1525 .with_field("region", "west");
1526 window.add(event2);
1527
1528 let event3 = Event::new("Test")
1530 .with_timestamp(base_time + Duration::seconds(6))
1531 .with_field("region", "east");
1532 let result = window.add(event3);
1533 assert!(result.is_some());
1534 assert_eq!(result.unwrap().len(), 1);
1536 }
1537
1538 #[test]
1539 fn test_partitioned_sliding_window() {
1540 let mut window = PartitionedSlidingWindow::new(
1541 "region".to_string(),
1542 Duration::seconds(10),
1543 Duration::seconds(2),
1544 );
1545 let base_time = Utc::now();
1546
1547 let event = Event::new("Test")
1549 .with_timestamp(base_time)
1550 .with_field("region", "east");
1551 let result = window.add(event);
1552 assert!(result.is_some());
1553 assert_eq!(result.unwrap().len(), 1);
1554
1555 let event = Event::new("Test")
1557 .with_timestamp(base_time + Duration::seconds(1))
1558 .with_field("region", "west");
1559 let result = window.add(event);
1560 assert!(result.is_some());
1562 }
1563
1564 #[test]
1565 fn test_count_window_multiple_completions() {
1566 let mut window = CountWindow::new(2);
1567
1568 window.add(Event::new("Test").with_field("batch", 1i64));
1570 let result = window.add(Event::new("Test").with_field("batch", 1i64));
1571 assert!(result.is_some());
1572 assert_eq!(result.unwrap().len(), 2);
1573
1574 window.add(Event::new("Test").with_field("batch", 2i64));
1576 let result = window.add(Event::new("Test").with_field("batch", 2i64));
1577 assert!(result.is_some());
1578 assert_eq!(result.unwrap().len(), 2);
1579 }
1580
1581 #[test]
1586 fn test_delay_buffer_basic() {
1587 let mut delay: DelayBuffer<i32> = DelayBuffer::new(1);
1588
1589 assert_eq!(delay.push(10), None);
1591 assert!(!delay.is_empty());
1592 assert_eq!(delay.len(), 1);
1593
1594 assert_eq!(delay.push(20), Some(10));
1596 assert_eq!(delay.len(), 1);
1597
1598 assert_eq!(delay.push(30), Some(20));
1600 assert_eq!(delay.len(), 1);
1601 }
1602
1603 #[test]
1604 fn test_delay_buffer_delay_2() {
1605 let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1606
1607 assert_eq!(delay.push(10), None);
1609 assert_eq!(delay.push(20), None);
1610 assert_eq!(delay.len(), 2);
1611
1612 assert_eq!(delay.push(30), Some(10));
1614
1615 assert_eq!(delay.push(40), Some(20));
1617 }
1618
1619 #[test]
1620 fn test_delay_buffer_current_previous() {
1621 let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1622
1623 delay.push(10);
1624 delay.push(20);
1625
1626 assert_eq!(delay.current(), Some(&20));
1627 assert_eq!(delay.previous(), Some(&10));
1628 assert_eq!(delay.oldest(), Some(&10));
1629 }
1630
1631 #[test]
1632 fn test_delay_buffer_is_ready() {
1633 let mut delay: DelayBuffer<i32> = DelayBuffer::new(3);
1634
1635 assert!(!delay.is_ready());
1636 delay.push(1);
1637 assert!(!delay.is_ready());
1638 delay.push(2);
1639 assert!(!delay.is_ready());
1640 delay.push(3);
1641 assert!(delay.is_ready());
1642 }
1643
1644 #[test]
1645 fn test_delay_buffer_flush() {
1646 let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1647
1648 delay.push(10);
1649 delay.push(20);
1650 delay.push(30);
1651
1652 let flushed = delay.flush();
1653 assert_eq!(flushed, vec![20, 30]);
1654 assert!(delay.is_empty());
1655 }
1656
1657 #[test]
1658 fn test_delay_buffer_clear() {
1659 let mut delay: DelayBuffer<i32> = DelayBuffer::new(2);
1660
1661 delay.push(10);
1662 delay.push(20);
1663
1664 delay.clear();
1665 assert!(delay.is_empty());
1666 assert_eq!(delay.len(), 0);
1667 }
1668
1669 #[test]
1670 fn test_delay_buffer_with_events() {
1671 let mut delay: DelayBuffer<Event> = DelayBuffer::new(1);
1672
1673 let e1 = Event::new("Test").with_field("value", 100i64);
1674 let e2 = Event::new("Test").with_field("value", 200i64);
1675
1676 assert!(delay.push(e1).is_none());
1677 let output = delay.push(e2);
1678 assert!(output.is_some());
1679 assert_eq!(output.unwrap().get_int("value"), Some(100));
1680 }
1681
1682 #[test]
1687 fn test_previous_value_tracker_basic() {
1688 let mut tracker: PreviousValueTracker<f64> = PreviousValueTracker::new();
1689
1690 assert!(tracker.current().is_none());
1692 assert!(tracker.previous().is_none());
1693 assert!(!tracker.has_both());
1694
1695 tracker.update(10.0);
1697 assert_eq!(tracker.current(), Some(&10.0));
1698 assert!(tracker.previous().is_none());
1699 assert!(!tracker.has_both());
1700
1701 tracker.update(20.0);
1703 assert_eq!(tracker.current(), Some(&20.0));
1704 assert_eq!(tracker.previous(), Some(&10.0));
1705 assert!(tracker.has_both());
1706
1707 tracker.update(30.0);
1709 assert_eq!(tracker.current(), Some(&30.0));
1710 assert_eq!(tracker.previous(), Some(&20.0));
1711 }
1712
1713 #[test]
1714 fn test_previous_value_tracker_get_pair() {
1715 let mut tracker: PreviousValueTracker<i32> = PreviousValueTracker::new();
1716
1717 assert!(tracker.get_pair().is_none());
1719
1720 tracker.update(1);
1721 assert!(tracker.get_pair().is_none());
1722
1723 tracker.update(2);
1724 let pair = tracker.get_pair();
1725 assert!(pair.is_some());
1726 let (curr, prev) = pair.unwrap();
1727 assert_eq!(*curr, 2);
1728 assert_eq!(*prev, 1);
1729 }
1730
1731 #[test]
1732 fn test_previous_value_tracker_reset() {
1733 let mut tracker: PreviousValueTracker<i32> = PreviousValueTracker::new();
1734
1735 tracker.update(1);
1736 tracker.update(2);
1737 assert!(tracker.has_both());
1738
1739 tracker.reset();
1740 assert!(!tracker.has_both());
1741 assert!(tracker.current().is_none());
1742 assert!(tracker.previous().is_none());
1743 }
1744
1745 #[test]
1746 fn test_previous_value_tracker_threshold_comparison() {
1747 let mut tracker: PreviousValueTracker<f64> = PreviousValueTracker::new();
1748 let threshold = 5.0;
1749
1750 tracker.update(100.0);
1751 tracker.update(107.0);
1752
1753 if let Some((curr, prev)) = tracker.get_pair() {
1754 let diff = curr - prev;
1755 assert!(diff > threshold, "Should detect change > threshold");
1756 }
1757 }
1758
1759 #[test]
1764 fn test_partitioned_delay_buffer() {
1765 let mut buffer: PartitionedDelayBuffer<i32> = PartitionedDelayBuffer::new(1);
1766
1767 assert_eq!(buffer.push("a", 10), None);
1769 assert_eq!(buffer.push("a", 20), Some(10));
1770
1771 assert_eq!(buffer.push("b", 100), None);
1773 assert_eq!(buffer.push("b", 200), Some(100));
1774
1775 assert_eq!(buffer.current("a"), Some(&20));
1777 assert_eq!(buffer.current("b"), Some(&200));
1778 }
1779
1780 #[test]
1781 fn test_partitioned_delay_buffer_previous() {
1782 let mut buffer: PartitionedDelayBuffer<i32> = PartitionedDelayBuffer::new(2);
1783
1784 buffer.push("x", 1);
1785 buffer.push("x", 2);
1786
1787 assert_eq!(buffer.current("x"), Some(&2));
1788 assert_eq!(buffer.previous("x"), Some(&1));
1789 assert!(buffer.is_ready("x"));
1790 assert!(!buffer.is_ready("y")); }
1792
1793 #[test]
1798 fn test_partitioned_previous_tracker() {
1799 let mut tracker: PartitionedPreviousValueTracker<f64> =
1800 PartitionedPreviousValueTracker::new();
1801
1802 tracker.update("IBM", 100.0);
1804 tracker.update("IBM", 105.0);
1805
1806 tracker.update("MSFT", 200.0);
1808 tracker.update("MSFT", 198.0);
1809
1810 assert!(tracker.has_both("IBM"));
1812 let (curr, prev) = tracker.get_pair("IBM").unwrap();
1813 assert_eq!(*curr, 105.0);
1814 assert_eq!(*prev, 100.0);
1815
1816 assert!(tracker.has_both("MSFT"));
1818 let (curr, prev) = tracker.get_pair("MSFT").unwrap();
1819 assert_eq!(*curr, 198.0);
1820 assert_eq!(*prev, 200.0);
1821
1822 assert!(!tracker.has_both("AAPL"));
1824 }
1825
1826 #[test]
1827 fn test_partitioned_previous_tracker_avg_change_detection() {
1828 let mut tracker: PartitionedPreviousValueTracker<f64> =
1831 PartitionedPreviousValueTracker::new();
1832 let threshold = 1.0;
1833
1834 let averages = vec![
1836 ("ibm", 100.0),
1837 ("msft", 50.0),
1838 ("ibm", 100.5), ("msft", 52.5), ("ibm", 102.5), ];
1842
1843 let mut alerts = Vec::new();
1844
1845 for (symbol, avg) in averages {
1846 tracker.update(symbol, avg);
1847
1848 if let Some((curr, prev)) = tracker.get_pair(symbol) {
1849 let diff = (curr - prev).abs();
1850 if diff > threshold {
1851 alerts.push((symbol.to_string(), diff));
1852 }
1853 }
1854 }
1855
1856 assert_eq!(alerts.len(), 2);
1857 assert_eq!(alerts[0].0, "msft");
1858 assert!((alerts[0].1 - 2.5).abs() < 0.001);
1859 assert_eq!(alerts[1].0, "ibm");
1860 assert!((alerts[1].1 - 2.0).abs() < 0.001);
1861 }
1862
1863 #[test]
1864 fn test_incremental_sliding_window_basic() {
1865 let mut window =
1866 IncrementalSlidingWindow::new(Duration::seconds(5), Duration::seconds(1), "price");
1867 let base_time = Utc::now();
1868
1869 for i in 0..5 {
1871 let event = Event::new("Trade")
1872 .with_timestamp(base_time + Duration::milliseconds(i * 500))
1873 .with_field("price", varpulis_core::Value::Float(100.0 + i as f64));
1874 let shared = Arc::new(event);
1875 let _ = window.add(shared);
1876 }
1877
1878 let agg = window.current_aggregates();
1880 assert_eq!(agg.count, 5);
1881 assert_eq!(agg.sum, 510.0); assert!((agg.avg.unwrap() - 102.0).abs() < 0.001);
1883 assert_eq!(agg.min, Some(100.0));
1884 assert_eq!(agg.max, Some(104.0));
1885 }
1886
1887 #[test]
1888 fn test_incremental_sliding_window_expiry() {
1889 let mut window =
1890 IncrementalSlidingWindow::new(Duration::seconds(2), Duration::seconds(1), "value");
1891 let base_time = Utc::now();
1892
1893 for i in 0..3 {
1895 let event = Event::new("Metric")
1896 .with_timestamp(base_time + Duration::seconds(i))
1897 .with_field("value", varpulis_core::Value::Float(10.0 * (i + 1) as f64));
1898 window.add(Arc::new(event));
1899 }
1900
1901 let agg = window.current_aggregates();
1903 assert_eq!(agg.count, 3);
1904 assert_eq!(agg.sum, 60.0); let event = Event::new("Metric")
1908 .with_timestamp(base_time + Duration::seconds(3))
1909 .with_field("value", varpulis_core::Value::Float(40.0));
1910 window.add(Arc::new(event));
1911
1912 let agg = window.current_aggregates();
1913 assert_eq!(agg.count, 3); assert_eq!(agg.sum, 90.0); assert_eq!(agg.min, Some(20.0)); }
1917
1918 #[test]
1919 fn test_incremental_sliding_window_emit() {
1920 let mut window =
1921 IncrementalSlidingWindow::new(Duration::seconds(5), Duration::seconds(2), "value");
1922 let base_time = Utc::now();
1923
1924 let event = Event::new("Test")
1926 .with_timestamp(base_time)
1927 .with_field("value", varpulis_core::Value::Float(100.0));
1928 let result = window.add(Arc::new(event));
1929 assert!(result.is_some());
1930
1931 let event = Event::new("Test")
1933 .with_timestamp(base_time + Duration::seconds(1))
1934 .with_field("value", varpulis_core::Value::Float(200.0));
1935 let result = window.add(Arc::new(event));
1936 assert!(result.is_none());
1937
1938 let event = Event::new("Test")
1940 .with_timestamp(base_time + Duration::seconds(2))
1941 .with_field("value", varpulis_core::Value::Float(300.0));
1942 let result = window.add(Arc::new(event));
1943 assert!(result.is_some());
1944
1945 let agg = result.unwrap();
1946 assert_eq!(agg.count, 3);
1947 assert_eq!(agg.sum, 600.0);
1948 }
1949
1950 #[test]
1955 fn test_session_window_basic() {
1956 let mut window = SessionWindow::new(Duration::seconds(5));
1957 let base_time = Utc::now();
1958
1959 assert!(window
1961 .add(Event::new("Test").with_timestamp(base_time))
1962 .is_none());
1963 assert!(window
1964 .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)))
1965 .is_none());
1966 assert!(window
1967 .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(4)))
1968 .is_none());
1969
1970 let result =
1972 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(10)));
1973 assert!(result.is_some());
1974 assert_eq!(
1975 result.unwrap().len(),
1976 3,
1977 "First session should have 3 events"
1978 );
1979 }
1980
1981 #[test]
1982 fn test_session_window_no_gap() {
1983 let mut window = SessionWindow::new(Duration::seconds(10));
1984 let base_time = Utc::now();
1985
1986 for i in 0..5 {
1988 assert!(window
1989 .add(Event::new("Test").with_timestamp(base_time + Duration::seconds(i)))
1990 .is_none());
1991 }
1992
1993 let flushed = window.flush();
1995 assert_eq!(flushed.len(), 5);
1996 }
1997
1998 #[test]
1999 fn test_session_window_multiple_sessions() {
2000 let mut window = SessionWindow::new(Duration::seconds(3));
2001 let base_time = Utc::now();
2002
2003 window.add(Event::new("Test").with_timestamp(base_time));
2005 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2006 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2007
2008 let result =
2010 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(7)));
2011 assert!(result.is_some());
2012 assert_eq!(result.unwrap().len(), 3);
2013
2014 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(8)));
2016
2017 let result =
2019 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(12)));
2020 assert!(result.is_some());
2021 assert_eq!(result.unwrap().len(), 2);
2022 }
2023
2024 #[test]
2025 fn test_session_window_flush() {
2026 let mut window = SessionWindow::new(Duration::seconds(5));
2027 let base_time = Utc::now();
2028
2029 window.add(Event::new("Test").with_timestamp(base_time));
2030 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2031
2032 let flushed = window.flush();
2033 assert_eq!(flushed.len(), 2);
2034
2035 let flushed_again = window.flush();
2037 assert_eq!(flushed_again.len(), 0);
2038 }
2039
2040 #[test]
2045 fn test_session_window_check_expired_not_expired() {
2046 let mut window = SessionWindow::new(Duration::seconds(5));
2047 let base_time = Utc::now();
2048
2049 window.add(Event::new("Test").with_timestamp(base_time));
2051 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2052
2053 let check_time = base_time + Duration::seconds(4);
2055 assert!(window.check_expired(check_time).is_none());
2056 }
2057
2058 #[test]
2059 fn test_session_window_check_expired_returns_events() {
2060 let mut window = SessionWindow::new(Duration::seconds(5));
2061 let base_time = Utc::now();
2062
2063 window.add(Event::new("Test").with_timestamp(base_time));
2065 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(1)));
2066 window.add(Event::new("Test").with_timestamp(base_time + Duration::seconds(2)));
2067
2068 let check_time = base_time + Duration::seconds(8);
2070 let result = window.check_expired(check_time);
2071 assert!(result.is_some());
2072 assert_eq!(result.unwrap().len(), 3);
2073
2074 let flushed = window.flush();
2076 assert_eq!(flushed.len(), 0);
2077 }
2078
2079 #[test]
2080 fn test_session_window_check_expired_empty() {
2081 let mut window = SessionWindow::new(Duration::seconds(5));
2082 assert!(window.check_expired(Utc::now()).is_none());
2084 }
2085
2086 #[test]
2087 fn test_session_window_gap_getter() {
2088 let window = SessionWindow::new(Duration::seconds(7));
2089 assert_eq!(window.gap(), Duration::seconds(7));
2090 }
2091
2092 #[test]
2093 fn test_partitioned_session_window_check_expired() {
2094 let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(5));
2095 let base_time = Utc::now();
2096
2097 window.add(
2099 Event::new("Test")
2100 .with_timestamp(base_time)
2101 .with_field("region", "east"),
2102 );
2103 window.add(
2104 Event::new("Test")
2105 .with_timestamp(base_time + Duration::seconds(3))
2106 .with_field("region", "west"),
2107 );
2108
2109 let check_time = base_time + Duration::seconds(6);
2112 let expired = window.check_expired(check_time);
2113 assert_eq!(expired.len(), 1);
2114 assert_eq!(expired[0].1.len(), 1);
2115
2116 let check_time2 = base_time + Duration::seconds(9);
2118 let expired2 = window.check_expired(check_time2);
2119 assert_eq!(expired2.len(), 1);
2120 }
2121
2122 #[test]
2123 fn test_partitioned_session_window_removes_expired() {
2124 let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(3));
2125 let base_time = Utc::now();
2126
2127 window.add(
2128 Event::new("Test")
2129 .with_timestamp(base_time)
2130 .with_field("region", "east"),
2131 );
2132 window.add(
2133 Event::new("Test")
2134 .with_timestamp(base_time)
2135 .with_field("region", "west"),
2136 );
2137
2138 let check_time = base_time + Duration::seconds(5);
2140 let expired = window.check_expired(check_time);
2141 assert_eq!(expired.len(), 2);
2142
2143 let flushed = window.flush();
2145 assert_eq!(flushed.len(), 0);
2146 }
2147
2148 #[test]
2149 fn test_partitioned_session_window_gap_getter() {
2150 let window = PartitionedSessionWindow::new("k".to_string(), Duration::seconds(10));
2151 assert_eq!(window.gap(), Duration::seconds(10));
2152 }
2153
2154 #[test]
2155 fn test_partitioned_session_window() {
2156 let mut window = PartitionedSessionWindow::new("region".to_string(), Duration::seconds(5));
2157 let base_time = Utc::now();
2158
2159 window.add(
2161 Event::new("Test")
2162 .with_timestamp(base_time)
2163 .with_field("region", "east"),
2164 );
2165 window.add(
2166 Event::new("Test")
2167 .with_timestamp(base_time + Duration::seconds(2))
2168 .with_field("region", "east"),
2169 );
2170
2171 window.add(
2173 Event::new("Test")
2174 .with_timestamp(base_time)
2175 .with_field("region", "west"),
2176 );
2177
2178 let result = window.add(
2180 Event::new("Test")
2181 .with_timestamp(base_time + Duration::seconds(8))
2182 .with_field("region", "east"),
2183 );
2184 assert!(result.is_some());
2185 assert_eq!(
2186 result.unwrap().len(),
2187 2,
2188 "East session should have 2 events"
2189 );
2190
2191 let result = window.add(
2193 Event::new("Test")
2194 .with_timestamp(base_time + Duration::seconds(3))
2195 .with_field("region", "west"),
2196 );
2197 assert!(result.is_none(), "West session should still be open");
2198 }
2199
2200 #[test]
2201 fn test_tumbling_window_checkpoint_restore() {
2202 let mut window = TumblingWindow::new(Duration::seconds(5));
2203 let base_time = Utc::now();
2204
2205 let e1 = Event::new("Test").with_timestamp(base_time);
2207 assert!(window.add(e1).is_none());
2208 let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
2209 assert!(window.add(e2).is_none());
2210
2211 let cp = window.checkpoint();
2213
2214 let mut restored = TumblingWindow::new(Duration::seconds(5));
2216 restored.restore(&cp);
2217
2218 let e3 = Event::new("Test").with_timestamp(base_time + Duration::seconds(6));
2220 let result = restored.add(e3);
2221 assert!(
2222 result.is_some(),
2223 "Window should close on event past boundary"
2224 );
2225 let emitted = result.unwrap();
2226 assert_eq!(
2227 emitted.len(),
2228 2,
2229 "Restored window should contain the 2 checkpointed events"
2230 );
2231 }
2232
2233 #[test]
2234 fn test_count_window_checkpoint_restore() {
2235 let mut window = CountWindow::new(3);
2236
2237 let e1 = Event::new("Test").with_field("value", 1i64);
2239 assert!(window.add(e1).is_none());
2240 let e2 = Event::new("Test").with_field("value", 2i64);
2241 assert!(window.add(e2).is_none());
2242
2243 let cp = window.checkpoint();
2245
2246 let mut restored = CountWindow::new(3);
2248 restored.restore(&cp);
2249
2250 let e3 = Event::new("Test").with_field("value", 3i64);
2252 let result = restored.add(e3);
2253 assert!(result.is_some(), "Window should emit after 3rd event");
2254 let emitted = result.unwrap();
2255 assert_eq!(emitted.len(), 3, "All 3 events should be returned");
2256 }
2257
2258 #[test]
2259 fn test_sliding_count_window_checkpoint_restore() {
2260 let mut window = SlidingCountWindow::new(4, 2);
2262
2263 let e1 = Event::new("Test").with_field("seq", 1i64);
2265 assert!(window.add(e1).is_none());
2266 let e2 = Event::new("Test").with_field("seq", 2i64);
2267 assert!(window.add(e2).is_none());
2268
2269 let cp = window.checkpoint();
2271
2272 let mut restored = SlidingCountWindow::new(4, 2);
2274 restored.restore(&cp);
2275
2276 let e3 = Event::new("Test").with_field("seq", 3i64);
2278 assert!(restored.add(e3).is_none());
2279 let e4 = Event::new("Test").with_field("seq", 4i64);
2280 let result = restored.add(e4);
2281 assert!(
2282 result.is_some(),
2283 "Should emit when window is full and slide interval reached"
2284 );
2285 assert_eq!(
2286 result.unwrap().len(),
2287 4,
2288 "Window should contain all 4 events"
2289 );
2290
2291 let e5 = Event::new("Test").with_field("seq", 5i64);
2293 assert!(restored.add(e5).is_none());
2294 let e6 = Event::new("Test").with_field("seq", 6i64);
2295 let result = restored.add(e6);
2296 assert!(result.is_some(), "Should emit after slide_size more events");
2297 let emitted = result.unwrap();
2298 assert_eq!(
2299 emitted.len(),
2300 4,
2301 "Sliding window should still hold 4 events"
2302 );
2303 }
2304
2305 #[test]
2306 fn test_session_window_checkpoint_restore() {
2307 let mut window = SessionWindow::new(Duration::seconds(5));
2308 let base_time = Utc::now();
2309
2310 let e1 = Event::new("Test").with_timestamp(base_time);
2312 assert!(window.add(e1).is_none());
2313 let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(1));
2314 assert!(window.add(e2).is_none());
2315
2316 let cp = window.checkpoint();
2318
2319 let mut restored = SessionWindow::new(Duration::seconds(5));
2321 restored.restore(&cp);
2322
2323 let e3 = Event::new("Test").with_timestamp(base_time + Duration::seconds(7));
2325 let result = restored.add(e3);
2326 assert!(
2327 result.is_some(),
2328 "Session should close due to gap exceeding 5s"
2329 );
2330 let emitted = result.unwrap();
2331 assert_eq!(
2332 emitted.len(),
2333 2,
2334 "Closed session should contain the 2 checkpointed events"
2335 );
2336 }
2337
2338 #[test]
2339 fn test_tumbling_window_advance_watermark() {
2340 let mut window = TumblingWindow::new(Duration::seconds(5));
2341 let base_time = Utc::now();
2342
2343 for i in 0..3 {
2345 let event = Event::new("Test").with_timestamp(base_time + Duration::seconds(i));
2346 assert!(window.add(event).is_none());
2347 }
2348
2349 let result = window.advance_watermark(base_time + Duration::seconds(6));
2351 assert!(
2352 result.is_some(),
2353 "Watermark past window end should emit events"
2354 );
2355 let emitted = result.unwrap();
2356 assert_eq!(
2357 emitted.len(),
2358 3,
2359 "All 3 events in the window should be emitted"
2360 );
2361 }
2362
2363 #[test]
2364 fn test_session_window_advance_watermark() {
2365 let mut window = SessionWindow::new(Duration::seconds(5));
2366 let base_time = Utc::now();
2367
2368 let e1 = Event::new("Test").with_timestamp(base_time);
2370 assert!(window.add(e1).is_none());
2371 let e2 = Event::new("Test").with_timestamp(base_time + Duration::seconds(2));
2372 assert!(window.add(e2).is_none());
2373
2374 let result = window.advance_watermark(base_time + Duration::seconds(8));
2376 assert!(
2377 result.is_some(),
2378 "Watermark past session gap should close the session"
2379 );
2380 let emitted = result.unwrap();
2381 assert_eq!(
2382 emitted.len(),
2383 2,
2384 "Closed session should contain the 2 events"
2385 );
2386 }
2387}