rust_rule_engine/streaming/
aggregator.rs

1//! Stream Aggregation Functions
2//!
3//! Provides various aggregation operations for streaming data analysis.
4
5use crate::streaming::event::StreamEvent;
6use crate::streaming::window::TimeWindow;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10/// Type of aggregation to perform
11#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12pub enum AggregationType {
13    /// Count number of events
14    Count,
15    /// Sum numeric values
16    Sum { field: String },
17    /// Calculate average
18    Average { field: String },
19    /// Find minimum value
20    Min { field: String },
21    /// Find maximum value
22    Max { field: String },
23    /// Count distinct values
24    CountDistinct { field: String },
25    /// Calculate standard deviation
26    StdDev { field: String },
27    /// Calculate percentile
28    Percentile { field: String, percentile: f64 },
29    /// First event in window
30    First,
31    /// Last event in window
32    Last,
33    /// Count by category
34    CountBy { field: String },
35}
36
37/// Result of an aggregation operation
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum AggregationResult {
40    /// Numeric result
41    Number(f64),
42    /// String result
43    Text(String),
44    /// Boolean result
45    Boolean(bool),
46    /// Map of category counts
47    CountMap(HashMap<String, usize>),
48    /// No result (empty data)
49    None,
50}
51
52impl AggregationResult {
53    /// Convert to numeric value if possible
54    pub fn as_number(&self) -> Option<f64> {
55        match self {
56            AggregationResult::Number(n) => Some(*n),
57            _ => None,
58        }
59    }
60
61    /// Convert to string if possible
62    pub fn as_string(&self) -> Option<&str> {
63        match self {
64            AggregationResult::Text(s) => Some(s),
65            _ => None,
66        }
67    }
68
69    /// Convert to boolean if possible
70    pub fn as_boolean(&self) -> Option<bool> {
71        match self {
72            AggregationResult::Boolean(b) => Some(*b),
73            _ => None,
74        }
75    }
76}
77
78/// Aggregator for performing calculations on event streams
79#[derive(Debug)]
80pub struct Aggregator {
81    /// Type of aggregation
82    aggregation_type: AggregationType,
83    /// Field to aggregate on (if applicable)
84    field: Option<String>,
85}
86
87impl Aggregator {
88    /// Create a new aggregator
89    pub fn new(aggregation_type: AggregationType) -> Self {
90        let field = match &aggregation_type {
91            AggregationType::Sum { field }
92            | AggregationType::Average { field }
93            | AggregationType::Min { field }
94            | AggregationType::Max { field }
95            | AggregationType::CountDistinct { field }
96            | AggregationType::StdDev { field }
97            | AggregationType::Percentile { field, .. }
98            | AggregationType::CountBy { field } => Some(field.clone()),
99            _ => None,
100        };
101
102        Self {
103            aggregation_type,
104            field,
105        }
106    }
107
108    /// Perform aggregation on a time window
109    pub fn aggregate(&self, window: &TimeWindow) -> AggregationResult {
110        let events = window.events();
111
112        match &self.aggregation_type {
113            AggregationType::Count => AggregationResult::Number(events.len() as f64),
114
115            AggregationType::Sum { field } => {
116                let sum = window.sum(field);
117                AggregationResult::Number(sum)
118            }
119
120            AggregationType::Average { field } => match window.average(field) {
121                Some(avg) => AggregationResult::Number(avg),
122                None => AggregationResult::None,
123            },
124
125            AggregationType::Min { field } => match window.min(field) {
126                Some(min) => AggregationResult::Number(min),
127                None => AggregationResult::None,
128            },
129
130            AggregationType::Max { field } => match window.max(field) {
131                Some(max) => AggregationResult::Number(max),
132                None => AggregationResult::None,
133            },
134
135            AggregationType::CountDistinct { field } => {
136                let distinct_count = self.count_distinct_values(events, field);
137                AggregationResult::Number(distinct_count as f64)
138            }
139
140            AggregationType::StdDev { field } => {
141                let std_dev = self.calculate_std_dev(events, field);
142                match std_dev {
143                    Some(sd) => AggregationResult::Number(sd),
144                    None => AggregationResult::None,
145                }
146            }
147
148            AggregationType::Percentile { field, percentile } => {
149                let value = self.calculate_percentile(events, field, *percentile);
150                match value {
151                    Some(v) => AggregationResult::Number(v),
152                    None => AggregationResult::None,
153                }
154            }
155
156            AggregationType::First => match events.front() {
157                Some(event) => AggregationResult::Text(event.id.clone()),
158                None => AggregationResult::None,
159            },
160
161            AggregationType::Last => match events.back() {
162                Some(event) => AggregationResult::Text(event.id.clone()),
163                None => AggregationResult::None,
164            },
165
166            AggregationType::CountBy { field } => {
167                let counts = self.count_by_field(events, field);
168                AggregationResult::CountMap(counts)
169            }
170        }
171    }
172
173    /// Perform aggregation on a slice of events
174    pub fn aggregate_events(&self, events: &[StreamEvent]) -> AggregationResult {
175        match &self.aggregation_type {
176            AggregationType::Count => AggregationResult::Number(events.len() as f64),
177
178            AggregationType::Sum { field } => {
179                let sum: f64 = events.iter().filter_map(|e| e.get_numeric(field)).sum();
180                AggregationResult::Number(sum)
181            }
182
183            AggregationType::Average { field } => {
184                let values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
185
186                if values.is_empty() {
187                    AggregationResult::None
188                } else {
189                    let avg = values.iter().sum::<f64>() / values.len() as f64;
190                    AggregationResult::Number(avg)
191                }
192            }
193
194            _ => {
195                // For other types, create a temporary window
196                // This is less efficient but provides compatibility
197                AggregationResult::None
198            }
199        }
200    }
201
202    /// Count distinct values in a field
203    fn count_distinct_values(
204        &self,
205        events: &std::collections::VecDeque<StreamEvent>,
206        field: &str,
207    ) -> usize {
208        let mut seen = std::collections::HashSet::new();
209
210        for event in events {
211            if let Some(value) = event.data.get(field) {
212                seen.insert(format!("{:?}", value));
213            }
214        }
215
216        seen.len()
217    }
218
219    /// Calculate standard deviation
220    fn calculate_std_dev(
221        &self,
222        events: &std::collections::VecDeque<StreamEvent>,
223        field: &str,
224    ) -> Option<f64> {
225        let values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
226
227        if values.len() < 2 {
228            return None;
229        }
230
231        let mean = values.iter().sum::<f64>() / values.len() as f64;
232        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
233
234        Some(variance.sqrt())
235    }
236
237    /// Calculate percentile
238    fn calculate_percentile(
239        &self,
240        events: &std::collections::VecDeque<StreamEvent>,
241        field: &str,
242        percentile: f64,
243    ) -> Option<f64> {
244        let mut values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
245
246        if values.is_empty() {
247            return None;
248        }
249
250        values.sort_by(|a, b| a.partial_cmp(b).unwrap());
251
252        let index = (percentile / 100.0 * (values.len() - 1) as f64).round() as usize;
253        values.get(index).copied()
254    }
255
256    /// Count occurrences by field value
257    fn count_by_field(
258        &self,
259        events: &std::collections::VecDeque<StreamEvent>,
260        field: &str,
261    ) -> HashMap<String, usize> {
262        let mut counts = HashMap::new();
263
264        for event in events {
265            if let Some(value) = event.data.get(field) {
266                let key = match value {
267                    crate::types::Value::String(s) => s.clone(),
268                    crate::types::Value::Number(n) => n.to_string(),
269                    crate::types::Value::Integer(i) => i.to_string(),
270                    crate::types::Value::Boolean(b) => b.to_string(),
271                    _ => format!("{:?}", value),
272                };
273
274                *counts.entry(key).or_insert(0) += 1;
275            }
276        }
277
278        counts
279    }
280}
281
282/// Stream analytics helper for complex aggregations
283#[derive(Debug)]
284pub struct StreamAnalytics {
285    /// Cache of recent calculations
286    cache: HashMap<String, (u64, AggregationResult)>,
287    /// Cache TTL in milliseconds
288    cache_ttl: u64,
289}
290
291impl StreamAnalytics {
292    /// Create new stream analytics instance
293    pub fn new(cache_ttl_ms: u64) -> Self {
294        Self {
295            cache: HashMap::new(),
296            cache_ttl: cache_ttl_ms,
297        }
298    }
299
300    /// Perform cached aggregation
301    pub fn aggregate_cached(
302        &mut self,
303        key: &str,
304        window: &TimeWindow,
305        aggregator: &Aggregator,
306        current_time: u64,
307    ) -> AggregationResult {
308        // Check cache
309        if let Some((timestamp, result)) = self.cache.get(key) {
310            if current_time - timestamp < self.cache_ttl {
311                return result.clone();
312            }
313        }
314
315        // Calculate new result
316        let result = aggregator.aggregate(window);
317        self.cache
318            .insert(key.to_string(), (current_time, result.clone()));
319
320        // Clean old cache entries
321        self.cache
322            .retain(|_, (timestamp, _)| current_time - *timestamp < self.cache_ttl);
323
324        result
325    }
326
327    /// Calculate moving average over multiple windows
328    pub fn moving_average(
329        &self,
330        windows: &[TimeWindow],
331        field: &str,
332        window_count: usize,
333    ) -> Option<f64> {
334        if windows.is_empty() {
335            return None;
336        }
337
338        let recent_windows = if windows.len() > window_count {
339            &windows[windows.len() - window_count..]
340        } else {
341            windows
342        };
343
344        let total_sum: f64 = recent_windows.iter().map(|w| w.sum(field)).sum();
345
346        let total_count: usize = recent_windows.iter().map(|w| w.count()).sum();
347
348        if total_count == 0 {
349            None
350        } else {
351            Some(total_sum / total_count as f64)
352        }
353    }
354
355    /// Detect anomalies using z-score
356    pub fn detect_anomalies(
357        &self,
358        windows: &[TimeWindow],
359        field: &str,
360        threshold: f64,
361    ) -> Vec<String> {
362        if windows.len() < 3 {
363            return Vec::new();
364        }
365
366        // Calculate baseline statistics from historical windows
367        let historical_windows = &windows[..windows.len() - 1];
368        let values: Vec<f64> = historical_windows
369            .iter()
370            .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
371            .collect();
372
373        if values.len() < 10 {
374            return Vec::new();
375        }
376
377        let mean = values.iter().sum::<f64>() / values.len() as f64;
378        let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
379        let std_dev = variance.sqrt();
380
381        // Check current window for anomalies
382        let current_window = windows.last().unwrap();
383        let mut anomalies = Vec::new();
384
385        for event in current_window.events() {
386            if let Some(value) = event.get_numeric(field) {
387                let z_score = (value - mean) / std_dev;
388                if z_score.abs() > threshold {
389                    anomalies.push(event.id.clone());
390                }
391            }
392        }
393
394        anomalies
395    }
396
397    /// Calculate trend direction
398    pub fn calculate_trend(&self, windows: &[TimeWindow], field: &str) -> TrendDirection {
399        if windows.len() < 2 {
400            return TrendDirection::Stable;
401        }
402
403        let averages: Vec<f64> = windows.iter().filter_map(|w| w.average(field)).collect();
404
405        if averages.len() < 2 {
406            return TrendDirection::Stable;
407        }
408
409        let first_half = &averages[..averages.len() / 2];
410        let second_half = &averages[averages.len() / 2..];
411
412        let first_avg = first_half.iter().sum::<f64>() / first_half.len() as f64;
413        let second_avg = second_half.iter().sum::<f64>() / second_half.len() as f64;
414
415        let change_percent = ((second_avg - first_avg) / first_avg) * 100.0;
416
417        if change_percent > 5.0 {
418            TrendDirection::Increasing
419        } else if change_percent < -5.0 {
420            TrendDirection::Decreasing
421        } else {
422            TrendDirection::Stable
423        }
424    }
425}
426
427/// Direction of trend analysis
428#[derive(Debug, Clone, PartialEq)]
429pub enum TrendDirection {
430    /// Values are increasing
431    Increasing,
432    /// Values are decreasing
433    Decreasing,
434    /// Values are stable
435    Stable,
436}
437
438#[cfg(test)]
439mod tests {
440    use super::*;
441    use crate::streaming::event::StreamEvent;
442    use crate::types::Value;
443    use std::collections::HashMap;
444
445    #[test]
446    fn test_count_aggregation() {
447        let aggregator = Aggregator::new(AggregationType::Count);
448        let events = create_test_events(5);
449
450        let result = aggregator.aggregate_events(&events);
451        assert_eq!(result.as_number(), Some(5.0));
452    }
453
454    #[test]
455    fn test_sum_aggregation() {
456        let aggregator = Aggregator::new(AggregationType::Sum {
457            field: "value".to_string(),
458        });
459        let events = create_test_events(5);
460
461        let result = aggregator.aggregate_events(&events);
462        assert_eq!(result.as_number(), Some(10.0)); // 0+1+2+3+4
463    }
464
465    #[test]
466    fn test_average_aggregation() {
467        let aggregator = Aggregator::new(AggregationType::Average {
468            field: "value".to_string(),
469        });
470        let events = create_test_events(5);
471
472        let result = aggregator.aggregate_events(&events);
473        assert_eq!(result.as_number(), Some(2.0));
474    }
475
476    fn create_test_events(count: usize) -> Vec<StreamEvent> {
477        (0..count)
478            .map(|i| {
479                let mut data = HashMap::new();
480                data.insert("value".to_string(), Value::Number(i as f64));
481                StreamEvent::new("TestEvent", data, "test")
482            })
483            .collect()
484    }
485}