use std::time::{Duration, SystemTime};
use crate::error::StreamingError;
#[derive(Debug, Clone)]
pub struct StreamEvent<T> {
pub timestamp: SystemTime,
pub payload: T,
pub sequence: u64,
}
impl<T> StreamEvent<T> {
pub fn new(timestamp: SystemTime, payload: T, sequence: u64) -> Self {
Self {
timestamp,
payload,
sequence,
}
}
}
#[derive(Debug, Clone)]
pub struct SessionWindow<T> {
pub start: SystemTime,
pub end: SystemTime,
pub events: Vec<StreamEvent<T>>,
pub session_id: u64,
}
impl<T> SessionWindow<T> {
pub fn duration(&self) -> Duration {
self.end
.duration_since(self.start)
.unwrap_or(Duration::ZERO)
}
pub fn event_count(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct SessionWindowConfig {
pub gap_duration: Duration,
pub min_events: usize,
pub max_session_duration: Option<Duration>,
}
impl Default for SessionWindowConfig {
fn default() -> Self {
Self {
gap_duration: Duration::from_secs(30),
min_events: 1,
max_session_duration: None,
}
}
}
pub struct SessionWindowProcessor<T: Clone> {
config: SessionWindowConfig,
current_session: Option<Vec<StreamEvent<T>>>,
session_start: Option<SystemTime>,
last_event_time: Option<SystemTime>,
next_session_id: u64,
closed_sessions: Vec<SessionWindow<T>>,
}
impl<T: Clone> SessionWindowProcessor<T> {
pub fn new(config: SessionWindowConfig) -> Self {
Self {
config,
current_session: None,
session_start: None,
last_event_time: None,
next_session_id: 0,
closed_sessions: Vec::new(),
}
}
pub fn process(&mut self, event: StreamEvent<T>) -> Result<(), StreamingError> {
let event_time = event.timestamp;
let gap_exceeded = self.last_event_time.map(|last| {
event_time.duration_since(last).unwrap_or(Duration::ZERO) > self.config.gap_duration
});
let max_exceeded = self
.session_start
.zip(self.config.max_session_duration)
.map(|(start, max)| event_time.duration_since(start).unwrap_or(Duration::ZERO) > max);
let should_close = gap_exceeded.unwrap_or(false) || max_exceeded.unwrap_or(false);
if should_close {
self.close_current_session();
}
if self.current_session.is_none() {
self.current_session = Some(Vec::new());
self.session_start = Some(event_time);
}
self.last_event_time = Some(event_time);
if let Some(ref mut session) = self.current_session {
session.push(event);
}
Ok(())
}
pub fn flush(&mut self) {
self.close_current_session();
}
pub fn drain_sessions(&mut self) -> Vec<SessionWindow<T>> {
std::mem::take(&mut self.closed_sessions)
}
pub fn pending_event_count(&self) -> usize {
self.current_session.as_ref().map(|s| s.len()).unwrap_or(0)
}
pub fn total_sessions_closed(&self) -> u64 {
self.next_session_id
}
fn close_current_session(&mut self) {
if let (Some(events), Some(start)) =
(self.current_session.take(), self.session_start.take())
{
let session_id = self.next_session_id;
self.next_session_id += 1;
if events.len() >= self.config.min_events {
let end = self.last_event_time.unwrap_or(start);
self.closed_sessions.push(SessionWindow {
start,
end,
events,
session_id,
});
}
}
self.last_event_time = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::UNIX_EPOCH;
fn ts(secs: u64) -> SystemTime {
UNIX_EPOCH + Duration::from_secs(secs)
}
fn event(secs: u64, seq: u64) -> StreamEvent<u32> {
StreamEvent::new(ts(secs), seq as u32, seq)
}
#[test]
fn test_single_session_from_close_events() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(60),
min_events: 1,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("process ok");
proc.process(event(10, 1)).expect("process ok");
proc.process(event(20, 2)).expect("process ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].event_count(), 3);
}
#[test]
fn test_gap_detection_closes_session() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(30),
min_events: 1,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("process ok");
proc.process(event(60, 1)).expect("process ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 2);
}
#[test]
fn test_min_events_filter_drops_small_sessions() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(5),
min_events: 3,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("process ok");
proc.process(event(1, 1)).expect("process ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 0);
}
#[test]
fn test_max_session_duration_force_closes() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(100),
min_events: 1,
max_session_duration: Some(Duration::from_secs(50)),
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("process ok");
proc.process(event(60, 1)).expect("process ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 2);
}
#[test]
fn test_flush_closes_open_session() {
let cfg = SessionWindowConfig::default();
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("process ok");
assert_eq!(proc.pending_event_count(), 1);
proc.flush();
assert_eq!(proc.pending_event_count(), 0);
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 1);
}
#[test]
fn test_multiple_sessions_from_gapped_stream() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(10),
min_events: 1,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("ok");
proc.process(event(5, 1)).expect("ok");
proc.process(event(35, 2)).expect("ok");
proc.process(event(40, 3)).expect("ok");
proc.process(event(100, 4)).expect("ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 3);
}
#[test]
fn test_session_id_increments() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(5),
min_events: 1,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("ok");
proc.process(event(20, 1)).expect("ok"); proc.flush(); let sessions = proc.drain_sessions();
assert_eq!(sessions[0].session_id, 0);
assert_eq!(sessions[1].session_id, 1);
}
#[test]
fn test_session_duration_computation() {
let cfg = SessionWindowConfig::default();
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(100, 0)).expect("ok");
proc.process(event(110, 1)).expect("ok");
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions[0].duration(), Duration::from_secs(10));
}
#[test]
fn test_empty_processor_has_no_sessions() {
let mut proc: SessionWindowProcessor<u32> = SessionWindowProcessor::new(Default::default());
proc.flush();
assert_eq!(proc.drain_sessions().len(), 0);
}
#[test]
fn test_events_within_gap_stay_in_same_session() {
let cfg = SessionWindowConfig {
gap_duration: Duration::from_secs(60),
min_events: 1,
max_session_duration: None,
};
let mut proc = SessionWindowProcessor::new(cfg);
for i in 0..10u64 {
proc.process(event(i * 5, i)).expect("ok"); }
proc.flush();
let sessions = proc.drain_sessions();
assert_eq!(sessions.len(), 1);
assert_eq!(sessions[0].event_count(), 10);
}
#[test]
fn test_pending_event_count_resets_after_flush() {
let cfg = SessionWindowConfig::default();
let mut proc = SessionWindowProcessor::new(cfg);
proc.process(event(0, 0)).expect("ok");
proc.process(event(1, 1)).expect("ok");
assert_eq!(proc.pending_event_count(), 2);
proc.flush();
assert_eq!(proc.pending_event_count(), 0);
}
}