oxirs_stream/processing/
mod.rs1pub mod aggregation;
21pub mod processor;
22pub mod window;
23
24pub use aggregation::{AggregateFunction, AggregationManager, AggregationState};
26pub use processor::{EventProcessor, ProcessorConfig, ProcessorStats};
27pub use window::{EventWindow, Watermark, WindowConfig, WindowResult, WindowTrigger, WindowType};
28
29#[cfg(test)]
37mod tests {
38 use super::*;
39 use crate::StreamEvent;
40 use chrono::{Duration as ChronoDuration, Utc};
41
42 #[test]
43 fn test_window_creation() {
44 let config = WindowConfig {
45 window_type: WindowType::Tumbling {
46 duration: ChronoDuration::seconds(10),
47 },
48 aggregates: vec![AggregateFunction::Count],
49 group_by: vec![],
50 filter: None,
51 allow_lateness: None,
52 trigger: WindowTrigger::OnTime,
53 };
54
55 let window = EventWindow::new(config);
56 assert!(!window.id().is_empty());
57 assert_eq!(window.event_count(), 0);
58 }
59
60 #[test]
61 fn test_processor_creation() {
62 let processor = EventProcessor::default();
63 assert_eq!(processor.active_windows().len(), 0);
64 }
65
66 #[test]
67 fn test_aggregation_manager() {
68 let mut manager = AggregationManager::new();
69 manager.add_aggregation("count".to_string(), AggregateFunction::Count);
70
71 let event = StreamEvent::TripleAdded {
72 subject: "test".to_string(),
73 predicate: "hasValue".to_string(),
74 object: "42".to_string(),
75 graph: None,
76 metadata: crate::event::EventMetadata::default(),
77 };
78
79 assert!(manager.update(&event).is_ok());
80 let results = manager.results().unwrap();
81 assert!(results.contains_key("count"));
82 }
83
84 #[test]
85 fn test_watermark_update() {
86 let mut watermark = Watermark::new();
87 let initial_time = watermark.current();
88
89 let future_time = initial_time + ChronoDuration::seconds(10);
90 watermark.update(future_time);
91
92 assert_eq!(watermark.current(), future_time);
93 }
94
95 #[test]
96 fn test_window_trigger_conditions() {
97 let config = WindowConfig {
98 window_type: WindowType::CountBased { size: 5 },
99 aggregates: vec![AggregateFunction::Count],
100 group_by: vec![],
101 filter: None,
102 allow_lateness: None,
103 trigger: WindowTrigger::OnCount(3),
104 };
105
106 let mut window = EventWindow::new(config);
107
108 for i in 0..3 {
110 let event = StreamEvent::TripleAdded {
111 subject: format!("test_{i}"),
112 predicate: "hasValue".to_string(),
113 object: i.to_string(),
114 graph: None,
115 metadata: crate::event::EventMetadata::default(),
116 };
117 window.add_event(event).unwrap();
118 }
119
120 assert!(window.should_trigger(Utc::now()));
121 }
122}