1use super::{
10 aggregation::AggregationManager,
11 window::{EventWindow, Watermark, WindowConfig, WindowResult},
12};
13use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, VecDeque};
18use tracing::{debug, info, warn};
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ProcessorConfig {
23 pub max_windows: usize,
25 pub max_late_events: usize,
27 pub watermark_interval: ChronoDuration,
29 pub enable_stats: bool,
31 pub memory_limit: Option<usize>,
33}
34
35impl Default for ProcessorConfig {
36 fn default() -> Self {
37 Self {
38 max_windows: 1000,
39 max_late_events: 10000,
40 watermark_interval: ChronoDuration::seconds(1),
41 enable_stats: true,
42 memory_limit: Some(1024 * 1024 * 100), }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ProcessorStats {
50 pub events_processed: u64,
52 pub windows_created: u64,
54 pub windows_triggered: u64,
56 pub late_events: u64,
58 pub dropped_events: u64,
60 pub start_time: DateTime<Utc>,
62 pub last_processing_time: DateTime<Utc>,
64 pub avg_latency_ms: f64,
66 pub peak_memory_usage: usize,
68}
69
70impl Default for ProcessorStats {
71 fn default() -> Self {
72 let now = Utc::now();
73 Self {
74 events_processed: 0,
75 windows_created: 0,
76 windows_triggered: 0,
77 late_events: 0,
78 dropped_events: 0,
79 start_time: now,
80 last_processing_time: now,
81 avg_latency_ms: 0.0,
82 peak_memory_usage: 0,
83 }
84 }
85}
86
87pub struct EventProcessor {
89 windows: HashMap<String, EventWindow>,
90 watermark: DateTime<Utc>,
91 late_events: VecDeque<(StreamEvent, DateTime<Utc>)>,
92 stats: ProcessorStats,
93 config: ProcessorConfig,
94 watermark_manager: Watermark,
95 aggregation_manager: AggregationManager,
96}
97
98impl EventProcessor {
99 pub fn new(config: ProcessorConfig) -> Self {
101 Self {
102 windows: HashMap::new(),
103 watermark: Utc::now(),
104 late_events: VecDeque::new(),
105 stats: ProcessorStats::default(),
106 config,
107 watermark_manager: Watermark::default(),
108 aggregation_manager: AggregationManager::default(),
109 }
110 }
111
112 pub fn create_window(&mut self, config: WindowConfig) -> Result<String> {
114 let window = EventWindow::new(config);
115 let window_id = window.id().to_string();
116
117 if let Some(limit) = self.config.memory_limit {
119 if self.estimate_memory_usage() > limit {
120 return Err(anyhow!("Memory limit exceeded, cannot create new window"));
121 }
122 }
123
124 if self.windows.len() >= self.config.max_windows {
126 warn!("Maximum number of windows reached, removing oldest window");
127 self.remove_oldest_window();
128 }
129
130 self.windows.insert(window_id.clone(), window);
131 self.stats.windows_created += 1;
132
133 info!("Created new window: {}", window_id);
134 Ok(window_id)
135 }
136
137 pub fn process_event(&mut self, event: StreamEvent) -> Result<Vec<WindowResult>> {
139 let start_time = std::time::Instant::now();
140 let mut results = Vec::new();
141
142 self.update_watermark(&event)?;
144
145 if self.is_late_event(&event) {
147 self.handle_late_event(event)?;
148 return Ok(results);
149 }
150
151 let mut windows_to_trigger = Vec::new();
153
154 for (window_id, window) in &mut self.windows {
155 if let Err(e) = window.add_event(event.clone()) {
156 warn!("Failed to add event to window {}: {}", window_id, e);
157 continue;
158 }
159
160 if window.should_trigger(self.watermark) {
162 windows_to_trigger.push(window_id.clone());
163 }
164 }
165
166 for window_id in windows_to_trigger {
168 let result = self.trigger_window(&window_id)?;
169 results.push(result);
170 }
171
172 self.update_stats(start_time);
174
175 Ok(results)
176 }
177
178 fn trigger_window(&mut self, window_id: &str) -> Result<WindowResult> {
180 let window = self
181 .windows
182 .get(window_id)
183 .ok_or_else(|| anyhow!("Window not found: {}", window_id))?;
184
185 let aggregations = self.aggregation_manager.results()?;
187
188 let result = WindowResult {
189 window_id: window_id.to_string(),
190 window_start: window
191 .config()
192 .window_type
193 .start_time()
194 .unwrap_or(Utc::now()),
195 window_end: Utc::now(),
196 event_count: window.event_count(),
197 aggregations,
198 trigger_reason: "Window trigger condition met".to_string(),
199 processing_time: Utc::now(),
200 };
201
202 self.stats.windows_triggered += 1;
203 info!("Triggered window: {}", window_id);
204
205 Ok(result)
206 }
207
208 fn update_watermark(&mut self, event: &StreamEvent) -> Result<()> {
210 let event_time = event.timestamp();
211 self.watermark_manager.update(event_time);
212 self.watermark = self.watermark_manager.current();
213 Ok(())
214 }
215
216 fn is_late_event(&self, event: &StreamEvent) -> bool {
218 let event_time = event.timestamp();
219 let allowed_lateness = self.watermark_manager.allowed_lateness;
220 event_time < self.watermark - allowed_lateness
221 }
222
223 fn handle_late_event(&mut self, event: StreamEvent) -> Result<()> {
225 if self.late_events.len() >= self.config.max_late_events {
226 self.late_events.pop_front();
227 self.stats.dropped_events += 1;
228 }
229
230 self.late_events.push_back((event, Utc::now()));
231 self.stats.late_events += 1;
232
233 Ok(())
234 }
235
236 fn remove_oldest_window(&mut self) {
238 if let Some((oldest_id, _)) = self.windows.iter().min_by_key(|(_, window)| {
239 window
240 .config()
241 .window_type
242 .start_time()
243 .unwrap_or(Utc::now())
244 }) {
245 let oldest_id = oldest_id.clone();
246 self.windows.remove(&oldest_id);
247 debug!("Removed oldest window: {}", oldest_id);
248 }
249 }
250
251 fn estimate_memory_usage(&self) -> usize {
253 let window_size = std::mem::size_of::<EventWindow>();
255 let late_event_size = std::mem::size_of::<(StreamEvent, DateTime<Utc>)>();
256
257 self.windows.len() * window_size + self.late_events.len() * late_event_size
258 }
259
260 fn update_stats(&mut self, start_time: std::time::Instant) {
262 self.stats.events_processed += 1;
263 self.stats.last_processing_time = Utc::now();
264
265 let elapsed = start_time.elapsed();
266 let latency_ms = elapsed.as_secs_f64() * 1000.0;
267
268 let alpha = 0.1;
270 self.stats.avg_latency_ms = alpha * latency_ms + (1.0 - alpha) * self.stats.avg_latency_ms;
271
272 let current_memory = self.estimate_memory_usage();
274 if current_memory > self.stats.peak_memory_usage {
275 self.stats.peak_memory_usage = current_memory;
276 }
277 }
278
279 pub fn stats(&self) -> &ProcessorStats {
281 &self.stats
282 }
283
284 pub fn active_windows(&self) -> Vec<String> {
286 self.windows.keys().cloned().collect()
287 }
288
289 pub fn get_window(&self, window_id: &str) -> Option<&EventWindow> {
291 self.windows.get(window_id)
292 }
293
294 pub fn remove_window(&mut self, window_id: &str) -> Result<()> {
296 if self.windows.remove(window_id).is_some() {
297 info!("Removed window: {}", window_id);
298 Ok(())
299 } else {
300 Err(anyhow!("Window not found: {}", window_id))
301 }
302 }
303
304 pub fn clear_windows(&mut self) {
306 self.windows.clear();
307 info!("Cleared all windows");
308 }
309
310 pub fn current_watermark(&self) -> DateTime<Utc> {
312 self.watermark
313 }
314
315 pub fn late_events(&self) -> &VecDeque<(StreamEvent, DateTime<Utc>)> {
317 &self.late_events
318 }
319}
320
321impl Default for EventProcessor {
322 fn default() -> Self {
323 Self::new(ProcessorConfig::default())
324 }
325}
326
327trait WindowTypeExt {
329 fn start_time(&self) -> Option<DateTime<Utc>>;
330}
331
332impl WindowTypeExt for super::window::WindowType {
333 fn start_time(&self) -> Option<DateTime<Utc>> {
334 Some(Utc::now())
336 }
337}