1use crate::event::{Event, EventData, EventFilter};
9use std::collections::VecDeque;
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12
13pub struct EventStream {
19 events: VecDeque<Event>,
20 max_size: usize,
21}
22
23impl EventStream {
24 pub fn new() -> Self {
26 Self {
27 events: VecDeque::new(),
28 max_size: 10_000,
29 }
30 }
31
32 pub fn with_max_size(max_size: usize) -> Self {
34 Self {
35 events: VecDeque::new(),
36 max_size,
37 }
38 }
39
40 pub fn push(&mut self, event: Event) {
42 if self.events.len() >= self.max_size {
43 self.events.pop_front();
44 }
45 self.events.push_back(event);
46 }
47
48 pub fn pop(&mut self) -> Option<Event> {
50 self.events.pop_front()
51 }
52
53 pub fn peek(&self) -> Option<&Event> {
55 self.events.front()
56 }
57
58 pub fn len(&self) -> usize {
60 self.events.len()
61 }
62
63 pub fn is_empty(&self) -> bool {
65 self.events.is_empty()
66 }
67
68 pub fn clear(&mut self) {
70 self.events.clear();
71 }
72
73 pub fn filter(&self, filter: &EventFilter) -> Vec<&Event> {
75 self.events.iter().filter(|e| e.matches(filter)).collect()
76 }
77
78 pub fn take(&mut self, filter: &EventFilter) -> Vec<Event> {
80 let matching: Vec<Event> = self
81 .events
82 .iter()
83 .filter(|e| e.matches(filter))
84 .cloned()
85 .collect();
86
87 self.events.retain(|e| !e.matches(filter));
88
89 matching
90 }
91
92 pub fn as_slice(&self) -> impl Iterator<Item = &Event> {
94 self.events.iter()
95 }
96}
97
98impl Default for EventStream {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104pub struct StreamProcessor {
110 pipeline: Vec<Box<dyn ProcessingStep + Send + Sync>>,
111}
112
113impl StreamProcessor {
114 pub fn new() -> Self {
116 Self {
117 pipeline: Vec::new(),
118 }
119 }
120
121 pub fn add_step(mut self, step: impl ProcessingStep + Send + Sync + 'static) -> Self {
123 self.pipeline.push(Box::new(step));
124 self
125 }
126
127 pub fn filter(self, filter: EventFilter) -> Self {
129 self.add_step(FilterStep { filter })
130 }
131
132 pub fn map(self, mapper: impl Fn(Event) -> Event + Send + Sync + 'static) -> Self {
134 self.add_step(MapStep {
135 mapper: Arc::new(mapper),
136 })
137 }
138
139 pub fn transform_data(
141 self,
142 transformer: impl Fn(EventData) -> EventData + Send + Sync + 'static,
143 ) -> Self {
144 self.add_step(TransformDataStep {
145 transformer: Arc::new(transformer),
146 })
147 }
148
149 pub fn process(&self, event: Event) -> Option<Event> {
151 let mut current = Some(event);
152
153 for step in &self.pipeline {
154 if let Some(e) = current {
155 current = step.process(e);
156 } else {
157 break;
158 }
159 }
160
161 current
162 }
163
164 pub fn process_batch(&self, events: Vec<Event>) -> Vec<Event> {
166 events.into_iter().filter_map(|e| self.process(e)).collect()
167 }
168}
169
170impl Default for StreamProcessor {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub trait ProcessingStep {
182 fn process(&self, event: Event) -> Option<Event>;
183}
184
185struct FilterStep {
187 filter: EventFilter,
188}
189
190impl ProcessingStep for FilterStep {
191 fn process(&self, event: Event) -> Option<Event> {
192 if event.matches(&self.filter) {
193 Some(event)
194 } else {
195 None
196 }
197 }
198}
199
200struct MapStep {
202 mapper: Arc<dyn Fn(Event) -> Event + Send + Sync>,
203}
204
205impl ProcessingStep for MapStep {
206 fn process(&self, event: Event) -> Option<Event> {
207 Some((self.mapper)(event))
208 }
209}
210
211struct TransformDataStep {
213 transformer: Arc<dyn Fn(EventData) -> EventData + Send + Sync>,
214}
215
216impl ProcessingStep for TransformDataStep {
217 fn process(&self, mut event: Event) -> Option<Event> {
218 event.data = (self.transformer)(event.data);
219 Some(event)
220 }
221}
222
223pub struct WindowedStream {
229 window_size: Duration,
230 windows: RwLock<Vec<Window>>,
231}
232
233impl WindowedStream {
234 pub fn new(window_size: Duration) -> Self {
236 Self {
237 window_size,
238 windows: RwLock::new(Vec::new()),
239 }
240 }
241
242 pub fn push(&self, event: Event) {
244 let window_start = self.window_start(event.timestamp);
245 let mut windows = self
246 .windows
247 .write()
248 .expect("windows RwLock poisoned in push");
249
250 if let Some(window) = windows.iter_mut().find(|w| w.start == window_start) {
251 window.events.push(event);
252 } else {
253 let mut window = Window::new(window_start, self.window_size.as_millis() as u64);
254 window.events.push(event);
255 windows.push(window);
256 }
257 }
258
259 pub fn completed_windows(&self) -> Vec<Window> {
261 let now = current_timestamp_millis();
262 let windows = self
263 .windows
264 .read()
265 .expect("windows RwLock poisoned in completed_windows");
266
267 windows
268 .iter()
269 .filter(|w| w.end() <= now)
270 .cloned()
271 .collect()
272 }
273
274 pub fn flush_completed(&self) -> Vec<Window> {
276 let now = current_timestamp_millis();
277 let mut windows = self
278 .windows
279 .write()
280 .expect("windows RwLock poisoned in flush_completed");
281
282 let completed: Vec<Window> = windows
283 .iter()
284 .filter(|w| w.end() <= now)
285 .cloned()
286 .collect();
287
288 windows.retain(|w| w.end() > now);
289
290 completed
291 }
292
293 fn window_start(&self, timestamp: u64) -> u64 {
294 let window_ms = self.window_size.as_millis() as u64;
295 (timestamp / window_ms) * window_ms
296 }
297}
298
299#[derive(Debug, Clone)]
301pub struct Window {
302 pub start: u64,
303 pub duration: u64,
304 pub events: Vec<Event>,
305}
306
307impl Window {
308 pub fn new(start: u64, duration: u64) -> Self {
309 Self {
310 start,
311 duration,
312 events: Vec::new(),
313 }
314 }
315
316 pub fn end(&self) -> u64 {
317 self.start + self.duration
318 }
319
320 pub fn len(&self) -> usize {
321 self.events.len()
322 }
323
324 pub fn is_empty(&self) -> bool {
325 self.events.is_empty()
326 }
327}
328
329#[derive(Debug, Clone, Copy)]
335pub enum AggregateFunction {
336 Count,
337 Sum,
338 Avg,
339 Min,
340 Max,
341}
342
343impl AggregateFunction {
344 pub fn apply(&self, values: &[f64]) -> Option<f64> {
346 if values.is_empty() {
347 return None;
348 }
349
350 Some(match self {
351 Self::Count => values.len() as f64,
352 Self::Sum => values.iter().sum(),
353 Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
354 Self::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
355 Self::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
356 })
357 }
358}
359
360fn current_timestamp_millis() -> u64 {
361 std::time::SystemTime::now()
362 .duration_since(std::time::UNIX_EPOCH)
363 .map(|d| d.as_millis() as u64)
364 .unwrap_or(0)
365}
366
367#[cfg(test)]
372mod tests {
373 use super::*;
374 use crate::event::EventType;
375
376 fn create_test_event(source: &str, value: i64) -> Event {
377 Event::new(EventType::Created, source, EventData::Int(value))
378 }
379
380 #[test]
381 fn test_event_stream() {
382 let mut stream = EventStream::new();
383
384 stream.push(create_test_event("test", 1));
385 stream.push(create_test_event("test", 2));
386 stream.push(create_test_event("test", 3));
387
388 assert_eq!(stream.len(), 3);
389
390 let event = stream.pop().unwrap();
391 assert_eq!(event.data.as_i64(), Some(1));
392 }
393
394 #[test]
395 fn test_stream_filter() {
396 let mut stream = EventStream::new();
397
398 stream.push(create_test_event("users", 1));
399 stream.push(create_test_event("orders", 2));
400 stream.push(create_test_event("users", 3));
401
402 let filter = EventFilter::new().with_source("users");
403 let filtered = stream.filter(&filter);
404
405 assert_eq!(filtered.len(), 2);
406 }
407
408 #[test]
409 fn test_stream_processor() {
410 let processor = StreamProcessor::new()
411 .filter(EventFilter::new().with_type(EventType::Created))
412 .map(|mut e| {
413 e.metadata.insert("processed".to_string(), "true".to_string());
414 e
415 });
416
417 let event = create_test_event("test", 42);
418 let processed = processor.process(event).unwrap();
419
420 assert_eq!(processed.get_metadata("processed"), Some(&"true".to_string()));
421 }
422
423 #[test]
424 fn test_stream_max_size() {
425 let mut stream = EventStream::with_max_size(3);
426
427 for i in 0..5 {
428 stream.push(create_test_event("test", i));
429 }
430
431 assert_eq!(stream.len(), 3);
432
433 let first = stream.pop().unwrap();
434 assert_eq!(first.data.as_i64(), Some(2));
435 }
436
437 #[test]
438 fn test_aggregate_functions() {
439 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
440
441 assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
442 assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
443 assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
444 assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
445 assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
446 }
447
448 #[test]
449 fn test_window() {
450 let window = Window::new(1000, 100);
451 assert_eq!(window.start, 1000);
452 assert_eq!(window.end(), 1100);
453 assert!(window.is_empty());
454 }
455}