Skip to main content

rust_rule_engine/streaming/
operators.rs

1//! Stream Operators - Fluent API for Stream Processing
2//!
3//! This module provides a fluent, composable API for building stream processing pipelines.
4//! Inspired by Apache Flink, Kafka Streams, and Rust iterators.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use rust_rule_engine::streaming::*;
10//!
11//! let result = DataStream::from_events(events)
12//!     .filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
13//!     .map(|e| enhance_event(e))
14//!     .key_by(|e| e.get_string("user_id").unwrap_or("unknown").to_string())
15//!     .window(WindowConfig::sliding(Duration::from_secs(60)))
16//!     .reduce(|acc, e| {
17//!         let sum = acc.get_numeric("total").unwrap_or(0.0);
18//!         let amount = e.get_numeric("amount").unwrap_or(0.0);
19//!         acc.data.insert("total".to_string(), Value::Number(sum + amount));
20//!         acc
21//!     })
22//!     .collect();
23//! ```
24
25use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32/// A stream of events with chainable operators
33#[derive(Clone)]
34pub struct DataStream {
35    events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39    /// Create a new data stream from events
40    pub fn from_events(events: Vec<StreamEvent>) -> Self {
41        Self { events }
42    }
43
44    /// Create an empty data stream
45    pub fn new() -> Self {
46        Self { events: Vec::new() }
47    }
48
49    /// Add an event to the stream
50    pub fn push(&mut self, event: StreamEvent) {
51        self.events.push(event);
52    }
53
54    /// Get the number of events in the stream
55    pub fn len(&self) -> usize {
56        self.events.len()
57    }
58
59    /// Check if stream is empty
60    pub fn is_empty(&self) -> bool {
61        self.events.is_empty()
62    }
63
64    /// Filter events based on a predicate
65    ///
66    /// # Example
67    /// ```rust,ignore
68    /// stream.filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
69    /// ```
70    pub fn filter<F>(self, predicate: F) -> Self
71    where
72        F: Fn(&StreamEvent) -> bool,
73    {
74        let filtered_events = self.events.into_iter().filter(predicate).collect();
75        Self {
76            events: filtered_events,
77        }
78    }
79
80    /// Transform each event using a mapping function
81    ///
82    /// # Example
83    /// ```rust,ignore
84    /// stream.map(|mut e| {
85    ///     e.add_tag("processed", "true");
86    ///     e
87    /// })
88    /// ```
89    pub fn map<F>(self, mapper: F) -> Self
90    where
91        F: Fn(StreamEvent) -> StreamEvent,
92    {
93        let mapped_events = self.events.into_iter().map(mapper).collect();
94        Self {
95            events: mapped_events,
96        }
97    }
98
99    /// Transform each event into multiple events (flatMap)
100    ///
101    /// # Example
102    /// ```rust,ignore
103    /// stream.flat_map(|e| {
104    ///     // Split event into multiple events
105    ///     vec![e.clone(), e]
106    /// })
107    /// ```
108    pub fn flat_map<F>(self, mapper: F) -> Self
109    where
110        F: Fn(StreamEvent) -> Vec<StreamEvent>,
111    {
112        let flat_mapped_events = self.events.into_iter().flat_map(mapper).collect();
113        Self {
114            events: flat_mapped_events,
115        }
116    }
117
118    /// Key events by a specific field or function
119    ///
120    /// # Example
121    /// ```rust,ignore
122    /// stream.key_by(|e| e.get_string("user_id").unwrap_or("default").to_string())
123    /// ```
124    pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
125    where
126        F: Fn(&StreamEvent) -> K,
127        K: std::hash::Hash + Eq + Clone,
128    {
129        let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
130
131        for event in self.events {
132            let key = key_selector(&event);
133            keyed_events.entry(key).or_default().push(event);
134        }
135
136        KeyedStream { keyed_events }
137    }
138
139    /// Apply a window to the stream
140    ///
141    /// # Example
142    /// ```rust,ignore
143    /// stream.window(WindowConfig::sliding(Duration::from_secs(60)))
144    /// ```
145    pub fn window(self, config: WindowConfig) -> WindowedStream {
146        WindowedStream::new(self.events, config)
147    }
148
149    /// Reduce events to a single result
150    ///
151    /// # Example
152    /// ```rust,ignore
153    /// stream.reduce(|acc, e| {
154    ///     // Accumulate values
155    ///     acc
156    /// })
157    /// ```
158    pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
159    where
160        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
161    {
162        self.events.into_iter().reduce(reducer)
163    }
164
165    /// Count the number of events
166    pub fn count(self) -> usize {
167        self.events.len()
168    }
169
170    /// Collect events into a vector
171    pub fn collect(self) -> Vec<StreamEvent> {
172        self.events
173    }
174
175    /// Take only the first n events
176    pub fn take(self, n: usize) -> Self {
177        Self {
178            events: self.events.into_iter().take(n).collect(),
179        }
180    }
181
182    /// Skip the first n events
183    pub fn skip(self, n: usize) -> Self {
184        Self {
185            events: self.events.into_iter().skip(n).collect(),
186        }
187    }
188
189    /// Process each event with a side effect (doesn't modify the stream)
190    ///
191    /// # Example
192    /// ```rust,ignore
193    /// stream.for_each(|e| {
194    ///     println!("Processing: {:?}", e);
195    /// })
196    /// ```
197    pub fn for_each<F>(self, action: F) -> Self
198    where
199        F: Fn(&StreamEvent),
200    {
201        for event in &self.events {
202            action(event);
203        }
204        self
205    }
206
207    /// Union with another stream
208    pub fn union(mut self, other: DataStream) -> Self {
209        self.events.extend(other.events);
210        Self {
211            events: self.events,
212        }
213    }
214
215    /// Find events matching a pattern
216    pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
217    where
218        F: Fn(&StreamEvent) -> bool,
219    {
220        self.events.into_iter().find(predicate)
221    }
222
223    /// Check if any event matches the predicate
224    pub fn any<F>(&self, predicate: F) -> bool
225    where
226        F: Fn(&StreamEvent) -> bool,
227    {
228        self.events.iter().any(predicate)
229    }
230
231    /// Check if all events match the predicate
232    pub fn all<F>(&self, predicate: F) -> bool
233    where
234        F: Fn(&StreamEvent) -> bool,
235    {
236        self.events.iter().all(predicate)
237    }
238
239    /// Sort events by a key function
240    pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
241    where
242        F: Fn(&StreamEvent) -> K,
243        K: Ord,
244    {
245        self.events.sort_by_key(key_fn);
246        Self {
247            events: self.events,
248        }
249    }
250
251    /// Group events by a key and apply aggregation
252    pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
253    where
254        F: Fn(&StreamEvent) -> K,
255        K: std::hash::Hash + Eq + Clone,
256    {
257        let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
258
259        for event in self.events {
260            let key = key_selector(&event);
261            grouped.entry(key).or_default().push(event);
262        }
263
264        GroupedStream { groups: grouped }
265    }
266
267    /// Apply an aggregation function
268    pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
269    where
270        A: Aggregation,
271    {
272        aggregator.aggregate(&self.events)
273    }
274}
275
276impl Default for DataStream {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282/// A stream of events keyed by a specific field
283pub struct KeyedStream<K>
284where
285    K: std::hash::Hash + Eq,
286{
287    keyed_events: HashMap<K, Vec<StreamEvent>>,
288}
289
290impl<K> KeyedStream<K>
291where
292    K: std::hash::Hash + Eq + Clone,
293{
294    /// Reduce events within each key
295    pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
296    where
297        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
298    {
299        self.keyed_events
300            .into_iter()
301            .filter_map(|(key, events)| {
302                events
303                    .into_iter()
304                    .reduce(&reducer)
305                    .map(|result| (key, result))
306            })
307            .collect()
308    }
309
310    /// Apply aggregation to each key group
311    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
312    where
313        A: Aggregation + Clone,
314    {
315        self.keyed_events
316            .into_iter()
317            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
318            .collect()
319    }
320
321    /// Apply a window to each key group
322    pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
323        KeyedWindowedStream {
324            keyed_events: self.keyed_events,
325            config,
326        }
327    }
328
329    /// Count events per key
330    pub fn count(self) -> HashMap<K, usize> {
331        self.keyed_events
332            .into_iter()
333            .map(|(key, events)| (key, events.len()))
334            .collect()
335    }
336
337    /// Get all keys
338    pub fn keys(&self) -> Vec<K> {
339        self.keyed_events.keys().cloned().collect()
340    }
341
342    /// Flatten back to a regular stream
343    pub fn flatten(self) -> DataStream {
344        // Prefer iterating values directly to avoid iterating over key/value pairs
345        // Flatten the vectors of events coming from each key directly
346        let events: Vec<StreamEvent> = self.keyed_events.into_values().flatten().collect();
347
348        DataStream { events }
349    }
350}
351
352/// Window configuration for stream processing
353#[derive(Debug, Clone)]
354pub struct WindowConfig {
355    pub window_type: WindowType,
356    pub duration: Duration,
357    pub max_events: usize,
358}
359
360impl WindowConfig {
361    /// Create a sliding window configuration
362    pub fn sliding(duration: Duration) -> Self {
363        Self {
364            window_type: WindowType::Sliding,
365            duration,
366            max_events: 10000,
367        }
368    }
369
370    /// Create a tumbling window configuration
371    pub fn tumbling(duration: Duration) -> Self {
372        Self {
373            window_type: WindowType::Tumbling,
374            duration,
375            max_events: 10000,
376        }
377    }
378
379    /// Create a session window configuration
380    pub fn session(timeout: Duration) -> Self {
381        Self {
382            window_type: WindowType::Session { timeout },
383            duration: timeout,
384            max_events: 10000,
385        }
386    }
387
388    /// Set maximum events per window
389    pub fn with_max_events(mut self, max_events: usize) -> Self {
390        self.max_events = max_events;
391        self
392    }
393}
394
395/// A stream with windowing applied
396pub struct WindowedStream {
397    windows: Vec<TimeWindow>,
398}
399
400impl WindowedStream {
401    /// Create a new windowed stream
402    pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
403        let mut windows = Vec::new();
404
405        if events.is_empty() {
406            return Self { windows };
407        }
408
409        // Group events into windows based on configuration
410        match config.window_type {
411            WindowType::Tumbling => {
412                // Calculate window boundaries
413                let window_ms = config.duration.as_millis() as u64;
414                let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
415
416                for event in events {
417                    let window_start = (event.metadata.timestamp / window_ms) * window_ms;
418                    window_map.entry(window_start).or_default().push(event);
419                }
420
421                // Create windows
422                for (start_time, mut window_events) in window_map {
423                    let mut window = TimeWindow::new(
424                        config.window_type.clone(),
425                        config.duration,
426                        start_time,
427                        config.max_events,
428                    );
429
430                    for event in window_events.drain(..) {
431                        window.add_event(event);
432                    }
433
434                    windows.push(window);
435                }
436            }
437            WindowType::Sliding | WindowType::Session { .. } => {
438                // For sliding windows, create overlapping windows
439                // Simplified implementation: create one window per unique timestamp
440                let window_ms = config.duration.as_millis() as u64;
441
442                if !events.is_empty() {
443                    let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
444                    let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
445
446                    let mut current_start = min_time;
447
448                    while current_start <= max_time {
449                        let mut window = TimeWindow::new(
450                            config.window_type.clone(),
451                            config.duration,
452                            current_start,
453                            config.max_events,
454                        );
455
456                        for event in &events {
457                            if event.metadata.timestamp >= current_start
458                                && event.metadata.timestamp < current_start + window_ms
459                            {
460                                window.add_event(event.clone());
461                            }
462                        }
463
464                        if window.count() > 0 {
465                            windows.push(window);
466                        }
467
468                        // Slide forward (overlap 50%)
469                        current_start += window_ms / 2;
470                    }
471                }
472            }
473        }
474
475        Self { windows }
476    }
477
478    /// Apply aggregation to each window
479    pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
480    where
481        A: Aggregation,
482    {
483        self.windows
484            .iter()
485            .map(|window| {
486                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
487                aggregator.aggregate(&events)
488            })
489            .collect()
490    }
491
492    /// Reduce events within each window
493    pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
494    where
495        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
496    {
497        self.windows
498            .into_iter()
499            .filter_map(|window| {
500                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
501                events.into_iter().reduce(&reducer)
502            })
503            .collect()
504    }
505
506    /// Get all windows
507    pub fn windows(&self) -> &[TimeWindow] {
508        &self.windows
509    }
510
511    /// Count events in each window
512    pub fn counts(self) -> Vec<usize> {
513        self.windows.iter().map(|w| w.count()).collect()
514    }
515
516    /// Flatten all windows back into a stream
517    pub fn flatten(self) -> DataStream {
518        let events: Vec<StreamEvent> = self
519            .windows
520            .into_iter()
521            .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
522            .collect();
523
524        DataStream { events }
525    }
526}
527
528/// Keyed stream with windowing
529pub struct KeyedWindowedStream<K>
530where
531    K: std::hash::Hash + Eq,
532{
533    keyed_events: HashMap<K, Vec<StreamEvent>>,
534    config: WindowConfig,
535}
536
537impl<K> KeyedWindowedStream<K>
538where
539    K: std::hash::Hash + Eq + Clone,
540{
541    /// Apply aggregation to each key's window
542    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
543    where
544        A: Aggregation + Clone,
545    {
546        self.keyed_events
547            .into_iter()
548            .map(|(key, events)| {
549                let windowed = WindowedStream::new(events, self.config.clone());
550                let results = windowed.aggregate(aggregator.clone());
551                (key, results)
552            })
553            .collect()
554    }
555
556    /// Reduce events within each key's window
557    pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
558    where
559        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
560    {
561        self.keyed_events
562            .into_iter()
563            .map(|(key, events)| {
564                let windowed = WindowedStream::new(events, self.config.clone());
565                let results = windowed.reduce(reducer.clone());
566                (key, results)
567            })
568            .collect()
569    }
570}
571
572/// Grouped stream for aggregations
573pub struct GroupedStream<K>
574where
575    K: std::hash::Hash + Eq,
576{
577    groups: HashMap<K, Vec<StreamEvent>>,
578}
579
580impl<K> GroupedStream<K>
581where
582    K: std::hash::Hash + Eq + Clone,
583{
584    /// Apply aggregation to each group
585    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
586    where
587        A: Aggregation + Clone,
588    {
589        self.groups
590            .into_iter()
591            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
592            .collect()
593    }
594
595    /// Count events in each group
596    pub fn count(self) -> HashMap<K, usize> {
597        self.groups
598            .into_iter()
599            .map(|(key, events)| (key, events.len()))
600            .collect()
601    }
602
603    /// Get the first event in each group
604    pub fn first(self) -> HashMap<K, StreamEvent> {
605        self.groups
606            .into_iter()
607            .filter_map(|(key, mut events)| {
608                if !events.is_empty() {
609                    Some((key, events.remove(0)))
610                } else {
611                    None
612                }
613            })
614            .collect()
615    }
616
617    /// Get the last event in each group
618    pub fn last(self) -> HashMap<K, StreamEvent> {
619        self.groups
620            .into_iter()
621            .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
622            .collect()
623    }
624}
625
626/// Trait for aggregation functions
627pub trait Aggregation: Send + Sync {
628    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
629}
630
631/// Result of an aggregation operation
632#[derive(Debug, Clone)]
633pub enum AggregateResult {
634    Number(f64),
635    String(String),
636    Map(HashMap<String, Value>),
637    List(Vec<Value>),
638    None,
639}
640
641impl AggregateResult {
642    pub fn as_number(&self) -> Option<f64> {
643        match self {
644            AggregateResult::Number(n) => Some(*n),
645            _ => None,
646        }
647    }
648
649    pub fn as_string(&self) -> Option<&str> {
650        match self {
651            AggregateResult::String(s) => Some(s),
652            _ => None,
653        }
654    }
655
656    pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
657        match self {
658            AggregateResult::Map(m) => Some(m),
659            _ => None,
660        }
661    }
662}
663
664// Built-in aggregators
665
666/// Count aggregator
667#[derive(Clone)]
668pub struct Count;
669
670impl Aggregation for Count {
671    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
672        AggregateResult::Number(events.len() as f64)
673    }
674}
675
676/// Sum aggregator
677#[derive(Clone)]
678pub struct Sum {
679    pub field: String,
680}
681
682impl Sum {
683    pub fn new(field: impl Into<String>) -> Self {
684        Self {
685            field: field.into(),
686        }
687    }
688}
689
690impl Aggregation for Sum {
691    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
692        let sum: f64 = events
693            .iter()
694            .filter_map(|e| e.get_numeric(&self.field))
695            .sum();
696        AggregateResult::Number(sum)
697    }
698}
699
700/// Average aggregator
701#[derive(Clone)]
702pub struct Average {
703    pub field: String,
704}
705
706impl Average {
707    pub fn new(field: impl Into<String>) -> Self {
708        Self {
709            field: field.into(),
710        }
711    }
712}
713
714impl Aggregation for Average {
715    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
716        let values: Vec<f64> = events
717            .iter()
718            .filter_map(|e| e.get_numeric(&self.field))
719            .collect();
720
721        if values.is_empty() {
722            AggregateResult::None
723        } else {
724            let avg = values.iter().sum::<f64>() / values.len() as f64;
725            AggregateResult::Number(avg)
726        }
727    }
728}
729
730/// Min aggregator
731#[derive(Clone)]
732pub struct Min {
733    pub field: String,
734}
735
736impl Min {
737    pub fn new(field: impl Into<String>) -> Self {
738        Self {
739            field: field.into(),
740        }
741    }
742}
743
744impl Aggregation for Min {
745    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
746        events
747            .iter()
748            .filter_map(|e| e.get_numeric(&self.field))
749            .min_by(|a, b| a.partial_cmp(b).unwrap())
750            .map(AggregateResult::Number)
751            .unwrap_or(AggregateResult::None)
752    }
753}
754
755/// Max aggregator
756#[derive(Clone)]
757pub struct Max {
758    pub field: String,
759}
760
761impl Max {
762    pub fn new(field: impl Into<String>) -> Self {
763        Self {
764            field: field.into(),
765        }
766    }
767}
768
769impl Aggregation for Max {
770    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
771        events
772            .iter()
773            .filter_map(|e| e.get_numeric(&self.field))
774            .max_by(|a, b| a.partial_cmp(b).unwrap())
775            .map(AggregateResult::Number)
776            .unwrap_or(AggregateResult::None)
777    }
778}
779
780/// Custom aggregator using a closure
781pub struct CustomAggregator<F>
782where
783    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
784{
785    func: Arc<F>,
786}
787
788impl<F> CustomAggregator<F>
789where
790    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
791{
792    pub fn new(func: F) -> Self {
793        Self {
794            func: Arc::new(func),
795        }
796    }
797}
798
799impl<F> Clone for CustomAggregator<F>
800where
801    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
802{
803    fn clone(&self) -> Self {
804        Self {
805            func: Arc::clone(&self.func),
806        }
807    }
808}
809
810impl<F> Aggregation for CustomAggregator<F>
811where
812    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
813{
814    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
815        (self.func)(events)
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822    use crate::types::Value;
823    use std::collections::HashMap;
824
825    fn create_test_events(count: usize) -> Vec<StreamEvent> {
826        (0..count)
827            .map(|i| {
828                let mut data = HashMap::new();
829                data.insert("value".to_string(), Value::Number(i as f64));
830                data.insert(
831                    "user_id".to_string(),
832                    Value::String(format!("user_{}", i % 3)),
833                );
834                StreamEvent::new("TestEvent", data, "test")
835            })
836            .collect()
837    }
838
839    #[test]
840    fn test_filter_operator() {
841        let events = create_test_events(10);
842        let stream = DataStream::from_events(events);
843
844        let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
845
846        assert_eq!(filtered.len(), 4); // 6, 7, 8, 9
847    }
848
849    #[test]
850    fn test_map_operator() {
851        let events = create_test_events(5);
852        let stream = DataStream::from_events(events);
853
854        let mapped = stream.map(|mut e| {
855            if let Some(value) = e.get_numeric("value") {
856                e.data
857                    .insert("doubled".to_string(), Value::Number(value * 2.0));
858            }
859            e
860        });
861
862        let collected = mapped.collect();
863        assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
864        assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
865    }
866
867    #[test]
868    fn test_key_by_operator() {
869        let events = create_test_events(9);
870        let stream = DataStream::from_events(events);
871
872        let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
873
874        let counts = keyed.count();
875        assert_eq!(counts.len(), 3); // 3 unique users
876        assert_eq!(*counts.get("user_0").unwrap(), 3);
877        assert_eq!(*counts.get("user_1").unwrap(), 3);
878        assert_eq!(*counts.get("user_2").unwrap(), 3);
879    }
880
881    #[test]
882    fn test_reduce_operator() {
883        let events = create_test_events(5);
884        let stream = DataStream::from_events(events);
885
886        let result = stream.reduce(|mut acc, e| {
887            let acc_val = acc.get_numeric("value").unwrap_or(0.0);
888            let e_val = e.get_numeric("value").unwrap_or(0.0);
889            acc.data
890                .insert("value".to_string(), Value::Number(acc_val + e_val));
891            acc
892        });
893
894        assert!(result.is_some());
895        assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); // 0+1+2+3+4
896    }
897
898    #[test]
899    fn test_count_aggregator() {
900        let events = create_test_events(10);
901        let result = Count.aggregate(&events);
902
903        assert_eq!(result.as_number(), Some(10.0));
904    }
905
906    #[test]
907    fn test_sum_aggregator() {
908        let events = create_test_events(5);
909        let result = Sum::new("value").aggregate(&events);
910
911        assert_eq!(result.as_number(), Some(10.0)); // 0+1+2+3+4
912    }
913
914    #[test]
915    fn test_average_aggregator() {
916        let events = create_test_events(5);
917        let result = Average::new("value").aggregate(&events);
918
919        assert_eq!(result.as_number(), Some(2.0)); // (0+1+2+3+4)/5
920    }
921
922    #[test]
923    fn test_group_by() {
924        let events = create_test_events(9);
925        let stream = DataStream::from_events(events);
926
927        let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
928
929        let counts = grouped.count();
930        assert_eq!(counts.len(), 3);
931    }
932
933    #[test]
934    fn test_chaining_operators() {
935        let events = create_test_events(20);
936        let stream = DataStream::from_events(events);
937
938        let result = stream
939            .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
940            .map(|mut e| {
941                if let Some(v) = e.get_numeric("value") {
942                    e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
943                }
944                e
945            })
946            .take(5)
947            .collect();
948
949        assert_eq!(result.len(), 5);
950        assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); // 5 * 2
951    }
952
953    #[test]
954    fn test_windowed_stream() {
955        let events = create_test_events(10);
956        let stream = DataStream::from_events(events);
957
958        let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
959
960        assert!(!windowed.windows().is_empty());
961    }
962}