1use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12 AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::fmt::Write as _;
19use std::sync::Arc;
20use tokio::time::{interval, Duration};
21use tracing::{debug, error, info, warn};
22
23type MetricsGroupKey = (String, Option<String>, Option<String>, Option<i32>);
25
26#[derive(Clone)]
28pub struct PrometheusClient {
29 base_url: String,
30 client: Client,
31}
32
33impl PrometheusClient {
34 pub fn new(base_url: impl Into<String>) -> Self {
36 Self {
37 base_url: base_url.into(),
38 client: Client::new(),
39 }
40 }
41
42 pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
48 let mut url = format!("{}/api/v1/query", self.base_url);
49 let _ = write!(url, "?query={}", urlencoding::encode(query));
50
51 if let Some(t) = time {
52 let _ = write!(url, "&time={t}");
53 }
54
55 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
56
57 Ok(response)
58 }
59
60 pub async fn query_range(
66 &self,
67 query: &str,
68 start: i64,
69 end: i64,
70 step: &str,
71 ) -> Result<PrometheusResponse> {
72 let url = format!(
73 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
74 self.base_url,
75 urlencoding::encode(query),
76 start,
77 end,
78 step
79 );
80
81 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
82
83 Ok(response)
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PrometheusResponse {
90 pub status: String,
92 pub data: PrometheusData,
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99pub struct PrometheusData {
100 pub result_type: String,
102 pub result: Vec<PrometheusResult>,
104}
105
106#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct PrometheusResult {
109 pub metric: HashMap<String, String>,
111 pub value: Option<PrometheusValue>,
113 pub values: Option<Vec<PrometheusValue>>,
115}
116
117pub type PrometheusValue = (f64, String);
119
120pub struct MetricsAggregator {
122 db: AnalyticsDatabase,
123 prom_client: PrometheusClient,
124 config: AnalyticsConfig,
125}
126
127impl MetricsAggregator {
128 pub fn new(
130 db: AnalyticsDatabase,
131 prometheus_url: impl Into<String>,
132 config: AnalyticsConfig,
133 ) -> Self {
134 Self {
135 db,
136 prom_client: PrometheusClient::new(prometheus_url),
137 config,
138 }
139 }
140
141 #[allow(clippy::unused_async)]
143 pub async fn start(self: Arc<Self>) {
144 info!("Starting metrics aggregation service");
145
146 let self_clone = Arc::clone(&self);
148 tokio::spawn(async move {
149 self_clone.run_minute_aggregation().await;
150 });
151
152 let self_clone = Arc::clone(&self);
154 tokio::spawn(async move {
155 self_clone.run_hourly_rollup().await;
156 });
157
158 let self_clone = Arc::clone(&self);
160 tokio::spawn(async move {
161 self_clone.run_daily_rollup().await;
162 });
163 }
164
165 async fn run_minute_aggregation(&self) {
167 let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
168
169 loop {
170 interval.tick().await;
171
172 if let Err(e) = self.aggregate_minute_metrics().await {
173 error!("Error aggregating minute metrics: {}", e);
174 }
175 }
176 }
177
178 #[allow(clippy::too_many_lines)]
180 async fn aggregate_minute_metrics(&self) -> Result<()> {
181 let now = Utc::now();
182 let minute_start = now
183 .with_second(0)
184 .expect("0 is valid for seconds")
185 .with_nanosecond(0)
186 .expect("0 is valid for nanoseconds")
187 - chrono::Duration::minutes(1);
188 let timestamp = minute_start.timestamp();
189
190 debug!("Aggregating metrics for minute: {}", minute_start);
191
192 let query = r"sum by (protocol, method, path, status) (
194 increase(mockforge_requests_by_path_total{}[1m]) > 0
195 )"
196 .to_string();
197
198 let response = self.prom_client.query(&query, Some(timestamp)).await?;
199
200 let mut aggregates = Vec::new();
201
202 for result in response.data.result {
203 let protocol = result
204 .metric
205 .get("protocol")
206 .map_or_else(|| "unknown".to_string(), ToString::to_string);
207 let method = result.metric.get("method").cloned();
208 let endpoint = result.metric.get("path").cloned();
209 let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
210
211 #[allow(clippy::cast_possible_truncation)]
212 let request_count = if let Some((_, value)) = result.value {
213 value.parse::<f64>().unwrap_or(0.0) as i64
214 } else {
215 0
216 };
217
218 let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
220 (&Some(protocol.clone()), &method, &endpoint)
221 {
222 format!(
223 r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
224 )
225 } else {
226 continue;
227 };
228
229 let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
230 Ok(resp) => resp
231 .data
232 .result
233 .first()
234 .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
235 Err(e) => {
236 warn!("Failed to query latency: {}", e);
237 None
238 }
239 };
240
241 let agg = MetricsAggregate {
242 id: None,
243 timestamp,
244 protocol: protocol.clone(),
245 method: method.clone(),
246 endpoint: endpoint.clone(),
247 status_code,
248 workspace_id: None,
249 environment: None,
250 request_count,
251 error_count: status_code.map_or(0, |sc| if sc >= 400 { request_count } else { 0 }),
252 latency_sum: 0.0,
253 latency_min: None,
254 latency_max: None,
255 latency_p50: None,
256 latency_p95,
257 latency_p99: None,
258 bytes_sent: 0,
259 bytes_received: 0,
260 active_connections: None,
261 created_at: None,
262 };
263
264 aggregates.push(agg);
265 }
266
267 if !aggregates.is_empty() {
268 self.db.insert_minute_aggregates_batch(&aggregates).await?;
269 info!("Stored {} minute aggregates", aggregates.len());
270
271 for agg in &aggregates {
273 let stats = EndpointStats {
274 id: None,
275 endpoint: agg.endpoint.clone().unwrap_or_default(),
276 protocol: agg.protocol.clone(),
277 method: agg.method.clone(),
278 workspace_id: agg.workspace_id.clone(),
279 environment: agg.environment.clone(),
280 total_requests: agg.request_count,
281 total_errors: agg.error_count,
282 avg_latency_ms: agg.latency_p95,
283 min_latency_ms: agg.latency_min,
284 max_latency_ms: agg.latency_max,
285 p95_latency_ms: agg.latency_p95,
286 status_codes: None,
287 total_bytes_sent: agg.bytes_sent,
288 total_bytes_received: agg.bytes_received,
289 first_seen: timestamp,
290 last_seen: timestamp,
291 updated_at: None,
292 };
293
294 if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
295 warn!("Failed to update endpoint stats: {}", e);
296 }
297 }
298 }
299
300 Ok(())
301 }
302
303 async fn run_hourly_rollup(&self) {
305 let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
306
307 loop {
308 interval.tick().await;
309
310 if let Err(e) = self.rollup_to_hour().await {
311 error!("Error rolling up to hourly metrics: {}", e);
312 }
313 }
314 }
315
316 async fn rollup_to_hour(&self) -> Result<()> {
318 let now = Utc::now();
319 let hour_start = now
320 .with_minute(0)
321 .expect("0 is valid for minutes")
322 .with_second(0)
323 .expect("0 is valid for seconds")
324 .with_nanosecond(0)
325 .expect("0 is valid for nanoseconds")
326 - chrono::Duration::hours(1);
327 let hour_end = hour_start + chrono::Duration::hours(1);
328
329 info!("Rolling up metrics to hour: {}", hour_start);
330
331 let filter = AnalyticsFilter {
332 start_time: Some(hour_start.timestamp()),
333 end_time: Some(hour_end.timestamp()),
334 ..Default::default()
335 };
336
337 let minute_data = self.db.get_minute_aggregates(&filter).await?;
338
339 if minute_data.is_empty() {
340 debug!("No minute data to roll up");
341 return Ok(());
342 }
343
344 let mut groups: HashMap<MetricsGroupKey, Vec<&MetricsAggregate>> = HashMap::new();
346
347 for agg in &minute_data {
348 let key =
349 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
350 groups.entry(key).or_default().push(agg);
351 }
352
353 for ((protocol, method, endpoint, status_code), group) in groups {
354 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
355 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
356 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
357 let latency_min =
358 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
359 let latency_max =
360 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
361
362 let hour_agg = HourMetricsAggregate {
363 id: None,
364 timestamp: hour_start.timestamp(),
365 protocol,
366 method,
367 endpoint,
368 status_code,
369 workspace_id: None,
370 environment: None,
371 request_count,
372 error_count,
373 latency_sum,
374 latency_min: if latency_min.is_finite() {
375 Some(latency_min)
376 } else {
377 None
378 },
379 latency_max: if latency_max.is_finite() {
380 Some(latency_max)
381 } else {
382 None
383 },
384 latency_p50: None,
385 latency_p95: None,
386 latency_p99: None,
387 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
388 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
389 active_connections_avg: None,
390 active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
391 created_at: None,
392 };
393
394 self.db.insert_hour_aggregate(&hour_agg).await?;
395 }
396
397 info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
398 Ok(())
399 }
400
401 async fn run_daily_rollup(&self) {
403 let mut interval = interval(Duration::from_secs(86400)); loop {
406 interval.tick().await;
407
408 if let Err(e) = self.rollup_to_day().await {
409 error!("Error rolling up to daily metrics: {}", e);
410 }
411 }
412 }
413
414 #[allow(clippy::too_many_lines)]
416 async fn rollup_to_day(&self) -> Result<()> {
417 let now = Utc::now();
418 let day_start = now
419 .with_hour(0)
420 .expect("0 is valid for hours")
421 .with_minute(0)
422 .expect("0 is valid for minutes")
423 .with_second(0)
424 .expect("0 is valid for seconds")
425 .with_nanosecond(0)
426 .expect("0 is valid for nanoseconds")
427 - chrono::Duration::days(1);
428 let day_end = day_start + chrono::Duration::days(1);
429
430 info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
431
432 let filter = AnalyticsFilter {
433 start_time: Some(day_start.timestamp()),
434 end_time: Some(day_end.timestamp()),
435 ..Default::default()
436 };
437
438 let hour_data = self.db.get_hour_aggregates(&filter).await?;
439
440 if hour_data.is_empty() {
441 debug!("No hour data to roll up");
442 return Ok(());
443 }
444
445 let mut groups: HashMap<MetricsGroupKey, Vec<&HourMetricsAggregate>> = HashMap::new();
447
448 for agg in &hour_data {
449 let key =
450 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
451 groups.entry(key).or_default().push(agg);
452 }
453
454 let mut peak_hour: Option<i32> = None;
456 let mut max_requests = 0i64;
457 for agg in &hour_data {
458 if agg.request_count > max_requests {
459 max_requests = agg.request_count;
460 if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
462 #[allow(clippy::cast_possible_wrap)]
463 {
464 peak_hour = Some(dt.hour() as i32);
465 }
466 }
467 }
468 }
469
470 for ((protocol, method, endpoint, status_code), group) in groups {
471 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
472 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
473 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
474 let latency_min =
475 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
476 let latency_max =
477 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
478
479 #[allow(clippy::cast_precision_loss)]
481 let latency_p50_avg: Option<f64> = {
482 let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
483 if p50_values.is_empty() {
484 None
485 } else {
486 Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
487 }
488 };
489 #[allow(clippy::cast_precision_loss)]
490 let latency_p95_avg: Option<f64> = {
491 let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
492 if p95_values.is_empty() {
493 None
494 } else {
495 Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
496 }
497 };
498 #[allow(clippy::cast_precision_loss)]
499 let latency_p99_avg: Option<f64> = {
500 let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
501 if p99_values.is_empty() {
502 None
503 } else {
504 Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
505 }
506 };
507
508 #[allow(clippy::cast_precision_loss)]
510 let active_connections_avg: Option<f64> = {
511 let avg_values: Vec<f64> =
512 group.iter().filter_map(|a| a.active_connections_avg).collect();
513 if avg_values.is_empty() {
514 None
515 } else {
516 Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
517 }
518 };
519
520 let active_connections_max =
522 group.iter().filter_map(|a| a.active_connections_max).max();
523
524 let day_agg = DayMetricsAggregate {
525 id: None,
526 date: day_start.format("%Y-%m-%d").to_string(),
527 timestamp: day_start.timestamp(),
528 protocol,
529 method,
530 endpoint,
531 status_code,
532 workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
533 environment: group.first().and_then(|a| a.environment.clone()),
534 request_count,
535 error_count,
536 latency_sum,
537 latency_min: if latency_min.is_finite() {
538 Some(latency_min)
539 } else {
540 None
541 },
542 latency_max: if latency_max.is_finite() {
543 Some(latency_max)
544 } else {
545 None
546 },
547 latency_p50: latency_p50_avg,
548 latency_p95: latency_p95_avg,
549 latency_p99: latency_p99_avg,
550 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
551 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
552 active_connections_avg,
553 active_connections_max,
554 unique_clients: None, peak_hour,
556 created_at: None,
557 };
558
559 self.db.insert_day_aggregate(&day_agg).await?;
560 }
561
562 info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
563 Ok(())
564 }
565}
566
567#[cfg(test)]
568mod tests {
569 use super::*;
570
571 #[test]
572 fn test_prometheus_client_creation() {
573 let client = PrometheusClient::new("http://localhost:9090");
574 assert_eq!(client.base_url, "http://localhost:9090");
575 }
576}