mockforge_analytics/
aggregator.rs

1//! Metrics aggregation service
2//!
3//! This module provides background services that:
4//! - Query Prometheus metrics at regular intervals
5//! - Aggregate and store metrics in the analytics database
6//! - Roll up minute data to hour/day granularity
7
8use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate};
12use chrono::{Timelike, Utc};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::time::{interval, Duration};
18use tracing::{debug, error, info, warn};
19
20/// Prometheus query client
21#[derive(Clone)]
22pub struct PrometheusClient {
23    base_url: String,
24    client: Client,
25}
26
27impl PrometheusClient {
28    /// Create a new Prometheus client
29    pub fn new(base_url: impl Into<String>) -> Self {
30        Self {
31            base_url: base_url.into(),
32            client: Client::new(),
33        }
34    }
35
36    /// Execute a Prometheus instant query
37    pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
38        let mut url = format!("{}/api/v1/query", self.base_url);
39        url.push_str(&format!("?query={}", urlencoding::encode(query)));
40
41        if let Some(t) = time {
42            url.push_str(&format!("&time={t}"));
43        }
44
45        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
46
47        Ok(response)
48    }
49
50    /// Execute a Prometheus range query
51    pub async fn query_range(
52        &self,
53        query: &str,
54        start: i64,
55        end: i64,
56        step: &str,
57    ) -> Result<PrometheusResponse> {
58        let url = format!(
59            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
60            self.base_url,
61            urlencoding::encode(query),
62            start,
63            end,
64            step
65        );
66
67        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
68
69        Ok(response)
70    }
71}
72
73/// Prometheus API response
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PrometheusResponse {
76    pub status: String,
77    pub data: PrometheusData,
78}
79
80/// Prometheus data payload
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct PrometheusData {
84    pub result_type: String,
85    pub result: Vec<PrometheusResult>,
86}
87
88/// Prometheus query result
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PrometheusResult {
91    pub metric: HashMap<String, String>,
92    pub value: Option<PrometheusValue>,
93    pub values: Option<Vec<PrometheusValue>>,
94}
95
96/// Prometheus metric value (timestamp, value)
97pub type PrometheusValue = (f64, String);
98
99/// Metrics aggregation service
100pub struct MetricsAggregator {
101    db: AnalyticsDatabase,
102    prom_client: PrometheusClient,
103    config: AnalyticsConfig,
104}
105
106impl MetricsAggregator {
107    /// Create a new metrics aggregator
108    pub fn new(
109        db: AnalyticsDatabase,
110        prometheus_url: impl Into<String>,
111        config: AnalyticsConfig,
112    ) -> Self {
113        Self {
114            db,
115            prom_client: PrometheusClient::new(prometheus_url),
116            config,
117        }
118    }
119
120    /// Start the aggregation service
121    pub async fn start(self: Arc<Self>) {
122        info!("Starting metrics aggregation service");
123
124        // Spawn minute aggregation task
125        let self_clone = Arc::clone(&self);
126        tokio::spawn(async move {
127            self_clone.run_minute_aggregation().await;
128        });
129
130        // Spawn hourly rollup task
131        let self_clone = Arc::clone(&self);
132        tokio::spawn(async move {
133            self_clone.run_hourly_rollup().await;
134        });
135
136        // Spawn daily rollup task
137        let self_clone = Arc::clone(&self);
138        tokio::spawn(async move {
139            self_clone.run_daily_rollup().await;
140        });
141    }
142
143    /// Run minute-level aggregation loop
144    async fn run_minute_aggregation(&self) {
145        let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
146
147        loop {
148            interval.tick().await;
149
150            if let Err(e) = self.aggregate_minute_metrics().await {
151                error!("Error aggregating minute metrics: {}", e);
152            }
153        }
154    }
155
156    /// Aggregate metrics for the last minute
157    async fn aggregate_minute_metrics(&self) -> Result<()> {
158        let now = Utc::now();
159        let minute_start =
160            now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
161        let timestamp = minute_start.timestamp();
162
163        debug!("Aggregating metrics for minute: {}", minute_start);
164
165        // Query request counts by protocol, method, path
166        let query = r"sum by (protocol, method, path, status) (
167                increase(mockforge_requests_by_path_total{}[1m]) > 0
168            )"
169        .to_string();
170
171        let response = self.prom_client.query(&query, Some(timestamp)).await?;
172
173        let mut aggregates = Vec::new();
174
175        for result in response.data.result {
176            let protocol = result
177                .metric
178                .get("protocol")
179                .map_or_else(|| "unknown".to_string(), ToString::to_string);
180            let method = result.metric.get("method").cloned();
181            let endpoint = result.metric.get("path").cloned();
182            let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
183
184            let request_count = if let Some((_, value)) = result.value {
185                value.parse::<f64>().unwrap_or(0.0) as i64
186            } else {
187                0
188            };
189
190            // Query latency metrics for this combination
191            let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
192                (&Some(protocol.clone()), &method, &endpoint)
193            {
194                format!(
195                    r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
196                )
197            } else {
198                continue;
199            };
200
201            let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
202                Ok(resp) => resp
203                    .data
204                    .result
205                    .first()
206                    .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
207                Err(e) => {
208                    warn!("Failed to query latency: {}", e);
209                    None
210                }
211            };
212
213            let agg = MetricsAggregate {
214                id: None,
215                timestamp,
216                protocol: protocol.clone(),
217                method: method.clone(),
218                endpoint: endpoint.clone(),
219                status_code,
220                workspace_id: None,
221                environment: None,
222                request_count,
223                error_count: if let Some(sc) = status_code {
224                    if sc >= 400 {
225                        request_count
226                    } else {
227                        0
228                    }
229                } else {
230                    0
231                },
232                latency_sum: 0.0,
233                latency_min: None,
234                latency_max: None,
235                latency_p50: None,
236                latency_p95,
237                latency_p99: None,
238                bytes_sent: 0,
239                bytes_received: 0,
240                active_connections: None,
241                created_at: None,
242            };
243
244            aggregates.push(agg);
245        }
246
247        if !aggregates.is_empty() {
248            self.db.insert_minute_aggregates_batch(&aggregates).await?;
249            info!("Stored {} minute aggregates", aggregates.len());
250
251            // Also update endpoint stats
252            for agg in &aggregates {
253                let stats = EndpointStats {
254                    id: None,
255                    endpoint: agg.endpoint.clone().unwrap_or_default(),
256                    protocol: agg.protocol.clone(),
257                    method: agg.method.clone(),
258                    workspace_id: agg.workspace_id.clone(),
259                    environment: agg.environment.clone(),
260                    total_requests: agg.request_count,
261                    total_errors: agg.error_count,
262                    avg_latency_ms: agg.latency_p95,
263                    min_latency_ms: agg.latency_min,
264                    max_latency_ms: agg.latency_max,
265                    p95_latency_ms: agg.latency_p95,
266                    status_codes: None,
267                    total_bytes_sent: agg.bytes_sent,
268                    total_bytes_received: agg.bytes_received,
269                    first_seen: timestamp,
270                    last_seen: timestamp,
271                    updated_at: None,
272                };
273
274                if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
275                    warn!("Failed to update endpoint stats: {}", e);
276                }
277            }
278        }
279
280        Ok(())
281    }
282
283    /// Run hourly rollup loop
284    async fn run_hourly_rollup(&self) {
285        let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
286
287        loop {
288            interval.tick().await;
289
290            if let Err(e) = self.rollup_to_hour().await {
291                error!("Error rolling up to hourly metrics: {}", e);
292            }
293        }
294    }
295
296    /// Roll up minute data to hour-level aggregates
297    async fn rollup_to_hour(&self) -> Result<()> {
298        let now = Utc::now();
299        let hour_start =
300            now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
301                - chrono::Duration::hours(1);
302        let hour_end = hour_start + chrono::Duration::hours(1);
303
304        info!("Rolling up metrics to hour: {}", hour_start);
305
306        let filter = AnalyticsFilter {
307            start_time: Some(hour_start.timestamp()),
308            end_time: Some(hour_end.timestamp()),
309            ..Default::default()
310        };
311
312        let minute_data = self.db.get_minute_aggregates(&filter).await?;
313
314        if minute_data.is_empty() {
315            debug!("No minute data to roll up");
316            return Ok(());
317        }
318
319        // Group by protocol, method, endpoint, status_code
320        let mut groups: HashMap<
321            (String, Option<String>, Option<String>, Option<i32>),
322            Vec<&MetricsAggregate>,
323        > = HashMap::new();
324
325        for agg in &minute_data {
326            let key =
327                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
328            groups.entry(key).or_default().push(agg);
329        }
330
331        for ((protocol, method, endpoint, status_code), group) in groups {
332            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
333            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
334            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
335            let latency_min =
336                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
337            let latency_max =
338                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
339
340            let hour_agg = HourMetricsAggregate {
341                id: None,
342                timestamp: hour_start.timestamp(),
343                protocol,
344                method,
345                endpoint,
346                status_code,
347                workspace_id: None,
348                environment: None,
349                request_count,
350                error_count,
351                latency_sum,
352                latency_min: if latency_min.is_finite() {
353                    Some(latency_min)
354                } else {
355                    None
356                },
357                latency_max: if latency_max.is_finite() {
358                    Some(latency_max)
359                } else {
360                    None
361                },
362                latency_p50: None,
363                latency_p95: None,
364                latency_p99: None,
365                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
366                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
367                active_connections_avg: None,
368                active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
369                created_at: None,
370            };
371
372            self.db.insert_hour_aggregate(&hour_agg).await?;
373        }
374
375        info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
376        Ok(())
377    }
378
379    /// Run daily rollup loop
380    async fn run_daily_rollup(&self) {
381        let mut interval = interval(Duration::from_secs(86400)); // Daily
382
383        loop {
384            interval.tick().await;
385
386            if let Err(e) = self.rollup_to_day().await {
387                error!("Error rolling up to daily metrics: {}", e);
388            }
389        }
390    }
391
392    /// Roll up hour data to day-level aggregates
393    async fn rollup_to_day(&self) -> Result<()> {
394        let now = Utc::now();
395        let day_start = now
396            .with_hour(0)
397            .unwrap()
398            .with_minute(0)
399            .unwrap()
400            .with_second(0)
401            .unwrap()
402            .with_nanosecond(0)
403            .unwrap()
404            - chrono::Duration::days(1);
405        let day_end = day_start + chrono::Duration::days(1);
406
407        info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
408
409        let filter = AnalyticsFilter {
410            start_time: Some(day_start.timestamp()),
411            end_time: Some(day_end.timestamp()),
412            ..Default::default()
413        };
414
415        let hour_data = self.db.get_hour_aggregates(&filter).await?;
416
417        if hour_data.is_empty() {
418            debug!("No hour data to roll up");
419            return Ok(());
420        }
421
422        // Group by protocol, method, endpoint, status_code
423        let mut groups: HashMap<
424            (String, Option<String>, Option<String>, Option<i32>),
425            Vec<&HourMetricsAggregate>,
426        > = HashMap::new();
427
428        for agg in &hour_data {
429            let key =
430                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
431            groups.entry(key).or_default().push(agg);
432        }
433
434        // Find peak hour (hour with max request count)
435        let mut peak_hour: Option<i32> = None;
436        let mut max_requests = 0i64;
437        for agg in &hour_data {
438            if agg.request_count > max_requests {
439                max_requests = agg.request_count;
440                // Extract hour from timestamp
441                if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
442                    peak_hour = Some(dt.hour() as i32);
443                }
444            }
445        }
446
447        for ((protocol, method, endpoint, status_code), group) in groups {
448            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
449            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
450            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
451            let latency_min =
452                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
453            let latency_max =
454                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
455
456            // Calculate percentiles from hour aggregates (average of hour percentiles)
457            let latency_p50_avg: Option<f64> = {
458                let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
459                if !p50_values.is_empty() {
460                    Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
461                } else {
462                    None
463                }
464            };
465            let latency_p95_avg: Option<f64> = {
466                let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
467                if !p95_values.is_empty() {
468                    Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
469                } else {
470                    None
471                }
472            };
473            let latency_p99_avg: Option<f64> = {
474                let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
475                if !p99_values.is_empty() {
476                    Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
477                } else {
478                    None
479                }
480            };
481
482            // Average active connections
483            let active_connections_avg: Option<f64> = {
484                let avg_values: Vec<f64> =
485                    group.iter().filter_map(|a| a.active_connections_avg).collect();
486                if !avg_values.is_empty() {
487                    Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
488                } else {
489                    None
490                }
491            };
492
493            // Max active connections
494            let active_connections_max = group.iter().filter_map(|a| a.active_connections_max).max();
495
496            let day_agg = DayMetricsAggregate {
497                id: None,
498                date: day_start.format("%Y-%m-%d").to_string(),
499                timestamp: day_start.timestamp(),
500                protocol,
501                method,
502                endpoint,
503                status_code,
504                workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
505                environment: group.first().and_then(|a| a.environment.clone()),
506                request_count,
507                error_count,
508                latency_sum,
509                latency_min: if latency_min.is_finite() {
510                    Some(latency_min)
511                } else {
512                    None
513                },
514                latency_max: if latency_max.is_finite() {
515                    Some(latency_max)
516                } else {
517                    None
518                },
519                latency_p50: latency_p50_avg,
520                latency_p95: latency_p95_avg,
521                latency_p99: latency_p99_avg,
522                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
523                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
524                active_connections_avg,
525                active_connections_max,
526                unique_clients: None, // Would need to track unique clients separately
527                peak_hour,
528                created_at: None,
529            };
530
531            self.db.insert_day_aggregate(&day_agg).await?;
532        }
533
534        info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
535        Ok(())
536    }
537}
538
539#[cfg(test)]
540mod tests {
541    use super::*;
542
543    #[test]
544    fn test_prometheus_client_creation() {
545        let client = PrometheusClient::new("http://localhost:9090");
546        assert_eq!(client.base_url, "http://localhost:9090");
547    }
548}