fluxus_utils/
window.rs

1use std::time::Duration;
2
3/// Window type for stream processing
4#[derive(Debug, Clone)]
5pub enum WindowType {
6    /// Tumbling window with fixed size
7    Tumbling(Duration),
8    /// Sliding window with size and slide interval
9    Sliding(Duration, Duration),
10    /// Session window with gap timeout
11    Session(Duration),
12    /// Global window, no window boundaries
13    Global,
14}
15
16/// Configuration for windowed operations
17#[derive(Debug, Clone)]
18pub struct WindowConfig {
19    /// Type of the window
20    pub window_type: WindowType,
21    /// Whether to allow late arrivals
22    pub allow_lateness: Duration,
23    /// Watermark strategy (time to wait before processing)
24    pub watermark_delay: Duration,
25}
26
27impl WindowConfig {
28    /// Create a new tumbling window configuration
29    pub fn tumbling(size: Duration) -> Self {
30        Self {
31            window_type: WindowType::Tumbling(size),
32            allow_lateness: Duration::from_secs(0),
33            watermark_delay: Duration::from_secs(0),
34        }
35    }
36
37    /// Create a new sliding window configuration
38    pub fn sliding(size: Duration, slide: Duration) -> Self {
39        Self {
40            window_type: WindowType::Sliding(size, slide),
41            allow_lateness: Duration::from_secs(0),
42            watermark_delay: Duration::from_secs(0),
43        }
44    }
45
46    /// Create a new session window configuration
47    pub fn session(gap: Duration) -> Self {
48        Self {
49            window_type: WindowType::Session(gap),
50            allow_lateness: Duration::from_secs(0),
51            watermark_delay: Duration::from_secs(0),
52        }
53    }
54
55    /// Create a new global window configuration
56    pub fn global() -> Self {
57        Self {
58            window_type: WindowType::Global,
59            allow_lateness: Duration::from_secs(0),
60            watermark_delay: Duration::from_secs(0),
61        }
62    }
63
64    /// Set the allowed lateness for this window
65    pub fn with_lateness(mut self, lateness: Duration) -> Self {
66        self.allow_lateness = lateness;
67        self
68    }
69
70    /// Set the watermark delay for this window
71    pub fn with_watermark_delay(mut self, delay: Duration) -> Self {
72        self.watermark_delay = delay;
73        self
74    }
75}
76
77impl WindowType {
78    fn get_common_windows(&self, timestamp: i64) -> Vec<i64> {
79        match self {
80            WindowType::Tumbling(duration) => {
81                let duration_ms = duration.as_millis() as i64;
82                vec![(timestamp / duration_ms) * duration_ms]
83            }
84            WindowType::Sliding(size, slide) => {
85                let slide_ms = slide.as_millis() as i64;
86                let size_ms = size.as_millis() as i64;
87                let earliest_window = ((timestamp - size_ms) / slide_ms) * slide_ms;
88                let latest_window = (timestamp / slide_ms) * slide_ms;
89
90                (earliest_window..=latest_window)
91                    .step_by(slide.as_millis() as usize)
92                    .filter(|&start| timestamp - start < size_ms)
93                    .collect()
94            }
95            WindowType::Session(gap) => {
96                let gap_ms = gap.as_millis() as i64;
97                vec![timestamp / gap_ms]
98            }
99            WindowType::Global => {
100                vec![0]
101            }
102        }
103    }
104
105    pub fn get_affected_windows(&self, timestamp: i64) -> Vec<i64> {
106        self.get_common_windows(timestamp)
107    }
108
109    pub fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
110        self.get_common_windows(timestamp)
111            .iter()
112            .map(|&ts| ts as u64)
113            .collect()
114    }
115}