Skip to main content

oxigdal_streaming/windowing/
session.rs

1//! Session window implementation.
2
3use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::BTreeMap;
9
10/// Configuration for session windows.
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SessionWindowConfig {
13    /// Gap duration between sessions
14    pub gap: Duration,
15
16    /// Maximum session duration (optional)
17    pub max_duration: Option<Duration>,
18}
19
20impl SessionWindowConfig {
21    /// Create a new session window configuration.
22    pub fn new(gap: Duration) -> Self {
23        Self {
24            gap,
25            max_duration: None,
26        }
27    }
28
29    /// Set the maximum session duration.
30    pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
31        self.max_duration = Some(max_duration);
32        self
33    }
34}
35
36/// Session window (dynamic windows based on activity).
37#[derive(Debug)]
38pub struct SessionWindow {
39    config: SessionWindowConfig,
40    sessions: BTreeMap<DateTime<Utc>, Window>,
41}
42
43impl SessionWindow {
44    /// Create a new session window.
45    pub fn new(gap: Duration) -> Self {
46        Self {
47            config: SessionWindowConfig::new(gap),
48            sessions: BTreeMap::new(),
49        }
50    }
51
52    /// Create a new session window with maximum duration.
53    pub fn with_max_duration(gap: Duration, max_duration: Duration) -> Self {
54        Self {
55            config: SessionWindowConfig::new(gap).with_max_duration(max_duration),
56            sessions: BTreeMap::new(),
57        }
58    }
59
60    /// Assign an element to a session window.
61    pub fn assign(&mut self, timestamp: DateTime<Utc>) -> Result<Window> {
62        let mut merged_window = None;
63        let mut windows_to_remove = Vec::new();
64
65        for (start, window) in &self.sessions {
66            if timestamp >= window.start && timestamp <= window.end {
67                merged_window = Some(window.clone());
68                windows_to_remove.push(*start);
69            } else if timestamp > window.end && timestamp - window.end < self.config.gap {
70                let new_end = timestamp + self.config.gap;
71                let mut new_window = Window::new(window.start, new_end)?;
72
73                if let Some(max_dur) = self.config.max_duration {
74                    if new_window.duration() > max_dur {
75                        new_window = Window::new(new_window.end - max_dur, new_window.end)?;
76                    }
77                }
78
79                if let Some(existing) = merged_window {
80                    merged_window = existing.merge(&new_window);
81                } else {
82                    merged_window = Some(new_window);
83                }
84
85                windows_to_remove.push(*start);
86            }
87        }
88
89        for start in windows_to_remove {
90            self.sessions.remove(&start);
91        }
92
93        let result_window = if let Some(window) = merged_window {
94            window
95        } else {
96            Window::new(timestamp, timestamp + self.config.gap)?
97        };
98
99        self.sessions
100            .insert(result_window.start, result_window.clone());
101
102        Ok(result_window)
103    }
104
105    /// Get all active sessions.
106    pub fn active_sessions(&self) -> Vec<Window> {
107        self.sessions.values().cloned().collect()
108    }
109
110    /// Clear expired sessions.
111    pub fn clear_expired(&mut self, watermark: DateTime<Utc>) {
112        self.sessions.retain(|_, window| window.end > watermark);
113    }
114}
115
116/// Assigner for session windows.
117pub struct SessionAssigner {
118    config: SessionWindowConfig,
119}
120
121impl SessionAssigner {
122    /// Create a new session window assigner.
123    pub fn new(gap: Duration) -> Self {
124        Self {
125            config: SessionWindowConfig::new(gap),
126        }
127    }
128
129    /// Create a new session window assigner with maximum duration.
130    pub fn with_max_duration(gap: Duration, max_duration: Duration) -> Self {
131        Self {
132            config: SessionWindowConfig::new(gap).with_max_duration(max_duration),
133        }
134    }
135}
136
137impl WindowAssigner for SessionAssigner {
138    fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
139        let start = element.event_time;
140        let end = start + self.config.gap;
141        Ok(vec![Window::new(start, end)?])
142    }
143
144    fn assigner_type(&self) -> &str {
145        "SessionAssigner"
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152
153    #[test]
154    fn test_session_window() {
155        let mut window = SessionWindow::new(Duration::seconds(60));
156        let ts1 =
157            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
158
159        let w1 = window
160            .assign(ts1)
161            .expect("Session window assignment should succeed in test");
162        assert!(w1.contains(&ts1));
163        assert_eq!(w1.duration(), Duration::seconds(60));
164    }
165
166    #[test]
167    fn test_session_window_merge() {
168        let mut window = SessionWindow::new(Duration::seconds(60));
169
170        let ts1 =
171            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
172        let ts2 = ts1 + Duration::seconds(30);
173
174        let _w1 = window
175            .assign(ts1)
176            .expect("Session window assignment should succeed in test");
177        let w2 = window
178            .assign(ts2)
179            .expect("Session window assignment should succeed in test");
180
181        assert!(w2.contains(&ts1));
182        assert!(w2.contains(&ts2));
183    }
184
185    #[test]
186    fn test_session_window_separate() {
187        let mut window = SessionWindow::new(Duration::seconds(60));
188
189        let ts1 =
190            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
191        let ts2 = ts1 + Duration::seconds(120);
192
193        let w1 = window
194            .assign(ts1)
195            .expect("Session window assignment should succeed in test");
196        let w2 = window
197            .assign(ts2)
198            .expect("Session window assignment should succeed in test");
199
200        assert!(!w1.contains(&ts2));
201        assert!(!w2.contains(&ts1));
202    }
203
204    #[test]
205    fn test_session_window_max_duration() {
206        let mut window =
207            SessionWindow::with_max_duration(Duration::seconds(10), Duration::seconds(100));
208
209        let ts1 =
210            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
211        window
212            .assign(ts1)
213            .expect("Session window assignment should succeed in test");
214
215        let ts2 = ts1 + Duration::seconds(200);
216        let w = window
217            .assign(ts2)
218            .expect("Session window assignment should succeed in test");
219
220        assert!(w.duration() <= Duration::seconds(100));
221    }
222
223    #[test]
224    fn test_session_assigner() {
225        let assigner = SessionAssigner::new(Duration::seconds(60));
226        let elem = StreamElement::new(
227            vec![1, 2, 3],
228            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
229        );
230
231        let windows = assigner
232            .assign_windows(&elem)
233            .expect("Session window assigner should succeed in test");
234        assert_eq!(windows.len(), 1);
235        assert!(windows[0].contains(&elem.event_time));
236    }
237
238    #[test]
239    fn test_clear_expired() {
240        let mut window = SessionWindow::new(Duration::seconds(60));
241
242        let ts1 =
243            DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
244        let ts2 = ts1 + Duration::seconds(200);
245
246        window
247            .assign(ts1)
248            .expect("Session window assignment should succeed in test");
249        window
250            .assign(ts2)
251            .expect("Session window assignment should succeed in test");
252
253        assert_eq!(window.active_sessions().len(), 2);
254
255        let watermark = ts1 + Duration::seconds(100);
256        window.clear_expired(watermark);
257
258        assert_eq!(window.active_sessions().len(), 1);
259    }
260}