llm_analytics_hub/analytics/
aggregation.rs

1//! Aggregation Engine
2//!
3//! Real-time metrics aggregation with multiple time windows and statistical measures.
4
5use crate::models::metrics::{
6    AggregatedMetric, MetricValues, StatisticalMeasures, TimeWindow,
7};
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use dashmap::DashMap;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::debug;
14
15use super::AnalyticsConfig;
16
17/// Real-time aggregation engine
18pub struct AggregationEngine {
19    #[allow(dead_code)]
20    config: Arc<AnalyticsConfig>,
21    // Window -> Metric Name -> Aggregation State
22    aggregations: Arc<DashMap<TimeWindow, DashMap<String, AggregationState>>>,
23}
24
25impl AggregationEngine {
26    /// Create a new aggregation engine
27    pub async fn new(config: Arc<AnalyticsConfig>) -> Result<Self> {
28        let aggregations = Arc::new(DashMap::new());
29
30        // Initialize aggregation windows
31        for &window_secs in &config.aggregation_windows {
32            let window = Self::seconds_to_window(window_secs);
33            aggregations.insert(window, DashMap::new());
34        }
35
36        Ok(Self {
37            config,
38            aggregations,
39        })
40    }
41
42    /// Convert seconds to TimeWindow enum
43    fn seconds_to_window(seconds: u64) -> TimeWindow {
44        match seconds {
45            60 => TimeWindow::OneMinute,
46            300 => TimeWindow::FiveMinutes,
47            900 => TimeWindow::FifteenMinutes,
48            3600 => TimeWindow::OneHour,
49            21600 => TimeWindow::SixHours,
50            86400 => TimeWindow::OneDay,
51            604800 => TimeWindow::OneWeek,
52            2592000 => TimeWindow::OneMonth,
53            _ => TimeWindow::FiveMinutes, // default
54        }
55    }
56
57    /// Add a data point to aggregation
58    pub fn add_point(
59        &self,
60        metric_name: &str,
61        value: f64,
62        timestamp: DateTime<Utc>,
63        _tags: HashMap<String, String>,
64    ) -> Result<()> {
65        for window_map in self.aggregations.iter() {
66            let _window = *window_map.key();
67            let metrics = window_map.value();
68
69            metrics
70                .entry(metric_name.to_string())
71                .or_insert_with(AggregationState::new)
72                .add_value(value, timestamp);
73        }
74
75        debug!(
76            "Added data point: {} = {} at {}",
77            metric_name, value, timestamp
78        );
79        Ok(())
80    }
81
82    /// Get aggregated metrics for a time window
83    pub fn get_aggregated(
84        &self,
85        metric_name: &str,
86        window: TimeWindow,
87    ) -> Option<AggregatedMetric> {
88        let window_map = self.aggregations.get(&window)?;
89        let state = window_map.get(metric_name)?;
90
91        let stats = state.calculate_statistics();
92        let (window_start, window_end) = state.get_time_bounds();
93
94        Some(AggregatedMetric {
95            name: metric_name.to_string(),
96            window,
97            window_start,
98            window_end,
99            values: MetricValues::Stats(stats),
100            tags: HashMap::new(),
101        })
102    }
103
104    /// Get all aggregated metrics for a window
105    pub fn get_all_aggregated(&self, window: TimeWindow) -> Vec<AggregatedMetric> {
106        let mut results = Vec::new();
107
108        if let Some(window_map) = self.aggregations.get(&window) {
109            for entry in window_map.iter() {
110                let metric_name = entry.key();
111                if let Some(metric) = self.get_aggregated(metric_name, window) {
112                    results.push(metric);
113                }
114            }
115        }
116
117        results
118    }
119
120    /// Reset aggregation state for a metric
121    pub fn reset_metric(&self, metric_name: &str) {
122        for window_map in self.aggregations.iter() {
123            window_map.value().remove(metric_name);
124        }
125    }
126
127    /// Clear all aggregation data
128    pub fn clear_all(&self) {
129        for window_map in self.aggregations.iter() {
130            window_map.value().clear();
131        }
132    }
133
134    /// Get aggregation statistics
135    pub fn get_stats(&self) -> AggregationStats {
136        let mut total_metrics = 0;
137        let mut total_data_points = 0;
138
139        for window_map in self.aggregations.iter() {
140            total_metrics += window_map.value().len();
141            for state_entry in window_map.value().iter() {
142                total_data_points += state_entry.value().values.len();
143            }
144        }
145
146        AggregationStats {
147            total_metrics,
148            total_data_points,
149            active_windows: self.aggregations.len(),
150        }
151    }
152}
153
154/// Aggregation state for a single metric
155struct AggregationState {
156    values: Vec<f64>,
157    timestamps: Vec<DateTime<Utc>>,
158    min_timestamp: Option<DateTime<Utc>>,
159    max_timestamp: Option<DateTime<Utc>>,
160}
161
162impl AggregationState {
163    fn new() -> Self {
164        Self {
165            values: Vec::new(),
166            timestamps: Vec::new(),
167            min_timestamp: None,
168            max_timestamp: None,
169        }
170    }
171
172    fn add_value(&mut self, value: f64, timestamp: DateTime<Utc>) {
173        self.values.push(value);
174        self.timestamps.push(timestamp);
175
176        if self.min_timestamp.is_none() || timestamp < self.min_timestamp.unwrap() {
177            self.min_timestamp = Some(timestamp);
178        }
179        if self.max_timestamp.is_none() || timestamp > self.max_timestamp.unwrap() {
180            self.max_timestamp = Some(timestamp);
181        }
182    }
183
184    fn calculate_statistics(&self) -> StatisticalMeasures {
185        if self.values.is_empty() {
186            return StatisticalMeasures::default();
187        }
188
189        let count = self.values.len() as u64;
190        let sum: f64 = self.values.iter().sum();
191        let avg = sum / count as f64;
192
193        let mut sorted = self.values.clone();
194        sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
195
196        let min = sorted[0];
197        let max = sorted[sorted.len() - 1];
198
199        let p50_idx = (count as f64 * 0.50) as usize;
200        let p95_idx = (count as f64 * 0.95) as usize;
201        let p99_idx = (count as f64 * 0.99) as usize;
202
203        let p50 = sorted[p50_idx.min(sorted.len() - 1)];
204        let p95 = sorted[p95_idx.min(sorted.len() - 1)];
205        let p99 = sorted[p99_idx.min(sorted.len() - 1)];
206
207        // Calculate standard deviation
208        let variance: f64 = self
209            .values
210            .iter()
211            .map(|v| {
212                let diff = v - avg;
213                diff * diff
214            })
215            .sum::<f64>()
216            / count as f64;
217        let stddev = variance.sqrt();
218
219        StatisticalMeasures {
220            avg,
221            min,
222            max,
223            p50,
224            p95,
225            p99,
226            stddev: Some(stddev),
227            count,
228            sum,
229        }
230    }
231
232    fn get_time_bounds(&self) -> (DateTime<Utc>, DateTime<Utc>) {
233        (
234            self.min_timestamp.unwrap_or_else(Utc::now),
235            self.max_timestamp.unwrap_or_else(Utc::now),
236        )
237    }
238}
239
240/// Aggregation statistics
241#[derive(Debug, Clone)]
242pub struct AggregationStats {
243    pub total_metrics: usize,
244    pub total_data_points: usize,
245    pub active_windows: usize,
246}