1use std::time::Duration;
2
3#[derive(Debug, Clone)]
5pub enum WindowType {
6 Tumbling(Duration),
8 Sliding(Duration, Duration),
10 Session(Duration),
12 Global,
14}
15
16#[derive(Debug, Clone)]
18pub struct WindowConfig {
19 pub window_type: WindowType,
21 pub allow_lateness: Duration,
23 pub watermark_delay: Duration,
25}
26
27impl WindowConfig {
28 pub fn tumbling(size: Duration) -> Self {
30 Self {
31 window_type: WindowType::Tumbling(size),
32 allow_lateness: Duration::from_secs(0),
33 watermark_delay: Duration::from_secs(0),
34 }
35 }
36
37 pub fn sliding(size: Duration, slide: Duration) -> Self {
39 Self {
40 window_type: WindowType::Sliding(size, slide),
41 allow_lateness: Duration::from_secs(0),
42 watermark_delay: Duration::from_secs(0),
43 }
44 }
45
46 pub fn session(gap: Duration) -> Self {
48 Self {
49 window_type: WindowType::Session(gap),
50 allow_lateness: Duration::from_secs(0),
51 watermark_delay: Duration::from_secs(0),
52 }
53 }
54
55 pub fn global() -> Self {
57 Self {
58 window_type: WindowType::Global,
59 allow_lateness: Duration::from_secs(0),
60 watermark_delay: Duration::from_secs(0),
61 }
62 }
63
64 pub fn with_lateness(mut self, lateness: Duration) -> Self {
66 self.allow_lateness = lateness;
67 self
68 }
69
70 pub fn with_watermark_delay(mut self, delay: Duration) -> Self {
72 self.watermark_delay = delay;
73 self
74 }
75}
76
77impl WindowType {
78 fn get_common_windows(&self, timestamp: i64) -> Vec<i64> {
79 match self {
80 WindowType::Tumbling(duration) => {
81 let duration_ms = duration.as_millis() as i64;
82 vec![(timestamp / duration_ms) * duration_ms]
83 }
84 WindowType::Sliding(size, slide) => {
85 let slide_ms = slide.as_millis() as i64;
86 let size_ms = size.as_millis() as i64;
87 let earliest_window = ((timestamp - size_ms) / slide_ms) * slide_ms;
88 let latest_window = (timestamp / slide_ms) * slide_ms;
89
90 (earliest_window..=latest_window)
91 .step_by(slide.as_millis() as usize)
92 .filter(|&start| timestamp - start < size_ms)
93 .collect()
94 }
95 WindowType::Session(gap) => {
96 let gap_ms = gap.as_millis() as i64;
97 vec![timestamp / gap_ms]
98 }
99 WindowType::Global => {
100 vec![0]
101 }
102 }
103 }
104
105 pub fn get_affected_windows(&self, timestamp: i64) -> Vec<i64> {
106 self.get_common_windows(timestamp)
107 }
108
109 pub fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
110 self.get_common_windows(timestamp)
111 .iter()
112 .map(|&ts| ts as u64)
113 .collect()
114 }
115}