Skip to main content

mockforge_analytics/
aggregator.rs

1//! Metrics aggregation service
2//!
3//! This module provides background services that:
4//! - Query Prometheus metrics at regular intervals
5//! - Aggregate and store metrics in the analytics database
6//! - Roll up minute data to hour/day granularity
7
8use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12    AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::fmt::Write as _;
19use std::sync::Arc;
20use tokio::time::{interval, Duration};
21use tracing::{debug, error, info, warn};
22
23/// Key type for grouping metrics by protocol, method, endpoint, and status code
24type MetricsGroupKey = (String, Option<String>, Option<String>, Option<i32>);
25
26/// Prometheus query client
27#[derive(Clone)]
28pub struct PrometheusClient {
29    base_url: String,
30    client: Client,
31}
32
33impl PrometheusClient {
34    /// Create a new Prometheus client
35    pub fn new(base_url: impl Into<String>) -> Self {
36        Self {
37            base_url: base_url.into(),
38            client: Client::new(),
39        }
40    }
41
42    /// Execute a Prometheus instant query
43    ///
44    /// # Errors
45    ///
46    /// Returns an error if the HTTP request or JSON deserialization fails.
47    pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
48        let mut url = format!("{}/api/v1/query", self.base_url);
49        let _ = write!(url, "?query={}", urlencoding::encode(query));
50
51        if let Some(t) = time {
52            let _ = write!(url, "&time={t}");
53        }
54
55        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
56
57        Ok(response)
58    }
59
60    /// Execute a Prometheus range query
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if the HTTP request or JSON deserialization fails.
65    pub async fn query_range(
66        &self,
67        query: &str,
68        start: i64,
69        end: i64,
70        step: &str,
71    ) -> Result<PrometheusResponse> {
72        let url = format!(
73            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
74            self.base_url,
75            urlencoding::encode(query),
76            start,
77            end,
78            step
79        );
80
81        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
82
83        Ok(response)
84    }
85}
86
87/// Prometheus API response
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct PrometheusResponse {
90    /// Response status ("success" or "error")
91    pub status: String,
92    /// Response data payload
93    pub data: PrometheusData,
94}
95
96/// Prometheus data payload
97#[derive(Debug, Clone, Serialize, Deserialize)]
98#[serde(rename_all = "camelCase")]
99pub struct PrometheusData {
100    /// Result type (e.g., "vector", "matrix")
101    pub result_type: String,
102    /// Query results
103    pub result: Vec<PrometheusResult>,
104}
105
106/// Prometheus query result
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct PrometheusResult {
109    /// Metric labels
110    pub metric: HashMap<String, String>,
111    /// Instant query value
112    pub value: Option<PrometheusValue>,
113    /// Range query values
114    pub values: Option<Vec<PrometheusValue>>,
115}
116
117/// Prometheus metric value (timestamp, value)
118pub type PrometheusValue = (f64, String);
119
120/// Metrics aggregation service
121pub struct MetricsAggregator {
122    db: AnalyticsDatabase,
123    prom_client: PrometheusClient,
124    config: AnalyticsConfig,
125}
126
127impl MetricsAggregator {
128    /// Create a new metrics aggregator
129    pub fn new(
130        db: AnalyticsDatabase,
131        prometheus_url: impl Into<String>,
132        config: AnalyticsConfig,
133    ) -> Self {
134        Self {
135            db,
136            prom_client: PrometheusClient::new(prometheus_url),
137            config,
138        }
139    }
140
141    /// Start the aggregation service
142    #[allow(clippy::unused_async)]
143    pub async fn start(self: Arc<Self>) {
144        info!("Starting metrics aggregation service");
145
146        // Spawn minute aggregation task
147        let self_clone = Arc::clone(&self);
148        tokio::spawn(async move {
149            self_clone.run_minute_aggregation().await;
150        });
151
152        // Spawn hourly rollup task
153        let self_clone = Arc::clone(&self);
154        tokio::spawn(async move {
155            self_clone.run_hourly_rollup().await;
156        });
157
158        // Spawn daily rollup task
159        let self_clone = Arc::clone(&self);
160        tokio::spawn(async move {
161            self_clone.run_daily_rollup().await;
162        });
163    }
164
165    /// Run minute-level aggregation loop
166    async fn run_minute_aggregation(&self) {
167        let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
168
169        loop {
170            interval.tick().await;
171
172            if let Err(e) = self.aggregate_minute_metrics().await {
173                error!("Error aggregating minute metrics: {}", e);
174            }
175        }
176    }
177
178    /// Aggregate metrics for the last minute
179    #[allow(clippy::too_many_lines)]
180    async fn aggregate_minute_metrics(&self) -> Result<()> {
181        let now = Utc::now();
182        let minute_start = now
183            .with_second(0)
184            .expect("0 is valid for seconds")
185            .with_nanosecond(0)
186            .expect("0 is valid for nanoseconds")
187            - chrono::Duration::minutes(1);
188        let timestamp = minute_start.timestamp();
189
190        debug!("Aggregating metrics for minute: {}", minute_start);
191
192        // Query request counts by protocol, method, path
193        let query = r"sum by (protocol, method, path, status) (
194                increase(mockforge_requests_by_path_total{}[1m]) > 0
195            )"
196        .to_string();
197
198        let response = self.prom_client.query(&query, Some(timestamp)).await?;
199
200        let mut aggregates = Vec::new();
201
202        for result in response.data.result {
203            let protocol = result
204                .metric
205                .get("protocol")
206                .map_or_else(|| "unknown".to_string(), ToString::to_string);
207            let method = result.metric.get("method").cloned();
208            let endpoint = result.metric.get("path").cloned();
209            let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
210
211            #[allow(clippy::cast_possible_truncation)]
212            let request_count = if let Some((_, value)) = result.value {
213                value.parse::<f64>().unwrap_or(0.0) as i64
214            } else {
215                0
216            };
217
218            // Query latency metrics for this combination
219            let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
220                (&Some(protocol.clone()), &method, &endpoint)
221            {
222                format!(
223                    r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
224                )
225            } else {
226                continue;
227            };
228
229            let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
230                Ok(resp) => resp
231                    .data
232                    .result
233                    .first()
234                    .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
235                Err(e) => {
236                    warn!("Failed to query latency: {}", e);
237                    None
238                }
239            };
240
241            let agg = MetricsAggregate {
242                id: None,
243                timestamp,
244                protocol: protocol.clone(),
245                method: method.clone(),
246                endpoint: endpoint.clone(),
247                status_code,
248                workspace_id: None,
249                environment: None,
250                request_count,
251                error_count: status_code.map_or(0, |sc| if sc >= 400 { request_count } else { 0 }),
252                latency_sum: 0.0,
253                latency_min: None,
254                latency_max: None,
255                latency_p50: None,
256                latency_p95,
257                latency_p99: None,
258                bytes_sent: 0,
259                bytes_received: 0,
260                active_connections: None,
261                created_at: None,
262            };
263
264            aggregates.push(agg);
265        }
266
267        if !aggregates.is_empty() {
268            self.db.insert_minute_aggregates_batch(&aggregates).await?;
269            info!("Stored {} minute aggregates", aggregates.len());
270
271            // Also update endpoint stats
272            for agg in &aggregates {
273                let stats = EndpointStats {
274                    id: None,
275                    endpoint: agg.endpoint.clone().unwrap_or_default(),
276                    protocol: agg.protocol.clone(),
277                    method: agg.method.clone(),
278                    workspace_id: agg.workspace_id.clone(),
279                    environment: agg.environment.clone(),
280                    total_requests: agg.request_count,
281                    total_errors: agg.error_count,
282                    avg_latency_ms: agg.latency_p95,
283                    min_latency_ms: agg.latency_min,
284                    max_latency_ms: agg.latency_max,
285                    p95_latency_ms: agg.latency_p95,
286                    status_codes: None,
287                    total_bytes_sent: agg.bytes_sent,
288                    total_bytes_received: agg.bytes_received,
289                    first_seen: timestamp,
290                    last_seen: timestamp,
291                    updated_at: None,
292                };
293
294                if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
295                    warn!("Failed to update endpoint stats: {}", e);
296                }
297            }
298        }
299
300        Ok(())
301    }
302
303    /// Run hourly rollup loop
304    async fn run_hourly_rollup(&self) {
305        let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
306
307        loop {
308            interval.tick().await;
309
310            if let Err(e) = self.rollup_to_hour().await {
311                error!("Error rolling up to hourly metrics: {}", e);
312            }
313        }
314    }
315
316    /// Roll up minute data to hour-level aggregates
317    async fn rollup_to_hour(&self) -> Result<()> {
318        let now = Utc::now();
319        let hour_start = now
320            .with_minute(0)
321            .expect("0 is valid for minutes")
322            .with_second(0)
323            .expect("0 is valid for seconds")
324            .with_nanosecond(0)
325            .expect("0 is valid for nanoseconds")
326            - chrono::Duration::hours(1);
327        let hour_end = hour_start + chrono::Duration::hours(1);
328
329        info!("Rolling up metrics to hour: {}", hour_start);
330
331        let filter = AnalyticsFilter {
332            start_time: Some(hour_start.timestamp()),
333            end_time: Some(hour_end.timestamp()),
334            ..Default::default()
335        };
336
337        let minute_data = self.db.get_minute_aggregates(&filter).await?;
338
339        if minute_data.is_empty() {
340            debug!("No minute data to roll up");
341            return Ok(());
342        }
343
344        // Group by protocol, method, endpoint, status_code
345        let mut groups: HashMap<MetricsGroupKey, Vec<&MetricsAggregate>> = HashMap::new();
346
347        for agg in &minute_data {
348            let key =
349                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
350            groups.entry(key).or_default().push(agg);
351        }
352
353        for ((protocol, method, endpoint, status_code), group) in groups {
354            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
355            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
356            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
357            let latency_min =
358                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
359            let latency_max =
360                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
361
362            let hour_agg = HourMetricsAggregate {
363                id: None,
364                timestamp: hour_start.timestamp(),
365                protocol,
366                method,
367                endpoint,
368                status_code,
369                workspace_id: None,
370                environment: None,
371                request_count,
372                error_count,
373                latency_sum,
374                latency_min: if latency_min.is_finite() {
375                    Some(latency_min)
376                } else {
377                    None
378                },
379                latency_max: if latency_max.is_finite() {
380                    Some(latency_max)
381                } else {
382                    None
383                },
384                latency_p50: None,
385                latency_p95: None,
386                latency_p99: None,
387                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
388                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
389                active_connections_avg: None,
390                active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
391                created_at: None,
392            };
393
394            self.db.insert_hour_aggregate(&hour_agg).await?;
395        }
396
397        info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
398        Ok(())
399    }
400
401    /// Run daily rollup loop
402    async fn run_daily_rollup(&self) {
403        let mut interval = interval(Duration::from_secs(86400)); // Daily
404
405        loop {
406            interval.tick().await;
407
408            if let Err(e) = self.rollup_to_day().await {
409                error!("Error rolling up to daily metrics: {}", e);
410            }
411        }
412    }
413
414    /// Roll up hour data to day-level aggregates
415    #[allow(clippy::too_many_lines)]
416    async fn rollup_to_day(&self) -> Result<()> {
417        let now = Utc::now();
418        let day_start = now
419            .with_hour(0)
420            .expect("0 is valid for hours")
421            .with_minute(0)
422            .expect("0 is valid for minutes")
423            .with_second(0)
424            .expect("0 is valid for seconds")
425            .with_nanosecond(0)
426            .expect("0 is valid for nanoseconds")
427            - chrono::Duration::days(1);
428        let day_end = day_start + chrono::Duration::days(1);
429
430        info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
431
432        let filter = AnalyticsFilter {
433            start_time: Some(day_start.timestamp()),
434            end_time: Some(day_end.timestamp()),
435            ..Default::default()
436        };
437
438        let hour_data = self.db.get_hour_aggregates(&filter).await?;
439
440        if hour_data.is_empty() {
441            debug!("No hour data to roll up");
442            return Ok(());
443        }
444
445        // Group by protocol, method, endpoint, status_code
446        let mut groups: HashMap<MetricsGroupKey, Vec<&HourMetricsAggregate>> = HashMap::new();
447
448        for agg in &hour_data {
449            let key =
450                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
451            groups.entry(key).or_default().push(agg);
452        }
453
454        // Find peak hour (hour with max request count)
455        let mut peak_hour: Option<i32> = None;
456        let mut max_requests = 0i64;
457        for agg in &hour_data {
458            if agg.request_count > max_requests {
459                max_requests = agg.request_count;
460                // Extract hour from timestamp
461                if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
462                    #[allow(clippy::cast_possible_wrap)]
463                    {
464                        peak_hour = Some(dt.hour() as i32);
465                    }
466                }
467            }
468        }
469
470        for ((protocol, method, endpoint, status_code), group) in groups {
471            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
472            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
473            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
474            let latency_min =
475                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
476            let latency_max =
477                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
478
479            // Calculate percentiles from hour aggregates (average of hour percentiles)
480            #[allow(clippy::cast_precision_loss)]
481            let latency_p50_avg: Option<f64> = {
482                let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
483                if p50_values.is_empty() {
484                    None
485                } else {
486                    Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
487                }
488            };
489            #[allow(clippy::cast_precision_loss)]
490            let latency_p95_avg: Option<f64> = {
491                let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
492                if p95_values.is_empty() {
493                    None
494                } else {
495                    Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
496                }
497            };
498            #[allow(clippy::cast_precision_loss)]
499            let latency_p99_avg: Option<f64> = {
500                let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
501                if p99_values.is_empty() {
502                    None
503                } else {
504                    Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
505                }
506            };
507
508            // Average active connections
509            #[allow(clippy::cast_precision_loss)]
510            let active_connections_avg: Option<f64> = {
511                let avg_values: Vec<f64> =
512                    group.iter().filter_map(|a| a.active_connections_avg).collect();
513                if avg_values.is_empty() {
514                    None
515                } else {
516                    Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
517                }
518            };
519
520            // Max active connections
521            let active_connections_max =
522                group.iter().filter_map(|a| a.active_connections_max).max();
523
524            let day_agg = DayMetricsAggregate {
525                id: None,
526                date: day_start.format("%Y-%m-%d").to_string(),
527                timestamp: day_start.timestamp(),
528                protocol,
529                method,
530                endpoint,
531                status_code,
532                workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
533                environment: group.first().and_then(|a| a.environment.clone()),
534                request_count,
535                error_count,
536                latency_sum,
537                latency_min: if latency_min.is_finite() {
538                    Some(latency_min)
539                } else {
540                    None
541                },
542                latency_max: if latency_max.is_finite() {
543                    Some(latency_max)
544                } else {
545                    None
546                },
547                latency_p50: latency_p50_avg,
548                latency_p95: latency_p95_avg,
549                latency_p99: latency_p99_avg,
550                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
551                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
552                active_connections_avg,
553                active_connections_max,
554                unique_clients: None, // Would need to track unique clients separately
555                peak_hour,
556                created_at: None,
557            };
558
559            self.db.insert_day_aggregate(&day_agg).await?;
560        }
561
562        info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
563        Ok(())
564    }
565}
566
567#[cfg(test)]
568mod tests {
569    use super::*;
570
571    #[test]
572    fn test_prometheus_client_creation() {
573        let client = PrometheusClient::new("http://localhost:9090");
574        assert_eq!(client.base_url, "http://localhost:9090");
575    }
576}