1use std::time::{Duration, SystemTime};
15
16use crate::error::StreamingError;
17
18#[derive(Debug, Clone)]
23pub struct StreamEvent<T> {
24 pub timestamp: SystemTime,
26 pub payload: T,
28 pub sequence: u64,
30}
31
32impl<T> StreamEvent<T> {
33 pub fn new(timestamp: SystemTime, payload: T, sequence: u64) -> Self {
35 Self {
36 timestamp,
37 payload,
38 sequence,
39 }
40 }
41}
42
43#[derive(Debug, Clone)]
47pub struct SessionWindow<T> {
48 pub start: SystemTime,
50 pub end: SystemTime,
52 pub events: Vec<StreamEvent<T>>,
54 pub session_id: u64,
56}
57
58impl<T> SessionWindow<T> {
59 pub fn duration(&self) -> Duration {
63 self.end
64 .duration_since(self.start)
65 .unwrap_or(Duration::ZERO)
66 }
67
68 pub fn event_count(&self) -> usize {
70 self.events.len()
71 }
72
73 pub fn is_empty(&self) -> bool {
75 self.events.is_empty()
76 }
77}
78
79#[derive(Debug, Clone)]
83pub struct SessionWindowConfig {
84 pub gap_duration: Duration,
86 pub min_events: usize,
89 pub max_session_duration: Option<Duration>,
91}
92
93impl Default for SessionWindowConfig {
94 fn default() -> Self {
95 Self {
96 gap_duration: Duration::from_secs(30),
97 min_events: 1,
98 max_session_duration: None,
99 }
100 }
101}
102
103pub struct SessionWindowProcessor<T: Clone> {
111 config: SessionWindowConfig,
112 current_session: Option<Vec<StreamEvent<T>>>,
114 session_start: Option<SystemTime>,
116 last_event_time: Option<SystemTime>,
118 next_session_id: u64,
120 closed_sessions: Vec<SessionWindow<T>>,
122}
123
124impl<T: Clone> SessionWindowProcessor<T> {
125 pub fn new(config: SessionWindowConfig) -> Self {
127 Self {
128 config,
129 current_session: None,
130 session_start: None,
131 last_event_time: None,
132 next_session_id: 0,
133 closed_sessions: Vec::new(),
134 }
135 }
136
137 pub fn process(&mut self, event: StreamEvent<T>) -> Result<(), StreamingError> {
144 let event_time = event.timestamp;
145
146 let gap_exceeded = self.last_event_time.map(|last| {
148 event_time.duration_since(last).unwrap_or(Duration::ZERO) > self.config.gap_duration
149 });
150
151 let max_exceeded = self
152 .session_start
153 .zip(self.config.max_session_duration)
154 .map(|(start, max)| event_time.duration_since(start).unwrap_or(Duration::ZERO) > max);
155
156 let should_close = gap_exceeded.unwrap_or(false) || max_exceeded.unwrap_or(false);
157
158 if should_close {
159 self.close_current_session();
160 }
161
162 if self.current_session.is_none() {
164 self.current_session = Some(Vec::new());
165 self.session_start = Some(event_time);
166 }
167
168 self.last_event_time = Some(event_time);
169 if let Some(ref mut session) = self.current_session {
170 session.push(event);
171 }
172
173 Ok(())
174 }
175
176 pub fn flush(&mut self) {
181 self.close_current_session();
182 }
183
184 pub fn drain_sessions(&mut self) -> Vec<SessionWindow<T>> {
189 std::mem::take(&mut self.closed_sessions)
190 }
191
192 pub fn pending_event_count(&self) -> usize {
194 self.current_session.as_ref().map(|s| s.len()).unwrap_or(0)
195 }
196
197 pub fn total_sessions_closed(&self) -> u64 {
199 self.next_session_id
200 }
201
202 fn close_current_session(&mut self) {
205 if let (Some(events), Some(start)) =
206 (self.current_session.take(), self.session_start.take())
207 {
208 let session_id = self.next_session_id;
209 self.next_session_id += 1;
210
211 if events.len() >= self.config.min_events {
212 let end = self.last_event_time.unwrap_or(start);
213 self.closed_sessions.push(SessionWindow {
214 start,
215 end,
216 events,
217 session_id,
218 });
219 }
220 }
222 self.last_event_time = None;
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use std::time::UNIX_EPOCH;
230
231 fn ts(secs: u64) -> SystemTime {
232 UNIX_EPOCH + Duration::from_secs(secs)
233 }
234
235 fn event(secs: u64, seq: u64) -> StreamEvent<u32> {
236 StreamEvent::new(ts(secs), seq as u32, seq)
237 }
238
239 #[test]
240 fn test_single_session_from_close_events() {
241 let cfg = SessionWindowConfig {
242 gap_duration: Duration::from_secs(60),
243 min_events: 1,
244 max_session_duration: None,
245 };
246 let mut proc = SessionWindowProcessor::new(cfg);
247 proc.process(event(0, 0)).expect("process ok");
248 proc.process(event(10, 1)).expect("process ok");
249 proc.process(event(20, 2)).expect("process ok");
250 proc.flush();
251 let sessions = proc.drain_sessions();
252 assert_eq!(sessions.len(), 1);
253 assert_eq!(sessions[0].event_count(), 3);
254 }
255
256 #[test]
257 fn test_gap_detection_closes_session() {
258 let cfg = SessionWindowConfig {
259 gap_duration: Duration::from_secs(30),
260 min_events: 1,
261 max_session_duration: None,
262 };
263 let mut proc = SessionWindowProcessor::new(cfg);
264 proc.process(event(0, 0)).expect("process ok");
265 proc.process(event(60, 1)).expect("process ok");
267 proc.flush();
268 let sessions = proc.drain_sessions();
269 assert_eq!(sessions.len(), 2);
270 }
271
272 #[test]
273 fn test_min_events_filter_drops_small_sessions() {
274 let cfg = SessionWindowConfig {
275 gap_duration: Duration::from_secs(5),
276 min_events: 3,
277 max_session_duration: None,
278 };
279 let mut proc = SessionWindowProcessor::new(cfg);
280 proc.process(event(0, 0)).expect("process ok");
281 proc.process(event(1, 1)).expect("process ok");
282 proc.flush();
284 let sessions = proc.drain_sessions();
285 assert_eq!(sessions.len(), 0);
286 }
287
288 #[test]
289 fn test_max_session_duration_force_closes() {
290 let cfg = SessionWindowConfig {
291 gap_duration: Duration::from_secs(100),
292 min_events: 1,
293 max_session_duration: Some(Duration::from_secs(50)),
294 };
295 let mut proc = SessionWindowProcessor::new(cfg);
296 proc.process(event(0, 0)).expect("process ok");
297 proc.process(event(60, 1)).expect("process ok");
299 proc.flush();
300 let sessions = proc.drain_sessions();
301 assert_eq!(sessions.len(), 2);
303 }
304
305 #[test]
306 fn test_flush_closes_open_session() {
307 let cfg = SessionWindowConfig::default();
308 let mut proc = SessionWindowProcessor::new(cfg);
309 proc.process(event(0, 0)).expect("process ok");
310 assert_eq!(proc.pending_event_count(), 1);
311 proc.flush();
312 assert_eq!(proc.pending_event_count(), 0);
313 let sessions = proc.drain_sessions();
314 assert_eq!(sessions.len(), 1);
315 }
316
317 #[test]
318 fn test_multiple_sessions_from_gapped_stream() {
319 let cfg = SessionWindowConfig {
320 gap_duration: Duration::from_secs(10),
321 min_events: 1,
322 max_session_duration: None,
323 };
324 let mut proc = SessionWindowProcessor::new(cfg);
325 proc.process(event(0, 0)).expect("ok");
327 proc.process(event(5, 1)).expect("ok");
328 proc.process(event(35, 2)).expect("ok");
331 proc.process(event(40, 3)).expect("ok");
332 proc.process(event(100, 4)).expect("ok");
335 proc.flush();
336 let sessions = proc.drain_sessions();
337 assert_eq!(sessions.len(), 3);
338 }
339
340 #[test]
341 fn test_session_id_increments() {
342 let cfg = SessionWindowConfig {
343 gap_duration: Duration::from_secs(5),
344 min_events: 1,
345 max_session_duration: None,
346 };
347 let mut proc = SessionWindowProcessor::new(cfg);
348 proc.process(event(0, 0)).expect("ok");
349 proc.process(event(20, 1)).expect("ok"); proc.flush(); let sessions = proc.drain_sessions();
352 assert_eq!(sessions[0].session_id, 0);
353 assert_eq!(sessions[1].session_id, 1);
354 }
355
356 #[test]
357 fn test_session_duration_computation() {
358 let cfg = SessionWindowConfig::default();
359 let mut proc = SessionWindowProcessor::new(cfg);
360 proc.process(event(100, 0)).expect("ok");
361 proc.process(event(110, 1)).expect("ok");
362 proc.flush();
363 let sessions = proc.drain_sessions();
364 assert_eq!(sessions[0].duration(), Duration::from_secs(10));
365 }
366
367 #[test]
368 fn test_empty_processor_has_no_sessions() {
369 let mut proc: SessionWindowProcessor<u32> = SessionWindowProcessor::new(Default::default());
370 proc.flush();
371 assert_eq!(proc.drain_sessions().len(), 0);
372 }
373
374 #[test]
375 fn test_events_within_gap_stay_in_same_session() {
376 let cfg = SessionWindowConfig {
377 gap_duration: Duration::from_secs(60),
378 min_events: 1,
379 max_session_duration: None,
380 };
381 let mut proc = SessionWindowProcessor::new(cfg);
382 for i in 0..10u64 {
383 proc.process(event(i * 5, i)).expect("ok"); }
385 proc.flush();
386 let sessions = proc.drain_sessions();
387 assert_eq!(sessions.len(), 1);
388 assert_eq!(sessions[0].event_count(), 10);
389 }
390
391 #[test]
392 fn test_pending_event_count_resets_after_flush() {
393 let cfg = SessionWindowConfig::default();
394 let mut proc = SessionWindowProcessor::new(cfg);
395 proc.process(event(0, 0)).expect("ok");
396 proc.process(event(1, 1)).expect("ok");
397 assert_eq!(proc.pending_event_count(), 2);
398 proc.flush();
399 assert_eq!(proc.pending_event_count(), 0);
400 }
401}