oxirs_stream/processing/
processor.rs

1//! Main event processor implementation
2//!
3//! This module provides the core event processing functionality including:
4//! - Event processor management
5//! - Window lifecycle management
6//! - Watermark handling
7//! - Late event processing
8
9use super::{
10    aggregation::AggregationManager,
11    window::{EventWindow, Watermark, WindowConfig, WindowResult},
12};
13use crate::StreamEvent;
14use anyhow::{anyhow, Result};
15use chrono::{DateTime, Duration as ChronoDuration, Utc};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, VecDeque};
18use tracing::{debug, info, warn};
19
20/// Processor configuration
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct ProcessorConfig {
23    /// Maximum number of windows to maintain
24    pub max_windows: usize,
25    /// Maximum late event buffer size
26    pub max_late_events: usize,
27    /// Watermark advancement interval
28    pub watermark_interval: ChronoDuration,
29    /// Enable statistics collection
30    pub enable_stats: bool,
31    /// Memory limit for event storage (bytes)
32    pub memory_limit: Option<usize>,
33}
34
35impl Default for ProcessorConfig {
36    fn default() -> Self {
37        Self {
38            max_windows: 1000,
39            max_late_events: 10000,
40            watermark_interval: ChronoDuration::seconds(1),
41            enable_stats: true,
42            memory_limit: Some(1024 * 1024 * 100), // 100MB
43        }
44    }
45}
46
47/// Processing statistics
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ProcessorStats {
50    /// Total events processed
51    pub events_processed: u64,
52    /// Total windows created
53    pub windows_created: u64,
54    /// Total windows triggered
55    pub windows_triggered: u64,
56    /// Late events received
57    pub late_events: u64,
58    /// Dropped events (due to memory limits)
59    pub dropped_events: u64,
60    /// Processing start time
61    pub start_time: DateTime<Utc>,
62    /// Last processing time
63    pub last_processing_time: DateTime<Utc>,
64    /// Average processing latency (milliseconds)
65    pub avg_latency_ms: f64,
66    /// Peak memory usage (bytes)
67    pub peak_memory_usage: usize,
68}
69
70impl Default for ProcessorStats {
71    fn default() -> Self {
72        let now = Utc::now();
73        Self {
74            events_processed: 0,
75            windows_created: 0,
76            windows_triggered: 0,
77            late_events: 0,
78            dropped_events: 0,
79            start_time: now,
80            last_processing_time: now,
81            avg_latency_ms: 0.0,
82            peak_memory_usage: 0,
83        }
84    }
85}
86
87/// Advanced event processor with windowing and aggregations
88pub struct EventProcessor {
89    windows: HashMap<String, EventWindow>,
90    watermark: DateTime<Utc>,
91    late_events: VecDeque<(StreamEvent, DateTime<Utc>)>,
92    stats: ProcessorStats,
93    config: ProcessorConfig,
94    watermark_manager: Watermark,
95    aggregation_manager: AggregationManager,
96}
97
98impl EventProcessor {
99    /// Create a new event processor
100    pub fn new(config: ProcessorConfig) -> Self {
101        Self {
102            windows: HashMap::new(),
103            watermark: Utc::now(),
104            late_events: VecDeque::new(),
105            stats: ProcessorStats::default(),
106            config,
107            watermark_manager: Watermark::default(),
108            aggregation_manager: AggregationManager::default(),
109        }
110    }
111
112    /// Create a new window with the given configuration
113    pub fn create_window(&mut self, config: WindowConfig) -> Result<String> {
114        let window = EventWindow::new(config);
115        let window_id = window.id().to_string();
116
117        // Check memory limits
118        if let Some(limit) = self.config.memory_limit {
119            if self.estimate_memory_usage() > limit {
120                return Err(anyhow!("Memory limit exceeded, cannot create new window"));
121            }
122        }
123
124        // Check window count limits
125        if self.windows.len() >= self.config.max_windows {
126            warn!("Maximum number of windows reached, removing oldest window");
127            self.remove_oldest_window();
128        }
129
130        self.windows.insert(window_id.clone(), window);
131        self.stats.windows_created += 1;
132
133        info!("Created new window: {}", window_id);
134        Ok(window_id)
135    }
136
137    /// Process an event through all windows
138    pub fn process_event(&mut self, event: StreamEvent) -> Result<Vec<WindowResult>> {
139        let start_time = std::time::Instant::now();
140        let mut results = Vec::new();
141
142        // Update watermark
143        self.update_watermark(&event)?;
144
145        // Check if event is late
146        if self.is_late_event(&event) {
147            self.handle_late_event(event)?;
148            return Ok(results);
149        }
150
151        // Process event through all windows
152        let mut windows_to_trigger = Vec::new();
153
154        for (window_id, window) in &mut self.windows {
155            if let Err(e) = window.add_event(event.clone()) {
156                warn!("Failed to add event to window {}: {}", window_id, e);
157                continue;
158            }
159
160            // Check if window should trigger
161            if window.should_trigger(self.watermark) {
162                windows_to_trigger.push(window_id.clone());
163            }
164        }
165
166        // Trigger windows that need to be triggered
167        for window_id in windows_to_trigger {
168            let result = self.trigger_window(&window_id)?;
169            results.push(result);
170        }
171
172        // Update statistics
173        self.update_stats(start_time);
174
175        Ok(results)
176    }
177
178    /// Trigger a window and produce results
179    fn trigger_window(&mut self, window_id: &str) -> Result<WindowResult> {
180        let window = self
181            .windows
182            .get(window_id)
183            .ok_or_else(|| anyhow!("Window not found: {}", window_id))?;
184
185        // Calculate aggregations
186        let aggregations = self.aggregation_manager.results()?;
187
188        let result = WindowResult {
189            window_id: window_id.to_string(),
190            window_start: window
191                .config()
192                .window_type
193                .start_time()
194                .unwrap_or(Utc::now()),
195            window_end: Utc::now(),
196            event_count: window.event_count(),
197            aggregations,
198            trigger_reason: "Window trigger condition met".to_string(),
199            processing_time: Utc::now(),
200        };
201
202        self.stats.windows_triggered += 1;
203        info!("Triggered window: {}", window_id);
204
205        Ok(result)
206    }
207
208    /// Update watermark based on event
209    fn update_watermark(&mut self, event: &StreamEvent) -> Result<()> {
210        let event_time = event.timestamp();
211        self.watermark_manager.update(event_time);
212        self.watermark = self.watermark_manager.current();
213        Ok(())
214    }
215
216    /// Check if event is late
217    fn is_late_event(&self, event: &StreamEvent) -> bool {
218        let event_time = event.timestamp();
219        let allowed_lateness = self.watermark_manager.allowed_lateness;
220        event_time < self.watermark - allowed_lateness
221    }
222
223    /// Handle late events
224    fn handle_late_event(&mut self, event: StreamEvent) -> Result<()> {
225        if self.late_events.len() >= self.config.max_late_events {
226            self.late_events.pop_front();
227            self.stats.dropped_events += 1;
228        }
229
230        self.late_events.push_back((event, Utc::now()));
231        self.stats.late_events += 1;
232
233        Ok(())
234    }
235
236    /// Remove oldest window to make room for new ones
237    fn remove_oldest_window(&mut self) {
238        if let Some((oldest_id, _)) = self.windows.iter().min_by_key(|(_, window)| {
239            window
240                .config()
241                .window_type
242                .start_time()
243                .unwrap_or(Utc::now())
244        }) {
245            let oldest_id = oldest_id.clone();
246            self.windows.remove(&oldest_id);
247            debug!("Removed oldest window: {}", oldest_id);
248        }
249    }
250
251    /// Estimate current memory usage
252    fn estimate_memory_usage(&self) -> usize {
253        // Rough estimation of memory usage
254        let window_size = std::mem::size_of::<EventWindow>();
255        let late_event_size = std::mem::size_of::<(StreamEvent, DateTime<Utc>)>();
256
257        self.windows.len() * window_size + self.late_events.len() * late_event_size
258    }
259
260    /// Update processing statistics
261    fn update_stats(&mut self, start_time: std::time::Instant) {
262        self.stats.events_processed += 1;
263        self.stats.last_processing_time = Utc::now();
264
265        let elapsed = start_time.elapsed();
266        let latency_ms = elapsed.as_secs_f64() * 1000.0;
267
268        // Update average latency (exponential moving average)
269        let alpha = 0.1;
270        self.stats.avg_latency_ms = alpha * latency_ms + (1.0 - alpha) * self.stats.avg_latency_ms;
271
272        // Update peak memory usage
273        let current_memory = self.estimate_memory_usage();
274        if current_memory > self.stats.peak_memory_usage {
275            self.stats.peak_memory_usage = current_memory;
276        }
277    }
278
279    /// Get processing statistics
280    pub fn stats(&self) -> &ProcessorStats {
281        &self.stats
282    }
283
284    /// Get active windows
285    pub fn active_windows(&self) -> Vec<String> {
286        self.windows.keys().cloned().collect()
287    }
288
289    /// Get window by ID
290    pub fn get_window(&self, window_id: &str) -> Option<&EventWindow> {
291        self.windows.get(window_id)
292    }
293
294    /// Remove window by ID
295    pub fn remove_window(&mut self, window_id: &str) -> Result<()> {
296        if self.windows.remove(window_id).is_some() {
297            info!("Removed window: {}", window_id);
298            Ok(())
299        } else {
300            Err(anyhow!("Window not found: {}", window_id))
301        }
302    }
303
304    /// Clear all windows
305    pub fn clear_windows(&mut self) {
306        self.windows.clear();
307        info!("Cleared all windows");
308    }
309
310    /// Get current watermark
311    pub fn current_watermark(&self) -> DateTime<Utc> {
312        self.watermark
313    }
314
315    /// Get late events
316    pub fn late_events(&self) -> &VecDeque<(StreamEvent, DateTime<Utc>)> {
317        &self.late_events
318    }
319}
320
321impl Default for EventProcessor {
322    fn default() -> Self {
323        Self::new(ProcessorConfig::default())
324    }
325}
326
327// Helper trait for window types
328trait WindowTypeExt {
329    fn start_time(&self) -> Option<DateTime<Utc>>;
330}
331
332impl WindowTypeExt for super::window::WindowType {
333    fn start_time(&self) -> Option<DateTime<Utc>> {
334        // This would need to be implemented based on the actual window type
335        Some(Utc::now())
336    }
337}