Skip to main content

aegis_streaming/
stream.rs

1//! Aegis Streaming Stream Processing
2//!
3//! Stream processing utilities for event transformation.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::event::{Event, EventData, EventFilter};
9use std::collections::VecDeque;
10use std::sync::{Arc, RwLock};
11use std::time::Duration;
12
13// =============================================================================
14// Event Stream
15// =============================================================================
16
17/// A stream of events with processing capabilities.
18pub struct EventStream {
19    events: VecDeque<Event>,
20    max_size: usize,
21}
22
23impl EventStream {
24    /// Create a new event stream.
25    pub fn new() -> Self {
26        Self {
27            events: VecDeque::new(),
28            max_size: 10_000,
29        }
30    }
31
32    /// Create a stream with a maximum size.
33    pub fn with_max_size(max_size: usize) -> Self {
34        Self {
35            events: VecDeque::new(),
36            max_size,
37        }
38    }
39
40    /// Push an event to the stream.
41    pub fn push(&mut self, event: Event) {
42        if self.events.len() >= self.max_size {
43            self.events.pop_front();
44        }
45        self.events.push_back(event);
46    }
47
48    /// Pop the next event from the stream.
49    pub fn pop(&mut self) -> Option<Event> {
50        self.events.pop_front()
51    }
52
53    /// Peek at the next event.
54    pub fn peek(&self) -> Option<&Event> {
55        self.events.front()
56    }
57
58    /// Get the number of events in the stream.
59    pub fn len(&self) -> usize {
60        self.events.len()
61    }
62
63    /// Check if the stream is empty.
64    pub fn is_empty(&self) -> bool {
65        self.events.is_empty()
66    }
67
68    /// Clear all events.
69    pub fn clear(&mut self) {
70        self.events.clear();
71    }
72
73    /// Filter events in the stream.
74    pub fn filter(&self, filter: &EventFilter) -> Vec<&Event> {
75        self.events.iter().filter(|e| e.matches(filter)).collect()
76    }
77
78    /// Take events matching a filter.
79    pub fn take(&mut self, filter: &EventFilter) -> Vec<Event> {
80        let matching: Vec<Event> = self
81            .events
82            .iter()
83            .filter(|e| e.matches(filter))
84            .cloned()
85            .collect();
86
87        self.events.retain(|e| !e.matches(filter));
88
89        matching
90    }
91
92    /// Get events as a slice.
93    pub fn as_slice(&self) -> impl Iterator<Item = &Event> {
94        self.events.iter()
95    }
96}
97
98impl Default for EventStream {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104// =============================================================================
105// Stream Processor
106// =============================================================================
107
108/// Processes events in a stream.
109pub struct StreamProcessor {
110    pipeline: Vec<Box<dyn ProcessingStep + Send + Sync>>,
111}
112
113impl StreamProcessor {
114    /// Create a new stream processor.
115    pub fn new() -> Self {
116        Self {
117            pipeline: Vec::new(),
118        }
119    }
120
121    /// Add a processing step.
122    pub fn add_step(mut self, step: impl ProcessingStep + Send + Sync + 'static) -> Self {
123        self.pipeline.push(Box::new(step));
124        self
125    }
126
127    /// Add a filter step.
128    pub fn filter(self, filter: EventFilter) -> Self {
129        self.add_step(FilterStep { filter })
130    }
131
132    /// Add a map step.
133    pub fn map(self, mapper: impl Fn(Event) -> Event + Send + Sync + 'static) -> Self {
134        self.add_step(MapStep {
135            mapper: Arc::new(mapper),
136        })
137    }
138
139    /// Add a transform step on event data.
140    pub fn transform_data(
141        self,
142        transformer: impl Fn(EventData) -> EventData + Send + Sync + 'static,
143    ) -> Self {
144        self.add_step(TransformDataStep {
145            transformer: Arc::new(transformer),
146        })
147    }
148
149    /// Process an event through the pipeline.
150    pub fn process(&self, event: Event) -> Option<Event> {
151        let mut current = Some(event);
152
153        for step in &self.pipeline {
154            if let Some(e) = current {
155                current = step.process(e);
156            } else {
157                break;
158            }
159        }
160
161        current
162    }
163
164    /// Process a batch of events.
165    pub fn process_batch(&self, events: Vec<Event>) -> Vec<Event> {
166        events.into_iter().filter_map(|e| self.process(e)).collect()
167    }
168}
169
170impl Default for StreamProcessor {
171    fn default() -> Self {
172        Self::new()
173    }
174}
175
176// =============================================================================
177// Processing Step
178// =============================================================================
179
180/// A step in the processing pipeline.
181pub trait ProcessingStep {
182    fn process(&self, event: Event) -> Option<Event>;
183}
184
185/// Filter step that filters events.
186struct FilterStep {
187    filter: EventFilter,
188}
189
190impl ProcessingStep for FilterStep {
191    fn process(&self, event: Event) -> Option<Event> {
192        if event.matches(&self.filter) {
193            Some(event)
194        } else {
195            None
196        }
197    }
198}
199
200/// Map step that transforms events.
201struct MapStep {
202    mapper: Arc<dyn Fn(Event) -> Event + Send + Sync>,
203}
204
205impl ProcessingStep for MapStep {
206    fn process(&self, event: Event) -> Option<Event> {
207        Some((self.mapper)(event))
208    }
209}
210
211/// Transform step that transforms event data.
212struct TransformDataStep {
213    transformer: Arc<dyn Fn(EventData) -> EventData + Send + Sync>,
214}
215
216impl ProcessingStep for TransformDataStep {
217    fn process(&self, mut event: Event) -> Option<Event> {
218        event.data = (self.transformer)(event.data);
219        Some(event)
220    }
221}
222
223// =============================================================================
224// Windowed Stream
225// =============================================================================
226
227/// A time-windowed event stream.
228pub struct WindowedStream {
229    window_size: Duration,
230    windows: RwLock<Vec<Window>>,
231}
232
233impl WindowedStream {
234    /// Create a windowed stream.
235    pub fn new(window_size: Duration) -> Self {
236        Self {
237            window_size,
238            windows: RwLock::new(Vec::new()),
239        }
240    }
241
242    /// Add an event to the appropriate window.
243    pub fn push(&self, event: Event) {
244        let window_start = self.window_start(event.timestamp);
245        let mut windows = self
246            .windows
247            .write()
248            .expect("windows RwLock poisoned in push");
249
250        if let Some(window) = windows.iter_mut().find(|w| w.start == window_start) {
251            window.events.push(event);
252        } else {
253            let mut window = Window::new(window_start, self.window_size.as_millis() as u64);
254            window.events.push(event);
255            windows.push(window);
256        }
257    }
258
259    /// Get completed windows.
260    pub fn completed_windows(&self) -> Vec<Window> {
261        let now = current_timestamp_millis();
262        let windows = self
263            .windows
264            .read()
265            .expect("windows RwLock poisoned in completed_windows");
266
267        windows
268            .iter()
269            .filter(|w| w.end() <= now)
270            .cloned()
271            .collect()
272    }
273
274    /// Remove completed windows and return them.
275    pub fn flush_completed(&self) -> Vec<Window> {
276        let now = current_timestamp_millis();
277        let mut windows = self
278            .windows
279            .write()
280            .expect("windows RwLock poisoned in flush_completed");
281
282        let completed: Vec<Window> = windows
283            .iter()
284            .filter(|w| w.end() <= now)
285            .cloned()
286            .collect();
287
288        windows.retain(|w| w.end() > now);
289
290        completed
291    }
292
293    fn window_start(&self, timestamp: u64) -> u64 {
294        let window_ms = self.window_size.as_millis() as u64;
295        (timestamp / window_ms) * window_ms
296    }
297}
298
299/// A time window of events.
300#[derive(Debug, Clone)]
301pub struct Window {
302    pub start: u64,
303    pub duration: u64,
304    pub events: Vec<Event>,
305}
306
307impl Window {
308    pub fn new(start: u64, duration: u64) -> Self {
309        Self {
310            start,
311            duration,
312            events: Vec::new(),
313        }
314    }
315
316    pub fn end(&self) -> u64 {
317        self.start + self.duration
318    }
319
320    pub fn len(&self) -> usize {
321        self.events.len()
322    }
323
324    pub fn is_empty(&self) -> bool {
325        self.events.is_empty()
326    }
327}
328
329// =============================================================================
330// Aggregation
331// =============================================================================
332
333/// Aggregate function for stream processing.
334#[derive(Debug, Clone, Copy)]
335pub enum AggregateFunction {
336    Count,
337    Sum,
338    Avg,
339    Min,
340    Max,
341}
342
343impl AggregateFunction {
344    /// Apply the aggregation to numeric values.
345    pub fn apply(&self, values: &[f64]) -> Option<f64> {
346        if values.is_empty() {
347            return None;
348        }
349
350        Some(match self {
351            Self::Count => values.len() as f64,
352            Self::Sum => values.iter().sum(),
353            Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
354            Self::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
355            Self::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
356        })
357    }
358}
359
360fn current_timestamp_millis() -> u64 {
361    std::time::SystemTime::now()
362        .duration_since(std::time::UNIX_EPOCH)
363        .map(|d| d.as_millis() as u64)
364        .unwrap_or(0)
365}
366
367// =============================================================================
368// Tests
369// =============================================================================
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::event::EventType;
375
376    fn create_test_event(source: &str, value: i64) -> Event {
377        Event::new(EventType::Created, source, EventData::Int(value))
378    }
379
380    #[test]
381    fn test_event_stream() {
382        let mut stream = EventStream::new();
383
384        stream.push(create_test_event("test", 1));
385        stream.push(create_test_event("test", 2));
386        stream.push(create_test_event("test", 3));
387
388        assert_eq!(stream.len(), 3);
389
390        let event = stream.pop().unwrap();
391        assert_eq!(event.data.as_i64(), Some(1));
392    }
393
394    #[test]
395    fn test_stream_filter() {
396        let mut stream = EventStream::new();
397
398        stream.push(create_test_event("users", 1));
399        stream.push(create_test_event("orders", 2));
400        stream.push(create_test_event("users", 3));
401
402        let filter = EventFilter::new().with_source("users");
403        let filtered = stream.filter(&filter);
404
405        assert_eq!(filtered.len(), 2);
406    }
407
408    #[test]
409    fn test_stream_processor() {
410        let processor = StreamProcessor::new()
411            .filter(EventFilter::new().with_type(EventType::Created))
412            .map(|mut e| {
413                e.metadata.insert("processed".to_string(), "true".to_string());
414                e
415            });
416
417        let event = create_test_event("test", 42);
418        let processed = processor.process(event).unwrap();
419
420        assert_eq!(processed.get_metadata("processed"), Some(&"true".to_string()));
421    }
422
423    #[test]
424    fn test_stream_max_size() {
425        let mut stream = EventStream::with_max_size(3);
426
427        for i in 0..5 {
428            stream.push(create_test_event("test", i));
429        }
430
431        assert_eq!(stream.len(), 3);
432
433        let first = stream.pop().unwrap();
434        assert_eq!(first.data.as_i64(), Some(2));
435    }
436
437    #[test]
438    fn test_aggregate_functions() {
439        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
440
441        assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
442        assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
443        assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
444        assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
445        assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
446    }
447
448    #[test]
449    fn test_window() {
450        let window = Window::new(1000, 100);
451        assert_eq!(window.start, 1000);
452        assert_eq!(window.end(), 1100);
453        assert!(window.is_empty());
454    }
455}