rust_rule_engine/streaming/
operators.rs

1//! Stream Operators - Fluent API for Stream Processing
2//!
3//! This module provides a fluent, composable API for building stream processing pipelines.
4//! Inspired by Apache Flink, Kafka Streams, and Rust iterators.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use rust_rule_engine::streaming::*;
10//!
11//! let result = DataStream::from_events(events)
12//!     .filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
13//!     .map(|e| enhance_event(e))
14//!     .key_by(|e| e.get_string("user_id").unwrap_or("unknown").to_string())
15//!     .window(WindowConfig::sliding(Duration::from_secs(60)))
16//!     .reduce(|acc, e| {
17//!         let sum = acc.get_numeric("total").unwrap_or(0.0);
18//!         let amount = e.get_numeric("amount").unwrap_or(0.0);
19//!         acc.data.insert("total".to_string(), Value::Number(sum + amount));
20//!         acc
21//!     })
22//!     .collect();
23//! ```
24
25use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32/// A stream of events with chainable operators
33#[derive(Clone)]
34pub struct DataStream {
35    events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39    /// Create a new data stream from events
40    pub fn from_events(events: Vec<StreamEvent>) -> Self {
41        Self { events }
42    }
43
44    /// Create an empty data stream
45    pub fn new() -> Self {
46        Self { events: Vec::new() }
47    }
48
49    /// Add an event to the stream
50    pub fn push(&mut self, event: StreamEvent) {
51        self.events.push(event);
52    }
53
54    /// Get the number of events in the stream
55    pub fn len(&self) -> usize {
56        self.events.len()
57    }
58
59    /// Check if stream is empty
60    pub fn is_empty(&self) -> bool {
61        self.events.is_empty()
62    }
63
64    /// Filter events based on a predicate
65    ///
66    /// # Example
67    /// ```rust,ignore
68    /// stream.filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
69    /// ```
70    pub fn filter<F>(self, predicate: F) -> Self
71    where
72        F: Fn(&StreamEvent) -> bool,
73    {
74        let filtered_events = self.events.into_iter().filter(predicate).collect();
75        Self {
76            events: filtered_events,
77        }
78    }
79
80    /// Transform each event using a mapping function
81    ///
82    /// # Example
83    /// ```rust,ignore
84    /// stream.map(|mut e| {
85    ///     e.add_tag("processed", "true");
86    ///     e
87    /// })
88    /// ```
89    pub fn map<F>(self, mapper: F) -> Self
90    where
91        F: Fn(StreamEvent) -> StreamEvent,
92    {
93        let mapped_events = self.events.into_iter().map(mapper).collect();
94        Self {
95            events: mapped_events,
96        }
97    }
98
99    /// Transform each event into multiple events (flatMap)
100    ///
101    /// # Example
102    /// ```rust,ignore
103    /// stream.flat_map(|e| {
104    ///     // Split event into multiple events
105    ///     vec![e.clone(), e]
106    /// })
107    /// ```
108    pub fn flat_map<F>(self, mapper: F) -> Self
109    where
110        F: Fn(StreamEvent) -> Vec<StreamEvent>,
111    {
112        let flat_mapped_events = self.events.into_iter().flat_map(mapper).collect();
113        Self {
114            events: flat_mapped_events,
115        }
116    }
117
118    /// Key events by a specific field or function
119    ///
120    /// # Example
121    /// ```rust,ignore
122    /// stream.key_by(|e| e.get_string("user_id").unwrap_or("default").to_string())
123    /// ```
124    pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
125    where
126        F: Fn(&StreamEvent) -> K,
127        K: std::hash::Hash + Eq + Clone,
128    {
129        let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
130
131        for event in self.events {
132            let key = key_selector(&event);
133            keyed_events.entry(key).or_default().push(event);
134        }
135
136        KeyedStream { keyed_events }
137    }
138
139    /// Apply a window to the stream
140    ///
141    /// # Example
142    /// ```rust,ignore
143    /// stream.window(WindowConfig::sliding(Duration::from_secs(60)))
144    /// ```
145    pub fn window(self, config: WindowConfig) -> WindowedStream {
146        WindowedStream::new(self.events, config)
147    }
148
149    /// Reduce events to a single result
150    ///
151    /// # Example
152    /// ```rust,ignore
153    /// stream.reduce(|acc, e| {
154    ///     // Accumulate values
155    ///     acc
156    /// })
157    /// ```
158    pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
159    where
160        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
161    {
162        self.events.into_iter().reduce(reducer)
163    }
164
165    /// Count the number of events
166    pub fn count(self) -> usize {
167        self.events.len()
168    }
169
170    /// Collect events into a vector
171    pub fn collect(self) -> Vec<StreamEvent> {
172        self.events
173    }
174
175    /// Take only the first n events
176    pub fn take(self, n: usize) -> Self {
177        Self {
178            events: self.events.into_iter().take(n).collect(),
179        }
180    }
181
182    /// Skip the first n events
183    pub fn skip(self, n: usize) -> Self {
184        Self {
185            events: self.events.into_iter().skip(n).collect(),
186        }
187    }
188
189    /// Process each event with a side effect (doesn't modify the stream)
190    ///
191    /// # Example
192    /// ```rust,ignore
193    /// stream.for_each(|e| {
194    ///     println!("Processing: {:?}", e);
195    /// })
196    /// ```
197    pub fn for_each<F>(self, action: F) -> Self
198    where
199        F: Fn(&StreamEvent),
200    {
201        for event in &self.events {
202            action(event);
203        }
204        self
205    }
206
207    /// Union with another stream
208    pub fn union(mut self, other: DataStream) -> Self {
209        self.events.extend(other.events);
210        Self {
211            events: self.events,
212        }
213    }
214
215    /// Find events matching a pattern
216    pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
217    where
218        F: Fn(&StreamEvent) -> bool,
219    {
220        self.events.into_iter().find(predicate)
221    }
222
223    /// Check if any event matches the predicate
224    pub fn any<F>(&self, predicate: F) -> bool
225    where
226        F: Fn(&StreamEvent) -> bool,
227    {
228        self.events.iter().any(predicate)
229    }
230
231    /// Check if all events match the predicate
232    pub fn all<F>(&self, predicate: F) -> bool
233    where
234        F: Fn(&StreamEvent) -> bool,
235    {
236        self.events.iter().all(predicate)
237    }
238
239    /// Sort events by a key function
240    pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
241    where
242        F: Fn(&StreamEvent) -> K,
243        K: Ord,
244    {
245        self.events.sort_by_key(key_fn);
246        Self {
247            events: self.events,
248        }
249    }
250
251    /// Group events by a key and apply aggregation
252    pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
253    where
254        F: Fn(&StreamEvent) -> K,
255        K: std::hash::Hash + Eq + Clone,
256    {
257        let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
258
259        for event in self.events {
260            let key = key_selector(&event);
261            grouped.entry(key).or_default().push(event);
262        }
263
264        GroupedStream { groups: grouped }
265    }
266
267    /// Apply an aggregation function
268    pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
269    where
270        A: Aggregation,
271    {
272        aggregator.aggregate(&self.events)
273    }
274}
275
276impl Default for DataStream {
277    fn default() -> Self {
278        Self::new()
279    }
280}
281
282/// A stream of events keyed by a specific field
283pub struct KeyedStream<K>
284where
285    K: std::hash::Hash + Eq,
286{
287    keyed_events: HashMap<K, Vec<StreamEvent>>,
288}
289
290impl<K> KeyedStream<K>
291where
292    K: std::hash::Hash + Eq + Clone,
293{
294    /// Reduce events within each key
295    pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
296    where
297        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
298    {
299        self.keyed_events
300            .into_iter()
301            .filter_map(|(key, events)| {
302                events
303                    .into_iter()
304                    .reduce(&reducer)
305                    .map(|result| (key, result))
306            })
307            .collect()
308    }
309
310    /// Apply aggregation to each key group
311    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
312    where
313        A: Aggregation + Clone,
314    {
315        self.keyed_events
316            .into_iter()
317            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
318            .collect()
319    }
320
321    /// Apply a window to each key group
322    pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
323        KeyedWindowedStream {
324            keyed_events: self.keyed_events,
325            config,
326        }
327    }
328
329    /// Count events per key
330    pub fn count(self) -> HashMap<K, usize> {
331        self.keyed_events
332            .into_iter()
333            .map(|(key, events)| (key, events.len()))
334            .collect()
335    }
336
337    /// Get all keys
338    pub fn keys(&self) -> Vec<K> {
339        self.keyed_events.keys().cloned().collect()
340    }
341
342    /// Flatten back to a regular stream
343    pub fn flatten(self) -> DataStream {
344        let events: Vec<StreamEvent> = self
345            .keyed_events
346            .into_iter()
347            .flat_map(|(_, events)| events)
348            .collect();
349
350        DataStream { events }
351    }
352}
353
354/// Window configuration for stream processing
355#[derive(Debug, Clone)]
356pub struct WindowConfig {
357    pub window_type: WindowType,
358    pub duration: Duration,
359    pub max_events: usize,
360}
361
362impl WindowConfig {
363    /// Create a sliding window configuration
364    pub fn sliding(duration: Duration) -> Self {
365        Self {
366            window_type: WindowType::Sliding,
367            duration,
368            max_events: 10000,
369        }
370    }
371
372    /// Create a tumbling window configuration
373    pub fn tumbling(duration: Duration) -> Self {
374        Self {
375            window_type: WindowType::Tumbling,
376            duration,
377            max_events: 10000,
378        }
379    }
380
381    /// Create a session window configuration
382    pub fn session(timeout: Duration) -> Self {
383        Self {
384            window_type: WindowType::Session { timeout },
385            duration: timeout,
386            max_events: 10000,
387        }
388    }
389
390    /// Set maximum events per window
391    pub fn with_max_events(mut self, max_events: usize) -> Self {
392        self.max_events = max_events;
393        self
394    }
395}
396
397/// A stream with windowing applied
398pub struct WindowedStream {
399    windows: Vec<TimeWindow>,
400}
401
402impl WindowedStream {
403    /// Create a new windowed stream
404    pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
405        let mut windows = Vec::new();
406
407        if events.is_empty() {
408            return Self { windows };
409        }
410
411        // Group events into windows based on configuration
412        match config.window_type {
413            WindowType::Tumbling => {
414                // Calculate window boundaries
415                let window_ms = config.duration.as_millis() as u64;
416                let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
417
418                for event in events {
419                    let window_start = (event.metadata.timestamp / window_ms) * window_ms;
420                    window_map.entry(window_start).or_default().push(event);
421                }
422
423                // Create windows
424                for (start_time, mut window_events) in window_map {
425                    let mut window = TimeWindow::new(
426                        config.window_type.clone(),
427                        config.duration,
428                        start_time,
429                        config.max_events,
430                    );
431
432                    for event in window_events.drain(..) {
433                        window.add_event(event);
434                    }
435
436                    windows.push(window);
437                }
438            }
439            WindowType::Sliding | WindowType::Session { .. } => {
440                // For sliding windows, create overlapping windows
441                // Simplified implementation: create one window per unique timestamp
442                let window_ms = config.duration.as_millis() as u64;
443
444                if !events.is_empty() {
445                    let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
446                    let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
447
448                    let mut current_start = min_time;
449
450                    while current_start <= max_time {
451                        let mut window = TimeWindow::new(
452                            config.window_type.clone(),
453                            config.duration,
454                            current_start,
455                            config.max_events,
456                        );
457
458                        for event in &events {
459                            if event.metadata.timestamp >= current_start
460                                && event.metadata.timestamp < current_start + window_ms
461                            {
462                                window.add_event(event.clone());
463                            }
464                        }
465
466                        if window.count() > 0 {
467                            windows.push(window);
468                        }
469
470                        // Slide forward (overlap 50%)
471                        current_start += window_ms / 2;
472                    }
473                }
474            }
475        }
476
477        Self { windows }
478    }
479
480    /// Apply aggregation to each window
481    pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
482    where
483        A: Aggregation,
484    {
485        self.windows
486            .iter()
487            .map(|window| {
488                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
489                aggregator.aggregate(&events)
490            })
491            .collect()
492    }
493
494    /// Reduce events within each window
495    pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
496    where
497        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
498    {
499        self.windows
500            .into_iter()
501            .filter_map(|window| {
502                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
503                events.into_iter().reduce(&reducer)
504            })
505            .collect()
506    }
507
508    /// Get all windows
509    pub fn windows(&self) -> &[TimeWindow] {
510        &self.windows
511    }
512
513    /// Count events in each window
514    pub fn counts(self) -> Vec<usize> {
515        self.windows.iter().map(|w| w.count()).collect()
516    }
517
518    /// Flatten all windows back into a stream
519    pub fn flatten(self) -> DataStream {
520        let events: Vec<StreamEvent> = self
521            .windows
522            .into_iter()
523            .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
524            .collect();
525
526        DataStream { events }
527    }
528}
529
530/// Keyed stream with windowing
531pub struct KeyedWindowedStream<K>
532where
533    K: std::hash::Hash + Eq,
534{
535    keyed_events: HashMap<K, Vec<StreamEvent>>,
536    config: WindowConfig,
537}
538
539impl<K> KeyedWindowedStream<K>
540where
541    K: std::hash::Hash + Eq + Clone,
542{
543    /// Apply aggregation to each key's window
544    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
545    where
546        A: Aggregation + Clone,
547    {
548        self.keyed_events
549            .into_iter()
550            .map(|(key, events)| {
551                let windowed = WindowedStream::new(events, self.config.clone());
552                let results = windowed.aggregate(aggregator.clone());
553                (key, results)
554            })
555            .collect()
556    }
557
558    /// Reduce events within each key's window
559    pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
560    where
561        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
562    {
563        self.keyed_events
564            .into_iter()
565            .map(|(key, events)| {
566                let windowed = WindowedStream::new(events, self.config.clone());
567                let results = windowed.reduce(reducer.clone());
568                (key, results)
569            })
570            .collect()
571    }
572}
573
574/// Grouped stream for aggregations
575pub struct GroupedStream<K>
576where
577    K: std::hash::Hash + Eq,
578{
579    groups: HashMap<K, Vec<StreamEvent>>,
580}
581
582impl<K> GroupedStream<K>
583where
584    K: std::hash::Hash + Eq + Clone,
585{
586    /// Apply aggregation to each group
587    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
588    where
589        A: Aggregation + Clone,
590    {
591        self.groups
592            .into_iter()
593            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
594            .collect()
595    }
596
597    /// Count events in each group
598    pub fn count(self) -> HashMap<K, usize> {
599        self.groups
600            .into_iter()
601            .map(|(key, events)| (key, events.len()))
602            .collect()
603    }
604
605    /// Get the first event in each group
606    pub fn first(self) -> HashMap<K, StreamEvent> {
607        self.groups
608            .into_iter()
609            .filter_map(|(key, mut events)| {
610                if !events.is_empty() {
611                    Some((key, events.remove(0)))
612                } else {
613                    None
614                }
615            })
616            .collect()
617    }
618
619    /// Get the last event in each group
620    pub fn last(self) -> HashMap<K, StreamEvent> {
621        self.groups
622            .into_iter()
623            .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
624            .collect()
625    }
626}
627
628/// Trait for aggregation functions
629pub trait Aggregation: Send + Sync {
630    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
631}
632
633/// Result of an aggregation operation
634#[derive(Debug, Clone)]
635pub enum AggregateResult {
636    Number(f64),
637    String(String),
638    Map(HashMap<String, Value>),
639    List(Vec<Value>),
640    None,
641}
642
643impl AggregateResult {
644    pub fn as_number(&self) -> Option<f64> {
645        match self {
646            AggregateResult::Number(n) => Some(*n),
647            _ => None,
648        }
649    }
650
651    pub fn as_string(&self) -> Option<&str> {
652        match self {
653            AggregateResult::String(s) => Some(s),
654            _ => None,
655        }
656    }
657
658    pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
659        match self {
660            AggregateResult::Map(m) => Some(m),
661            _ => None,
662        }
663    }
664}
665
666// Built-in aggregators
667
668/// Count aggregator
669#[derive(Clone)]
670pub struct Count;
671
672impl Aggregation for Count {
673    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
674        AggregateResult::Number(events.len() as f64)
675    }
676}
677
678/// Sum aggregator
679#[derive(Clone)]
680pub struct Sum {
681    pub field: String,
682}
683
684impl Sum {
685    pub fn new(field: impl Into<String>) -> Self {
686        Self {
687            field: field.into(),
688        }
689    }
690}
691
692impl Aggregation for Sum {
693    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
694        let sum: f64 = events
695            .iter()
696            .filter_map(|e| e.get_numeric(&self.field))
697            .sum();
698        AggregateResult::Number(sum)
699    }
700}
701
702/// Average aggregator
703#[derive(Clone)]
704pub struct Average {
705    pub field: String,
706}
707
708impl Average {
709    pub fn new(field: impl Into<String>) -> Self {
710        Self {
711            field: field.into(),
712        }
713    }
714}
715
716impl Aggregation for Average {
717    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
718        let values: Vec<f64> = events
719            .iter()
720            .filter_map(|e| e.get_numeric(&self.field))
721            .collect();
722
723        if values.is_empty() {
724            AggregateResult::None
725        } else {
726            let avg = values.iter().sum::<f64>() / values.len() as f64;
727            AggregateResult::Number(avg)
728        }
729    }
730}
731
732/// Min aggregator
733#[derive(Clone)]
734pub struct Min {
735    pub field: String,
736}
737
738impl Min {
739    pub fn new(field: impl Into<String>) -> Self {
740        Self {
741            field: field.into(),
742        }
743    }
744}
745
746impl Aggregation for Min {
747    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
748        events
749            .iter()
750            .filter_map(|e| e.get_numeric(&self.field))
751            .min_by(|a, b| a.partial_cmp(b).unwrap())
752            .map(AggregateResult::Number)
753            .unwrap_or(AggregateResult::None)
754    }
755}
756
757/// Max aggregator
758#[derive(Clone)]
759pub struct Max {
760    pub field: String,
761}
762
763impl Max {
764    pub fn new(field: impl Into<String>) -> Self {
765        Self {
766            field: field.into(),
767        }
768    }
769}
770
771impl Aggregation for Max {
772    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
773        events
774            .iter()
775            .filter_map(|e| e.get_numeric(&self.field))
776            .max_by(|a, b| a.partial_cmp(b).unwrap())
777            .map(AggregateResult::Number)
778            .unwrap_or(AggregateResult::None)
779    }
780}
781
782/// Custom aggregator using a closure
783pub struct CustomAggregator<F>
784where
785    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
786{
787    func: Arc<F>,
788}
789
790impl<F> CustomAggregator<F>
791where
792    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
793{
794    pub fn new(func: F) -> Self {
795        Self {
796            func: Arc::new(func),
797        }
798    }
799}
800
801impl<F> Clone for CustomAggregator<F>
802where
803    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
804{
805    fn clone(&self) -> Self {
806        Self {
807            func: Arc::clone(&self.func),
808        }
809    }
810}
811
812impl<F> Aggregation for CustomAggregator<F>
813where
814    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
815{
816    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
817        (self.func)(events)
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use super::*;
824    use crate::types::Value;
825    use std::collections::HashMap;
826
827    fn create_test_events(count: usize) -> Vec<StreamEvent> {
828        (0..count)
829            .map(|i| {
830                let mut data = HashMap::new();
831                data.insert("value".to_string(), Value::Number(i as f64));
832                data.insert(
833                    "user_id".to_string(),
834                    Value::String(format!("user_{}", i % 3)),
835                );
836                StreamEvent::new("TestEvent", data, "test")
837            })
838            .collect()
839    }
840
841    #[test]
842    fn test_filter_operator() {
843        let events = create_test_events(10);
844        let stream = DataStream::from_events(events);
845
846        let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
847
848        assert_eq!(filtered.len(), 4); // 6, 7, 8, 9
849    }
850
851    #[test]
852    fn test_map_operator() {
853        let events = create_test_events(5);
854        let stream = DataStream::from_events(events);
855
856        let mapped = stream.map(|mut e| {
857            if let Some(value) = e.get_numeric("value") {
858                e.data
859                    .insert("doubled".to_string(), Value::Number(value * 2.0));
860            }
861            e
862        });
863
864        let collected = mapped.collect();
865        assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
866        assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
867    }
868
869    #[test]
870    fn test_key_by_operator() {
871        let events = create_test_events(9);
872        let stream = DataStream::from_events(events);
873
874        let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
875
876        let counts = keyed.count();
877        assert_eq!(counts.len(), 3); // 3 unique users
878        assert_eq!(*counts.get("user_0").unwrap(), 3);
879        assert_eq!(*counts.get("user_1").unwrap(), 3);
880        assert_eq!(*counts.get("user_2").unwrap(), 3);
881    }
882
883    #[test]
884    fn test_reduce_operator() {
885        let events = create_test_events(5);
886        let stream = DataStream::from_events(events);
887
888        let result = stream.reduce(|mut acc, e| {
889            let acc_val = acc.get_numeric("value").unwrap_or(0.0);
890            let e_val = e.get_numeric("value").unwrap_or(0.0);
891            acc.data
892                .insert("value".to_string(), Value::Number(acc_val + e_val));
893            acc
894        });
895
896        assert!(result.is_some());
897        assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); // 0+1+2+3+4
898    }
899
900    #[test]
901    fn test_count_aggregator() {
902        let events = create_test_events(10);
903        let result = Count.aggregate(&events);
904
905        assert_eq!(result.as_number(), Some(10.0));
906    }
907
908    #[test]
909    fn test_sum_aggregator() {
910        let events = create_test_events(5);
911        let result = Sum::new("value").aggregate(&events);
912
913        assert_eq!(result.as_number(), Some(10.0)); // 0+1+2+3+4
914    }
915
916    #[test]
917    fn test_average_aggregator() {
918        let events = create_test_events(5);
919        let result = Average::new("value").aggregate(&events);
920
921        assert_eq!(result.as_number(), Some(2.0)); // (0+1+2+3+4)/5
922    }
923
924    #[test]
925    fn test_group_by() {
926        let events = create_test_events(9);
927        let stream = DataStream::from_events(events);
928
929        let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
930
931        let counts = grouped.count();
932        assert_eq!(counts.len(), 3);
933    }
934
935    #[test]
936    fn test_chaining_operators() {
937        let events = create_test_events(20);
938        let stream = DataStream::from_events(events);
939
940        let result = stream
941            .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
942            .map(|mut e| {
943                if let Some(v) = e.get_numeric("value") {
944                    e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
945                }
946                e
947            })
948            .take(5)
949            .collect();
950
951        assert_eq!(result.len(), 5);
952        assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); // 5 * 2
953    }
954
955    #[test]
956    fn test_windowed_stream() {
957        let events = create_test_events(10);
958        let stream = DataStream::from_events(events);
959
960        let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
961
962        assert!(!windowed.windows().is_empty());
963    }
964}