oxigdal_streaming/windowing/
session.rs1use super::window::{Window, WindowAssigner};
4use crate::core::stream::StreamElement;
5use crate::error::Result;
6use chrono::{DateTime, Duration, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::BTreeMap;
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct SessionWindowConfig {
13 pub gap: Duration,
15
16 pub max_duration: Option<Duration>,
18}
19
20impl SessionWindowConfig {
21 pub fn new(gap: Duration) -> Self {
23 Self {
24 gap,
25 max_duration: None,
26 }
27 }
28
29 pub fn with_max_duration(mut self, max_duration: Duration) -> Self {
31 self.max_duration = Some(max_duration);
32 self
33 }
34}
35
36#[derive(Debug)]
38pub struct SessionWindow {
39 config: SessionWindowConfig,
40 sessions: BTreeMap<DateTime<Utc>, Window>,
41}
42
43impl SessionWindow {
44 pub fn new(gap: Duration) -> Self {
46 Self {
47 config: SessionWindowConfig::new(gap),
48 sessions: BTreeMap::new(),
49 }
50 }
51
52 pub fn with_max_duration(gap: Duration, max_duration: Duration) -> Self {
54 Self {
55 config: SessionWindowConfig::new(gap).with_max_duration(max_duration),
56 sessions: BTreeMap::new(),
57 }
58 }
59
60 pub fn assign(&mut self, timestamp: DateTime<Utc>) -> Result<Window> {
62 let mut merged_window = None;
63 let mut windows_to_remove = Vec::new();
64
65 for (start, window) in &self.sessions {
66 if timestamp >= window.start && timestamp <= window.end {
67 merged_window = Some(window.clone());
68 windows_to_remove.push(*start);
69 } else if timestamp > window.end && timestamp - window.end < self.config.gap {
70 let new_end = timestamp + self.config.gap;
71 let mut new_window = Window::new(window.start, new_end)?;
72
73 if let Some(max_dur) = self.config.max_duration {
74 if new_window.duration() > max_dur {
75 new_window = Window::new(new_window.end - max_dur, new_window.end)?;
76 }
77 }
78
79 if let Some(existing) = merged_window {
80 merged_window = existing.merge(&new_window);
81 } else {
82 merged_window = Some(new_window);
83 }
84
85 windows_to_remove.push(*start);
86 }
87 }
88
89 for start in windows_to_remove {
90 self.sessions.remove(&start);
91 }
92
93 let result_window = if let Some(window) = merged_window {
94 window
95 } else {
96 Window::new(timestamp, timestamp + self.config.gap)?
97 };
98
99 self.sessions
100 .insert(result_window.start, result_window.clone());
101
102 Ok(result_window)
103 }
104
105 pub fn active_sessions(&self) -> Vec<Window> {
107 self.sessions.values().cloned().collect()
108 }
109
110 pub fn clear_expired(&mut self, watermark: DateTime<Utc>) {
112 self.sessions.retain(|_, window| window.end > watermark);
113 }
114}
115
116pub struct SessionAssigner {
118 config: SessionWindowConfig,
119}
120
121impl SessionAssigner {
122 pub fn new(gap: Duration) -> Self {
124 Self {
125 config: SessionWindowConfig::new(gap),
126 }
127 }
128
129 pub fn with_max_duration(gap: Duration, max_duration: Duration) -> Self {
131 Self {
132 config: SessionWindowConfig::new(gap).with_max_duration(max_duration),
133 }
134 }
135}
136
137impl WindowAssigner for SessionAssigner {
138 fn assign_windows(&self, element: &StreamElement) -> Result<Vec<Window>> {
139 let start = element.event_time;
140 let end = start + self.config.gap;
141 Ok(vec![Window::new(start, end)?])
142 }
143
144 fn assigner_type(&self) -> &str {
145 "SessionAssigner"
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152
153 #[test]
154 fn test_session_window() {
155 let mut window = SessionWindow::new(Duration::seconds(60));
156 let ts1 =
157 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
158
159 let w1 = window
160 .assign(ts1)
161 .expect("Session window assignment should succeed in test");
162 assert!(w1.contains(&ts1));
163 assert_eq!(w1.duration(), Duration::seconds(60));
164 }
165
166 #[test]
167 fn test_session_window_merge() {
168 let mut window = SessionWindow::new(Duration::seconds(60));
169
170 let ts1 =
171 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
172 let ts2 = ts1 + Duration::seconds(30);
173
174 let _w1 = window
175 .assign(ts1)
176 .expect("Session window assignment should succeed in test");
177 let w2 = window
178 .assign(ts2)
179 .expect("Session window assignment should succeed in test");
180
181 assert!(w2.contains(&ts1));
182 assert!(w2.contains(&ts2));
183 }
184
185 #[test]
186 fn test_session_window_separate() {
187 let mut window = SessionWindow::new(Duration::seconds(60));
188
189 let ts1 =
190 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
191 let ts2 = ts1 + Duration::seconds(120);
192
193 let w1 = window
194 .assign(ts1)
195 .expect("Session window assignment should succeed in test");
196 let w2 = window
197 .assign(ts2)
198 .expect("Session window assignment should succeed in test");
199
200 assert!(!w1.contains(&ts2));
201 assert!(!w2.contains(&ts1));
202 }
203
204 #[test]
205 fn test_session_window_max_duration() {
206 let mut window =
207 SessionWindow::with_max_duration(Duration::seconds(10), Duration::seconds(100));
208
209 let ts1 =
210 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
211 window
212 .assign(ts1)
213 .expect("Session window assignment should succeed in test");
214
215 let ts2 = ts1 + Duration::seconds(200);
216 let w = window
217 .assign(ts2)
218 .expect("Session window assignment should succeed in test");
219
220 assert!(w.duration() <= Duration::seconds(100));
221 }
222
223 #[test]
224 fn test_session_assigner() {
225 let assigner = SessionAssigner::new(Duration::seconds(60));
226 let elem = StreamElement::new(
227 vec![1, 2, 3],
228 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed"),
229 );
230
231 let windows = assigner
232 .assign_windows(&elem)
233 .expect("Session window assigner should succeed in test");
234 assert_eq!(windows.len(), 1);
235 assert!(windows[0].contains(&elem.event_time));
236 }
237
238 #[test]
239 fn test_clear_expired() {
240 let mut window = SessionWindow::new(Duration::seconds(60));
241
242 let ts1 =
243 DateTime::from_timestamp(1000, 0).expect("Test timestamp creation should succeed");
244 let ts2 = ts1 + Duration::seconds(200);
245
246 window
247 .assign(ts1)
248 .expect("Session window assignment should succeed in test");
249 window
250 .assign(ts2)
251 .expect("Session window assignment should succeed in test");
252
253 assert_eq!(window.active_sessions().len(), 2);
254
255 let watermark = ts1 + Duration::seconds(100);
256 window.clear_expired(watermark);
257
258 assert_eq!(window.active_sessions().len(), 1);
259 }
260}