Skip to main content

aegis_streaming/
stream.rs

1//! Aegis Streaming Stream Processing
2//!
3//! Stream processing utilities for event transformation.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventData, EventFilter};
9use std::collections::VecDeque;
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12
13// =============================================================================
14// Event Stream
15// =============================================================================
16
17/// A stream of events with processing capabilities.
18pub struct EventStream {
19    events: VecDeque<Event>,
20    max_size: usize,
21}
22
23impl EventStream {
24    /// Create a new event stream.
25    pub fn new() -> Self {
26        Self {
27            events: VecDeque::new(),
28            max_size: 10_000,
29        }
30    }
31
32    /// Create a stream with a maximum size.
33    pub fn with_max_size(max_size: usize) -> Self {
34        Self {
35            events: VecDeque::new(),
36            max_size,
37        }
38    }
39
40    /// Push an event to the stream.
41    pub fn push(&mut self, event: Event) {
42        if self.events.len() >= self.max_size {
43            self.events.pop_front();
44        }
45        self.events.push_back(event);
46    }
47
48    /// Pop the next event from the stream.
49    pub fn pop(&mut self) -> Option<Event> {
50        self.events.pop_front()
51    }
52
53    /// Peek at the next event.
54    pub fn peek(&self) -> Option<&Event> {
55        self.events.front()
56    }
57
58    /// Get the number of events in the stream.
59    pub fn len(&self) -> usize {
60        self.events.len()
61    }
62
63    /// Check if the stream is empty.
64    pub fn is_empty(&self) -> bool {
65        self.events.is_empty()
66    }
67
68    /// Clear all events.
69    pub fn clear(&mut self) {
70        self.events.clear();
71    }
72
73    /// Filter events in the stream.
74    pub fn filter(&self, filter: &EventFilter) -> Vec<&Event> {
75        self.events.iter().filter(|e| e.matches(filter)).collect()
76    }
77
78    /// Take events matching a filter.
79    pub fn take(&mut self, filter: &EventFilter) -> Vec<Event> {
80        let matching: Vec<Event> = self
81            .events
82            .iter()
83            .filter(|e| e.matches(filter))
84            .cloned()
85            .collect();
86
87        self.events.retain(|e| !e.matches(filter));
88
89        matching
90    }
91
92    /// Get events as a slice.
93    pub fn as_slice(&self) -> impl Iterator<Item = &Event> {
94        self.events.iter()
95    }
96}
97
98impl Default for EventStream {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104// =============================================================================
105// Stream Processor
106// =============================================================================
107
108/// Processes events in a stream.
109pub struct StreamProcessor {
110    pipeline: Vec<Box<dyn ProcessingStep + Send + Sync>>,
111}
112
113impl StreamProcessor {
114    /// Create a new stream processor.
115    pub fn new() -> Self {
116        Self {
117            pipeline: Vec::new(),
118        }
119    }
120
121    /// Add a processing step.
122    pub fn add_step(mut self, step: impl ProcessingStep + Send + Sync + 'static) -> Self {
123        self.pipeline.push(Box::new(step));
124        self
125    }
126
127    /// Add a filter step.
128    pub fn filter(self, filter: EventFilter) -> Self {
129        self.add_step(FilterStep { filter })
130    }
131
132    /// Add a map step.
133    pub fn map(self, mapper: impl Fn(Event) -> Event + Send + Sync + 'static) -> Self {
134        self.add_step(MapStep {
135            mapper: Arc::new(mapper),
136        })
137    }
138
139    /// Add a transform step on event data.
140    pub fn transform_data(
141        self,
142        transformer: impl Fn(EventData) -> EventData + Send + Sync + 'static,
143    ) -> Self {
144        self.add_step(TransformDataStep {
145            transformer: Arc::new(transformer),
146        })
147    }
148
149    /// Process an event through the pipeline.
150    pub fn process(&self, event: Event) -> Option<Event> {
151        let mut current = Some(event);
152
153        for step in &self.pipeline {
154            if let Some(e) = current {
155                current = step.process(e);
156            } else {
157                break;
158            }
159        }
160
161        current
162    }
163
164    /// Process a batch of events.
165    pub fn process_batch(&self, events: Vec<Event>) -> Vec<Event> {
166        events.into_iter().filter_map(|e| self.process(e)).collect()
167    }
168}
169
170impl Default for StreamProcessor {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176// =============================================================================
177// Processing Step
178// =============================================================================
179
180/// A step in the processing pipeline.
181pub trait ProcessingStep {
182    fn process(&self, event: Event) -> Option<Event>;
183}
184
185/// Filter step that filters events.
186struct FilterStep {
187    filter: EventFilter,
188}
189
190impl ProcessingStep for FilterStep {
191    fn process(&self, event: Event) -> Option<Event> {
192        if event.matches(&self.filter) {
193            Some(event)
194        } else {
195            None
196        }
197    }
198}
199
200/// Map step that transforms events.
201struct MapStep {
202    mapper: Arc<dyn Fn(Event) -> Event + Send + Sync>,
203}
204
205impl ProcessingStep for MapStep {
206    fn process(&self, event: Event) -> Option<Event> {
207        Some((self.mapper)(event))
208    }
209}
210
211/// Transform step that transforms event data.
212struct TransformDataStep {
213    transformer: Arc<dyn Fn(EventData) -> EventData + Send + Sync>,
214}
215
216impl ProcessingStep for TransformDataStep {
217    fn process(&self, mut event: Event) -> Option<Event> {
218        event.data = (self.transformer)(event.data);
219        Some(event)
220    }
221}
222
223// =============================================================================
224// Windowed Stream
225// =============================================================================
226
227/// A time-windowed event stream.
228pub struct WindowedStream {
229    window_size: Duration,
230    windows: RwLock<Vec<Window>>,
231}
232
233impl WindowedStream {
234    /// Create a windowed stream.
235    pub fn new(window_size: Duration) -> Self {
236        Self {
237            window_size,
238            windows: RwLock::new(Vec::new()),
239        }
240    }
241
242    /// Add an event to the appropriate window.
243    pub fn push(&self, event: Event) {
244        let window_start = self.window_start(event.timestamp);
245        let mut windows = self
246            .windows
247            .write()
248            .expect("windows RwLock poisoned in push");
249
250        if let Some(window) = windows.iter_mut().find(|w| w.start == window_start) {
251            window.events.push(event);
252        } else {
253            let mut window = Window::new(window_start, self.window_size.as_millis() as u64);
254            window.events.push(event);
255            windows.push(window);
256        }
257    }
258
259    /// Get completed windows.
260    pub fn completed_windows(&self) -> Vec<Window> {
261        let now = current_timestamp_millis();
262        let windows = self
263            .windows
264            .read()
265            .expect("windows RwLock poisoned in completed_windows");
266
267        windows.iter().filter(|w| w.end() <= now).cloned().collect()
268    }
269
270    /// Remove completed windows and return them.
271    pub fn flush_completed(&self) -> Vec<Window> {
272        let now = current_timestamp_millis();
273        let mut windows = self
274            .windows
275            .write()
276            .expect("windows RwLock poisoned in flush_completed");
277
278        let completed: Vec<Window> = windows.iter().filter(|w| w.end() <= now).cloned().collect();
279
280        windows.retain(|w| w.end() > now);
281
282        completed
283    }
284
285    fn window_start(&self, timestamp: u64) -> u64 {
286        let window_ms = self.window_size.as_millis() as u64;
287        (timestamp / window_ms) * window_ms
288    }
289}
290
291/// A time window of events.
292#[derive(Debug, Clone)]
293pub struct Window {
294    pub start: u64,
295    pub duration: u64,
296    pub events: Vec<Event>,
297}
298
299impl Window {
300    pub fn new(start: u64, duration: u64) -> Self {
301        Self {
302            start,
303            duration,
304            events: Vec::new(),
305        }
306    }
307
308    pub fn end(&self) -> u64 {
309        self.start + self.duration
310    }
311
312    pub fn len(&self) -> usize {
313        self.events.len()
314    }
315
316    pub fn is_empty(&self) -> bool {
317        self.events.is_empty()
318    }
319}
320
321// =============================================================================
322// Aggregation
323// =============================================================================
324
325/// Aggregate function for stream processing.
326#[derive(Debug, Clone, Copy)]
327pub enum AggregateFunction {
328    Count,
329    Sum,
330    Avg,
331    Min,
332    Max,
333}
334
335impl AggregateFunction {
336    /// Apply the aggregation to numeric values.
337    pub fn apply(&self, values: &[f64]) -> Option<f64> {
338        if values.is_empty() {
339            return None;
340        }
341
342        Some(match self {
343            Self::Count => values.len() as f64,
344            Self::Sum => values.iter().sum(),
345            Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
346            Self::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
347            Self::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
348        })
349    }
350}
351
352fn current_timestamp_millis() -> u64 {
353    std::time::SystemTime::now()
354        .duration_since(std::time::UNIX_EPOCH)
355        .map(|d| d.as_millis() as u64)
356        .unwrap_or(0)
357}
358
359// =============================================================================
360// Tests
361// =============================================================================
362
363#[cfg(test)]
364mod tests {
365    use super::*;
366    use crate::event::EventType;
367
368    fn create_test_event(source: &str, value: i64) -> Event {
369        Event::new(EventType::Created, source, EventData::Int(value))
370    }
371
372    #[test]
373    fn test_event_stream() {
374        let mut stream = EventStream::new();
375
376        stream.push(create_test_event("test", 1));
377        stream.push(create_test_event("test", 2));
378        stream.push(create_test_event("test", 3));
379
380        assert_eq!(stream.len(), 3);
381
382        let event = stream.pop().unwrap();
383        assert_eq!(event.data.as_i64(), Some(1));
384    }
385
386    #[test]
387    fn test_stream_filter() {
388        let mut stream = EventStream::new();
389
390        stream.push(create_test_event("users", 1));
391        stream.push(create_test_event("orders", 2));
392        stream.push(create_test_event("users", 3));
393
394        let filter = EventFilter::new().with_source("users");
395        let filtered = stream.filter(&filter);
396
397        assert_eq!(filtered.len(), 2);
398    }
399
400    #[test]
401    fn test_stream_processor() {
402        let processor = StreamProcessor::new()
403            .filter(EventFilter::new().with_type(EventType::Created))
404            .map(|mut e| {
405                e.metadata
406                    .insert("processed".to_string(), "true".to_string());
407                e
408            });
409
410        let event = create_test_event("test", 42);
411        let processed = processor.process(event).unwrap();
412
413        assert_eq!(
414            processed.get_metadata("processed"),
415            Some(&"true".to_string())
416        );
417    }
418
419    #[test]
420    fn test_stream_max_size() {
421        let mut stream = EventStream::with_max_size(3);
422
423        for i in 0..5 {
424            stream.push(create_test_event("test", i));
425        }
426
427        assert_eq!(stream.len(), 3);
428
429        let first = stream.pop().unwrap();
430        assert_eq!(first.data.as_i64(), Some(2));
431    }
432
433    #[test]
434    fn test_aggregate_functions() {
435        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
436
437        assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
438        assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
439        assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
440        assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
441        assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
442    }
443
444    #[test]
445    fn test_window() {
446        let window = Window::new(1000, 100);
447        assert_eq!(window.start, 1000);
448        assert_eq!(window.end(), 1100);
449        assert!(window.is_empty());
450    }
451}