oxirs_stream/processing/
mod.rs

1//! Advanced Event Processing Module
2//!
3//! This module provides sophisticated event processing capabilities including:
4//! - Time-based and count-based windowing
5//! - Streaming aggregations (count, sum, average, min, max)
6//! - Complex event pattern detection
7//! - Event correlation and causality tracking
8//! - Real-time analytics and metrics computation
9//!
10//! The module is organized into several sub-modules:
11//! - `window`: Window management and windowing logic
12//! - `aggregation`: Aggregation functions and state management
13//! - `processor`: Main event processing engine
14//! - `complex`: Complex event processing patterns
15//! - `analytics`: Real-time analytics and metrics
16//! - `anomaly`: Anomaly detection capabilities
17//! - `temporal`: Advanced temporal processing
18//! - `causality`: Causality analysis and correlation
19
20pub mod aggregation;
21pub mod processor;
22pub mod window;
23
24// Re-export commonly used types
25pub use aggregation::{AggregateFunction, AggregationManager, AggregationState};
26pub use processor::{EventProcessor, ProcessorConfig, ProcessorStats};
27pub use window::{EventWindow, Watermark, WindowConfig, WindowResult, WindowTrigger, WindowType};
28
29// TODO: The following modules would be created in subsequent refactoring steps:
30// pub mod complex;      // Complex event processing
31// pub mod analytics;    // Real-time analytics
32// pub mod anomaly;      // Anomaly detection
33// pub mod temporal;     // Temporal processing
34// pub mod causality;    // Causality analysis
35
36#[cfg(test)]
37mod tests {
38    use super::*;
39    use crate::StreamEvent;
40    use chrono::{Duration as ChronoDuration, Utc};
41
42    #[test]
43    fn test_window_creation() {
44        let config = WindowConfig {
45            window_type: WindowType::Tumbling {
46                duration: ChronoDuration::seconds(10),
47            },
48            aggregates: vec![AggregateFunction::Count],
49            group_by: vec![],
50            filter: None,
51            allow_lateness: None,
52            trigger: WindowTrigger::OnTime,
53        };
54
55        let window = EventWindow::new(config);
56        assert!(!window.id().is_empty());
57        assert_eq!(window.event_count(), 0);
58    }
59
60    #[test]
61    fn test_processor_creation() {
62        let processor = EventProcessor::default();
63        assert_eq!(processor.active_windows().len(), 0);
64    }
65
66    #[test]
67    fn test_aggregation_manager() {
68        let mut manager = AggregationManager::new();
69        manager.add_aggregation("count".to_string(), AggregateFunction::Count);
70
71        let event = StreamEvent::TripleAdded {
72            subject: "test".to_string(),
73            predicate: "hasValue".to_string(),
74            object: "42".to_string(),
75            graph: None,
76            metadata: crate::event::EventMetadata::default(),
77        };
78
79        assert!(manager.update(&event).is_ok());
80        let results = manager.results().unwrap();
81        assert!(results.contains_key("count"));
82    }
83
84    #[test]
85    fn test_watermark_update() {
86        let mut watermark = Watermark::new();
87        let initial_time = watermark.current();
88
89        let future_time = initial_time + ChronoDuration::seconds(10);
90        watermark.update(future_time);
91
92        assert_eq!(watermark.current(), future_time);
93    }
94
95    #[test]
96    fn test_window_trigger_conditions() {
97        let config = WindowConfig {
98            window_type: WindowType::CountBased { size: 5 },
99            aggregates: vec![AggregateFunction::Count],
100            group_by: vec![],
101            filter: None,
102            allow_lateness: None,
103            trigger: WindowTrigger::OnCount(3),
104        };
105
106        let mut window = EventWindow::new(config);
107
108        // Add events
109        for i in 0..3 {
110            let event = StreamEvent::TripleAdded {
111                subject: format!("test_{i}"),
112                predicate: "hasValue".to_string(),
113                object: i.to_string(),
114                graph: None,
115                metadata: crate::event::EventMetadata::default(),
116            };
117            window.add_event(event).unwrap();
118        }
119
120        assert!(window.should_trigger(Utc::now()));
121    }
122}