1use chrono::{DateTime, Duration as ChronoDuration, Utc};
39use std::cmp::Ordering;
40use std::collections::HashMap;
41use std::fmt::{self, Debug, Display, Formatter};
42use std::hash::{Hash, Hasher};
43use std::sync::Arc;
44use std::time::Duration;
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum TriggerResult {
49 Continue,
51 Fire,
53 FireAndPurge,
55 Purge,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum LateDataPolicy {
62 #[default]
64 Drop,
65 SideOutput,
67 AllowLateness(Duration),
69}
70
71#[derive(Debug)]
73pub enum WindowError {
74 InvalidConfig(String),
76 NotFound(String),
78 WindowClosed(String),
80 StateError(String),
82}
83
84impl fmt::Display for WindowError {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 match self {
87 WindowError::InvalidConfig(msg) => write!(f, "Invalid window config: {}", msg),
88 WindowError::NotFound(msg) => write!(f, "Window not found: {}", msg),
89 WindowError::WindowClosed(msg) => write!(f, "Window closed: {}", msg),
90 WindowError::StateError(msg) => write!(f, "State error: {}", msg),
91 }
92 }
93}
94
95impl std::error::Error for WindowError {}
96
97pub type WindowResult<T> = Result<T, WindowError>;
99
100pub trait Window: Clone + Debug + Send + Sync + 'static {
105 fn end_time(&self) -> Option<DateTime<Utc>>;
108
109 fn start_time(&self) -> Option<DateTime<Utc>>;
111
112 fn contains(&self, timestamp: DateTime<Utc>) -> bool;
114}
115
116#[derive(Debug, Clone)]
120pub struct TimeWindow {
121 start: DateTime<Utc>,
123 end: DateTime<Utc>,
125}
126
127impl TimeWindow {
128 pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
130 Self { start, end }
131 }
132
133 pub fn start(&self) -> DateTime<Utc> {
135 self.start
136 }
137
138 pub fn end(&self) -> DateTime<Utc> {
140 self.end
141 }
142
143 pub fn duration(&self) -> ChronoDuration {
145 self.end - self.start
146 }
147
148 pub fn max_timestamp(&self) -> DateTime<Utc> {
150 self.end - ChronoDuration::milliseconds(1)
151 }
152
153 pub fn contains(&self, timestamp: DateTime<Utc>) -> bool {
155 timestamp >= self.start && timestamp < self.end
156 }
157
158 pub fn intersects(&self, other: &TimeWindow) -> bool {
160 self.start < other.end && other.start < self.end
161 }
162
163 pub fn merge(&self, other: &TimeWindow) -> Option<TimeWindow> {
165 if self.intersects(other) || self.end == other.start || other.end == self.start {
166 Some(TimeWindow::new(
167 self.start.min(other.start),
168 self.end.max(other.end),
169 ))
170 } else {
171 None
172 }
173 }
174}
175
176impl PartialEq for TimeWindow {
177 fn eq(&self, other: &Self) -> bool {
178 self.start == other.start && self.end == other.end
179 }
180}
181
182impl Eq for TimeWindow {}
183
184impl Hash for TimeWindow {
185 fn hash<H: Hasher>(&self, state: &mut H) {
186 self.start.hash(state);
187 self.end.hash(state);
188 }
189}
190
191impl PartialOrd for TimeWindow {
192 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
193 Some(self.cmp(other))
194 }
195}
196
197impl Ord for TimeWindow {
198 fn cmp(&self, other: &Self) -> Ordering {
199 self
200 .start
201 .cmp(&other.start)
202 .then_with(|| self.end.cmp(&other.end))
203 }
204}
205
206impl fmt::Display for TimeWindow {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208 write!(
209 f,
210 "[{}, {})",
211 self.start.format("%H:%M:%S"),
212 self.end.format("%H:%M:%S")
213 )
214 }
215}
216
217impl Window for TimeWindow {
218 fn end_time(&self) -> Option<DateTime<Utc>> {
219 Some(self.end)
220 }
221
222 fn start_time(&self) -> Option<DateTime<Utc>> {
223 Some(self.start)
224 }
225
226 fn contains(&self, timestamp: DateTime<Utc>) -> bool {
227 timestamp >= self.start && timestamp < self.end
228 }
229}
230
231#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233pub struct CountWindow {
234 id: u64,
236 max_count: usize,
238}
239
240impl CountWindow {
241 pub fn new(id: u64, max_count: usize) -> Self {
243 Self { id, max_count }
244 }
245
246 pub fn id(&self) -> u64 {
248 self.id
249 }
250
251 pub fn max_count(&self) -> usize {
253 self.max_count
254 }
255}
256
257impl fmt::Display for CountWindow {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 write!(f, "CountWindow(id={}, max={})", self.id, self.max_count)
260 }
261}
262
263#[derive(Debug, Clone)]
265pub struct SessionWindow {
266 start: DateTime<Utc>,
268 end: DateTime<Utc>,
270 gap: Duration,
272}
273
274impl SessionWindow {
275 pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, gap: Duration) -> Self {
277 Self { start, end, gap }
278 }
279
280 pub fn from_element(timestamp: DateTime<Utc>, gap: Duration) -> Self {
282 let gap_chrono = ChronoDuration::from_std(gap).unwrap_or(ChronoDuration::seconds(0));
283 Self {
284 start: timestamp,
285 end: timestamp + gap_chrono,
286 gap,
287 }
288 }
289
290 pub fn start(&self) -> DateTime<Utc> {
292 self.start
293 }
294
295 pub fn end(&self) -> DateTime<Utc> {
297 self.end
298 }
299
300 pub fn gap(&self) -> Duration {
302 self.gap
303 }
304
305 pub fn contains(&self, timestamp: DateTime<Utc>) -> bool {
307 timestamp >= self.start && timestamp < self.end
308 }
309
310 pub fn should_merge(&self, other: &SessionWindow) -> bool {
312 self.start < other.end && other.start < self.end
314 }
315
316 pub fn merge(&self, other: &SessionWindow) -> Option<SessionWindow> {
318 if self.should_merge(other) {
319 Some(SessionWindow::new(
320 self.start.min(other.start),
321 self.end.max(other.end),
322 self.gap.max(other.gap),
323 ))
324 } else {
325 None
326 }
327 }
328
329 pub fn extend(&mut self, timestamp: DateTime<Utc>) {
331 let gap_chrono = ChronoDuration::from_std(self.gap).unwrap_or(ChronoDuration::seconds(0));
332 if timestamp < self.start {
333 self.start = timestamp;
334 }
335 let new_end = timestamp + gap_chrono;
336 if new_end > self.end {
337 self.end = new_end;
338 }
339 }
340}
341
342impl PartialEq for SessionWindow {
343 fn eq(&self, other: &Self) -> bool {
344 self.start == other.start && self.end == other.end
345 }
346}
347
348impl Eq for SessionWindow {}
349
350impl Hash for SessionWindow {
351 fn hash<H: Hasher>(&self, state: &mut H) {
352 self.start.hash(state);
353 self.end.hash(state);
354 }
355}
356
357impl fmt::Display for SessionWindow {
358 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359 write!(
360 f,
361 "Session[{}, {}) gap={:?}",
362 self.start.format("%H:%M:%S"),
363 self.end.format("%H:%M:%S"),
364 self.gap
365 )
366 }
367}
368
369pub trait WindowAssigner: Send + Sync {
374 type W: Clone + Debug + PartialEq + Eq + Hash + Send + Sync;
376
377 fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W>;
387
388 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>>;
390
391 fn is_event_time(&self) -> bool {
393 true
394 }
395}
396
397pub trait WindowTrigger<W>: Send + Sync
404where
405 W: Clone + Debug + PartialEq + Eq + Hash,
406{
407 fn on_element(&mut self, timestamp: DateTime<Utc>, window: &W) -> TriggerResult;
409
410 fn on_processing_time(&mut self, time: DateTime<Utc>, window: &W) -> TriggerResult;
412
413 fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &W) -> TriggerResult;
415
416 fn clear(&mut self, window: &W);
418
419 fn clone_trigger(&self) -> Box<dyn WindowTrigger<W>>;
421}
422
423#[derive(Debug, Clone, Default)]
425pub struct EventTimeTrigger;
426
427impl EventTimeTrigger {
428 pub fn new() -> Self {
430 Self
431 }
432}
433
434impl WindowTrigger<TimeWindow> for EventTimeTrigger {
435 fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
436 TriggerResult::Continue
437 }
438
439 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
440 TriggerResult::Continue
441 }
442
443 fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &TimeWindow) -> TriggerResult {
444 if watermark >= window.end() {
445 TriggerResult::FireAndPurge
446 } else {
447 TriggerResult::Continue
448 }
449 }
450
451 fn clear(&mut self, _window: &TimeWindow) {}
452
453 fn clone_trigger(&self) -> Box<dyn WindowTrigger<TimeWindow>> {
454 Box::new(self.clone())
455 }
456}
457
458#[derive(Debug, Clone)]
460pub struct CountTrigger {
461 count: usize,
463 window_counts: HashMap<CountWindow, usize>,
465}
466
467impl CountTrigger {
468 pub fn new(count: usize) -> Self {
470 Self {
471 count,
472 window_counts: HashMap::new(),
473 }
474 }
475
476 pub fn count(&self) -> usize {
478 self.count
479 }
480}
481
482impl WindowTrigger<CountWindow> for CountTrigger {
483 fn on_element(&mut self, _timestamp: DateTime<Utc>, window: &CountWindow) -> TriggerResult {
484 let count = self.window_counts.entry(window.clone()).or_insert(0);
485 *count += 1;
486
487 if *count >= self.count {
488 TriggerResult::FireAndPurge
489 } else {
490 TriggerResult::Continue
491 }
492 }
493
494 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &CountWindow) -> TriggerResult {
495 TriggerResult::Continue
496 }
497
498 fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &CountWindow) -> TriggerResult {
499 TriggerResult::Continue
500 }
501
502 fn clear(&mut self, window: &CountWindow) {
503 self.window_counts.remove(window);
504 }
505
506 fn clone_trigger(&self) -> Box<dyn WindowTrigger<CountWindow>> {
507 Box::new(self.clone())
508 }
509}
510
511#[derive(Debug, Clone)]
513pub struct SessionTrigger {
514 last_event_times: HashMap<SessionWindow, DateTime<Utc>>,
516}
517
518impl SessionTrigger {
519 pub fn new() -> Self {
521 Self {
522 last_event_times: HashMap::new(),
523 }
524 }
525}
526
527impl Default for SessionTrigger {
528 fn default() -> Self {
529 Self::new()
530 }
531}
532
533impl WindowTrigger<SessionWindow> for SessionTrigger {
534 fn on_element(&mut self, timestamp: DateTime<Utc>, window: &SessionWindow) -> TriggerResult {
535 self.last_event_times.insert(window.clone(), timestamp);
536 TriggerResult::Continue
537 }
538
539 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &SessionWindow) -> TriggerResult {
540 TriggerResult::Continue
541 }
542
543 fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &SessionWindow) -> TriggerResult {
544 if watermark >= window.end() {
546 TriggerResult::FireAndPurge
547 } else {
548 TriggerResult::Continue
549 }
550 }
551
552 fn clear(&mut self, window: &SessionWindow) {
553 self.last_event_times.remove(window);
554 }
555
556 fn clone_trigger(&self) -> Box<dyn WindowTrigger<SessionWindow>> {
557 Box::new(self.clone())
558 }
559}
560
561#[derive(Debug, Clone)]
563pub struct ProcessingTimeTrigger {
564 interval: Duration,
566 last_fire_times: HashMap<TimeWindow, DateTime<Utc>>,
568}
569
570impl ProcessingTimeTrigger {
571 pub fn new(interval: Duration) -> Self {
573 Self {
574 interval,
575 last_fire_times: HashMap::new(),
576 }
577 }
578
579 pub fn interval(&self) -> Duration {
581 self.interval
582 }
583}
584
585impl WindowTrigger<TimeWindow> for ProcessingTimeTrigger {
586 fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
587 TriggerResult::Continue
588 }
589
590 fn on_processing_time(&mut self, time: DateTime<Utc>, window: &TimeWindow) -> TriggerResult {
591 let interval_chrono =
592 ChronoDuration::from_std(self.interval).unwrap_or(ChronoDuration::seconds(1));
593
594 let last_fire = self
595 .last_fire_times
596 .get(window)
597 .copied()
598 .unwrap_or_else(|| time - interval_chrono);
599
600 if time >= last_fire + interval_chrono {
601 self.last_fire_times.insert(window.clone(), time);
602 TriggerResult::Fire
603 } else {
604 TriggerResult::Continue
605 }
606 }
607
608 fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
609 TriggerResult::Continue
610 }
611
612 fn clear(&mut self, window: &TimeWindow) {
613 self.last_fire_times.remove(window);
614 }
615
616 fn clone_trigger(&self) -> Box<dyn WindowTrigger<TimeWindow>> {
617 Box::new(self.clone())
618 }
619}
620
621#[derive(Debug, Clone)]
625pub struct TumblingWindowAssigner {
626 size: Duration,
628 offset: Duration,
630}
631
632impl TumblingWindowAssigner {
633 pub fn new(size: Duration) -> Self {
635 Self {
636 size,
637 offset: Duration::ZERO,
638 }
639 }
640
641 pub fn with_offset(mut self, offset: Duration) -> Self {
643 self.offset = offset;
644 self
645 }
646
647 pub fn size(&self) -> Duration {
649 self.size
650 }
651
652 pub fn offset(&self) -> Duration {
654 self.offset
655 }
656
657 fn window_start(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
659 let ts_millis = timestamp.timestamp_millis();
660 let size_millis = self.size.as_millis() as i64;
661 let offset_millis = self.offset.as_millis() as i64;
662
663 let window_start_millis =
664 ((ts_millis - offset_millis) / size_millis) * size_millis + offset_millis;
665
666 DateTime::from_timestamp_millis(window_start_millis).unwrap_or(timestamp)
667 }
668}
669
670impl WindowAssigner for TumblingWindowAssigner {
671 type W = TimeWindow;
672
673 fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
674 let start = self.window_start(timestamp);
675 let size_chrono = ChronoDuration::from_std(self.size).unwrap_or(ChronoDuration::seconds(1));
676 let end = start + size_chrono;
677
678 vec![TimeWindow::new(start, end)]
679 }
680
681 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
682 Box::new(EventTimeTrigger::new())
683 }
684}
685
686#[derive(Debug, Clone)]
690pub struct SlidingWindowAssigner {
691 size: Duration,
693 slide: Duration,
695 offset: Duration,
697}
698
699impl SlidingWindowAssigner {
700 pub fn new(size: Duration, slide: Duration) -> Self {
702 Self {
703 size,
704 slide,
705 offset: Duration::ZERO,
706 }
707 }
708
709 pub fn with_offset(mut self, offset: Duration) -> Self {
711 self.offset = offset;
712 self
713 }
714
715 pub fn size(&self) -> Duration {
717 self.size
718 }
719
720 pub fn slide(&self) -> Duration {
722 self.slide
723 }
724
725 pub fn offset(&self) -> Duration {
727 self.offset
728 }
729}
730
731impl WindowAssigner for SlidingWindowAssigner {
732 type W = TimeWindow;
733
734 fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
735 let ts_millis = timestamp.timestamp_millis();
736 let size_millis = self.size.as_millis() as i64;
737 let slide_millis = self.slide.as_millis() as i64;
738 let offset_millis = self.offset.as_millis() as i64;
739
740 let size_chrono = ChronoDuration::from_std(self.size).unwrap_or(ChronoDuration::seconds(1));
741
742 let last_start = ((ts_millis - offset_millis) / slide_millis) * slide_millis + offset_millis;
744
745 let num_windows = (size_millis / slide_millis) as usize;
747
748 (0..num_windows)
749 .map(|i| {
750 let start_millis = last_start - (i as i64) * slide_millis;
751 let start = DateTime::from_timestamp_millis(start_millis).unwrap_or(timestamp);
752 let end = start + size_chrono;
753 TimeWindow::new(start, end)
754 })
755 .filter(|w| w.contains(timestamp))
756 .collect()
757 }
758
759 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
760 Box::new(EventTimeTrigger::new())
761 }
762}
763
764#[derive(Debug, Clone)]
768pub struct SessionWindowAssigner {
769 gap: Duration,
771}
772
773impl SessionWindowAssigner {
774 pub fn new(gap: Duration) -> Self {
776 Self { gap }
777 }
778
779 pub fn gap(&self) -> Duration {
781 self.gap
782 }
783}
784
785impl WindowAssigner for SessionWindowAssigner {
786 type W = SessionWindow;
787
788 fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
789 vec![SessionWindow::from_element(timestamp, self.gap)]
790 }
791
792 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
793 Box::new(SessionTrigger::new())
794 }
795}
796
797#[derive(Debug)]
801pub struct CountWindowAssigner {
802 count: usize,
804 current_id: std::sync::atomic::AtomicU64,
806 current_count: std::sync::atomic::AtomicUsize,
808}
809
810impl Clone for CountWindowAssigner {
811 fn clone(&self) -> Self {
812 Self {
813 count: self.count,
814 current_id: std::sync::atomic::AtomicU64::new(
815 self.current_id.load(std::sync::atomic::Ordering::Relaxed),
816 ),
817 current_count: std::sync::atomic::AtomicUsize::new(
818 self
819 .current_count
820 .load(std::sync::atomic::Ordering::Relaxed),
821 ),
822 }
823 }
824}
825
826impl CountWindowAssigner {
827 pub fn new(count: usize) -> Self {
829 Self {
830 count,
831 current_id: std::sync::atomic::AtomicU64::new(0),
832 current_count: std::sync::atomic::AtomicUsize::new(0),
833 }
834 }
835
836 pub fn count(&self) -> usize {
838 self.count
839 }
840}
841
842impl WindowAssigner for CountWindowAssigner {
843 type W = CountWindow;
844
845 fn assign_windows(&self, _timestamp: DateTime<Utc>) -> Vec<Self::W> {
846 let prev_count = self
848 .current_count
849 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
850
851 if prev_count > 0 && prev_count.is_multiple_of(self.count) {
853 self
854 .current_id
855 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
856 self
857 .current_count
858 .store(1, std::sync::atomic::Ordering::Relaxed);
859 }
860
861 let id = self.current_id.load(std::sync::atomic::Ordering::Relaxed);
862
863 vec![CountWindow::new(id, self.count)]
864 }
865
866 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
867 Box::new(CountTrigger::new(self.count))
868 }
869
870 fn is_event_time(&self) -> bool {
871 false
872 }
873}
874
875#[derive(Debug, Clone, Default)]
877pub struct GlobalWindowAssigner;
878
879#[derive(Debug, Clone, PartialEq, Eq, Hash)]
881pub struct GlobalWindow;
882
883impl fmt::Display for GlobalWindow {
884 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885 write!(f, "GlobalWindow")
886 }
887}
888
889impl Window for GlobalWindow {
890 fn end_time(&self) -> Option<DateTime<Utc>> {
891 None }
893
894 fn start_time(&self) -> Option<DateTime<Utc>> {
895 None }
897
898 fn contains(&self, _timestamp: DateTime<Utc>) -> bool {
899 true }
901}
902
903impl GlobalWindowAssigner {
904 pub fn new() -> Self {
906 Self
907 }
908}
909
910impl WindowAssigner for GlobalWindowAssigner {
911 type W = GlobalWindow;
912
913 fn assign_windows(&self, _timestamp: DateTime<Utc>) -> Vec<Self::W> {
914 vec![GlobalWindow]
915 }
916
917 fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
918 Box::new(NeverTrigger)
919 }
920}
921
922#[derive(Debug, Clone, Default)]
924pub struct NeverTrigger;
925
926impl WindowTrigger<GlobalWindow> for NeverTrigger {
927 fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
928 TriggerResult::Continue
929 }
930
931 fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
932 TriggerResult::Continue
933 }
934
935 fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
936 TriggerResult::Continue
937 }
938
939 fn clear(&mut self, _window: &GlobalWindow) {}
940
941 fn clone_trigger(&self) -> Box<dyn WindowTrigger<GlobalWindow>> {
942 Box::new(self.clone())
943 }
944}
945
946#[derive(Debug, Clone)]
948pub struct WindowConfig {
949 pub late_data_policy: LateDataPolicy,
951 pub allowed_lateness: Duration,
953}
954
955impl Default for WindowConfig {
956 fn default() -> Self {
957 Self {
958 late_data_policy: LateDataPolicy::Drop,
959 allowed_lateness: Duration::ZERO,
960 }
961 }
962}
963
964impl WindowConfig {
965 pub fn new() -> Self {
967 Self::default()
968 }
969
970 pub fn with_late_data_policy(mut self, policy: LateDataPolicy) -> Self {
972 self.late_data_policy = policy;
973 self
974 }
975
976 pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
978 self.allowed_lateness = lateness;
979 self
980 }
981}
982
983#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
993pub struct Watermark {
994 pub timestamp: DateTime<Utc>,
996}
997
998impl Watermark {
999 #[must_use]
1001 pub fn new(timestamp: DateTime<Utc>) -> Self {
1002 Self { timestamp }
1003 }
1004
1005 #[must_use]
1007 pub fn min() -> Self {
1008 Self {
1009 timestamp: DateTime::<Utc>::MIN_UTC,
1010 }
1011 }
1012
1013 #[must_use]
1015 pub fn max() -> Self {
1016 Self {
1017 timestamp: DateTime::<Utc>::MAX_UTC,
1018 }
1019 }
1020
1021 #[must_use]
1023 pub fn is_end_of_stream(&self) -> bool {
1024 self.timestamp == DateTime::<Utc>::MAX_UTC
1025 }
1026
1027 pub fn advance(&mut self, timestamp: DateTime<Utc>) -> bool {
1030 if timestamp > self.timestamp {
1031 self.timestamp = timestamp;
1032 true
1033 } else {
1034 false
1035 }
1036 }
1037}
1038
1039impl Display for Watermark {
1040 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1041 if self.is_end_of_stream() {
1042 write!(f, "Watermark(END)")
1043 } else {
1044 write!(f, "Watermark({})", self.timestamp)
1045 }
1046 }
1047}
1048
1049pub trait WatermarkGenerator: Send + Sync + std::fmt::Debug + 'static {
1056 fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark>;
1059
1060 fn on_periodic_emit(&mut self) -> Option<Watermark>;
1063
1064 fn current_watermark(&self) -> Watermark;
1066
1067 fn on_end_of_stream(&mut self) -> Watermark {
1069 Watermark::max()
1070 }
1071}
1072
1073#[derive(Debug, Clone)]
1077pub struct MonotonicWatermarkGenerator {
1078 current: Watermark,
1079}
1080
1081impl MonotonicWatermarkGenerator {
1082 #[must_use]
1084 pub fn new() -> Self {
1085 Self {
1086 current: Watermark::min(),
1087 }
1088 }
1089}
1090
1091impl Default for MonotonicWatermarkGenerator {
1092 fn default() -> Self {
1093 Self::new()
1094 }
1095}
1096
1097impl WatermarkGenerator for MonotonicWatermarkGenerator {
1098 fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1099 if self.current.advance(timestamp) {
1100 Some(self.current.clone())
1101 } else {
1102 None
1103 }
1104 }
1105
1106 fn on_periodic_emit(&mut self) -> Option<Watermark> {
1107 None
1109 }
1110
1111 fn current_watermark(&self) -> Watermark {
1112 self.current.clone()
1113 }
1114}
1115
1116#[derive(Debug, Clone)]
1121pub struct BoundedOutOfOrdernessGenerator {
1122 max_out_of_orderness: Duration,
1124 max_timestamp: Option<DateTime<Utc>>,
1126 current: Watermark,
1128}
1129
1130impl BoundedOutOfOrdernessGenerator {
1131 #[must_use]
1133 pub fn new(max_out_of_orderness: Duration) -> Self {
1134 Self {
1135 max_out_of_orderness,
1136 max_timestamp: None,
1137 current: Watermark::min(),
1138 }
1139 }
1140
1141 fn calculate_watermark(&self) -> Watermark {
1142 match self.max_timestamp {
1143 Some(max_ts) => Watermark::new(max_ts - self.max_out_of_orderness),
1144 None => Watermark::min(),
1145 }
1146 }
1147}
1148
1149impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
1150 fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1151 self.max_timestamp = Some(
1153 self
1154 .max_timestamp
1155 .map(|current| current.max(timestamp))
1156 .unwrap_or(timestamp),
1157 );
1158
1159 let new_watermark = self.calculate_watermark();
1161
1162 if new_watermark.timestamp > self.current.timestamp {
1164 self.current = new_watermark.clone();
1165 Some(new_watermark)
1166 } else {
1167 None
1168 }
1169 }
1170
1171 fn on_periodic_emit(&mut self) -> Option<Watermark> {
1172 Some(self.current.clone())
1174 }
1175
1176 fn current_watermark(&self) -> Watermark {
1177 self.current.clone()
1178 }
1179}
1180
1181#[derive(Debug, Clone)]
1183pub struct PeriodicWatermarkGenerator {
1184 inner: BoundedOutOfOrdernessGenerator,
1186 interval: Duration,
1188 last_emit: Option<DateTime<Utc>>,
1190}
1191
1192impl PeriodicWatermarkGenerator {
1193 #[must_use]
1195 pub fn new(max_out_of_orderness: Duration, interval: Duration) -> Self {
1196 Self {
1197 inner: BoundedOutOfOrdernessGenerator::new(max_out_of_orderness),
1198 interval,
1199 last_emit: None,
1200 }
1201 }
1202}
1203
1204impl WatermarkGenerator for PeriodicWatermarkGenerator {
1205 fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1206 self.inner.on_event(timestamp);
1208 None
1209 }
1210
1211 fn on_periodic_emit(&mut self) -> Option<Watermark> {
1212 let now = Utc::now();
1213 let interval_chrono = ChronoDuration::from_std(self.interval).unwrap_or(ChronoDuration::zero());
1214 let should_emit = self
1215 .last_emit
1216 .map(|last| now - last >= interval_chrono)
1217 .unwrap_or(true);
1218
1219 if should_emit {
1220 self.last_emit = Some(now);
1221 Some(self.inner.current_watermark())
1222 } else {
1223 None
1224 }
1225 }
1226
1227 fn current_watermark(&self) -> Watermark {
1228 self.inner.current_watermark()
1229 }
1230}
1231
1232#[derive(Debug, Clone, PartialEq, Eq)]
1238pub enum LateDataResult<T> {
1239 OnTime(T),
1241 WithinLateness(T),
1243 Drop,
1245 SideOutput(T),
1247}
1248
1249impl<T> LateDataResult<T> {
1250 #[must_use]
1252 pub fn should_process(&self) -> bool {
1253 matches!(
1254 self,
1255 LateDataResult::OnTime(_) | LateDataResult::WithinLateness(_)
1256 )
1257 }
1258
1259 #[must_use]
1261 pub fn into_processable(self) -> Option<T> {
1262 match self {
1263 LateDataResult::OnTime(t) | LateDataResult::WithinLateness(t) => Some(t),
1264 _ => None,
1265 }
1266 }
1267
1268 #[must_use]
1270 pub fn is_side_output(&self) -> bool {
1271 matches!(self, LateDataResult::SideOutput(_))
1272 }
1273
1274 #[must_use]
1276 pub fn into_side_output(self) -> Option<T> {
1277 match self {
1278 LateDataResult::SideOutput(t) => Some(t),
1279 _ => None,
1280 }
1281 }
1282}
1283
1284#[derive(Debug, Clone)]
1286pub struct LateDataHandler {
1287 policy: LateDataPolicy,
1289 stats: LateDataStats,
1291}
1292
1293#[derive(Debug, Clone, Default)]
1295pub struct LateDataStats {
1296 pub on_time: u64,
1298 pub within_lateness: u64,
1300 pub dropped: u64,
1302 pub side_output: u64,
1304}
1305
1306impl LateDataHandler {
1307 #[must_use]
1309 pub fn new(policy: LateDataPolicy) -> Self {
1310 Self {
1311 policy,
1312 stats: LateDataStats::default(),
1313 }
1314 }
1315
1316 #[must_use]
1318 pub fn drop_late() -> Self {
1319 Self::new(LateDataPolicy::Drop)
1320 }
1321
1322 #[must_use]
1324 pub fn with_allowed_lateness(lateness: Duration) -> Self {
1325 Self::new(LateDataPolicy::AllowLateness(lateness))
1326 }
1327
1328 #[must_use]
1330 pub fn redirect_to_side_output() -> Self {
1331 Self::new(LateDataPolicy::SideOutput)
1332 }
1333
1334 fn allowed_lateness(&self) -> Duration {
1336 match &self.policy {
1337 LateDataPolicy::AllowLateness(d) => *d,
1338 _ => Duration::ZERO,
1339 }
1340 }
1341
1342 pub fn evaluate<T>(
1349 &mut self,
1350 element_timestamp: DateTime<Utc>,
1351 current_watermark: &Watermark,
1352 element: T,
1353 ) -> LateDataResult<T> {
1354 if element_timestamp >= current_watermark.timestamp {
1356 self.stats.on_time += 1;
1357 return LateDataResult::OnTime(element);
1358 }
1359
1360 let lateness_duration = current_watermark.timestamp - element_timestamp;
1362 let allowed = self.allowed_lateness();
1363
1364 if let Ok(lateness_std) = lateness_duration.to_std()
1366 && lateness_std <= allowed
1367 {
1368 self.stats.within_lateness += 1;
1369 return LateDataResult::WithinLateness(element);
1370 }
1371
1372 match &self.policy {
1374 LateDataPolicy::Drop => {
1375 self.stats.dropped += 1;
1376 LateDataResult::Drop
1377 }
1378 LateDataPolicy::AllowLateness(_) => {
1379 self.stats.within_lateness += 1;
1381 LateDataResult::WithinLateness(element)
1382 }
1383 LateDataPolicy::SideOutput => {
1384 self.stats.side_output += 1;
1385 LateDataResult::SideOutput(element)
1386 }
1387 }
1388 }
1389
1390 #[must_use]
1392 pub fn stats(&self) -> &LateDataStats {
1393 &self.stats
1394 }
1395
1396 pub fn reset_stats(&mut self) {
1398 self.stats = LateDataStats::default();
1399 }
1400
1401 #[must_use]
1403 pub fn policy(&self) -> &LateDataPolicy {
1404 &self.policy
1405 }
1406
1407 #[must_use]
1409 pub fn get_allowed_lateness(&self) -> Duration {
1410 match &self.policy {
1411 LateDataPolicy::AllowLateness(d) => *d,
1412 _ => Duration::ZERO,
1413 }
1414 }
1415}
1416
1417#[derive(Debug)]
1423pub struct WindowState<W: Window, T> {
1424 pub window: W,
1426 pub elements: Vec<(DateTime<Utc>, T)>,
1428 pub count: usize,
1430 pub triggered: bool,
1432 pub last_watermark: Option<Watermark>,
1434}
1435
1436impl<W: Window, T: Clone> WindowState<W, T> {
1437 #[must_use]
1439 pub fn new(window: W) -> Self {
1440 Self {
1441 window,
1442 elements: Vec::new(),
1443 count: 0,
1444 triggered: false,
1445 last_watermark: None,
1446 }
1447 }
1448
1449 pub fn add(&mut self, timestamp: DateTime<Utc>, element: T) {
1451 self.elements.push((timestamp, element));
1452 self.count += 1;
1453 }
1454
1455 pub fn update_watermark(&mut self, watermark: Watermark) {
1457 self.last_watermark = Some(watermark);
1458 }
1459
1460 #[must_use]
1465 pub fn can_be_gc(&self, current_watermark: &Watermark, allowed_lateness: ChronoDuration) -> bool {
1466 if !self.triggered {
1467 return false;
1468 }
1469 match self.window.end_time() {
1470 Some(end_time) => current_watermark.timestamp > end_time + allowed_lateness,
1471 None => false, }
1473 }
1474
1475 pub fn clear(&mut self) {
1477 self.elements.clear();
1478 self.count = 0;
1479 }
1480
1481 pub fn mark_triggered(&mut self) {
1483 self.triggered = true;
1484 }
1485
1486 #[must_use]
1488 pub fn elements(&self) -> &[(DateTime<Utc>, T)] {
1489 &self.elements
1490 }
1491
1492 #[must_use]
1494 pub fn values(&self) -> Vec<T> {
1495 self.elements.iter().map(|(_, v)| v.clone()).collect()
1496 }
1497}
1498
1499impl<W: Window, T: Clone> Clone for WindowState<W, T> {
1500 fn clone(&self) -> Self {
1501 Self {
1502 window: self.window.clone(),
1503 elements: self.elements.clone(),
1504 count: self.count,
1505 triggered: self.triggered,
1506 last_watermark: self.last_watermark.clone(),
1507 }
1508 }
1509}
1510
1511pub type SharedWindowAssigner<W> = Arc<dyn WindowAssigner<W = W>>;
1513
1514#[cfg(test)]
1515mod tests {
1516 use super::*;
1517
1518 fn timestamp(hour: u32, minute: u32, second: u32) -> DateTime<Utc> {
1519 use chrono::TimeZone;
1520 Utc
1521 .with_ymd_and_hms(2024, 1, 1, hour, minute, second)
1522 .unwrap()
1523 }
1524
1525 #[test]
1527 fn test_time_window_basic() {
1528 let start = timestamp(10, 0, 0);
1529 let end = timestamp(10, 5, 0);
1530 let window = TimeWindow::new(start, end);
1531
1532 assert_eq!(window.start(), start);
1533 assert_eq!(window.end(), end);
1534 assert_eq!(window.duration(), ChronoDuration::minutes(5));
1535 }
1536
1537 #[test]
1538 fn test_time_window_contains() {
1539 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1540
1541 assert!(window.contains(timestamp(10, 0, 0))); assert!(window.contains(timestamp(10, 2, 30)));
1543 assert!(!window.contains(timestamp(10, 5, 0))); assert!(!window.contains(timestamp(9, 59, 59)));
1545 }
1546
1547 #[test]
1548 fn test_time_window_intersects() {
1549 let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1550 let w2 = TimeWindow::new(timestamp(10, 3, 0), timestamp(10, 8, 0));
1551 let w3 = TimeWindow::new(timestamp(10, 5, 0), timestamp(10, 10, 0));
1552 let w4 = TimeWindow::new(timestamp(10, 10, 0), timestamp(10, 15, 0));
1553
1554 assert!(w1.intersects(&w2)); assert!(!w1.intersects(&w3)); assert!(!w1.intersects(&w4)); }
1558
1559 #[test]
1560 fn test_time_window_merge() {
1561 let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1562 let w2 = TimeWindow::new(timestamp(10, 3, 0), timestamp(10, 8, 0));
1563 let w3 = TimeWindow::new(timestamp(10, 10, 0), timestamp(10, 15, 0));
1564
1565 let merged = w1.merge(&w2).unwrap();
1566 assert_eq!(merged.start(), timestamp(10, 0, 0));
1567 assert_eq!(merged.end(), timestamp(10, 8, 0));
1568
1569 assert!(w1.merge(&w3).is_none()); }
1571
1572 #[test]
1573 fn test_time_window_display() {
1574 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1575 let s = format!("{}", window);
1576 assert!(s.contains("10:00:00"));
1577 assert!(s.contains("10:05:00"));
1578 }
1579
1580 #[test]
1582 fn test_count_window() {
1583 let window = CountWindow::new(42, 100);
1584 assert_eq!(window.id(), 42);
1585 assert_eq!(window.max_count(), 100);
1586 }
1587
1588 #[test]
1590 fn test_session_window_from_element() {
1591 let ts = timestamp(10, 0, 0);
1592 let gap = Duration::from_secs(300); let session = SessionWindow::from_element(ts, gap);
1594
1595 assert_eq!(session.start(), ts);
1596 assert_eq!(session.gap(), gap);
1597 assert!(session.contains(ts));
1598 }
1599
1600 #[test]
1601 fn test_session_window_extend() {
1602 let mut session = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
1603
1604 session.extend(timestamp(10, 2, 0));
1605 assert_eq!(session.start(), timestamp(10, 0, 0));
1606 session.extend(timestamp(9, 58, 0)); assert_eq!(session.start(), timestamp(9, 58, 0));
1610 }
1611
1612 #[test]
1613 fn test_session_window_merge() {
1614 let s1 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
1615 let s2 = SessionWindow::from_element(timestamp(10, 2, 0), Duration::from_secs(300));
1616 let s3 = SessionWindow::from_element(timestamp(10, 30, 0), Duration::from_secs(300));
1617
1618 assert!(s1.should_merge(&s2)); assert!(!s1.should_merge(&s3)); let merged = s1.merge(&s2).unwrap();
1622 assert_eq!(merged.start(), timestamp(10, 0, 0));
1623 }
1624
1625 #[test]
1627 fn test_tumbling_assigner() {
1628 let assigner = TumblingWindowAssigner::new(Duration::from_secs(300)); let windows = assigner.assign_windows(timestamp(10, 2, 30));
1631 assert_eq!(windows.len(), 1);
1632
1633 let window = &windows[0];
1635 assert_eq!(window.start(), timestamp(10, 0, 0));
1636 assert_eq!(window.end(), timestamp(10, 5, 0));
1637 }
1638
1639 #[test]
1640 fn test_tumbling_assigner_with_offset() {
1641 let assigner =
1642 TumblingWindowAssigner::new(Duration::from_secs(300)).with_offset(Duration::from_secs(60)); let windows = assigner.assign_windows(timestamp(10, 2, 30));
1645 assert_eq!(windows.len(), 1);
1646
1647 let window = &windows[0];
1649 assert_eq!(window.start(), timestamp(10, 1, 0));
1650 assert_eq!(window.end(), timestamp(10, 6, 0));
1651 }
1652
1653 #[test]
1655 fn test_sliding_assigner() {
1656 let assigner = SlidingWindowAssigner::new(
1657 Duration::from_secs(300), Duration::from_secs(60), );
1660
1661 let windows = assigner.assign_windows(timestamp(10, 2, 30));
1662
1663 assert!(windows.len() > 1);
1665
1666 for window in &windows {
1668 assert!(window.contains(timestamp(10, 2, 30)));
1669 }
1670 }
1671
1672 #[test]
1674 fn test_session_assigner() {
1675 let assigner = SessionWindowAssigner::new(Duration::from_secs(300)); let windows = assigner.assign_windows(timestamp(10, 0, 0));
1678 assert_eq!(windows.len(), 1);
1679 assert_eq!(windows[0].gap(), Duration::from_secs(300));
1680 }
1681
1682 #[test]
1684 fn test_count_assigner() {
1685 let assigner = CountWindowAssigner::new(3);
1686
1687 for _ in 0..3 {
1689 let windows = assigner.assign_windows(Utc::now());
1690 assert_eq!(windows[0].id(), 0);
1691 }
1692
1693 let windows = assigner.assign_windows(Utc::now());
1695 assert_eq!(windows[0].id(), 1);
1696 }
1697
1698 #[test]
1700 fn test_global_assigner() {
1701 let assigner = GlobalWindowAssigner::new();
1702
1703 let windows = assigner.assign_windows(Utc::now());
1704 assert_eq!(windows.len(), 1);
1705 assert_eq!(windows[0], GlobalWindow);
1706 }
1707
1708 #[test]
1710 fn test_event_time_trigger() {
1711 let mut trigger = EventTimeTrigger::new();
1712 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1713
1714 let result = trigger.on_event_time(timestamp(10, 3, 0), &window);
1716 assert_eq!(result, TriggerResult::Continue);
1717
1718 let result = trigger.on_event_time(timestamp(10, 5, 0), &window);
1720 assert_eq!(result, TriggerResult::FireAndPurge);
1721 }
1722
1723 #[test]
1724 fn test_count_trigger() {
1725 let mut trigger = CountTrigger::new(3);
1726 let window = CountWindow::new(0, 3);
1727
1728 assert_eq!(
1730 trigger.on_element(Utc::now(), &window),
1731 TriggerResult::Continue
1732 );
1733 assert_eq!(
1734 trigger.on_element(Utc::now(), &window),
1735 TriggerResult::Continue
1736 );
1737
1738 assert_eq!(
1740 trigger.on_element(Utc::now(), &window),
1741 TriggerResult::FireAndPurge
1742 );
1743 }
1744
1745 #[test]
1746 fn test_processing_time_trigger() {
1747 let mut trigger = ProcessingTimeTrigger::new(Duration::from_secs(60));
1748 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1749
1750 let result = trigger.on_processing_time(timestamp(10, 0, 0), &window);
1752 assert_eq!(result, TriggerResult::Fire);
1753
1754 let result = trigger.on_processing_time(timestamp(10, 0, 30), &window);
1756 assert_eq!(result, TriggerResult::Continue);
1757
1758 let result = trigger.on_processing_time(timestamp(10, 1, 0), &window);
1760 assert_eq!(result, TriggerResult::Fire);
1761 }
1762
1763 #[test]
1765 fn test_window_config() {
1766 let config = WindowConfig::new()
1767 .with_late_data_policy(LateDataPolicy::SideOutput)
1768 .with_allowed_lateness(Duration::from_secs(60));
1769
1770 assert_eq!(config.late_data_policy, LateDataPolicy::SideOutput);
1771 assert_eq!(config.allowed_lateness, Duration::from_secs(60));
1772 }
1773
1774 #[test]
1776 fn test_window_error_display() {
1777 let err = WindowError::InvalidConfig("bad config".to_string());
1778 assert!(err.to_string().contains("Invalid window config"));
1779
1780 let err = WindowError::NotFound("window1".to_string());
1781 assert!(err.to_string().contains("not found"));
1782
1783 let err = WindowError::WindowClosed("window1".to_string());
1784 assert!(err.to_string().contains("closed"));
1785
1786 let err = WindowError::StateError("state issue".to_string());
1787 assert!(err.to_string().contains("State error"));
1788 }
1789
1790 #[test]
1792 fn test_trigger_result_equality() {
1793 assert_eq!(TriggerResult::Continue, TriggerResult::Continue);
1794 assert_eq!(TriggerResult::Fire, TriggerResult::Fire);
1795 assert_ne!(TriggerResult::Fire, TriggerResult::FireAndPurge);
1796 }
1797
1798 #[test]
1800 fn test_late_data_policy_default() {
1801 let policy = LateDataPolicy::default();
1802 assert_eq!(policy, LateDataPolicy::Drop);
1803 }
1804
1805 #[test]
1807 fn test_trigger_clone() {
1808 let trigger = EventTimeTrigger::new();
1809 let cloned = trigger.clone_trigger();
1810 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1812 assert_eq!(
1813 cloned
1814 .clone_trigger()
1815 .on_event_time(timestamp(10, 5, 0), &window),
1816 TriggerResult::FireAndPurge
1817 );
1818 }
1819
1820 #[test]
1825 fn test_watermark_basic() {
1826 let ts = timestamp(10, 30, 0);
1827 let wm = Watermark::new(ts);
1828 assert_eq!(wm.timestamp, ts);
1829 assert!(!wm.is_end_of_stream());
1830 assert!(format!("{}", wm).contains("Watermark"));
1831 }
1832
1833 #[test]
1834 fn test_watermark_min_max() {
1835 let min_wm = Watermark::min();
1836 let max_wm = Watermark::max();
1837
1838 assert!(min_wm.timestamp < max_wm.timestamp);
1839 assert!(!min_wm.is_end_of_stream());
1840 assert!(max_wm.is_end_of_stream());
1841 assert!(format!("{}", max_wm).contains("END"));
1842 }
1843
1844 #[test]
1845 fn test_watermark_advance() {
1846 let mut wm = Watermark::new(timestamp(10, 0, 0));
1847
1848 assert!(wm.advance(timestamp(10, 5, 0)));
1850 assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1851
1852 assert!(!wm.advance(timestamp(10, 3, 0)));
1854 assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1855
1856 assert!(!wm.advance(timestamp(10, 5, 0)));
1858 assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1859 }
1860
1861 #[test]
1862 fn test_watermark_ordering() {
1863 let wm1 = Watermark::new(timestamp(10, 0, 0));
1864 let wm2 = Watermark::new(timestamp(10, 5, 0));
1865 let wm3 = Watermark::new(timestamp(10, 0, 0));
1866
1867 assert!(wm1 < wm2);
1868 assert!(wm2 > wm1);
1869 assert_eq!(wm1, wm3);
1870 }
1871
1872 #[test]
1877 fn test_monotonic_generator() {
1878 let mut generator = MonotonicWatermarkGenerator::new();
1879 assert_eq!(generator.current_watermark(), Watermark::min());
1880
1881 let wm = generator.on_event(timestamp(10, 0, 0));
1883 assert!(wm.is_some());
1884 assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 0));
1885
1886 let wm = generator.on_event(timestamp(10, 5, 0));
1888 assert!(wm.is_some());
1889 assert_eq!(wm.unwrap().timestamp, timestamp(10, 5, 0));
1890
1891 let wm = generator.on_event(timestamp(10, 3, 0));
1893 assert!(wm.is_none());
1894 assert_eq!(generator.current_watermark().timestamp, timestamp(10, 5, 0));
1895
1896 assert!(generator.on_periodic_emit().is_none());
1898 }
1899
1900 #[test]
1901 fn test_bounded_out_of_orderness_generator() {
1902 let max_delay = Duration::from_secs(5);
1903 let mut generator = BoundedOutOfOrdernessGenerator::new(max_delay);
1904 assert_eq!(generator.current_watermark(), Watermark::min());
1905
1906 let wm = generator.on_event(timestamp(10, 0, 0));
1908 assert!(wm.is_some());
1909 assert_eq!(wm.unwrap().timestamp, timestamp(9, 59, 55));
1910
1911 let wm = generator.on_event(timestamp(10, 0, 10));
1913 assert!(wm.is_some());
1914 assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 5));
1915
1916 let wm = generator.on_event(timestamp(10, 0, 5));
1918 assert!(wm.is_none());
1919 assert_eq!(generator.current_watermark().timestamp, timestamp(10, 0, 5));
1920
1921 let wm = generator.on_periodic_emit();
1923 assert!(wm.is_some());
1924 assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 5));
1925 }
1926
1927 #[test]
1928 fn test_periodic_watermark_generator() {
1929 let max_delay = Duration::from_secs(5);
1930 let interval = Duration::from_secs(10);
1931 let mut generator = PeriodicWatermarkGenerator::new(max_delay, interval);
1932
1933 let wm = generator.on_event(timestamp(10, 0, 0));
1935 assert!(wm.is_none());
1936
1937 generator.on_event(timestamp(10, 0, 10));
1938 assert!(generator.on_event(timestamp(10, 0, 15)).is_none());
1939
1940 let wm = generator.on_periodic_emit();
1942 assert!(wm.is_some());
1943 assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 10));
1945 }
1946
1947 #[test]
1948 fn test_watermark_generator_end_of_stream() {
1949 let mut generator = MonotonicWatermarkGenerator::new();
1950 generator.on_event(timestamp(10, 0, 0));
1951
1952 let eos = generator.on_end_of_stream();
1953 assert!(eos.is_end_of_stream());
1954 }
1955
1956 #[test]
1961 fn test_late_data_result() {
1962 let on_time: LateDataResult<i32> = LateDataResult::OnTime(42);
1963 assert!(on_time.should_process());
1964 assert_eq!(on_time.into_processable(), Some(42));
1965
1966 let within: LateDataResult<i32> = LateDataResult::WithinLateness(42);
1967 assert!(within.should_process());
1968
1969 let drop: LateDataResult<i32> = LateDataResult::Drop;
1970 assert!(!drop.should_process());
1971 assert_eq!(drop.into_processable(), None);
1972
1973 let side: LateDataResult<i32> = LateDataResult::SideOutput(42);
1974 assert!(!side.should_process());
1975 assert!(side.is_side_output());
1976 assert_eq!(LateDataResult::SideOutput(42).into_side_output(), Some(42));
1977 }
1978
1979 #[test]
1980 fn test_late_data_handler_on_time() {
1981 let mut handler = LateDataHandler::drop_late();
1982 let watermark = Watermark::new(timestamp(10, 0, 0));
1983
1984 let result = handler.evaluate(timestamp(10, 0, 0), &watermark, "data");
1986 assert!(matches!(result, LateDataResult::OnTime("data")));
1987
1988 let result = handler.evaluate(timestamp(10, 5, 0), &watermark, "data");
1990 assert!(matches!(result, LateDataResult::OnTime("data")));
1991
1992 assert_eq!(handler.stats().on_time, 2);
1993 }
1994
1995 #[test]
1996 fn test_late_data_handler_within_lateness() {
1997 let mut handler = LateDataHandler::with_allowed_lateness(Duration::from_secs(30));
1998 let watermark = Watermark::new(timestamp(10, 0, 0));
1999
2000 let result = handler.evaluate(timestamp(9, 59, 50), &watermark, "data");
2002 assert!(matches!(result, LateDataResult::WithinLateness("data")));
2003
2004 assert_eq!(handler.stats().within_lateness, 1);
2005 }
2006
2007 #[test]
2008 fn test_late_data_handler_drop() {
2009 let mut handler = LateDataHandler::drop_late();
2010 let watermark = Watermark::new(timestamp(10, 0, 0));
2011
2012 let result = handler.evaluate(timestamp(9, 59, 30), &watermark, "data");
2014 assert!(matches!(result, LateDataResult::Drop));
2015
2016 assert_eq!(handler.stats().dropped, 1);
2017 }
2018
2019 #[test]
2020 fn test_late_data_handler_side_output() {
2021 let mut handler = LateDataHandler::redirect_to_side_output();
2022 let watermark = Watermark::new(timestamp(10, 0, 0));
2023
2024 let result = handler.evaluate(timestamp(9, 59, 30), &watermark, "data");
2026 assert!(matches!(result, LateDataResult::SideOutput("data")));
2027
2028 assert_eq!(handler.stats().side_output, 1);
2029 }
2030
2031 #[test]
2032 fn test_late_data_handler_stats_reset() {
2033 let mut handler = LateDataHandler::drop_late();
2034 let watermark = Watermark::new(timestamp(10, 0, 0));
2035
2036 handler.evaluate(timestamp(10, 0, 0), &watermark, "data");
2037 handler.evaluate(timestamp(9, 59, 0), &watermark, "data");
2038
2039 assert_eq!(handler.stats().on_time, 1);
2040 assert_eq!(handler.stats().dropped, 1);
2041
2042 handler.reset_stats();
2043 assert_eq!(handler.stats().on_time, 0);
2044 assert_eq!(handler.stats().dropped, 0);
2045 }
2046
2047 #[test]
2048 fn test_late_data_handler_accessors() {
2049 let handler = LateDataHandler::with_allowed_lateness(Duration::from_secs(30));
2050 assert!(matches!(handler.policy(), LateDataPolicy::AllowLateness(_)));
2051 assert_eq!(handler.get_allowed_lateness(), Duration::from_secs(30));
2052 }
2053
2054 #[test]
2059 fn test_window_state_basic() {
2060 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2061 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window.clone());
2062
2063 assert_eq!(state.count, 0);
2064 assert!(!state.triggered);
2065 assert!(state.last_watermark.is_none());
2066
2067 state.add(timestamp(10, 1, 0), 42);
2068 state.add(timestamp(10, 2, 0), 43);
2069
2070 assert_eq!(state.count, 2);
2071 assert_eq!(state.elements().len(), 2);
2072 assert_eq!(state.values(), vec![42, 43]);
2073 }
2074
2075 #[test]
2076 fn test_window_state_watermark() {
2077 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2078 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2079
2080 let wm = Watermark::new(timestamp(10, 3, 0));
2081 state.update_watermark(wm.clone());
2082 assert_eq!(state.last_watermark, Some(wm));
2083 }
2084
2085 #[test]
2086 fn test_window_state_trigger_and_clear() {
2087 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2088 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2089
2090 state.add(timestamp(10, 1, 0), 42);
2091 state.mark_triggered();
2092 assert!(state.triggered);
2093
2094 state.clear();
2095 assert_eq!(state.count, 0);
2096 assert!(state.elements().is_empty());
2097 }
2098
2099 #[test]
2100 fn test_window_state_gc() {
2101 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2102 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2103
2104 let wm = Watermark::new(timestamp(11, 0, 0));
2106 assert!(!state.can_be_gc(&wm, ChronoDuration::zero()));
2107
2108 state.mark_triggered();
2110 assert!(state.can_be_gc(&wm, ChronoDuration::zero()));
2111
2112 let wm2 = Watermark::new(timestamp(10, 5, 0));
2114 assert!(!state.can_be_gc(&wm2, ChronoDuration::minutes(1)));
2115 }
2116
2117 #[test]
2118 fn test_window_state_clone() {
2119 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2120 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2121 state.add(timestamp(10, 1, 0), 42);
2122 state.mark_triggered();
2123
2124 let cloned = state.clone();
2125 assert_eq!(cloned.count, 1);
2126 assert!(cloned.triggered);
2127 assert_eq!(cloned.values(), vec![42]);
2128 }
2129
2130 #[test]
2131 fn test_count_window_display() {
2132 let window = CountWindow::new(42, 100);
2133 let s = format!("{}", window);
2134 assert!(s.contains("id=42"));
2135 assert!(s.contains("max=100"));
2136 }
2137
2138 #[test]
2139 fn test_session_window_new() {
2140 let start = timestamp(10, 0, 0);
2141 let end = timestamp(10, 5, 0);
2142 let gap = Duration::from_secs(300);
2143 let session = SessionWindow::new(start, end, gap);
2144
2145 assert_eq!(session.start(), start);
2146 assert_eq!(session.end(), end);
2147 assert_eq!(session.gap(), gap);
2148 }
2149
2150 #[test]
2151 fn test_session_window_display() {
2152 let session = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2153 let s = format!("{}", session);
2154 assert!(s.contains("Session"));
2155 assert!(s.contains("gap="));
2156 }
2157
2158 #[test]
2159 fn test_session_window_hash() {
2160 use std::collections::hash_map::DefaultHasher;
2161 use std::hash::{Hash, Hasher};
2162
2163 let s1 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2164 let s2 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2165 let s3 = SessionWindow::from_element(timestamp(10, 1, 0), Duration::from_secs(300));
2166
2167 let mut h1 = DefaultHasher::new();
2168 let mut h2 = DefaultHasher::new();
2169 let mut h3 = DefaultHasher::new();
2170
2171 s1.hash(&mut h1);
2172 s2.hash(&mut h2);
2173 s3.hash(&mut h3);
2174
2175 assert_eq!(h1.finish(), h2.finish()); assert_ne!(h1.finish(), h3.finish()); }
2178
2179 #[test]
2180 fn test_time_window_ord() {
2181 let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2182 let w2 = TimeWindow::new(timestamp(10, 5, 0), timestamp(10, 10, 0));
2183 let w3 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2184
2185 assert!(w1 < w2);
2186 assert_eq!(w1, w3);
2187 assert!(w2 > w1);
2188 }
2189
2190 #[test]
2191 fn test_late_data_handler_new() {
2192 let handler = LateDataHandler::new(LateDataPolicy::Drop);
2193 assert!(matches!(handler.policy(), LateDataPolicy::Drop));
2194 assert_eq!(handler.get_allowed_lateness(), Duration::ZERO);
2195 }
2196
2197 #[test]
2198 fn test_late_data_handler_allow_lateness() {
2199 let handler = LateDataHandler::new(LateDataPolicy::AllowLateness(Duration::from_secs(10)));
2200 assert_eq!(handler.get_allowed_lateness(), Duration::from_secs(10));
2201 assert!(matches!(handler.policy(), LateDataPolicy::AllowLateness(_)));
2202 }
2203
2204 #[test]
2205 fn test_late_data_handler_stats() {
2206 let handler = LateDataHandler::new(LateDataPolicy::Drop);
2207 let stats = handler.stats();
2208 assert_eq!(stats.dropped, 0);
2209 assert_eq!(stats.within_lateness, 0);
2210 assert_eq!(stats.side_output, 0);
2211 }
2212
2213 #[test]
2214 fn test_late_data_handler_reset_stats() {
2215 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
2216 handler.reset_stats();
2219 let stats = handler.stats();
2220 assert_eq!(stats.dropped, 0);
2221 }
2222
2223 #[test]
2224 fn test_window_state_new() {
2225 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2226 let state: WindowState<TimeWindow, i32> = WindowState::new(window);
2227
2228 assert_eq!(state.count, 0);
2229 assert!(!state.triggered);
2230 assert!(state.elements().is_empty());
2231 assert_eq!(state.last_watermark, None);
2232 }
2233
2234 #[test]
2235 fn test_window_state_add_multiple() {
2236 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2237 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2238
2239 state.add(timestamp(10, 1, 0), 42);
2240 state.add(timestamp(10, 2, 0), 43);
2241 state.add(timestamp(10, 3, 0), 44);
2242
2243 assert_eq!(state.count, 3);
2244 assert_eq!(state.elements().len(), 3);
2245 assert_eq!(state.values(), vec![42, 43, 44]);
2246 }
2247
2248 #[test]
2249 fn test_window_state_can_be_gc_global_window() {
2250 use crate::window::GlobalWindow;
2251 let window = GlobalWindow;
2252 let mut state: WindowState<GlobalWindow, i32> = WindowState::new(window);
2253 state.mark_triggered();
2254
2255 let wm = Watermark::new(timestamp(11, 0, 0));
2256 assert!(!state.can_be_gc(&wm, ChronoDuration::zero()));
2258 }
2259
2260 #[test]
2261 fn test_window_state_can_be_gc_with_lateness() {
2262 let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2263 let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2264 state.mark_triggered();
2265
2266 let wm = Watermark::new(timestamp(10, 5, 0));
2268 assert!(!state.can_be_gc(&wm, ChronoDuration::minutes(1)));
2269
2270 let wm2 = Watermark::new(timestamp(10, 6, 1));
2272 assert!(state.can_be_gc(&wm2, ChronoDuration::minutes(1)));
2273 }
2274}