use super::{
aggregation::AggregationManager,
window::{EventWindow, Watermark, WindowConfig, WindowResult},
};
use crate::StreamEvent;
use anyhow::{anyhow, Result};
use chrono::{DateTime, Duration as ChronoDuration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessorConfig {
pub max_windows: usize,
pub max_late_events: usize,
pub watermark_interval: ChronoDuration,
pub enable_stats: bool,
pub memory_limit: Option<usize>,
}
impl Default for ProcessorConfig {
fn default() -> Self {
Self {
max_windows: 1000,
max_late_events: 10000,
watermark_interval: ChronoDuration::seconds(1),
enable_stats: true,
memory_limit: Some(1024 * 1024 * 100), }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProcessorStats {
pub events_processed: u64,
pub windows_created: u64,
pub windows_triggered: u64,
pub late_events: u64,
pub dropped_events: u64,
pub start_time: DateTime<Utc>,
pub last_processing_time: DateTime<Utc>,
pub avg_latency_ms: f64,
pub peak_memory_usage: usize,
}
impl Default for ProcessorStats {
fn default() -> Self {
let now = Utc::now();
Self {
events_processed: 0,
windows_created: 0,
windows_triggered: 0,
late_events: 0,
dropped_events: 0,
start_time: now,
last_processing_time: now,
avg_latency_ms: 0.0,
peak_memory_usage: 0,
}
}
}
pub struct EventProcessor {
windows: HashMap<String, EventWindow>,
watermark: DateTime<Utc>,
late_events: VecDeque<(StreamEvent, DateTime<Utc>)>,
stats: ProcessorStats,
config: ProcessorConfig,
watermark_manager: Watermark,
aggregation_manager: AggregationManager,
}
impl EventProcessor {
pub fn new(config: ProcessorConfig) -> Self {
Self {
windows: HashMap::new(),
watermark: Utc::now(),
late_events: VecDeque::new(),
stats: ProcessorStats::default(),
config,
watermark_manager: Watermark::default(),
aggregation_manager: AggregationManager::default(),
}
}
pub fn create_window(&mut self, config: WindowConfig) -> Result<String> {
let window = EventWindow::new(config);
let window_id = window.id().to_string();
if let Some(limit) = self.config.memory_limit {
if self.estimate_memory_usage() > limit {
return Err(anyhow!("Memory limit exceeded, cannot create new window"));
}
}
if self.windows.len() >= self.config.max_windows {
warn!("Maximum number of windows reached, removing oldest window");
self.remove_oldest_window();
}
self.windows.insert(window_id.clone(), window);
self.stats.windows_created += 1;
info!("Created new window: {}", window_id);
Ok(window_id)
}
pub fn process_event(&mut self, event: StreamEvent) -> Result<Vec<WindowResult>> {
let start_time = std::time::Instant::now();
let mut results = Vec::new();
self.update_watermark(&event)?;
if self.is_late_event(&event) {
self.handle_late_event(event)?;
return Ok(results);
}
let mut windows_to_trigger = Vec::new();
for (window_id, window) in &mut self.windows {
if let Err(e) = window.add_event(event.clone()) {
warn!("Failed to add event to window {}: {}", window_id, e);
continue;
}
if window.should_trigger(self.watermark) {
windows_to_trigger.push(window_id.clone());
}
}
for window_id in windows_to_trigger {
let result = self.trigger_window(&window_id)?;
results.push(result);
}
self.update_stats(start_time);
Ok(results)
}
fn trigger_window(&mut self, window_id: &str) -> Result<WindowResult> {
let window = self
.windows
.get(window_id)
.ok_or_else(|| anyhow!("Window not found: {}", window_id))?;
let aggregations = self.aggregation_manager.results()?;
let result = WindowResult {
window_id: window_id.to_string(),
window_start: window
.config()
.window_type
.start_time()
.unwrap_or(Utc::now()),
window_end: Utc::now(),
event_count: window.event_count(),
aggregations,
trigger_reason: "Window trigger condition met".to_string(),
processing_time: Utc::now(),
};
self.stats.windows_triggered += 1;
info!("Triggered window: {}", window_id);
Ok(result)
}
fn update_watermark(&mut self, event: &StreamEvent) -> Result<()> {
let event_time = event.timestamp();
self.watermark_manager.update(event_time);
self.watermark = self.watermark_manager.current();
Ok(())
}
fn is_late_event(&self, event: &StreamEvent) -> bool {
let event_time = event.timestamp();
let allowed_lateness = self.watermark_manager.allowed_lateness;
event_time < self.watermark - allowed_lateness
}
fn handle_late_event(&mut self, event: StreamEvent) -> Result<()> {
if self.late_events.len() >= self.config.max_late_events {
self.late_events.pop_front();
self.stats.dropped_events += 1;
}
self.late_events.push_back((event, Utc::now()));
self.stats.late_events += 1;
Ok(())
}
fn remove_oldest_window(&mut self) {
if let Some((oldest_id, _)) = self.windows.iter().min_by_key(|(_, window)| {
window
.config()
.window_type
.start_time()
.unwrap_or(Utc::now())
}) {
let oldest_id = oldest_id.clone();
self.windows.remove(&oldest_id);
debug!("Removed oldest window: {}", oldest_id);
}
}
fn estimate_memory_usage(&self) -> usize {
let window_size = std::mem::size_of::<EventWindow>();
let late_event_size = std::mem::size_of::<(StreamEvent, DateTime<Utc>)>();
self.windows.len() * window_size + self.late_events.len() * late_event_size
}
fn update_stats(&mut self, start_time: std::time::Instant) {
self.stats.events_processed += 1;
self.stats.last_processing_time = Utc::now();
let elapsed = start_time.elapsed();
let latency_ms = elapsed.as_secs_f64() * 1000.0;
let alpha = 0.1;
self.stats.avg_latency_ms = alpha * latency_ms + (1.0 - alpha) * self.stats.avg_latency_ms;
let current_memory = self.estimate_memory_usage();
if current_memory > self.stats.peak_memory_usage {
self.stats.peak_memory_usage = current_memory;
}
}
pub fn stats(&self) -> &ProcessorStats {
&self.stats
}
pub fn active_windows(&self) -> Vec<String> {
self.windows.keys().cloned().collect()
}
pub fn get_window(&self, window_id: &str) -> Option<&EventWindow> {
self.windows.get(window_id)
}
pub fn remove_window(&mut self, window_id: &str) -> Result<()> {
if self.windows.remove(window_id).is_some() {
info!("Removed window: {}", window_id);
Ok(())
} else {
Err(anyhow!("Window not found: {}", window_id))
}
}
pub fn clear_windows(&mut self) {
self.windows.clear();
info!("Cleared all windows");
}
pub fn current_watermark(&self) -> DateTime<Utc> {
self.watermark
}
pub fn late_events(&self) -> &VecDeque<(StreamEvent, DateTime<Utc>)> {
&self.late_events
}
}
impl Default for EventProcessor {
fn default() -> Self {
Self::new(ProcessorConfig::default())
}
}
trait WindowTypeExt {
fn start_time(&self) -> Option<DateTime<Utc>>;
}
impl WindowTypeExt for super::window::WindowType {
fn start_time(&self) -> Option<DateTime<Utc>> {
Some(Utc::now())
}
}