1use crate::event::{Event, EventData, EventFilter};
9use std::collections::VecDeque;
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12
13pub struct EventStream {
19 events: VecDeque<Event>,
20 max_size: usize,
21}
22
23impl EventStream {
24 pub fn new() -> Self {
26 Self {
27 events: VecDeque::new(),
28 max_size: 10_000,
29 }
30 }
31
32 pub fn with_max_size(max_size: usize) -> Self {
34 Self {
35 events: VecDeque::new(),
36 max_size,
37 }
38 }
39
40 pub fn push(&mut self, event: Event) {
42 if self.events.len() >= self.max_size {
43 self.events.pop_front();
44 }
45 self.events.push_back(event);
46 }
47
48 pub fn pop(&mut self) -> Option<Event> {
50 self.events.pop_front()
51 }
52
53 pub fn peek(&self) -> Option<&Event> {
55 self.events.front()
56 }
57
58 pub fn len(&self) -> usize {
60 self.events.len()
61 }
62
63 pub fn is_empty(&self) -> bool {
65 self.events.is_empty()
66 }
67
68 pub fn clear(&mut self) {
70 self.events.clear();
71 }
72
73 pub fn filter(&self, filter: &EventFilter) -> Vec<&Event> {
75 self.events.iter().filter(|e| e.matches(filter)).collect()
76 }
77
78 pub fn take(&mut self, filter: &EventFilter) -> Vec<Event> {
80 let matching: Vec<Event> = self
81 .events
82 .iter()
83 .filter(|e| e.matches(filter))
84 .cloned()
85 .collect();
86
87 self.events.retain(|e| !e.matches(filter));
88
89 matching
90 }
91
92 pub fn as_slice(&self) -> impl Iterator<Item = &Event> {
94 self.events.iter()
95 }
96}
97
98impl Default for EventStream {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104pub struct StreamProcessor {
110 pipeline: Vec<Box<dyn ProcessingStep + Send + Sync>>,
111}
112
113impl StreamProcessor {
114 pub fn new() -> Self {
116 Self {
117 pipeline: Vec::new(),
118 }
119 }
120
121 pub fn add_step(mut self, step: impl ProcessingStep + Send + Sync + 'static) -> Self {
123 self.pipeline.push(Box::new(step));
124 self
125 }
126
127 pub fn filter(self, filter: EventFilter) -> Self {
129 self.add_step(FilterStep { filter })
130 }
131
132 pub fn map(self, mapper: impl Fn(Event) -> Event + Send + Sync + 'static) -> Self {
134 self.add_step(MapStep {
135 mapper: Arc::new(mapper),
136 })
137 }
138
139 pub fn transform_data(
141 self,
142 transformer: impl Fn(EventData) -> EventData + Send + Sync + 'static,
143 ) -> Self {
144 self.add_step(TransformDataStep {
145 transformer: Arc::new(transformer),
146 })
147 }
148
149 pub fn process(&self, event: Event) -> Option<Event> {
151 let mut current = Some(event);
152
153 for step in &self.pipeline {
154 if let Some(e) = current {
155 current = step.process(e);
156 } else {
157 break;
158 }
159 }
160
161 current
162 }
163
164 pub fn process_batch(&self, events: Vec<Event>) -> Vec<Event> {
166 events.into_iter().filter_map(|e| self.process(e)).collect()
167 }
168}
169
170impl Default for StreamProcessor {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176pub trait ProcessingStep {
182 fn process(&self, event: Event) -> Option<Event>;
183}
184
185struct FilterStep {
187 filter: EventFilter,
188}
189
190impl ProcessingStep for FilterStep {
191 fn process(&self, event: Event) -> Option<Event> {
192 if event.matches(&self.filter) {
193 Some(event)
194 } else {
195 None
196 }
197 }
198}
199
200struct MapStep {
202 mapper: Arc<dyn Fn(Event) -> Event + Send + Sync>,
203}
204
205impl ProcessingStep for MapStep {
206 fn process(&self, event: Event) -> Option<Event> {
207 Some((self.mapper)(event))
208 }
209}
210
211struct TransformDataStep {
213 transformer: Arc<dyn Fn(EventData) -> EventData + Send + Sync>,
214}
215
216impl ProcessingStep for TransformDataStep {
217 fn process(&self, mut event: Event) -> Option<Event> {
218 event.data = (self.transformer)(event.data);
219 Some(event)
220 }
221}
222
223pub struct WindowedStream {
229 window_size: Duration,
230 windows: RwLock<Vec<Window>>,
231}
232
233impl WindowedStream {
234 pub fn new(window_size: Duration) -> Self {
236 Self {
237 window_size,
238 windows: RwLock::new(Vec::new()),
239 }
240 }
241
242 pub fn push(&self, event: Event) {
244 let window_start = self.window_start(event.timestamp);
245 let mut windows = self
246 .windows
247 .write()
248 .expect("windows RwLock poisoned in push");
249
250 if let Some(window) = windows.iter_mut().find(|w| w.start == window_start) {
251 window.events.push(event);
252 } else {
253 let mut window = Window::new(window_start, self.window_size.as_millis() as u64);
254 window.events.push(event);
255 windows.push(window);
256 }
257 }
258
259 pub fn completed_windows(&self) -> Vec<Window> {
261 let now = current_timestamp_millis();
262 let windows = self
263 .windows
264 .read()
265 .expect("windows RwLock poisoned in completed_windows");
266
267 windows.iter().filter(|w| w.end() <= now).cloned().collect()
268 }
269
270 pub fn flush_completed(&self) -> Vec<Window> {
272 let now = current_timestamp_millis();
273 let mut windows = self
274 .windows
275 .write()
276 .expect("windows RwLock poisoned in flush_completed");
277
278 let completed: Vec<Window> = windows.iter().filter(|w| w.end() <= now).cloned().collect();
279
280 windows.retain(|w| w.end() > now);
281
282 completed
283 }
284
285 fn window_start(&self, timestamp: u64) -> u64 {
286 let window_ms = self.window_size.as_millis() as u64;
287 (timestamp / window_ms) * window_ms
288 }
289}
290
291#[derive(Debug, Clone)]
293pub struct Window {
294 pub start: u64,
295 pub duration: u64,
296 pub events: Vec<Event>,
297}
298
299impl Window {
300 pub fn new(start: u64, duration: u64) -> Self {
301 Self {
302 start,
303 duration,
304 events: Vec::new(),
305 }
306 }
307
308 pub fn end(&self) -> u64 {
309 self.start + self.duration
310 }
311
312 pub fn len(&self) -> usize {
313 self.events.len()
314 }
315
316 pub fn is_empty(&self) -> bool {
317 self.events.is_empty()
318 }
319}
320
321#[derive(Debug, Clone, Copy)]
327pub enum AggregateFunction {
328 Count,
329 Sum,
330 Avg,
331 Min,
332 Max,
333}
334
335impl AggregateFunction {
336 pub fn apply(&self, values: &[f64]) -> Option<f64> {
338 if values.is_empty() {
339 return None;
340 }
341
342 Some(match self {
343 Self::Count => values.len() as f64,
344 Self::Sum => values.iter().sum(),
345 Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
346 Self::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
347 Self::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
348 })
349 }
350}
351
352fn current_timestamp_millis() -> u64 {
353 std::time::SystemTime::now()
354 .duration_since(std::time::UNIX_EPOCH)
355 .map(|d| d.as_millis() as u64)
356 .unwrap_or(0)
357}
358
359#[cfg(test)]
364mod tests {
365 use super::*;
366 use crate::event::EventType;
367
368 fn create_test_event(source: &str, value: i64) -> Event {
369 Event::new(EventType::Created, source, EventData::Int(value))
370 }
371
372 #[test]
373 fn test_event_stream() {
374 let mut stream = EventStream::new();
375
376 stream.push(create_test_event("test", 1));
377 stream.push(create_test_event("test", 2));
378 stream.push(create_test_event("test", 3));
379
380 assert_eq!(stream.len(), 3);
381
382 let event = stream.pop().unwrap();
383 assert_eq!(event.data.as_i64(), Some(1));
384 }
385
386 #[test]
387 fn test_stream_filter() {
388 let mut stream = EventStream::new();
389
390 stream.push(create_test_event("users", 1));
391 stream.push(create_test_event("orders", 2));
392 stream.push(create_test_event("users", 3));
393
394 let filter = EventFilter::new().with_source("users");
395 let filtered = stream.filter(&filter);
396
397 assert_eq!(filtered.len(), 2);
398 }
399
400 #[test]
401 fn test_stream_processor() {
402 let processor = StreamProcessor::new()
403 .filter(EventFilter::new().with_type(EventType::Created))
404 .map(|mut e| {
405 e.metadata
406 .insert("processed".to_string(), "true".to_string());
407 e
408 });
409
410 let event = create_test_event("test", 42);
411 let processed = processor.process(event).unwrap();
412
413 assert_eq!(
414 processed.get_metadata("processed"),
415 Some(&"true".to_string())
416 );
417 }
418
419 #[test]
420 fn test_stream_max_size() {
421 let mut stream = EventStream::with_max_size(3);
422
423 for i in 0..5 {
424 stream.push(create_test_event("test", i));
425 }
426
427 assert_eq!(stream.len(), 3);
428
429 let first = stream.pop().unwrap();
430 assert_eq!(first.data.as_i64(), Some(2));
431 }
432
433 #[test]
434 fn test_aggregate_functions() {
435 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
436
437 assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
438 assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
439 assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
440 assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
441 assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
442 }
443
444 #[test]
445 fn test_window() {
446 let window = Window::new(1000, 100);
447 assert_eq!(window.start, 1000);
448 assert_eq!(window.end(), 1100);
449 assert!(window.is_empty());
450 }
451}