mockforge_analytics/
aggregator.rs

1//! Metrics aggregation service
2//!
3//! This module provides background services that:
4//! - Query Prometheus metrics at regular intervals
5//! - Aggregate and store metrics in the analytics database
6//! - Roll up minute data to hour/day granularity
7
8use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12    AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::time::{interval, Duration};
20use tracing::{debug, error, info, warn};
21
22/// Prometheus query client
23#[derive(Clone)]
24pub struct PrometheusClient {
25    base_url: String,
26    client: Client,
27}
28
29impl PrometheusClient {
30    /// Create a new Prometheus client
31    pub fn new(base_url: impl Into<String>) -> Self {
32        Self {
33            base_url: base_url.into(),
34            client: Client::new(),
35        }
36    }
37
38    /// Execute a Prometheus instant query
39    pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
40        let mut url = format!("{}/api/v1/query", self.base_url);
41        url.push_str(&format!("?query={}", urlencoding::encode(query)));
42
43        if let Some(t) = time {
44            url.push_str(&format!("&time={t}"));
45        }
46
47        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
48
49        Ok(response)
50    }
51
52    /// Execute a Prometheus range query
53    pub async fn query_range(
54        &self,
55        query: &str,
56        start: i64,
57        end: i64,
58        step: &str,
59    ) -> Result<PrometheusResponse> {
60        let url = format!(
61            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
62            self.base_url,
63            urlencoding::encode(query),
64            start,
65            end,
66            step
67        );
68
69        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
70
71        Ok(response)
72    }
73}
74
75/// Prometheus API response
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PrometheusResponse {
78    pub status: String,
79    pub data: PrometheusData,
80}
81
82/// Prometheus data payload
83#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct PrometheusData {
86    pub result_type: String,
87    pub result: Vec<PrometheusResult>,
88}
89
90/// Prometheus query result
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PrometheusResult {
93    pub metric: HashMap<String, String>,
94    pub value: Option<PrometheusValue>,
95    pub values: Option<Vec<PrometheusValue>>,
96}
97
98/// Prometheus metric value (timestamp, value)
99pub type PrometheusValue = (f64, String);
100
101/// Metrics aggregation service
102pub struct MetricsAggregator {
103    db: AnalyticsDatabase,
104    prom_client: PrometheusClient,
105    config: AnalyticsConfig,
106}
107
108impl MetricsAggregator {
109    /// Create a new metrics aggregator
110    pub fn new(
111        db: AnalyticsDatabase,
112        prometheus_url: impl Into<String>,
113        config: AnalyticsConfig,
114    ) -> Self {
115        Self {
116            db,
117            prom_client: PrometheusClient::new(prometheus_url),
118            config,
119        }
120    }
121
122    /// Start the aggregation service
123    pub async fn start(self: Arc<Self>) {
124        info!("Starting metrics aggregation service");
125
126        // Spawn minute aggregation task
127        let self_clone = Arc::clone(&self);
128        tokio::spawn(async move {
129            self_clone.run_minute_aggregation().await;
130        });
131
132        // Spawn hourly rollup task
133        let self_clone = Arc::clone(&self);
134        tokio::spawn(async move {
135            self_clone.run_hourly_rollup().await;
136        });
137
138        // Spawn daily rollup task
139        let self_clone = Arc::clone(&self);
140        tokio::spawn(async move {
141            self_clone.run_daily_rollup().await;
142        });
143    }
144
145    /// Run minute-level aggregation loop
146    async fn run_minute_aggregation(&self) {
147        let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
148
149        loop {
150            interval.tick().await;
151
152            if let Err(e) = self.aggregate_minute_metrics().await {
153                error!("Error aggregating minute metrics: {}", e);
154            }
155        }
156    }
157
158    /// Aggregate metrics for the last minute
159    async fn aggregate_minute_metrics(&self) -> Result<()> {
160        let now = Utc::now();
161        let minute_start = now
162            .with_second(0)
163            .expect("0 is valid for seconds")
164            .with_nanosecond(0)
165            .expect("0 is valid for nanoseconds")
166            - chrono::Duration::minutes(1);
167        let timestamp = minute_start.timestamp();
168
169        debug!("Aggregating metrics for minute: {}", minute_start);
170
171        // Query request counts by protocol, method, path
172        let query = r"sum by (protocol, method, path, status) (
173                increase(mockforge_requests_by_path_total{}[1m]) > 0
174            )"
175        .to_string();
176
177        let response = self.prom_client.query(&query, Some(timestamp)).await?;
178
179        let mut aggregates = Vec::new();
180
181        for result in response.data.result {
182            let protocol = result
183                .metric
184                .get("protocol")
185                .map_or_else(|| "unknown".to_string(), ToString::to_string);
186            let method = result.metric.get("method").cloned();
187            let endpoint = result.metric.get("path").cloned();
188            let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
189
190            let request_count = if let Some((_, value)) = result.value {
191                value.parse::<f64>().unwrap_or(0.0) as i64
192            } else {
193                0
194            };
195
196            // Query latency metrics for this combination
197            let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
198                (&Some(protocol.clone()), &method, &endpoint)
199            {
200                format!(
201                    r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
202                )
203            } else {
204                continue;
205            };
206
207            let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
208                Ok(resp) => resp
209                    .data
210                    .result
211                    .first()
212                    .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
213                Err(e) => {
214                    warn!("Failed to query latency: {}", e);
215                    None
216                }
217            };
218
219            let agg = MetricsAggregate {
220                id: None,
221                timestamp,
222                protocol: protocol.clone(),
223                method: method.clone(),
224                endpoint: endpoint.clone(),
225                status_code,
226                workspace_id: None,
227                environment: None,
228                request_count,
229                error_count: if let Some(sc) = status_code {
230                    if sc >= 400 {
231                        request_count
232                    } else {
233                        0
234                    }
235                } else {
236                    0
237                },
238                latency_sum: 0.0,
239                latency_min: None,
240                latency_max: None,
241                latency_p50: None,
242                latency_p95,
243                latency_p99: None,
244                bytes_sent: 0,
245                bytes_received: 0,
246                active_connections: None,
247                created_at: None,
248            };
249
250            aggregates.push(agg);
251        }
252
253        if !aggregates.is_empty() {
254            self.db.insert_minute_aggregates_batch(&aggregates).await?;
255            info!("Stored {} minute aggregates", aggregates.len());
256
257            // Also update endpoint stats
258            for agg in &aggregates {
259                let stats = EndpointStats {
260                    id: None,
261                    endpoint: agg.endpoint.clone().unwrap_or_default(),
262                    protocol: agg.protocol.clone(),
263                    method: agg.method.clone(),
264                    workspace_id: agg.workspace_id.clone(),
265                    environment: agg.environment.clone(),
266                    total_requests: agg.request_count,
267                    total_errors: agg.error_count,
268                    avg_latency_ms: agg.latency_p95,
269                    min_latency_ms: agg.latency_min,
270                    max_latency_ms: agg.latency_max,
271                    p95_latency_ms: agg.latency_p95,
272                    status_codes: None,
273                    total_bytes_sent: agg.bytes_sent,
274                    total_bytes_received: agg.bytes_received,
275                    first_seen: timestamp,
276                    last_seen: timestamp,
277                    updated_at: None,
278                };
279
280                if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
281                    warn!("Failed to update endpoint stats: {}", e);
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    /// Run hourly rollup loop
290    async fn run_hourly_rollup(&self) {
291        let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
292
293        loop {
294            interval.tick().await;
295
296            if let Err(e) = self.rollup_to_hour().await {
297                error!("Error rolling up to hourly metrics: {}", e);
298            }
299        }
300    }
301
302    /// Roll up minute data to hour-level aggregates
303    async fn rollup_to_hour(&self) -> Result<()> {
304        let now = Utc::now();
305        let hour_start = now
306            .with_minute(0)
307            .expect("0 is valid for minutes")
308            .with_second(0)
309            .expect("0 is valid for seconds")
310            .with_nanosecond(0)
311            .expect("0 is valid for nanoseconds")
312            - chrono::Duration::hours(1);
313        let hour_end = hour_start + chrono::Duration::hours(1);
314
315        info!("Rolling up metrics to hour: {}", hour_start);
316
317        let filter = AnalyticsFilter {
318            start_time: Some(hour_start.timestamp()),
319            end_time: Some(hour_end.timestamp()),
320            ..Default::default()
321        };
322
323        let minute_data = self.db.get_minute_aggregates(&filter).await?;
324
325        if minute_data.is_empty() {
326            debug!("No minute data to roll up");
327            return Ok(());
328        }
329
330        // Group by protocol, method, endpoint, status_code
331        let mut groups: HashMap<
332            (String, Option<String>, Option<String>, Option<i32>),
333            Vec<&MetricsAggregate>,
334        > = HashMap::new();
335
336        for agg in &minute_data {
337            let key =
338                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
339            groups.entry(key).or_default().push(agg);
340        }
341
342        for ((protocol, method, endpoint, status_code), group) in groups {
343            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
344            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
345            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
346            let latency_min =
347                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
348            let latency_max =
349                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
350
351            let hour_agg = HourMetricsAggregate {
352                id: None,
353                timestamp: hour_start.timestamp(),
354                protocol,
355                method,
356                endpoint,
357                status_code,
358                workspace_id: None,
359                environment: None,
360                request_count,
361                error_count,
362                latency_sum,
363                latency_min: if latency_min.is_finite() {
364                    Some(latency_min)
365                } else {
366                    None
367                },
368                latency_max: if latency_max.is_finite() {
369                    Some(latency_max)
370                } else {
371                    None
372                },
373                latency_p50: None,
374                latency_p95: None,
375                latency_p99: None,
376                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
377                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
378                active_connections_avg: None,
379                active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
380                created_at: None,
381            };
382
383            self.db.insert_hour_aggregate(&hour_agg).await?;
384        }
385
386        info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
387        Ok(())
388    }
389
390    /// Run daily rollup loop
391    async fn run_daily_rollup(&self) {
392        let mut interval = interval(Duration::from_secs(86400)); // Daily
393
394        loop {
395            interval.tick().await;
396
397            if let Err(e) = self.rollup_to_day().await {
398                error!("Error rolling up to daily metrics: {}", e);
399            }
400        }
401    }
402
403    /// Roll up hour data to day-level aggregates
404    async fn rollup_to_day(&self) -> Result<()> {
405        let now = Utc::now();
406        let day_start = now
407            .with_hour(0)
408            .expect("0 is valid for hours")
409            .with_minute(0)
410            .expect("0 is valid for minutes")
411            .with_second(0)
412            .expect("0 is valid for seconds")
413            .with_nanosecond(0)
414            .expect("0 is valid for nanoseconds")
415            - chrono::Duration::days(1);
416        let day_end = day_start + chrono::Duration::days(1);
417
418        info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
419
420        let filter = AnalyticsFilter {
421            start_time: Some(day_start.timestamp()),
422            end_time: Some(day_end.timestamp()),
423            ..Default::default()
424        };
425
426        let hour_data = self.db.get_hour_aggregates(&filter).await?;
427
428        if hour_data.is_empty() {
429            debug!("No hour data to roll up");
430            return Ok(());
431        }
432
433        // Group by protocol, method, endpoint, status_code
434        let mut groups: HashMap<
435            (String, Option<String>, Option<String>, Option<i32>),
436            Vec<&HourMetricsAggregate>,
437        > = HashMap::new();
438
439        for agg in &hour_data {
440            let key =
441                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
442            groups.entry(key).or_default().push(agg);
443        }
444
445        // Find peak hour (hour with max request count)
446        let mut peak_hour: Option<i32> = None;
447        let mut max_requests = 0i64;
448        for agg in &hour_data {
449            if agg.request_count > max_requests {
450                max_requests = agg.request_count;
451                // Extract hour from timestamp
452                if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
453                    peak_hour = Some(dt.hour() as i32);
454                }
455            }
456        }
457
458        for ((protocol, method, endpoint, status_code), group) in groups {
459            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
460            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
461            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
462            let latency_min =
463                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
464            let latency_max =
465                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
466
467            // Calculate percentiles from hour aggregates (average of hour percentiles)
468            let latency_p50_avg: Option<f64> = {
469                let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
470                if !p50_values.is_empty() {
471                    Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
472                } else {
473                    None
474                }
475            };
476            let latency_p95_avg: Option<f64> = {
477                let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
478                if !p95_values.is_empty() {
479                    Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
480                } else {
481                    None
482                }
483            };
484            let latency_p99_avg: Option<f64> = {
485                let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
486                if !p99_values.is_empty() {
487                    Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
488                } else {
489                    None
490                }
491            };
492
493            // Average active connections
494            let active_connections_avg: Option<f64> = {
495                let avg_values: Vec<f64> =
496                    group.iter().filter_map(|a| a.active_connections_avg).collect();
497                if !avg_values.is_empty() {
498                    Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
499                } else {
500                    None
501                }
502            };
503
504            // Max active connections
505            let active_connections_max =
506                group.iter().filter_map(|a| a.active_connections_max).max();
507
508            let day_agg = DayMetricsAggregate {
509                id: None,
510                date: day_start.format("%Y-%m-%d").to_string(),
511                timestamp: day_start.timestamp(),
512                protocol,
513                method,
514                endpoint,
515                status_code,
516                workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
517                environment: group.first().and_then(|a| a.environment.clone()),
518                request_count,
519                error_count,
520                latency_sum,
521                latency_min: if latency_min.is_finite() {
522                    Some(latency_min)
523                } else {
524                    None
525                },
526                latency_max: if latency_max.is_finite() {
527                    Some(latency_max)
528                } else {
529                    None
530                },
531                latency_p50: latency_p50_avg,
532                latency_p95: latency_p95_avg,
533                latency_p99: latency_p99_avg,
534                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
535                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
536                active_connections_avg,
537                active_connections_max,
538                unique_clients: None, // Would need to track unique clients separately
539                peak_hour,
540                created_at: None,
541            };
542
543            self.db.insert_day_aggregate(&day_agg).await?;
544        }
545
546        info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
547        Ok(())
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554
555    #[test]
556    fn test_prometheus_client_creation() {
557        let client = PrometheusClient::new("http://localhost:9090");
558        assert_eq!(client.base_url, "http://localhost:9090");
559    }
560}