mockforge_analytics/
aggregator.rs

1//! Metrics aggregation service
2//!
3//! This module provides background services that:
4//! - Query Prometheus metrics at regular intervals
5//! - Aggregate and store metrics in the analytics database
6//! - Roll up minute data to hour/day granularity
7
8use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12    AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::time::{interval, Duration};
20use tracing::{debug, error, info, warn};
21
22/// Prometheus query client
23#[derive(Clone)]
24pub struct PrometheusClient {
25    base_url: String,
26    client: Client,
27}
28
29impl PrometheusClient {
30    /// Create a new Prometheus client
31    pub fn new(base_url: impl Into<String>) -> Self {
32        Self {
33            base_url: base_url.into(),
34            client: Client::new(),
35        }
36    }
37
38    /// Execute a Prometheus instant query
39    pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
40        let mut url = format!("{}/api/v1/query", self.base_url);
41        url.push_str(&format!("?query={}", urlencoding::encode(query)));
42
43        if let Some(t) = time {
44            url.push_str(&format!("&time={t}"));
45        }
46
47        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
48
49        Ok(response)
50    }
51
52    /// Execute a Prometheus range query
53    pub async fn query_range(
54        &self,
55        query: &str,
56        start: i64,
57        end: i64,
58        step: &str,
59    ) -> Result<PrometheusResponse> {
60        let url = format!(
61            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
62            self.base_url,
63            urlencoding::encode(query),
64            start,
65            end,
66            step
67        );
68
69        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
70
71        Ok(response)
72    }
73}
74
75/// Prometheus API response
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PrometheusResponse {
78    pub status: String,
79    pub data: PrometheusData,
80}
81
82/// Prometheus data payload
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct PrometheusData {
86    pub result_type: String,
87    pub result: Vec<PrometheusResult>,
88}
89
90/// Prometheus query result
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PrometheusResult {
93    pub metric: HashMap<String, String>,
94    pub value: Option<PrometheusValue>,
95    pub values: Option<Vec<PrometheusValue>>,
96}
97
98/// Prometheus metric value (timestamp, value)
99pub type PrometheusValue = (f64, String);
100
101/// Metrics aggregation service
102pub struct MetricsAggregator {
103    db: AnalyticsDatabase,
104    prom_client: PrometheusClient,
105    config: AnalyticsConfig,
106}
107
108impl MetricsAggregator {
109    /// Create a new metrics aggregator
110    pub fn new(
111        db: AnalyticsDatabase,
112        prometheus_url: impl Into<String>,
113        config: AnalyticsConfig,
114    ) -> Self {
115        Self {
116            db,
117            prom_client: PrometheusClient::new(prometheus_url),
118            config,
119        }
120    }
121
122    /// Start the aggregation service
123    pub async fn start(self: Arc<Self>) {
124        info!("Starting metrics aggregation service");
125
126        // Spawn minute aggregation task
127        let self_clone = Arc::clone(&self);
128        tokio::spawn(async move {
129            self_clone.run_minute_aggregation().await;
130        });
131
132        // Spawn hourly rollup task
133        let self_clone = Arc::clone(&self);
134        tokio::spawn(async move {
135            self_clone.run_hourly_rollup().await;
136        });
137
138        // Spawn daily rollup task
139        let self_clone = Arc::clone(&self);
140        tokio::spawn(async move {
141            self_clone.run_daily_rollup().await;
142        });
143    }
144
145    /// Run minute-level aggregation loop
146    async fn run_minute_aggregation(&self) {
147        let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
148
149        loop {
150            interval.tick().await;
151
152            if let Err(e) = self.aggregate_minute_metrics().await {
153                error!("Error aggregating minute metrics: {}", e);
154            }
155        }
156    }
157
158    /// Aggregate metrics for the last minute
159    async fn aggregate_minute_metrics(&self) -> Result<()> {
160        let now = Utc::now();
161        let minute_start =
162            now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
163        let timestamp = minute_start.timestamp();
164
165        debug!("Aggregating metrics for minute: {}", minute_start);
166
167        // Query request counts by protocol, method, path
168        let query = r"sum by (protocol, method, path, status) (
169                increase(mockforge_requests_by_path_total{}[1m]) > 0
170            )"
171        .to_string();
172
173        let response = self.prom_client.query(&query, Some(timestamp)).await?;
174
175        let mut aggregates = Vec::new();
176
177        for result in response.data.result {
178            let protocol = result
179                .metric
180                .get("protocol")
181                .map_or_else(|| "unknown".to_string(), ToString::to_string);
182            let method = result.metric.get("method").cloned();
183            let endpoint = result.metric.get("path").cloned();
184            let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
185
186            let request_count = if let Some((_, value)) = result.value {
187                value.parse::<f64>().unwrap_or(0.0) as i64
188            } else {
189                0
190            };
191
192            // Query latency metrics for this combination
193            let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
194                (&Some(protocol.clone()), &method, &endpoint)
195            {
196                format!(
197                    r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
198                )
199            } else {
200                continue;
201            };
202
203            let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
204                Ok(resp) => resp
205                    .data
206                    .result
207                    .first()
208                    .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
209                Err(e) => {
210                    warn!("Failed to query latency: {}", e);
211                    None
212                }
213            };
214
215            let agg = MetricsAggregate {
216                id: None,
217                timestamp,
218                protocol: protocol.clone(),
219                method: method.clone(),
220                endpoint: endpoint.clone(),
221                status_code,
222                workspace_id: None,
223                environment: None,
224                request_count,
225                error_count: if let Some(sc) = status_code {
226                    if sc >= 400 {
227                        request_count
228                    } else {
229                        0
230                    }
231                } else {
232                    0
233                },
234                latency_sum: 0.0,
235                latency_min: None,
236                latency_max: None,
237                latency_p50: None,
238                latency_p95,
239                latency_p99: None,
240                bytes_sent: 0,
241                bytes_received: 0,
242                active_connections: None,
243                created_at: None,
244            };
245
246            aggregates.push(agg);
247        }
248
249        if !aggregates.is_empty() {
250            self.db.insert_minute_aggregates_batch(&aggregates).await?;
251            info!("Stored {} minute aggregates", aggregates.len());
252
253            // Also update endpoint stats
254            for agg in &aggregates {
255                let stats = EndpointStats {
256                    id: None,
257                    endpoint: agg.endpoint.clone().unwrap_or_default(),
258                    protocol: agg.protocol.clone(),
259                    method: agg.method.clone(),
260                    workspace_id: agg.workspace_id.clone(),
261                    environment: agg.environment.clone(),
262                    total_requests: agg.request_count,
263                    total_errors: agg.error_count,
264                    avg_latency_ms: agg.latency_p95,
265                    min_latency_ms: agg.latency_min,
266                    max_latency_ms: agg.latency_max,
267                    p95_latency_ms: agg.latency_p95,
268                    status_codes: None,
269                    total_bytes_sent: agg.bytes_sent,
270                    total_bytes_received: agg.bytes_received,
271                    first_seen: timestamp,
272                    last_seen: timestamp,
273                    updated_at: None,
274                };
275
276                if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
277                    warn!("Failed to update endpoint stats: {}", e);
278                }
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Run hourly rollup loop
286    async fn run_hourly_rollup(&self) {
287        let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
288
289        loop {
290            interval.tick().await;
291
292            if let Err(e) = self.rollup_to_hour().await {
293                error!("Error rolling up to hourly metrics: {}", e);
294            }
295        }
296    }
297
298    /// Roll up minute data to hour-level aggregates
299    async fn rollup_to_hour(&self) -> Result<()> {
300        let now = Utc::now();
301        let hour_start =
302            now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
303                - chrono::Duration::hours(1);
304        let hour_end = hour_start + chrono::Duration::hours(1);
305
306        info!("Rolling up metrics to hour: {}", hour_start);
307
308        let filter = AnalyticsFilter {
309            start_time: Some(hour_start.timestamp()),
310            end_time: Some(hour_end.timestamp()),
311            ..Default::default()
312        };
313
314        let minute_data = self.db.get_minute_aggregates(&filter).await?;
315
316        if minute_data.is_empty() {
317            debug!("No minute data to roll up");
318            return Ok(());
319        }
320
321        // Group by protocol, method, endpoint, status_code
322        let mut groups: HashMap<
323            (String, Option<String>, Option<String>, Option<i32>),
324            Vec<&MetricsAggregate>,
325        > = HashMap::new();
326
327        for agg in &minute_data {
328            let key =
329                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
330            groups.entry(key).or_default().push(agg);
331        }
332
333        for ((protocol, method, endpoint, status_code), group) in groups {
334            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
335            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
336            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
337            let latency_min =
338                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
339            let latency_max =
340                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
341
342            let hour_agg = HourMetricsAggregate {
343                id: None,
344                timestamp: hour_start.timestamp(),
345                protocol,
346                method,
347                endpoint,
348                status_code,
349                workspace_id: None,
350                environment: None,
351                request_count,
352                error_count,
353                latency_sum,
354                latency_min: if latency_min.is_finite() {
355                    Some(latency_min)
356                } else {
357                    None
358                },
359                latency_max: if latency_max.is_finite() {
360                    Some(latency_max)
361                } else {
362                    None
363                },
364                latency_p50: None,
365                latency_p95: None,
366                latency_p99: None,
367                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
368                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
369                active_connections_avg: None,
370                active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
371                created_at: None,
372            };
373
374            self.db.insert_hour_aggregate(&hour_agg).await?;
375        }
376
377        info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
378        Ok(())
379    }
380
381    /// Run daily rollup loop
382    async fn run_daily_rollup(&self) {
383        let mut interval = interval(Duration::from_secs(86400)); // Daily
384
385        loop {
386            interval.tick().await;
387
388            if let Err(e) = self.rollup_to_day().await {
389                error!("Error rolling up to daily metrics: {}", e);
390            }
391        }
392    }
393
394    /// Roll up hour data to day-level aggregates
395    async fn rollup_to_day(&self) -> Result<()> {
396        let now = Utc::now();
397        let day_start = now
398            .with_hour(0)
399            .unwrap()
400            .with_minute(0)
401            .unwrap()
402            .with_second(0)
403            .unwrap()
404            .with_nanosecond(0)
405            .unwrap()
406            - chrono::Duration::days(1);
407        let day_end = day_start + chrono::Duration::days(1);
408
409        info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
410
411        let filter = AnalyticsFilter {
412            start_time: Some(day_start.timestamp()),
413            end_time: Some(day_end.timestamp()),
414            ..Default::default()
415        };
416
417        let hour_data = self.db.get_hour_aggregates(&filter).await?;
418
419        if hour_data.is_empty() {
420            debug!("No hour data to roll up");
421            return Ok(());
422        }
423
424        // Group by protocol, method, endpoint, status_code
425        let mut groups: HashMap<
426            (String, Option<String>, Option<String>, Option<i32>),
427            Vec<&HourMetricsAggregate>,
428        > = HashMap::new();
429
430        for agg in &hour_data {
431            let key =
432                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
433            groups.entry(key).or_default().push(agg);
434        }
435
436        // Find peak hour (hour with max request count)
437        let mut peak_hour: Option<i32> = None;
438        let mut max_requests = 0i64;
439        for agg in &hour_data {
440            if agg.request_count > max_requests {
441                max_requests = agg.request_count;
442                // Extract hour from timestamp
443                if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
444                    peak_hour = Some(dt.hour() as i32);
445                }
446            }
447        }
448
449        for ((protocol, method, endpoint, status_code), group) in groups {
450            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
451            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
452            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
453            let latency_min =
454                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
455            let latency_max =
456                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
457
458            // Calculate percentiles from hour aggregates (average of hour percentiles)
459            let latency_p50_avg: Option<f64> = {
460                let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
461                if !p50_values.is_empty() {
462                    Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
463                } else {
464                    None
465                }
466            };
467            let latency_p95_avg: Option<f64> = {
468                let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
469                if !p95_values.is_empty() {
470                    Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
471                } else {
472                    None
473                }
474            };
475            let latency_p99_avg: Option<f64> = {
476                let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
477                if !p99_values.is_empty() {
478                    Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
479                } else {
480                    None
481                }
482            };
483
484            // Average active connections
485            let active_connections_avg: Option<f64> = {
486                let avg_values: Vec<f64> =
487                    group.iter().filter_map(|a| a.active_connections_avg).collect();
488                if !avg_values.is_empty() {
489                    Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
490                } else {
491                    None
492                }
493            };
494
495            // Max active connections
496            let active_connections_max =
497                group.iter().filter_map(|a| a.active_connections_max).max();
498
499            let day_agg = DayMetricsAggregate {
500                id: None,
501                date: day_start.format("%Y-%m-%d").to_string(),
502                timestamp: day_start.timestamp(),
503                protocol,
504                method,
505                endpoint,
506                status_code,
507                workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
508                environment: group.first().and_then(|a| a.environment.clone()),
509                request_count,
510                error_count,
511                latency_sum,
512                latency_min: if latency_min.is_finite() {
513                    Some(latency_min)
514                } else {
515                    None
516                },
517                latency_max: if latency_max.is_finite() {
518                    Some(latency_max)
519                } else {
520                    None
521                },
522                latency_p50: latency_p50_avg,
523                latency_p95: latency_p95_avg,
524                latency_p99: latency_p99_avg,
525                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
526                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
527                active_connections_avg,
528                active_connections_max,
529                unique_clients: None, // Would need to track unique clients separately
530                peak_hour,
531                created_at: None,
532            };
533
534            self.db.insert_day_aggregate(&day_agg).await?;
535        }
536
537        info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
538        Ok(())
539    }
540}
541
542#[cfg(test)]
543mod tests {
544    use super::*;
545
546    #[test]
547    fn test_prometheus_client_creation() {
548        let client = PrometheusClient::new("http://localhost:9090");
549        assert_eq!(client.base_url, "http://localhost:9090");
550    }
551}