rust_rule_engine/streaming/
aggregator.rs1use crate::streaming::event::StreamEvent;
6use crate::streaming::window::TimeWindow;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9
10#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
12pub enum AggregationType {
13 Count,
15 Sum { field: String },
17 Average { field: String },
19 Min { field: String },
21 Max { field: String },
23 CountDistinct { field: String },
25 StdDev { field: String },
27 Percentile { field: String, percentile: f64 },
29 First,
31 Last,
33 CountBy { field: String },
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub enum AggregationResult {
40 Number(f64),
42 Text(String),
44 Boolean(bool),
46 CountMap(HashMap<String, usize>),
48 None,
50}
51
52impl AggregationResult {
53 pub fn as_number(&self) -> Option<f64> {
55 match self {
56 AggregationResult::Number(n) => Some(*n),
57 _ => None,
58 }
59 }
60
61 pub fn as_string(&self) -> Option<&str> {
63 match self {
64 AggregationResult::Text(s) => Some(s),
65 _ => None,
66 }
67 }
68
69 pub fn as_boolean(&self) -> Option<bool> {
71 match self {
72 AggregationResult::Boolean(b) => Some(*b),
73 _ => None,
74 }
75 }
76}
77
78#[derive(Debug)]
80pub struct Aggregator {
81 aggregation_type: AggregationType,
83 field: Option<String>,
85}
86
87impl Aggregator {
88 pub fn new(aggregation_type: AggregationType) -> Self {
90 let field = match &aggregation_type {
91 AggregationType::Sum { field }
92 | AggregationType::Average { field }
93 | AggregationType::Min { field }
94 | AggregationType::Max { field }
95 | AggregationType::CountDistinct { field }
96 | AggregationType::StdDev { field }
97 | AggregationType::Percentile { field, .. }
98 | AggregationType::CountBy { field } => Some(field.clone()),
99 _ => None,
100 };
101
102 Self {
103 aggregation_type,
104 field,
105 }
106 }
107
108 pub fn aggregate(&self, window: &TimeWindow) -> AggregationResult {
110 let events = window.events();
111
112 match &self.aggregation_type {
113 AggregationType::Count => AggregationResult::Number(events.len() as f64),
114
115 AggregationType::Sum { field } => {
116 let sum = window.sum(field);
117 AggregationResult::Number(sum)
118 }
119
120 AggregationType::Average { field } => match window.average(field) {
121 Some(avg) => AggregationResult::Number(avg),
122 None => AggregationResult::None,
123 },
124
125 AggregationType::Min { field } => match window.min(field) {
126 Some(min) => AggregationResult::Number(min),
127 None => AggregationResult::None,
128 },
129
130 AggregationType::Max { field } => match window.max(field) {
131 Some(max) => AggregationResult::Number(max),
132 None => AggregationResult::None,
133 },
134
135 AggregationType::CountDistinct { field } => {
136 let distinct_count = self.count_distinct_values(events, field);
137 AggregationResult::Number(distinct_count as f64)
138 }
139
140 AggregationType::StdDev { field } => {
141 let std_dev = self.calculate_std_dev(events, field);
142 match std_dev {
143 Some(sd) => AggregationResult::Number(sd),
144 None => AggregationResult::None,
145 }
146 }
147
148 AggregationType::Percentile { field, percentile } => {
149 let value = self.calculate_percentile(events, field, *percentile);
150 match value {
151 Some(v) => AggregationResult::Number(v),
152 None => AggregationResult::None,
153 }
154 }
155
156 AggregationType::First => match events.front() {
157 Some(event) => AggregationResult::Text(event.id.clone()),
158 None => AggregationResult::None,
159 },
160
161 AggregationType::Last => match events.back() {
162 Some(event) => AggregationResult::Text(event.id.clone()),
163 None => AggregationResult::None,
164 },
165
166 AggregationType::CountBy { field } => {
167 let counts = self.count_by_field(events, field);
168 AggregationResult::CountMap(counts)
169 }
170 }
171 }
172
173 pub fn aggregate_events(&self, events: &[StreamEvent]) -> AggregationResult {
175 match &self.aggregation_type {
176 AggregationType::Count => AggregationResult::Number(events.len() as f64),
177
178 AggregationType::Sum { field } => {
179 let sum: f64 = events.iter().filter_map(|e| e.get_numeric(field)).sum();
180 AggregationResult::Number(sum)
181 }
182
183 AggregationType::Average { field } => {
184 let values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
185
186 if values.is_empty() {
187 AggregationResult::None
188 } else {
189 let avg = values.iter().sum::<f64>() / values.len() as f64;
190 AggregationResult::Number(avg)
191 }
192 }
193
194 _ => {
195 AggregationResult::None
198 }
199 }
200 }
201
202 fn count_distinct_values(
204 &self,
205 events: &std::collections::VecDeque<StreamEvent>,
206 field: &str,
207 ) -> usize {
208 let mut seen = std::collections::HashSet::new();
209
210 for event in events {
211 if let Some(value) = event.data.get(field) {
212 seen.insert(format!("{:?}", value));
213 }
214 }
215
216 seen.len()
217 }
218
219 fn calculate_std_dev(
221 &self,
222 events: &std::collections::VecDeque<StreamEvent>,
223 field: &str,
224 ) -> Option<f64> {
225 let values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
226
227 if values.len() < 2 {
228 return None;
229 }
230
231 let mean = values.iter().sum::<f64>() / values.len() as f64;
232 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
233
234 Some(variance.sqrt())
235 }
236
237 fn calculate_percentile(
239 &self,
240 events: &std::collections::VecDeque<StreamEvent>,
241 field: &str,
242 percentile: f64,
243 ) -> Option<f64> {
244 let mut values: Vec<f64> = events.iter().filter_map(|e| e.get_numeric(field)).collect();
245
246 if values.is_empty() {
247 return None;
248 }
249
250 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
251
252 let index = (percentile / 100.0 * (values.len() - 1) as f64).round() as usize;
253 values.get(index).copied()
254 }
255
256 fn count_by_field(
258 &self,
259 events: &std::collections::VecDeque<StreamEvent>,
260 field: &str,
261 ) -> HashMap<String, usize> {
262 let mut counts = HashMap::new();
263
264 for event in events {
265 if let Some(value) = event.data.get(field) {
266 let key = match value {
267 crate::types::Value::String(s) => s.clone(),
268 crate::types::Value::Number(n) => n.to_string(),
269 crate::types::Value::Integer(i) => i.to_string(),
270 crate::types::Value::Boolean(b) => b.to_string(),
271 _ => format!("{:?}", value),
272 };
273
274 *counts.entry(key).or_insert(0) += 1;
275 }
276 }
277
278 counts
279 }
280}
281
282#[derive(Debug)]
284pub struct StreamAnalytics {
285 cache: HashMap<String, (u64, AggregationResult)>,
287 cache_ttl: u64,
289}
290
291impl StreamAnalytics {
292 pub fn new(cache_ttl_ms: u64) -> Self {
294 Self {
295 cache: HashMap::new(),
296 cache_ttl: cache_ttl_ms,
297 }
298 }
299
300 pub fn aggregate_cached(
302 &mut self,
303 key: &str,
304 window: &TimeWindow,
305 aggregator: &Aggregator,
306 current_time: u64,
307 ) -> AggregationResult {
308 if let Some((timestamp, result)) = self.cache.get(key) {
310 if current_time - timestamp < self.cache_ttl {
311 return result.clone();
312 }
313 }
314
315 let result = aggregator.aggregate(window);
317 self.cache
318 .insert(key.to_string(), (current_time, result.clone()));
319
320 self.cache
322 .retain(|_, (timestamp, _)| current_time - *timestamp < self.cache_ttl);
323
324 result
325 }
326
327 pub fn moving_average(
329 &self,
330 windows: &[TimeWindow],
331 field: &str,
332 window_count: usize,
333 ) -> Option<f64> {
334 if windows.is_empty() {
335 return None;
336 }
337
338 let recent_windows = if windows.len() > window_count {
339 &windows[windows.len() - window_count..]
340 } else {
341 windows
342 };
343
344 let total_sum: f64 = recent_windows.iter().map(|w| w.sum(field)).sum();
345
346 let total_count: usize = recent_windows.iter().map(|w| w.count()).sum();
347
348 if total_count == 0 {
349 None
350 } else {
351 Some(total_sum / total_count as f64)
352 }
353 }
354
355 pub fn detect_anomalies(
357 &self,
358 windows: &[TimeWindow],
359 field: &str,
360 threshold: f64,
361 ) -> Vec<String> {
362 if windows.len() < 3 {
363 return Vec::new();
364 }
365
366 let historical_windows = &windows[..windows.len() - 1];
368 let values: Vec<f64> = historical_windows
369 .iter()
370 .flat_map(|w| w.events().iter().filter_map(|e| e.get_numeric(field)))
371 .collect();
372
373 if values.len() < 10 {
374 return Vec::new();
375 }
376
377 let mean = values.iter().sum::<f64>() / values.len() as f64;
378 let variance = values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
379 let std_dev = variance.sqrt();
380
381 let current_window = windows.last().unwrap();
383 let mut anomalies = Vec::new();
384
385 for event in current_window.events() {
386 if let Some(value) = event.get_numeric(field) {
387 let z_score = (value - mean) / std_dev;
388 if z_score.abs() > threshold {
389 anomalies.push(event.id.clone());
390 }
391 }
392 }
393
394 anomalies
395 }
396
397 pub fn calculate_trend(&self, windows: &[TimeWindow], field: &str) -> TrendDirection {
399 if windows.len() < 2 {
400 return TrendDirection::Stable;
401 }
402
403 let averages: Vec<f64> = windows.iter().filter_map(|w| w.average(field)).collect();
404
405 if averages.len() < 2 {
406 return TrendDirection::Stable;
407 }
408
409 let first_half = &averages[..averages.len() / 2];
410 let second_half = &averages[averages.len() / 2..];
411
412 let first_avg = first_half.iter().sum::<f64>() / first_half.len() as f64;
413 let second_avg = second_half.iter().sum::<f64>() / second_half.len() as f64;
414
415 let change_percent = ((second_avg - first_avg) / first_avg) * 100.0;
416
417 if change_percent > 5.0 {
418 TrendDirection::Increasing
419 } else if change_percent < -5.0 {
420 TrendDirection::Decreasing
421 } else {
422 TrendDirection::Stable
423 }
424 }
425}
426
427#[derive(Debug, Clone, PartialEq)]
429pub enum TrendDirection {
430 Increasing,
432 Decreasing,
434 Stable,
436}
437
438#[cfg(test)]
439mod tests {
440 use super::*;
441 use crate::streaming::event::StreamEvent;
442 use crate::types::Value;
443 use std::collections::HashMap;
444
445 #[test]
446 fn test_count_aggregation() {
447 let aggregator = Aggregator::new(AggregationType::Count);
448 let events = create_test_events(5);
449
450 let result = aggregator.aggregate_events(&events);
451 assert_eq!(result.as_number(), Some(5.0));
452 }
453
454 #[test]
455 fn test_sum_aggregation() {
456 let aggregator = Aggregator::new(AggregationType::Sum {
457 field: "value".to_string(),
458 });
459 let events = create_test_events(5);
460
461 let result = aggregator.aggregate_events(&events);
462 assert_eq!(result.as_number(), Some(10.0)); }
464
465 #[test]
466 fn test_average_aggregation() {
467 let aggregator = Aggregator::new(AggregationType::Average {
468 field: "value".to_string(),
469 });
470 let events = create_test_events(5);
471
472 let result = aggregator.aggregate_events(&events);
473 assert_eq!(result.as_number(), Some(2.0));
474 }
475
476 fn create_test_events(count: usize) -> Vec<StreamEvent> {
477 (0..count)
478 .map(|i| {
479 let mut data = HashMap::new();
480 data.insert("value".to_string(), Value::Number(i as f64));
481 StreamEvent::new("TestEvent", data, "test")
482 })
483 .collect()
484 }
485}