Skip to main content

oxirs_stream/processing/
mod.rs

1//! Advanced Event Processing Module
2//!
3//! This module provides sophisticated event processing capabilities including:
4//! - Time-based and count-based windowing
5//! - Streaming aggregations (count, sum, average, min, max)
6//! - Complex event pattern detection
7//! - Event correlation and causality tracking
8//! - Real-time analytics and metrics computation
9//!
10//! The module is organized into several sub-modules:
11//! - `window`: Window management and windowing logic
12//! - `aggregation`: Aggregation functions and state management
13//! - `processor`: Main event processing engine
14//! - `complex`: Complex event processing patterns
15//! - `analytics`: Real-time analytics and metrics
16//! - `anomaly`: Anomaly detection capabilities
17//! - `temporal`: Advanced temporal processing
18//! - `causality`: Causality analysis and correlation
19
20pub mod aggregation;
21pub mod joins;
22pub mod operators;
23pub mod pattern;
24pub mod processor;
25pub mod simd_ops;
26pub mod window;
27
28// Re-export commonly used types
29pub use aggregation::{AggregateFunction, AggregationManager, AggregationState};
30pub use joins::{
31    JoinCondition, JoinConfig, JoinStats, JoinType, JoinWindowStrategy, JoinedEvent, StreamJoiner,
32};
33pub use operators::{
34    DebounceOperator, DistinctOperator, FilterOperator, FlatMapOperator, MapOperator,
35    OperatorPipeline, OperatorStats, PartitionOperator, PipelineBuilder, PipelineStats,
36    ReduceOperator, StreamOperator, ThrottleOperator,
37};
38pub use pattern::{
39    Pattern, PatternMatch, PatternMatchStrategy, PatternMatcher, PatternMatcherStats,
40    StatisticalPatternType,
41};
42pub use processor::{EventProcessor, ProcessorConfig, ProcessorStats};
43pub use simd_ops::{
44    SimdAggregateResult, SimdBatchConfig, SimdBatchProcessor, SimdEventFilter, SimdProcessorStats,
45};
46pub use window::{EventWindow, Watermark, WindowConfig, WindowResult, WindowTrigger, WindowType};
47
48// TODO: The following modules would be created in subsequent refactoring steps:
49// pub mod complex;      // Complex event processing
50// pub mod analytics;    // Real-time analytics
51// pub mod anomaly;      // Anomaly detection
52// pub mod temporal;     // Temporal processing
53// pub mod causality;    // Causality analysis
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::StreamEvent;
59    use chrono::{Duration as ChronoDuration, Utc};
60
61    #[test]
62    fn test_window_creation() {
63        let config = WindowConfig {
64            window_type: WindowType::Tumbling {
65                duration: ChronoDuration::seconds(10),
66            },
67            aggregates: vec![AggregateFunction::Count],
68            group_by: vec![],
69            filter: None,
70            allow_lateness: None,
71            trigger: WindowTrigger::OnTime,
72        };
73
74        let window = EventWindow::new(config);
75        assert!(!window.id().is_empty());
76        assert_eq!(window.event_count(), 0);
77    }
78
79    #[test]
80    fn test_processor_creation() {
81        let processor = EventProcessor::default();
82        assert_eq!(processor.active_windows().len(), 0);
83    }
84
85    #[test]
86    fn test_aggregation_manager() {
87        let mut manager = AggregationManager::new();
88        manager.add_aggregation("count".to_string(), AggregateFunction::Count);
89
90        let event = StreamEvent::TripleAdded {
91            subject: "test".to_string(),
92            predicate: "hasValue".to_string(),
93            object: "42".to_string(),
94            graph: None,
95            metadata: crate::event::EventMetadata::default(),
96        };
97
98        assert!(manager.update(&event).is_ok());
99        let results = manager.results().unwrap();
100        assert!(results.contains_key("count"));
101    }
102
103    #[test]
104    fn test_watermark_update() {
105        let mut watermark = Watermark::new();
106        let initial_time = watermark.current();
107
108        let future_time = initial_time + ChronoDuration::seconds(10);
109        watermark.update(future_time);
110
111        assert_eq!(watermark.current(), future_time);
112    }
113
114    #[test]
115    fn test_window_trigger_conditions() {
116        let config = WindowConfig {
117            window_type: WindowType::CountBased { size: 5 },
118            aggregates: vec![AggregateFunction::Count],
119            group_by: vec![],
120            filter: None,
121            allow_lateness: None,
122            trigger: WindowTrigger::OnCount(3),
123        };
124
125        let mut window = EventWindow::new(config);
126
127        // Add events
128        for i in 0..3 {
129            let event = StreamEvent::TripleAdded {
130                subject: format!("test_{i}"),
131                predicate: "hasValue".to_string(),
132                object: i.to_string(),
133                graph: None,
134                metadata: crate::event::EventMetadata::default(),
135            };
136            window.add_event(event).unwrap();
137        }
138
139        assert!(window.should_trigger(Utc::now()));
140    }
141}