1use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12 AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::time::{interval, Duration};
20use tracing::{debug, error, info, warn};
21
22#[derive(Clone)]
24pub struct PrometheusClient {
25 base_url: String,
26 client: Client,
27}
28
29impl PrometheusClient {
30 pub fn new(base_url: impl Into<String>) -> Self {
32 Self {
33 base_url: base_url.into(),
34 client: Client::new(),
35 }
36 }
37
38 pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
40 let mut url = format!("{}/api/v1/query", self.base_url);
41 url.push_str(&format!("?query={}", urlencoding::encode(query)));
42
43 if let Some(t) = time {
44 url.push_str(&format!("&time={t}"));
45 }
46
47 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
48
49 Ok(response)
50 }
51
52 pub async fn query_range(
54 &self,
55 query: &str,
56 start: i64,
57 end: i64,
58 step: &str,
59 ) -> Result<PrometheusResponse> {
60 let url = format!(
61 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
62 self.base_url,
63 urlencoding::encode(query),
64 start,
65 end,
66 step
67 );
68
69 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
70
71 Ok(response)
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PrometheusResponse {
78 pub status: String,
79 pub data: PrometheusData,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct PrometheusData {
86 pub result_type: String,
87 pub result: Vec<PrometheusResult>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PrometheusResult {
93 pub metric: HashMap<String, String>,
94 pub value: Option<PrometheusValue>,
95 pub values: Option<Vec<PrometheusValue>>,
96}
97
98pub type PrometheusValue = (f64, String);
100
101pub struct MetricsAggregator {
103 db: AnalyticsDatabase,
104 prom_client: PrometheusClient,
105 config: AnalyticsConfig,
106}
107
108impl MetricsAggregator {
109 pub fn new(
111 db: AnalyticsDatabase,
112 prometheus_url: impl Into<String>,
113 config: AnalyticsConfig,
114 ) -> Self {
115 Self {
116 db,
117 prom_client: PrometheusClient::new(prometheus_url),
118 config,
119 }
120 }
121
122 pub async fn start(self: Arc<Self>) {
124 info!("Starting metrics aggregation service");
125
126 let self_clone = Arc::clone(&self);
128 tokio::spawn(async move {
129 self_clone.run_minute_aggregation().await;
130 });
131
132 let self_clone = Arc::clone(&self);
134 tokio::spawn(async move {
135 self_clone.run_hourly_rollup().await;
136 });
137
138 let self_clone = Arc::clone(&self);
140 tokio::spawn(async move {
141 self_clone.run_daily_rollup().await;
142 });
143 }
144
145 async fn run_minute_aggregation(&self) {
147 let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
148
149 loop {
150 interval.tick().await;
151
152 if let Err(e) = self.aggregate_minute_metrics().await {
153 error!("Error aggregating minute metrics: {}", e);
154 }
155 }
156 }
157
158 async fn aggregate_minute_metrics(&self) -> Result<()> {
160 let now = Utc::now();
161 let minute_start =
162 now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
163 let timestamp = minute_start.timestamp();
164
165 debug!("Aggregating metrics for minute: {}", minute_start);
166
167 let query = r"sum by (protocol, method, path, status) (
169 increase(mockforge_requests_by_path_total{}[1m]) > 0
170 )"
171 .to_string();
172
173 let response = self.prom_client.query(&query, Some(timestamp)).await?;
174
175 let mut aggregates = Vec::new();
176
177 for result in response.data.result {
178 let protocol = result
179 .metric
180 .get("protocol")
181 .map_or_else(|| "unknown".to_string(), ToString::to_string);
182 let method = result.metric.get("method").cloned();
183 let endpoint = result.metric.get("path").cloned();
184 let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
185
186 let request_count = if let Some((_, value)) = result.value {
187 value.parse::<f64>().unwrap_or(0.0) as i64
188 } else {
189 0
190 };
191
192 let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
194 (&Some(protocol.clone()), &method, &endpoint)
195 {
196 format!(
197 r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
198 )
199 } else {
200 continue;
201 };
202
203 let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
204 Ok(resp) => resp
205 .data
206 .result
207 .first()
208 .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
209 Err(e) => {
210 warn!("Failed to query latency: {}", e);
211 None
212 }
213 };
214
215 let agg = MetricsAggregate {
216 id: None,
217 timestamp,
218 protocol: protocol.clone(),
219 method: method.clone(),
220 endpoint: endpoint.clone(),
221 status_code,
222 workspace_id: None,
223 environment: None,
224 request_count,
225 error_count: if let Some(sc) = status_code {
226 if sc >= 400 {
227 request_count
228 } else {
229 0
230 }
231 } else {
232 0
233 },
234 latency_sum: 0.0,
235 latency_min: None,
236 latency_max: None,
237 latency_p50: None,
238 latency_p95,
239 latency_p99: None,
240 bytes_sent: 0,
241 bytes_received: 0,
242 active_connections: None,
243 created_at: None,
244 };
245
246 aggregates.push(agg);
247 }
248
249 if !aggregates.is_empty() {
250 self.db.insert_minute_aggregates_batch(&aggregates).await?;
251 info!("Stored {} minute aggregates", aggregates.len());
252
253 for agg in &aggregates {
255 let stats = EndpointStats {
256 id: None,
257 endpoint: agg.endpoint.clone().unwrap_or_default(),
258 protocol: agg.protocol.clone(),
259 method: agg.method.clone(),
260 workspace_id: agg.workspace_id.clone(),
261 environment: agg.environment.clone(),
262 total_requests: agg.request_count,
263 total_errors: agg.error_count,
264 avg_latency_ms: agg.latency_p95,
265 min_latency_ms: agg.latency_min,
266 max_latency_ms: agg.latency_max,
267 p95_latency_ms: agg.latency_p95,
268 status_codes: None,
269 total_bytes_sent: agg.bytes_sent,
270 total_bytes_received: agg.bytes_received,
271 first_seen: timestamp,
272 last_seen: timestamp,
273 updated_at: None,
274 };
275
276 if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
277 warn!("Failed to update endpoint stats: {}", e);
278 }
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn run_hourly_rollup(&self) {
287 let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
288
289 loop {
290 interval.tick().await;
291
292 if let Err(e) = self.rollup_to_hour().await {
293 error!("Error rolling up to hourly metrics: {}", e);
294 }
295 }
296 }
297
298 async fn rollup_to_hour(&self) -> Result<()> {
300 let now = Utc::now();
301 let hour_start =
302 now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
303 - chrono::Duration::hours(1);
304 let hour_end = hour_start + chrono::Duration::hours(1);
305
306 info!("Rolling up metrics to hour: {}", hour_start);
307
308 let filter = AnalyticsFilter {
309 start_time: Some(hour_start.timestamp()),
310 end_time: Some(hour_end.timestamp()),
311 ..Default::default()
312 };
313
314 let minute_data = self.db.get_minute_aggregates(&filter).await?;
315
316 if minute_data.is_empty() {
317 debug!("No minute data to roll up");
318 return Ok(());
319 }
320
321 let mut groups: HashMap<
323 (String, Option<String>, Option<String>, Option<i32>),
324 Vec<&MetricsAggregate>,
325 > = HashMap::new();
326
327 for agg in &minute_data {
328 let key =
329 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
330 groups.entry(key).or_default().push(agg);
331 }
332
333 for ((protocol, method, endpoint, status_code), group) in groups {
334 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
335 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
336 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
337 let latency_min =
338 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
339 let latency_max =
340 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
341
342 let hour_agg = HourMetricsAggregate {
343 id: None,
344 timestamp: hour_start.timestamp(),
345 protocol,
346 method,
347 endpoint,
348 status_code,
349 workspace_id: None,
350 environment: None,
351 request_count,
352 error_count,
353 latency_sum,
354 latency_min: if latency_min.is_finite() {
355 Some(latency_min)
356 } else {
357 None
358 },
359 latency_max: if latency_max.is_finite() {
360 Some(latency_max)
361 } else {
362 None
363 },
364 latency_p50: None,
365 latency_p95: None,
366 latency_p99: None,
367 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
368 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
369 active_connections_avg: None,
370 active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
371 created_at: None,
372 };
373
374 self.db.insert_hour_aggregate(&hour_agg).await?;
375 }
376
377 info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
378 Ok(())
379 }
380
381 async fn run_daily_rollup(&self) {
383 let mut interval = interval(Duration::from_secs(86400)); loop {
386 interval.tick().await;
387
388 if let Err(e) = self.rollup_to_day().await {
389 error!("Error rolling up to daily metrics: {}", e);
390 }
391 }
392 }
393
394 async fn rollup_to_day(&self) -> Result<()> {
396 let now = Utc::now();
397 let day_start = now
398 .with_hour(0)
399 .unwrap()
400 .with_minute(0)
401 .unwrap()
402 .with_second(0)
403 .unwrap()
404 .with_nanosecond(0)
405 .unwrap()
406 - chrono::Duration::days(1);
407 let day_end = day_start + chrono::Duration::days(1);
408
409 info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
410
411 let filter = AnalyticsFilter {
412 start_time: Some(day_start.timestamp()),
413 end_time: Some(day_end.timestamp()),
414 ..Default::default()
415 };
416
417 let hour_data = self.db.get_hour_aggregates(&filter).await?;
418
419 if hour_data.is_empty() {
420 debug!("No hour data to roll up");
421 return Ok(());
422 }
423
424 let mut groups: HashMap<
426 (String, Option<String>, Option<String>, Option<i32>),
427 Vec<&HourMetricsAggregate>,
428 > = HashMap::new();
429
430 for agg in &hour_data {
431 let key =
432 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
433 groups.entry(key).or_default().push(agg);
434 }
435
436 let mut peak_hour: Option<i32> = None;
438 let mut max_requests = 0i64;
439 for agg in &hour_data {
440 if agg.request_count > max_requests {
441 max_requests = agg.request_count;
442 if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
444 peak_hour = Some(dt.hour() as i32);
445 }
446 }
447 }
448
449 for ((protocol, method, endpoint, status_code), group) in groups {
450 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
451 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
452 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
453 let latency_min =
454 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
455 let latency_max =
456 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
457
458 let latency_p50_avg: Option<f64> = {
460 let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
461 if !p50_values.is_empty() {
462 Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
463 } else {
464 None
465 }
466 };
467 let latency_p95_avg: Option<f64> = {
468 let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
469 if !p95_values.is_empty() {
470 Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
471 } else {
472 None
473 }
474 };
475 let latency_p99_avg: Option<f64> = {
476 let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
477 if !p99_values.is_empty() {
478 Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
479 } else {
480 None
481 }
482 };
483
484 let active_connections_avg: Option<f64> = {
486 let avg_values: Vec<f64> =
487 group.iter().filter_map(|a| a.active_connections_avg).collect();
488 if !avg_values.is_empty() {
489 Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
490 } else {
491 None
492 }
493 };
494
495 let active_connections_max =
497 group.iter().filter_map(|a| a.active_connections_max).max();
498
499 let day_agg = DayMetricsAggregate {
500 id: None,
501 date: day_start.format("%Y-%m-%d").to_string(),
502 timestamp: day_start.timestamp(),
503 protocol,
504 method,
505 endpoint,
506 status_code,
507 workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
508 environment: group.first().and_then(|a| a.environment.clone()),
509 request_count,
510 error_count,
511 latency_sum,
512 latency_min: if latency_min.is_finite() {
513 Some(latency_min)
514 } else {
515 None
516 },
517 latency_max: if latency_max.is_finite() {
518 Some(latency_max)
519 } else {
520 None
521 },
522 latency_p50: latency_p50_avg,
523 latency_p95: latency_p95_avg,
524 latency_p99: latency_p99_avg,
525 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
526 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
527 active_connections_avg,
528 active_connections_max,
529 unique_clients: None, peak_hour,
531 created_at: None,
532 };
533
534 self.db.insert_day_aggregate(&day_agg).await?;
535 }
536
537 info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
538 Ok(())
539 }
540}
541
542#[cfg(test)]
543mod tests {
544 use super::*;
545
546 #[test]
547 fn test_prometheus_client_creation() {
548 let client = PrometheusClient::new("http://localhost:9090");
549 assert_eq!(client.base_url, "http://localhost:9090");
550 }
551}