rust_rule_engine/streaming/
operators.rs

1//! Stream Operators - Fluent API for Stream Processing
2//!
3//! This module provides a fluent, composable API for building stream processing pipelines.
4//! Inspired by Apache Flink, Kafka Streams, and Rust iterators.
5//!
6//! ## Example
7//!
8//! ```rust,ignore
9//! use rust_rule_engine::streaming::*;
10//!
11//! let result = DataStream::from_events(events)
12//!     .filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
13//!     .map(|e| enhance_event(e))
14//!     .key_by(|e| e.get_string("user_id").unwrap_or("unknown").to_string())
15//!     .window(WindowConfig::sliding(Duration::from_secs(60)))
16//!     .reduce(|acc, e| {
17//!         let sum = acc.get_numeric("total").unwrap_or(0.0);
18//!         let amount = e.get_numeric("amount").unwrap_or(0.0);
19//!         acc.data.insert("total".to_string(), Value::Number(sum + amount));
20//!         acc
21//!     })
22//!     .collect();
23//! ```
24
25use crate::streaming::event::StreamEvent;
26use crate::streaming::window::{TimeWindow, WindowType};
27use crate::types::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::Duration;
31
32/// A stream of events with chainable operators
33#[derive(Clone)]
34pub struct DataStream {
35    events: Vec<StreamEvent>,
36}
37
38impl DataStream {
39    /// Create a new data stream from events
40    pub fn from_events(events: Vec<StreamEvent>) -> Self {
41        Self { events }
42    }
43
44    /// Create an empty data stream
45    pub fn new() -> Self {
46        Self { events: Vec::new() }
47    }
48
49    /// Add an event to the stream
50    pub fn push(&mut self, event: StreamEvent) {
51        self.events.push(event);
52    }
53
54    /// Get the number of events in the stream
55    pub fn len(&self) -> usize {
56        self.events.len()
57    }
58
59    /// Check if stream is empty
60    pub fn is_empty(&self) -> bool {
61        self.events.is_empty()
62    }
63
64    /// Filter events based on a predicate
65    ///
66    /// # Example
67    /// ```rust,ignore
68    /// stream.filter(|e| e.get_numeric("amount").unwrap_or(0.0) > 100.0)
69    /// ```
70    pub fn filter<F>(self, predicate: F) -> Self
71    where
72        F: Fn(&StreamEvent) -> bool,
73    {
74        let filtered_events = self.events.into_iter().filter(predicate).collect();
75        Self {
76            events: filtered_events,
77        }
78    }
79
80    /// Transform each event using a mapping function
81    ///
82    /// # Example
83    /// ```rust,ignore
84    /// stream.map(|mut e| {
85    ///     e.add_tag("processed", "true");
86    ///     e
87    /// })
88    /// ```
89    pub fn map<F>(self, mapper: F) -> Self
90    where
91        F: Fn(StreamEvent) -> StreamEvent,
92    {
93        let mapped_events = self.events.into_iter().map(mapper).collect();
94        Self {
95            events: mapped_events,
96        }
97    }
98
99    /// Transform each event into multiple events (flatMap)
100    ///
101    /// # Example
102    /// ```rust,ignore
103    /// stream.flat_map(|e| {
104    ///     // Split event into multiple events
105    ///     vec![e.clone(), e]
106    /// })
107    /// ```
108    pub fn flat_map<F>(self, mapper: F) -> Self
109    where
110        F: Fn(StreamEvent) -> Vec<StreamEvent>,
111    {
112        let flat_mapped_events = self
113            .events
114            .into_iter()
115            .flat_map(mapper)
116            .collect();
117        Self {
118            events: flat_mapped_events,
119        }
120    }
121
122    /// Key events by a specific field or function
123    ///
124    /// # Example
125    /// ```rust,ignore
126    /// stream.key_by(|e| e.get_string("user_id").unwrap_or("default").to_string())
127    /// ```
128    pub fn key_by<F, K>(self, key_selector: F) -> KeyedStream<K>
129    where
130        F: Fn(&StreamEvent) -> K,
131        K: std::hash::Hash + Eq + Clone,
132    {
133        let mut keyed_events: HashMap<K, Vec<StreamEvent>> = HashMap::new();
134
135        for event in self.events {
136            let key = key_selector(&event);
137            keyed_events.entry(key).or_insert_with(Vec::new).push(event);
138        }
139
140        KeyedStream { keyed_events }
141    }
142
143    /// Apply a window to the stream
144    ///
145    /// # Example
146    /// ```rust,ignore
147    /// stream.window(WindowConfig::sliding(Duration::from_secs(60)))
148    /// ```
149    pub fn window(self, config: WindowConfig) -> WindowedStream {
150        WindowedStream::new(self.events, config)
151    }
152
153    /// Reduce events to a single result
154    ///
155    /// # Example
156    /// ```rust,ignore
157    /// stream.reduce(|acc, e| {
158    ///     // Accumulate values
159    ///     acc
160    /// })
161    /// ```
162    pub fn reduce<F>(self, reducer: F) -> Option<StreamEvent>
163    where
164        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
165    {
166        self.events.into_iter().reduce(reducer)
167    }
168
169    /// Count the number of events
170    pub fn count(self) -> usize {
171        self.events.len()
172    }
173
174    /// Collect events into a vector
175    pub fn collect(self) -> Vec<StreamEvent> {
176        self.events
177    }
178
179    /// Take only the first n events
180    pub fn take(self, n: usize) -> Self {
181        Self {
182            events: self.events.into_iter().take(n).collect(),
183        }
184    }
185
186    /// Skip the first n events
187    pub fn skip(self, n: usize) -> Self {
188        Self {
189            events: self.events.into_iter().skip(n).collect(),
190        }
191    }
192
193    /// Process each event with a side effect (doesn't modify the stream)
194    ///
195    /// # Example
196    /// ```rust,ignore
197    /// stream.for_each(|e| {
198    ///     println!("Processing: {:?}", e);
199    /// })
200    /// ```
201    pub fn for_each<F>(self, action: F) -> Self
202    where
203        F: Fn(&StreamEvent),
204    {
205        for event in &self.events {
206            action(event);
207        }
208        self
209    }
210
211    /// Union with another stream
212    pub fn union(mut self, other: DataStream) -> Self {
213        self.events.extend(other.events);
214        Self {
215            events: self.events,
216        }
217    }
218
219    /// Find events matching a pattern
220    pub fn find<F>(self, predicate: F) -> Option<StreamEvent>
221    where
222        F: Fn(&StreamEvent) -> bool,
223    {
224        self.events.into_iter().find(predicate)
225    }
226
227    /// Check if any event matches the predicate
228    pub fn any<F>(&self, predicate: F) -> bool
229    where
230        F: Fn(&StreamEvent) -> bool,
231    {
232        self.events.iter().any(predicate)
233    }
234
235    /// Check if all events match the predicate
236    pub fn all<F>(&self, predicate: F) -> bool
237    where
238        F: Fn(&StreamEvent) -> bool,
239    {
240        self.events.iter().all(predicate)
241    }
242
243    /// Sort events by a key function
244    pub fn sort_by<F, K>(mut self, key_fn: F) -> Self
245    where
246        F: Fn(&StreamEvent) -> K,
247        K: Ord,
248    {
249        self.events.sort_by_key(key_fn);
250        Self {
251            events: self.events,
252        }
253    }
254
255    /// Group events by a key and apply aggregation
256    pub fn group_by<F, K>(self, key_selector: F) -> GroupedStream<K>
257    where
258        F: Fn(&StreamEvent) -> K,
259        K: std::hash::Hash + Eq + Clone,
260    {
261        let mut grouped: HashMap<K, Vec<StreamEvent>> = HashMap::new();
262
263        for event in self.events {
264            let key = key_selector(&event);
265            grouped.entry(key).or_insert_with(Vec::new).push(event);
266        }
267
268        GroupedStream { groups: grouped }
269    }
270
271    /// Apply an aggregation function
272    pub fn aggregate<A>(self, aggregator: A) -> AggregateResult
273    where
274        A: Aggregation,
275    {
276        aggregator.aggregate(&self.events)
277    }
278}
279
280impl Default for DataStream {
281    fn default() -> Self {
282        Self::new()
283    }
284}
285
286/// A stream of events keyed by a specific field
287pub struct KeyedStream<K>
288where
289    K: std::hash::Hash + Eq,
290{
291    keyed_events: HashMap<K, Vec<StreamEvent>>,
292}
293
294impl<K> KeyedStream<K>
295where
296    K: std::hash::Hash + Eq + Clone,
297{
298    /// Reduce events within each key
299    pub fn reduce<F>(self, reducer: F) -> HashMap<K, StreamEvent>
300    where
301        F: Fn(StreamEvent, StreamEvent) -> StreamEvent,
302    {
303        self.keyed_events
304            .into_iter()
305            .filter_map(|(key, events)| {
306                events
307                    .into_iter()
308                    .reduce(|acc, e| reducer(acc, e))
309                    .map(|result| (key, result))
310            })
311            .collect()
312    }
313
314    /// Apply aggregation to each key group
315    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
316    where
317        A: Aggregation + Clone,
318    {
319        self.keyed_events
320            .into_iter()
321            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
322            .collect()
323    }
324
325    /// Apply a window to each key group
326    pub fn window(self, config: WindowConfig) -> KeyedWindowedStream<K> {
327        KeyedWindowedStream {
328            keyed_events: self.keyed_events,
329            config,
330        }
331    }
332
333    /// Count events per key
334    pub fn count(self) -> HashMap<K, usize> {
335        self.keyed_events
336            .into_iter()
337            .map(|(key, events)| (key, events.len()))
338            .collect()
339    }
340
341    /// Get all keys
342    pub fn keys(&self) -> Vec<K> {
343        self.keyed_events.keys().cloned().collect()
344    }
345
346    /// Flatten back to a regular stream
347    pub fn flatten(self) -> DataStream {
348        let events: Vec<StreamEvent> = self
349            .keyed_events
350            .into_iter()
351            .flat_map(|(_, events)| events)
352            .collect();
353
354        DataStream { events }
355    }
356}
357
358/// Window configuration for stream processing
359#[derive(Debug, Clone)]
360pub struct WindowConfig {
361    pub window_type: WindowType,
362    pub duration: Duration,
363    pub max_events: usize,
364}
365
366impl WindowConfig {
367    /// Create a sliding window configuration
368    pub fn sliding(duration: Duration) -> Self {
369        Self {
370            window_type: WindowType::Sliding,
371            duration,
372            max_events: 10000,
373        }
374    }
375
376    /// Create a tumbling window configuration
377    pub fn tumbling(duration: Duration) -> Self {
378        Self {
379            window_type: WindowType::Tumbling,
380            duration,
381            max_events: 10000,
382        }
383    }
384
385    /// Create a session window configuration
386    pub fn session(timeout: Duration) -> Self {
387        Self {
388            window_type: WindowType::Session { timeout },
389            duration: timeout,
390            max_events: 10000,
391        }
392    }
393
394    /// Set maximum events per window
395    pub fn with_max_events(mut self, max_events: usize) -> Self {
396        self.max_events = max_events;
397        self
398    }
399}
400
401/// A stream with windowing applied
402pub struct WindowedStream {
403    windows: Vec<TimeWindow>,
404}
405
406impl WindowedStream {
407    /// Create a new windowed stream
408    pub fn new(events: Vec<StreamEvent>, config: WindowConfig) -> Self {
409        let mut windows = Vec::new();
410
411        if events.is_empty() {
412            return Self { windows };
413        }
414
415        // Group events into windows based on configuration
416        match config.window_type {
417            WindowType::Tumbling => {
418                // Calculate window boundaries
419                let window_ms = config.duration.as_millis() as u64;
420                let mut window_map: HashMap<u64, Vec<StreamEvent>> = HashMap::new();
421
422                for event in events {
423                    let window_start = (event.metadata.timestamp / window_ms) * window_ms;
424                    window_map
425                        .entry(window_start)
426                        .or_insert_with(Vec::new)
427                        .push(event);
428                }
429
430                // Create windows
431                for (start_time, mut window_events) in window_map {
432                    let mut window = TimeWindow::new(
433                        config.window_type.clone(),
434                        config.duration,
435                        start_time,
436                        config.max_events,
437                    );
438
439                    for event in window_events.drain(..) {
440                        window.add_event(event);
441                    }
442
443                    windows.push(window);
444                }
445            }
446            WindowType::Sliding | WindowType::Session { .. } => {
447                // For sliding windows, create overlapping windows
448                // Simplified implementation: create one window per unique timestamp
449                let window_ms = config.duration.as_millis() as u64;
450
451                if !events.is_empty() {
452                    let min_time = events.iter().map(|e| e.metadata.timestamp).min().unwrap();
453                    let max_time = events.iter().map(|e| e.metadata.timestamp).max().unwrap();
454
455                    let mut current_start = min_time;
456
457                    while current_start <= max_time {
458                        let mut window = TimeWindow::new(
459                            config.window_type.clone(),
460                            config.duration,
461                            current_start,
462                            config.max_events,
463                        );
464
465                        for event in &events {
466                            if event.metadata.timestamp >= current_start
467                                && event.metadata.timestamp < current_start + window_ms
468                            {
469                                window.add_event(event.clone());
470                            }
471                        }
472
473                        if window.count() > 0 {
474                            windows.push(window);
475                        }
476
477                        // Slide forward (overlap 50%)
478                        current_start += window_ms / 2;
479                    }
480                }
481            }
482        }
483
484        Self { windows }
485    }
486
487    /// Apply aggregation to each window
488    pub fn aggregate<A>(self, aggregator: A) -> Vec<AggregateResult>
489    where
490        A: Aggregation,
491    {
492        self.windows
493            .iter()
494            .map(|window| {
495                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
496                aggregator.aggregate(&events)
497            })
498            .collect()
499    }
500
501    /// Reduce events within each window
502    pub fn reduce<F>(self, reducer: F) -> Vec<StreamEvent>
503    where
504        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
505    {
506        self.windows
507            .into_iter()
508            .filter_map(|window| {
509                let events: Vec<StreamEvent> = window.events().iter().cloned().collect();
510                events.into_iter().reduce(|acc, e| reducer(acc, e))
511            })
512            .collect()
513    }
514
515    /// Get all windows
516    pub fn windows(&self) -> &[TimeWindow] {
517        &self.windows
518    }
519
520    /// Count events in each window
521    pub fn counts(self) -> Vec<usize> {
522        self.windows.iter().map(|w| w.count()).collect()
523    }
524
525    /// Flatten all windows back into a stream
526    pub fn flatten(self) -> DataStream {
527        let events: Vec<StreamEvent> = self
528            .windows
529            .into_iter()
530            .flat_map(|w| w.events().iter().cloned().collect::<Vec<_>>())
531            .collect();
532
533        DataStream { events }
534    }
535}
536
537/// Keyed stream with windowing
538pub struct KeyedWindowedStream<K>
539where
540    K: std::hash::Hash + Eq,
541{
542    keyed_events: HashMap<K, Vec<StreamEvent>>,
543    config: WindowConfig,
544}
545
546impl<K> KeyedWindowedStream<K>
547where
548    K: std::hash::Hash + Eq + Clone,
549{
550    /// Apply aggregation to each key's window
551    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, Vec<AggregateResult>>
552    where
553        A: Aggregation + Clone,
554    {
555        self.keyed_events
556            .into_iter()
557            .map(|(key, events)| {
558                let windowed = WindowedStream::new(events, self.config.clone());
559                let results = windowed.aggregate(aggregator.clone());
560                (key, results)
561            })
562            .collect()
563    }
564
565    /// Reduce events within each key's window
566    pub fn reduce<F>(self, reducer: F) -> HashMap<K, Vec<StreamEvent>>
567    where
568        F: Fn(StreamEvent, StreamEvent) -> StreamEvent + Clone,
569    {
570        self.keyed_events
571            .into_iter()
572            .map(|(key, events)| {
573                let windowed = WindowedStream::new(events, self.config.clone());
574                let results = windowed.reduce(reducer.clone());
575                (key, results)
576            })
577            .collect()
578    }
579}
580
581/// Grouped stream for aggregations
582pub struct GroupedStream<K>
583where
584    K: std::hash::Hash + Eq,
585{
586    groups: HashMap<K, Vec<StreamEvent>>,
587}
588
589impl<K> GroupedStream<K>
590where
591    K: std::hash::Hash + Eq + Clone,
592{
593    /// Apply aggregation to each group
594    pub fn aggregate<A>(self, aggregator: A) -> HashMap<K, AggregateResult>
595    where
596        A: Aggregation + Clone,
597    {
598        self.groups
599            .into_iter()
600            .map(|(key, events)| (key, aggregator.clone().aggregate(&events)))
601            .collect()
602    }
603
604    /// Count events in each group
605    pub fn count(self) -> HashMap<K, usize> {
606        self.groups
607            .into_iter()
608            .map(|(key, events)| (key, events.len()))
609            .collect()
610    }
611
612    /// Get the first event in each group
613    pub fn first(self) -> HashMap<K, StreamEvent> {
614        self.groups
615            .into_iter()
616            .filter_map(|(key, mut events)| {
617                if !events.is_empty() {
618                    Some((key, events.remove(0)))
619                } else {
620                    None
621                }
622            })
623            .collect()
624    }
625
626    /// Get the last event in each group
627    pub fn last(self) -> HashMap<K, StreamEvent> {
628        self.groups
629            .into_iter()
630            .filter_map(|(key, mut events)| events.pop().map(|e| (key, e)))
631            .collect()
632    }
633}
634
635/// Trait for aggregation functions
636pub trait Aggregation: Send + Sync {
637    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult;
638}
639
640/// Result of an aggregation operation
641#[derive(Debug, Clone)]
642pub enum AggregateResult {
643    Number(f64),
644    String(String),
645    Map(HashMap<String, Value>),
646    List(Vec<Value>),
647    None,
648}
649
650impl AggregateResult {
651    pub fn as_number(&self) -> Option<f64> {
652        match self {
653            AggregateResult::Number(n) => Some(*n),
654            _ => None,
655        }
656    }
657
658    pub fn as_string(&self) -> Option<&str> {
659        match self {
660            AggregateResult::String(s) => Some(s),
661            _ => None,
662        }
663    }
664
665    pub fn as_map(&self) -> Option<&HashMap<String, Value>> {
666        match self {
667            AggregateResult::Map(m) => Some(m),
668            _ => None,
669        }
670    }
671}
672
673// Built-in aggregators
674
675/// Count aggregator
676#[derive(Clone)]
677pub struct Count;
678
679impl Aggregation for Count {
680    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
681        AggregateResult::Number(events.len() as f64)
682    }
683}
684
685/// Sum aggregator
686#[derive(Clone)]
687pub struct Sum {
688    pub field: String,
689}
690
691impl Sum {
692    pub fn new(field: impl Into<String>) -> Self {
693        Self {
694            field: field.into(),
695        }
696    }
697}
698
699impl Aggregation for Sum {
700    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
701        let sum: f64 = events
702            .iter()
703            .filter_map(|e| e.get_numeric(&self.field))
704            .sum();
705        AggregateResult::Number(sum)
706    }
707}
708
709/// Average aggregator
710#[derive(Clone)]
711pub struct Average {
712    pub field: String,
713}
714
715impl Average {
716    pub fn new(field: impl Into<String>) -> Self {
717        Self {
718            field: field.into(),
719        }
720    }
721}
722
723impl Aggregation for Average {
724    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
725        let values: Vec<f64> = events
726            .iter()
727            .filter_map(|e| e.get_numeric(&self.field))
728            .collect();
729
730        if values.is_empty() {
731            AggregateResult::None
732        } else {
733            let avg = values.iter().sum::<f64>() / values.len() as f64;
734            AggregateResult::Number(avg)
735        }
736    }
737}
738
739/// Min aggregator
740#[derive(Clone)]
741pub struct Min {
742    pub field: String,
743}
744
745impl Min {
746    pub fn new(field: impl Into<String>) -> Self {
747        Self {
748            field: field.into(),
749        }
750    }
751}
752
753impl Aggregation for Min {
754    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
755        events
756            .iter()
757            .filter_map(|e| e.get_numeric(&self.field))
758            .min_by(|a, b| a.partial_cmp(b).unwrap())
759            .map(AggregateResult::Number)
760            .unwrap_or(AggregateResult::None)
761    }
762}
763
764/// Max aggregator
765#[derive(Clone)]
766pub struct Max {
767    pub field: String,
768}
769
770impl Max {
771    pub fn new(field: impl Into<String>) -> Self {
772        Self {
773            field: field.into(),
774        }
775    }
776}
777
778impl Aggregation for Max {
779    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
780        events
781            .iter()
782            .filter_map(|e| e.get_numeric(&self.field))
783            .max_by(|a, b| a.partial_cmp(b).unwrap())
784            .map(AggregateResult::Number)
785            .unwrap_or(AggregateResult::None)
786    }
787}
788
789/// Custom aggregator using a closure
790pub struct CustomAggregator<F>
791where
792    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
793{
794    func: Arc<F>,
795}
796
797impl<F> CustomAggregator<F>
798where
799    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
800{
801    pub fn new(func: F) -> Self {
802        Self {
803            func: Arc::new(func),
804        }
805    }
806}
807
808impl<F> Clone for CustomAggregator<F>
809where
810    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
811{
812    fn clone(&self) -> Self {
813        Self {
814            func: Arc::clone(&self.func),
815        }
816    }
817}
818
819impl<F> Aggregation for CustomAggregator<F>
820where
821    F: Fn(&[StreamEvent]) -> AggregateResult + Send + Sync,
822{
823    fn aggregate(&self, events: &[StreamEvent]) -> AggregateResult {
824        (self.func)(events)
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use crate::types::Value;
832    use std::collections::HashMap;
833
834    fn create_test_events(count: usize) -> Vec<StreamEvent> {
835        (0..count)
836            .map(|i| {
837                let mut data = HashMap::new();
838                data.insert("value".to_string(), Value::Number(i as f64));
839                data.insert("user_id".to_string(), Value::String(format!("user_{}", i % 3)));
840                StreamEvent::new("TestEvent", data, "test")
841            })
842            .collect()
843    }
844
845    #[test]
846    fn test_filter_operator() {
847        let events = create_test_events(10);
848        let stream = DataStream::from_events(events);
849
850        let filtered = stream.filter(|e| e.get_numeric("value").unwrap_or(0.0) > 5.0);
851
852        assert_eq!(filtered.len(), 4); // 6, 7, 8, 9
853    }
854
855    #[test]
856    fn test_map_operator() {
857        let events = create_test_events(5);
858        let stream = DataStream::from_events(events);
859
860        let mapped = stream.map(|mut e| {
861            if let Some(value) = e.get_numeric("value") {
862                e.data.insert("doubled".to_string(), Value::Number(value * 2.0));
863            }
864            e
865        });
866
867        let collected = mapped.collect();
868        assert_eq!(collected[0].get_numeric("doubled"), Some(0.0));
869        assert_eq!(collected[1].get_numeric("doubled"), Some(2.0));
870    }
871
872    #[test]
873    fn test_key_by_operator() {
874        let events = create_test_events(9);
875        let stream = DataStream::from_events(events);
876
877        let keyed = stream.key_by(|e| e.get_string("user_id").unwrap_or("").to_string());
878
879        let counts = keyed.count();
880        assert_eq!(counts.len(), 3); // 3 unique users
881        assert_eq!(*counts.get("user_0").unwrap(), 3);
882        assert_eq!(*counts.get("user_1").unwrap(), 3);
883        assert_eq!(*counts.get("user_2").unwrap(), 3);
884    }
885
886    #[test]
887    fn test_reduce_operator() {
888        let events = create_test_events(5);
889        let stream = DataStream::from_events(events);
890
891        let result = stream.reduce(|mut acc, e| {
892            let acc_val = acc.get_numeric("value").unwrap_or(0.0);
893            let e_val = e.get_numeric("value").unwrap_or(0.0);
894            acc.data.insert("value".to_string(), Value::Number(acc_val + e_val));
895            acc
896        });
897
898        assert!(result.is_some());
899        assert_eq!(result.unwrap().get_numeric("value"), Some(10.0)); // 0+1+2+3+4
900    }
901
902    #[test]
903    fn test_count_aggregator() {
904        let events = create_test_events(10);
905        let result = Count.aggregate(&events);
906
907        assert_eq!(result.as_number(), Some(10.0));
908    }
909
910    #[test]
911    fn test_sum_aggregator() {
912        let events = create_test_events(5);
913        let result = Sum::new("value").aggregate(&events);
914
915        assert_eq!(result.as_number(), Some(10.0)); // 0+1+2+3+4
916    }
917
918    #[test]
919    fn test_average_aggregator() {
920        let events = create_test_events(5);
921        let result = Average::new("value").aggregate(&events);
922
923        assert_eq!(result.as_number(), Some(2.0)); // (0+1+2+3+4)/5
924    }
925
926    #[test]
927    fn test_group_by() {
928        let events = create_test_events(9);
929        let stream = DataStream::from_events(events);
930
931        let grouped = stream.group_by(|e| e.get_string("user_id").unwrap_or("").to_string());
932
933        let counts = grouped.count();
934        assert_eq!(counts.len(), 3);
935    }
936
937    #[test]
938    fn test_chaining_operators() {
939        let events = create_test_events(20);
940        let stream = DataStream::from_events(events);
941
942        let result = stream
943            .filter(|e| e.get_numeric("value").unwrap_or(0.0) >= 5.0)
944            .map(|mut e| {
945                if let Some(v) = e.get_numeric("value") {
946                    e.data.insert("doubled".to_string(), Value::Number(v * 2.0));
947                }
948                e
949            })
950            .take(5)
951            .collect();
952
953        assert_eq!(result.len(), 5);
954        assert_eq!(result[0].get_numeric("doubled"), Some(10.0)); // 5 * 2
955    }
956
957    #[test]
958    fn test_windowed_stream() {
959        let events = create_test_events(10);
960        let stream = DataStream::from_events(events);
961
962        let windowed = stream.window(WindowConfig::tumbling(Duration::from_secs(60)));
963
964        assert!(!windowed.windows().is_empty());
965    }
966}