Skip to main content

oxigdal_streaming/v2/
session_window.rs

1//! Session window processing for geospatial event streams.
2//!
3//! A session window groups events that occur within `gap_duration` of each other.
4//! When a gap larger than `gap_duration` is detected between consecutive events,
5//! the current session closes and a new one starts automatically.
6//!
7//! Additional v2 features:
8//! - Configurable **minimum session size** (sessions with fewer than `min_events`
9//!   events are silently discarded).
10//! - **Maximum session duration** guard: a session that exceeds `max_session_duration`
11//!   is force-closed even if no time-gap has been detected.
12//! - **`flush()`** to close the currently open session at end of stream.
13
14use std::time::{Duration, SystemTime};
15
16use crate::error::StreamingError;
17
18// ─── StreamEvent ──────────────────────────────────────────────────────────────
19
20/// A single typed event in the stream, carrying a wall-clock timestamp and a
21/// monotonically increasing sequence number.
22#[derive(Debug, Clone)]
23pub struct StreamEvent<T> {
24    /// Wall-clock time at which the event occurred.
25    pub timestamp: SystemTime,
26    /// Application payload.
27    pub payload: T,
28    /// Sequence number, strictly increasing within a stream.
29    pub sequence: u64,
30}
31
32impl<T> StreamEvent<T> {
33    /// Construct a new `StreamEvent`.
34    pub fn new(timestamp: SystemTime, payload: T, sequence: u64) -> Self {
35        Self {
36            timestamp,
37            payload,
38            sequence,
39        }
40    }
41}
42
43// ─── SessionWindow ────────────────────────────────────────────────────────────
44
45/// A closed session window containing all events that fell within the session.
46#[derive(Debug, Clone)]
47pub struct SessionWindow<T> {
48    /// Wall-clock timestamp of the first event.
49    pub start: SystemTime,
50    /// Wall-clock timestamp of the last event.
51    pub end: SystemTime,
52    /// All events belonging to this session, in arrival order.
53    pub events: Vec<StreamEvent<T>>,
54    /// Monotonically increasing session identifier (0-based).
55    pub session_id: u64,
56}
57
58impl<T> SessionWindow<T> {
59    /// Duration from first to last event.
60    ///
61    /// Returns `Duration::ZERO` if `end` is before `start` (clock anomaly).
62    pub fn duration(&self) -> Duration {
63        self.end
64            .duration_since(self.start)
65            .unwrap_or(Duration::ZERO)
66    }
67
68    /// Number of events in this session.
69    pub fn event_count(&self) -> usize {
70        self.events.len()
71    }
72
73    /// `true` if the session contains no events.
74    pub fn is_empty(&self) -> bool {
75        self.events.is_empty()
76    }
77}
78
79// ─── SessionWindowConfig ──────────────────────────────────────────────────────
80
81/// Configuration for the session window processor.
82#[derive(Debug, Clone)]
83pub struct SessionWindowConfig {
84    /// Maximum gap between consecutive events before the session is closed.
85    pub gap_duration: Duration,
86    /// Minimum number of events required for a session to be emitted.
87    /// Sessions with fewer events are silently discarded.
88    pub min_events: usize,
89    /// If set, force-close a session whose span exceeds this duration.
90    pub max_session_duration: Option<Duration>,
91}
92
93impl Default for SessionWindowConfig {
94    fn default() -> Self {
95        Self {
96            gap_duration: Duration::from_secs(30),
97            min_events: 1,
98            max_session_duration: None,
99        }
100    }
101}
102
103// ─── SessionWindowProcessor ───────────────────────────────────────────────────
104
105/// Stateful processor that groups a time-ordered event stream into session windows.
106///
107/// Call [`Self::process`] for each incoming event (events **must** arrive in
108/// non-decreasing timestamp order). Call [`Self::flush`] at end-of-stream to close
109/// any pending session. Completed windows are collected via [`Self::drain_sessions`].
110pub struct SessionWindowProcessor<T: Clone> {
111    config: SessionWindowConfig,
112    /// Events buffered in the currently open session.
113    current_session: Option<Vec<StreamEvent<T>>>,
114    /// Timestamp of the first event in the current session.
115    session_start: Option<SystemTime>,
116    /// Timestamp of the most recently processed event.
117    last_event_time: Option<SystemTime>,
118    /// Counter for generating session IDs.
119    next_session_id: u64,
120    /// Completed sessions waiting to be drained.
121    closed_sessions: Vec<SessionWindow<T>>,
122}
123
124impl<T: Clone> SessionWindowProcessor<T> {
125    /// Create a new processor with the given configuration.
126    pub fn new(config: SessionWindowConfig) -> Self {
127        Self {
128            config,
129            current_session: None,
130            session_start: None,
131            last_event_time: None,
132            next_session_id: 0,
133            closed_sessions: Vec::new(),
134        }
135    }
136
137    /// Process an incoming event.
138    ///
139    /// A new session is started if:
140    /// - There is no open session, **or**
141    /// - The gap since the previous event exceeds `gap_duration`, **or**
142    /// - The current session has exceeded `max_session_duration`.
143    pub fn process(&mut self, event: StreamEvent<T>) -> Result<(), StreamingError> {
144        let event_time = event.timestamp;
145
146        // Check whether we should close the current session.
147        let gap_exceeded = self.last_event_time.map(|last| {
148            event_time.duration_since(last).unwrap_or(Duration::ZERO) > self.config.gap_duration
149        });
150
151        let max_exceeded = self
152            .session_start
153            .zip(self.config.max_session_duration)
154            .map(|(start, max)| event_time.duration_since(start).unwrap_or(Duration::ZERO) > max);
155
156        let should_close = gap_exceeded.unwrap_or(false) || max_exceeded.unwrap_or(false);
157
158        if should_close {
159            self.close_current_session();
160        }
161
162        // Open a new session if none is active.
163        if self.current_session.is_none() {
164            self.current_session = Some(Vec::new());
165            self.session_start = Some(event_time);
166        }
167
168        self.last_event_time = Some(event_time);
169        if let Some(ref mut session) = self.current_session {
170            session.push(event);
171        }
172
173        Ok(())
174    }
175
176    /// Force-close the currently open session (call at end of stream).
177    ///
178    /// After flushing, any sessions that meet the `min_events` threshold are
179    /// available via [`Self::drain_sessions`].
180    pub fn flush(&mut self) {
181        self.close_current_session();
182    }
183
184    /// Drain and return all completed session windows.
185    ///
186    /// The internal buffer is cleared; subsequent calls return an empty `Vec`
187    /// until more sessions are closed.
188    pub fn drain_sessions(&mut self) -> Vec<SessionWindow<T>> {
189        std::mem::take(&mut self.closed_sessions)
190    }
191
192    /// Number of events buffered in the currently open (not yet closed) session.
193    pub fn pending_event_count(&self) -> usize {
194        self.current_session.as_ref().map(|s| s.len()).unwrap_or(0)
195    }
196
197    /// Total number of sessions that have been closed (includes discarded ones).
198    pub fn total_sessions_closed(&self) -> u64 {
199        self.next_session_id
200    }
201
202    // ── internals ────────────────────────────────────────────────────────────
203
204    fn close_current_session(&mut self) {
205        if let (Some(events), Some(start)) =
206            (self.current_session.take(), self.session_start.take())
207        {
208            let session_id = self.next_session_id;
209            self.next_session_id += 1;
210
211            if events.len() >= self.config.min_events {
212                let end = self.last_event_time.unwrap_or(start);
213                self.closed_sessions.push(SessionWindow {
214                    start,
215                    end,
216                    events,
217                    session_id,
218                });
219            }
220            // If min_events not met the session is discarded; ID is still consumed.
221        }
222        self.last_event_time = None;
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use std::time::UNIX_EPOCH;
230
231    fn ts(secs: u64) -> SystemTime {
232        UNIX_EPOCH + Duration::from_secs(secs)
233    }
234
235    fn event(secs: u64, seq: u64) -> StreamEvent<u32> {
236        StreamEvent::new(ts(secs), seq as u32, seq)
237    }
238
239    #[test]
240    fn test_single_session_from_close_events() {
241        let cfg = SessionWindowConfig {
242            gap_duration: Duration::from_secs(60),
243            min_events: 1,
244            max_session_duration: None,
245        };
246        let mut proc = SessionWindowProcessor::new(cfg);
247        proc.process(event(0, 0)).expect("process ok");
248        proc.process(event(10, 1)).expect("process ok");
249        proc.process(event(20, 2)).expect("process ok");
250        proc.flush();
251        let sessions = proc.drain_sessions();
252        assert_eq!(sessions.len(), 1);
253        assert_eq!(sessions[0].event_count(), 3);
254    }
255
256    #[test]
257    fn test_gap_detection_closes_session() {
258        let cfg = SessionWindowConfig {
259            gap_duration: Duration::from_secs(30),
260            min_events: 1,
261            max_session_duration: None,
262        };
263        let mut proc = SessionWindowProcessor::new(cfg);
264        proc.process(event(0, 0)).expect("process ok");
265        // gap of 60 s > 30 s → should close first session
266        proc.process(event(60, 1)).expect("process ok");
267        proc.flush();
268        let sessions = proc.drain_sessions();
269        assert_eq!(sessions.len(), 2);
270    }
271
272    #[test]
273    fn test_min_events_filter_drops_small_sessions() {
274        let cfg = SessionWindowConfig {
275            gap_duration: Duration::from_secs(5),
276            min_events: 3,
277            max_session_duration: None,
278        };
279        let mut proc = SessionWindowProcessor::new(cfg);
280        proc.process(event(0, 0)).expect("process ok");
281        proc.process(event(1, 1)).expect("process ok");
282        // only 2 events < min_events=3
283        proc.flush();
284        let sessions = proc.drain_sessions();
285        assert_eq!(sessions.len(), 0);
286    }
287
288    #[test]
289    fn test_max_session_duration_force_closes() {
290        let cfg = SessionWindowConfig {
291            gap_duration: Duration::from_secs(100),
292            min_events: 1,
293            max_session_duration: Some(Duration::from_secs(50)),
294        };
295        let mut proc = SessionWindowProcessor::new(cfg);
296        proc.process(event(0, 0)).expect("process ok");
297        // 60 s > max_session_duration=50 s → force close
298        proc.process(event(60, 1)).expect("process ok");
299        proc.flush();
300        let sessions = proc.drain_sessions();
301        // two sessions: first force-closed, second from event at t=60
302        assert_eq!(sessions.len(), 2);
303    }
304
305    #[test]
306    fn test_flush_closes_open_session() {
307        let cfg = SessionWindowConfig::default();
308        let mut proc = SessionWindowProcessor::new(cfg);
309        proc.process(event(0, 0)).expect("process ok");
310        assert_eq!(proc.pending_event_count(), 1);
311        proc.flush();
312        assert_eq!(proc.pending_event_count(), 0);
313        let sessions = proc.drain_sessions();
314        assert_eq!(sessions.len(), 1);
315    }
316
317    #[test]
318    fn test_multiple_sessions_from_gapped_stream() {
319        let cfg = SessionWindowConfig {
320            gap_duration: Duration::from_secs(10),
321            min_events: 1,
322            max_session_duration: None,
323        };
324        let mut proc = SessionWindowProcessor::new(cfg);
325        // Session 1
326        proc.process(event(0, 0)).expect("ok");
327        proc.process(event(5, 1)).expect("ok");
328        // gap of 30 s
329        // Session 2
330        proc.process(event(35, 2)).expect("ok");
331        proc.process(event(40, 3)).expect("ok");
332        // gap of 60 s
333        // Session 3
334        proc.process(event(100, 4)).expect("ok");
335        proc.flush();
336        let sessions = proc.drain_sessions();
337        assert_eq!(sessions.len(), 3);
338    }
339
340    #[test]
341    fn test_session_id_increments() {
342        let cfg = SessionWindowConfig {
343            gap_duration: Duration::from_secs(5),
344            min_events: 1,
345            max_session_duration: None,
346        };
347        let mut proc = SessionWindowProcessor::new(cfg);
348        proc.process(event(0, 0)).expect("ok");
349        proc.process(event(20, 1)).expect("ok"); // gap → closes session 0
350        proc.flush(); // closes session 1
351        let sessions = proc.drain_sessions();
352        assert_eq!(sessions[0].session_id, 0);
353        assert_eq!(sessions[1].session_id, 1);
354    }
355
356    #[test]
357    fn test_session_duration_computation() {
358        let cfg = SessionWindowConfig::default();
359        let mut proc = SessionWindowProcessor::new(cfg);
360        proc.process(event(100, 0)).expect("ok");
361        proc.process(event(110, 1)).expect("ok");
362        proc.flush();
363        let sessions = proc.drain_sessions();
364        assert_eq!(sessions[0].duration(), Duration::from_secs(10));
365    }
366
367    #[test]
368    fn test_empty_processor_has_no_sessions() {
369        let mut proc: SessionWindowProcessor<u32> = SessionWindowProcessor::new(Default::default());
370        proc.flush();
371        assert_eq!(proc.drain_sessions().len(), 0);
372    }
373
374    #[test]
375    fn test_events_within_gap_stay_in_same_session() {
376        let cfg = SessionWindowConfig {
377            gap_duration: Duration::from_secs(60),
378            min_events: 1,
379            max_session_duration: None,
380        };
381        let mut proc = SessionWindowProcessor::new(cfg);
382        for i in 0..10u64 {
383            proc.process(event(i * 5, i)).expect("ok"); // every 5 s, gap=60 s
384        }
385        proc.flush();
386        let sessions = proc.drain_sessions();
387        assert_eq!(sessions.len(), 1);
388        assert_eq!(sessions[0].event_count(), 10);
389    }
390
391    #[test]
392    fn test_pending_event_count_resets_after_flush() {
393        let cfg = SessionWindowConfig::default();
394        let mut proc = SessionWindowProcessor::new(cfg);
395        proc.process(event(0, 0)).expect("ok");
396        proc.process(event(1, 1)).expect("ok");
397        assert_eq!(proc.pending_event_count(), 2);
398        proc.flush();
399        assert_eq!(proc.pending_event_count(), 0);
400    }
401}