llm_analytics_hub/analytics/
aggregation.rs1use crate::models::metrics::{
6 AggregatedMetric, MetricValues, StatisticalMeasures, TimeWindow,
7};
8use anyhow::Result;
9use chrono::{DateTime, Utc};
10use dashmap::DashMap;
11use std::collections::HashMap;
12use std::sync::Arc;
13use tracing::debug;
14
15use super::AnalyticsConfig;
16
17pub struct AggregationEngine {
19 #[allow(dead_code)]
20 config: Arc<AnalyticsConfig>,
21 aggregations: Arc<DashMap<TimeWindow, DashMap<String, AggregationState>>>,
23}
24
25impl AggregationEngine {
26 pub async fn new(config: Arc<AnalyticsConfig>) -> Result<Self> {
28 let aggregations = Arc::new(DashMap::new());
29
30 for &window_secs in &config.aggregation_windows {
32 let window = Self::seconds_to_window(window_secs);
33 aggregations.insert(window, DashMap::new());
34 }
35
36 Ok(Self {
37 config,
38 aggregations,
39 })
40 }
41
42 fn seconds_to_window(seconds: u64) -> TimeWindow {
44 match seconds {
45 60 => TimeWindow::OneMinute,
46 300 => TimeWindow::FiveMinutes,
47 900 => TimeWindow::FifteenMinutes,
48 3600 => TimeWindow::OneHour,
49 21600 => TimeWindow::SixHours,
50 86400 => TimeWindow::OneDay,
51 604800 => TimeWindow::OneWeek,
52 2592000 => TimeWindow::OneMonth,
53 _ => TimeWindow::FiveMinutes, }
55 }
56
57 pub fn add_point(
59 &self,
60 metric_name: &str,
61 value: f64,
62 timestamp: DateTime<Utc>,
63 _tags: HashMap<String, String>,
64 ) -> Result<()> {
65 for window_map in self.aggregations.iter() {
66 let _window = *window_map.key();
67 let metrics = window_map.value();
68
69 metrics
70 .entry(metric_name.to_string())
71 .or_insert_with(AggregationState::new)
72 .add_value(value, timestamp);
73 }
74
75 debug!(
76 "Added data point: {} = {} at {}",
77 metric_name, value, timestamp
78 );
79 Ok(())
80 }
81
82 pub fn get_aggregated(
84 &self,
85 metric_name: &str,
86 window: TimeWindow,
87 ) -> Option<AggregatedMetric> {
88 let window_map = self.aggregations.get(&window)?;
89 let state = window_map.get(metric_name)?;
90
91 let stats = state.calculate_statistics();
92 let (window_start, window_end) = state.get_time_bounds();
93
94 Some(AggregatedMetric {
95 name: metric_name.to_string(),
96 window,
97 window_start,
98 window_end,
99 values: MetricValues::Stats(stats),
100 tags: HashMap::new(),
101 })
102 }
103
104 pub fn get_all_aggregated(&self, window: TimeWindow) -> Vec<AggregatedMetric> {
106 let mut results = Vec::new();
107
108 if let Some(window_map) = self.aggregations.get(&window) {
109 for entry in window_map.iter() {
110 let metric_name = entry.key();
111 if let Some(metric) = self.get_aggregated(metric_name, window) {
112 results.push(metric);
113 }
114 }
115 }
116
117 results
118 }
119
120 pub fn reset_metric(&self, metric_name: &str) {
122 for window_map in self.aggregations.iter() {
123 window_map.value().remove(metric_name);
124 }
125 }
126
127 pub fn clear_all(&self) {
129 for window_map in self.aggregations.iter() {
130 window_map.value().clear();
131 }
132 }
133
134 pub fn get_stats(&self) -> AggregationStats {
136 let mut total_metrics = 0;
137 let mut total_data_points = 0;
138
139 for window_map in self.aggregations.iter() {
140 total_metrics += window_map.value().len();
141 for state_entry in window_map.value().iter() {
142 total_data_points += state_entry.value().values.len();
143 }
144 }
145
146 AggregationStats {
147 total_metrics,
148 total_data_points,
149 active_windows: self.aggregations.len(),
150 }
151 }
152}
153
154struct AggregationState {
156 values: Vec<f64>,
157 timestamps: Vec<DateTime<Utc>>,
158 min_timestamp: Option<DateTime<Utc>>,
159 max_timestamp: Option<DateTime<Utc>>,
160}
161
162impl AggregationState {
163 fn new() -> Self {
164 Self {
165 values: Vec::new(),
166 timestamps: Vec::new(),
167 min_timestamp: None,
168 max_timestamp: None,
169 }
170 }
171
172 fn add_value(&mut self, value: f64, timestamp: DateTime<Utc>) {
173 self.values.push(value);
174 self.timestamps.push(timestamp);
175
176 if self.min_timestamp.is_none() || timestamp < self.min_timestamp.unwrap() {
177 self.min_timestamp = Some(timestamp);
178 }
179 if self.max_timestamp.is_none() || timestamp > self.max_timestamp.unwrap() {
180 self.max_timestamp = Some(timestamp);
181 }
182 }
183
184 fn calculate_statistics(&self) -> StatisticalMeasures {
185 if self.values.is_empty() {
186 return StatisticalMeasures::default();
187 }
188
189 let count = self.values.len() as u64;
190 let sum: f64 = self.values.iter().sum();
191 let avg = sum / count as f64;
192
193 let mut sorted = self.values.clone();
194 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
195
196 let min = sorted[0];
197 let max = sorted[sorted.len() - 1];
198
199 let p50_idx = (count as f64 * 0.50) as usize;
200 let p95_idx = (count as f64 * 0.95) as usize;
201 let p99_idx = (count as f64 * 0.99) as usize;
202
203 let p50 = sorted[p50_idx.min(sorted.len() - 1)];
204 let p95 = sorted[p95_idx.min(sorted.len() - 1)];
205 let p99 = sorted[p99_idx.min(sorted.len() - 1)];
206
207 let variance: f64 = self
209 .values
210 .iter()
211 .map(|v| {
212 let diff = v - avg;
213 diff * diff
214 })
215 .sum::<f64>()
216 / count as f64;
217 let stddev = variance.sqrt();
218
219 StatisticalMeasures {
220 avg,
221 min,
222 max,
223 p50,
224 p95,
225 p99,
226 stddev: Some(stddev),
227 count,
228 sum,
229 }
230 }
231
232 fn get_time_bounds(&self) -> (DateTime<Utc>, DateTime<Utc>) {
233 (
234 self.min_timestamp.unwrap_or_else(Utc::now),
235 self.max_timestamp.unwrap_or_else(Utc::now),
236 )
237 }
238}
239
240#[derive(Debug, Clone)]
242pub struct AggregationStats {
243 pub total_metrics: usize,
244 pub total_data_points: usize,
245 pub active_windows: usize,
246}