use crate::streaming::event::StreamEvent;
use std::collections::VecDeque;
use std::time::Duration;
#[derive(Debug, Clone, PartialEq)]
pub enum WindowType {
Sliding,
Tumbling,
Session { timeout: Duration },
}
#[derive(Debug)]
pub struct TimeWindow {
pub window_type: WindowType,
pub duration: Duration,
events: VecDeque<StreamEvent>,
pub start_time: u64,
pub end_time: u64,
max_events: usize,
}
impl TimeWindow {
pub fn new(
window_type: WindowType,
duration: Duration,
start_time: u64,
max_events: usize,
) -> Self {
let end_time = start_time + duration.as_millis() as u64;
Self {
window_type,
duration,
events: VecDeque::new(),
start_time,
end_time,
max_events,
}
}
pub fn add_event(&mut self, event: StreamEvent) -> bool {
if self.contains_timestamp(event.metadata.timestamp) {
self.events.push_back(event);
while self.events.len() > self.max_events {
self.events.pop_front();
}
true
} else {
false
}
}
pub fn contains_timestamp(&self, timestamp: u64) -> bool {
timestamp >= self.start_time && timestamp < self.end_time
}
pub fn events(&self) -> &VecDeque<StreamEvent> {
&self.events
}
pub fn count(&self) -> usize {
self.events.len()
}
pub fn is_expired(&self, current_time: u64) -> bool {
current_time >= self.end_time
}
pub fn duration_ms(&self) -> u64 {
self.duration.as_millis() as u64
}
pub fn clear(&mut self) {
self.events.clear();
}
pub fn events_by_type(&self, event_type: &str) -> Vec<&StreamEvent> {
self.events
.iter()
.filter(|e| e.event_type == event_type)
.collect()
}
pub fn sum(&self, field: &str) -> f64 {
self.events
.iter()
.filter_map(|e| e.get_numeric(field))
.sum()
}
pub fn average(&self, field: &str) -> Option<f64> {
let values: Vec<f64> = self
.events
.iter()
.filter_map(|e| e.get_numeric(field))
.collect();
if values.is_empty() {
None
} else {
Some(values.iter().sum::<f64>() / values.len() as f64)
}
}
pub fn min(&self, field: &str) -> Option<f64> {
self.events
.iter()
.filter_map(|e| e.get_numeric(field))
.fold(None, |acc, x| match acc {
None => Some(x),
Some(min) => Some(min.min(x)),
})
}
pub fn max(&self, field: &str) -> Option<f64> {
self.events
.iter()
.filter_map(|e| e.get_numeric(field))
.fold(None, |acc, x| match acc {
None => Some(x),
Some(max) => Some(max.max(x)),
})
}
pub fn latest_timestamp(&self) -> Option<u64> {
self.events.iter().map(|e| e.metadata.timestamp).max()
}
pub fn events_in_range(&self, start: u64, end: u64) -> Vec<&StreamEvent> {
self.events
.iter()
.filter(|e| e.metadata.timestamp >= start && e.metadata.timestamp < end)
.collect()
}
}
#[derive(Debug)]
pub struct WindowManager {
windows: Vec<TimeWindow>,
window_type: WindowType,
duration: Duration,
max_events_per_window: usize,
max_windows: usize,
}
impl WindowManager {
pub fn new(
window_type: WindowType,
duration: Duration,
max_events_per_window: usize,
max_windows: usize,
) -> Self {
Self {
windows: Vec::new(),
window_type,
duration,
max_events_per_window,
max_windows,
}
}
pub fn process_event(&mut self, event: StreamEvent) {
let event_time = event.metadata.timestamp;
let mut added = false;
for window in &mut self.windows {
if window.add_event(event.clone()) {
added = true;
break;
}
}
if !added {
let window_start = self.calculate_window_start(event_time);
let mut new_window = TimeWindow::new(
self.window_type.clone(),
self.duration,
window_start,
self.max_events_per_window,
);
new_window.add_event(event);
self.windows.push(new_window);
}
self.cleanup_expired_windows(event_time);
while self.windows.len() > self.max_windows {
self.windows.remove(0);
}
self.windows.sort_by_key(|w| w.start_time);
}
fn calculate_window_start(&self, event_time: u64) -> u64 {
match self.window_type {
WindowType::Tumbling => {
let window_ms = self.duration.as_millis() as u64;
(event_time / window_ms) * window_ms
}
WindowType::Sliding | WindowType::Session { .. } => event_time,
}
}
fn cleanup_expired_windows(&mut self, current_time: u64) {
self.windows
.retain(|window| !window.is_expired(current_time));
}
pub fn active_windows(&self) -> &[TimeWindow] {
&self.windows
}
pub fn latest_window(&self) -> Option<&TimeWindow> {
self.windows.last()
}
pub fn total_event_count(&self) -> usize {
self.windows.iter().map(|w| w.count()).sum()
}
pub fn windows_with_event_type(&self, event_type: &str) -> Vec<&TimeWindow> {
self.windows
.iter()
.filter(|w| w.events().iter().any(|e| e.event_type == event_type))
.collect()
}
pub fn aggregate_across_windows<F>(&self, aggregator: F) -> f64
where
F: Fn(&TimeWindow) -> f64,
{
self.windows.iter().map(aggregator).sum()
}
pub fn get_statistics(&self) -> WindowStatistics {
WindowStatistics {
total_windows: self.windows.len(),
total_events: self.total_event_count(),
oldest_window_start: self.windows.first().map(|w| w.start_time),
newest_window_start: self.windows.last().map(|w| w.start_time),
average_events_per_window: if self.windows.is_empty() {
0.0
} else {
self.total_event_count() as f64 / self.windows.len() as f64
},
}
}
}
#[derive(Debug, Clone)]
pub struct WindowStatistics {
pub total_windows: usize,
pub total_events: usize,
pub oldest_window_start: Option<u64>,
pub newest_window_start: Option<u64>,
pub average_events_per_window: f64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::Value;
use std::collections::HashMap;
#[test]
fn test_time_window_creation() {
let window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
assert_eq!(window.start_time, 1000);
assert_eq!(window.end_time, 61000);
assert_eq!(window.count(), 0);
}
#[test]
fn test_window_event_addition() {
let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
let mut data = HashMap::new();
data.insert("value".to_string(), Value::Number(10.0));
let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
assert!(window.add_event(event));
assert_eq!(window.count(), 1);
}
#[test]
fn test_window_aggregations() {
let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
for i in 0..5 {
let mut data = HashMap::new();
data.insert("value".to_string(), Value::Number(i as f64));
let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000 + i);
window.add_event(event);
}
assert_eq!(window.sum("value"), 10.0); assert_eq!(window.average("value"), Some(2.0));
assert_eq!(window.min("value"), Some(0.0));
assert_eq!(window.max("value"), Some(4.0));
}
#[test]
fn test_window_manager() {
let mut manager = WindowManager::new(WindowType::Sliding, Duration::from_secs(60), 100, 10);
let mut data = HashMap::new();
data.insert("value".to_string(), Value::Number(1.0));
let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
manager.process_event(event);
assert_eq!(manager.active_windows().len(), 1);
assert_eq!(manager.total_event_count(), 1);
}
}