oxirs_stream/processing/
mod.rs1pub mod aggregation;
21pub mod joins;
22pub mod operators;
23pub mod pattern;
24pub mod processor;
25pub mod simd_ops;
26pub mod window;
27
28pub use aggregation::{AggregateFunction, AggregationManager, AggregationState};
30pub use joins::{
31 JoinCondition, JoinConfig, JoinStats, JoinType, JoinWindowStrategy, JoinedEvent, StreamJoiner,
32};
33pub use operators::{
34 DebounceOperator, DistinctOperator, FilterOperator, FlatMapOperator, MapOperator,
35 OperatorPipeline, OperatorStats, PartitionOperator, PipelineBuilder, PipelineStats,
36 ReduceOperator, StreamOperator, ThrottleOperator,
37};
38pub use pattern::{
39 Pattern, PatternMatch, PatternMatchStrategy, PatternMatcher, PatternMatcherStats,
40 StatisticalPatternType,
41};
42pub use processor::{EventProcessor, ProcessorConfig, ProcessorStats};
43pub use simd_ops::{
44 SimdAggregateResult, SimdBatchConfig, SimdBatchProcessor, SimdEventFilter, SimdProcessorStats,
45};
46pub use window::{EventWindow, Watermark, WindowConfig, WindowResult, WindowTrigger, WindowType};
47
48#[cfg(test)]
56mod tests {
57 use super::*;
58 use crate::StreamEvent;
59 use chrono::{Duration as ChronoDuration, Utc};
60
61 #[test]
62 fn test_window_creation() {
63 let config = WindowConfig {
64 window_type: WindowType::Tumbling {
65 duration: ChronoDuration::seconds(10),
66 },
67 aggregates: vec![AggregateFunction::Count],
68 group_by: vec![],
69 filter: None,
70 allow_lateness: None,
71 trigger: WindowTrigger::OnTime,
72 };
73
74 let window = EventWindow::new(config);
75 assert!(!window.id().is_empty());
76 assert_eq!(window.event_count(), 0);
77 }
78
79 #[test]
80 fn test_processor_creation() {
81 let processor = EventProcessor::default();
82 assert_eq!(processor.active_windows().len(), 0);
83 }
84
85 #[test]
86 fn test_aggregation_manager() {
87 let mut manager = AggregationManager::new();
88 manager.add_aggregation("count".to_string(), AggregateFunction::Count);
89
90 let event = StreamEvent::TripleAdded {
91 subject: "test".to_string(),
92 predicate: "hasValue".to_string(),
93 object: "42".to_string(),
94 graph: None,
95 metadata: crate::event::EventMetadata::default(),
96 };
97
98 assert!(manager.update(&event).is_ok());
99 let results = manager.results().unwrap();
100 assert!(results.contains_key("count"));
101 }
102
103 #[test]
104 fn test_watermark_update() {
105 let mut watermark = Watermark::new();
106 let initial_time = watermark.current();
107
108 let future_time = initial_time + ChronoDuration::seconds(10);
109 watermark.update(future_time);
110
111 assert_eq!(watermark.current(), future_time);
112 }
113
114 #[test]
115 fn test_window_trigger_conditions() {
116 let config = WindowConfig {
117 window_type: WindowType::CountBased { size: 5 },
118 aggregates: vec![AggregateFunction::Count],
119 group_by: vec![],
120 filter: None,
121 allow_lateness: None,
122 trigger: WindowTrigger::OnCount(3),
123 };
124
125 let mut window = EventWindow::new(config);
126
127 for i in 0..3 {
129 let event = StreamEvent::TripleAdded {
130 subject: format!("test_{i}"),
131 predicate: "hasValue".to_string(),
132 object: i.to_string(),
133 graph: None,
134 metadata: crate::event::EventMetadata::default(),
135 };
136 window.add_event(event).unwrap();
137 }
138
139 assert!(window.should_trigger(Utc::now()));
140 }
141}