mockforge_analytics/
aggregator.rs

1//! Metrics aggregation service
2//!
3//! This module provides background services that:
4//! - Query Prometheus metrics at regular intervals
5//! - Aggregate and store metrics in the analytics database
6//! - Roll up minute data to hour/day granularity
7
8use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::*;
12use chrono::{Timelike, Utc};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::time::{interval, Duration};
18use tracing::{debug, error, info, warn};
19
20/// Prometheus query client
21#[derive(Clone)]
22pub struct PrometheusClient {
23    base_url: String,
24    client: Client,
25}
26
27impl PrometheusClient {
28    /// Create a new Prometheus client
29    pub fn new(base_url: impl Into<String>) -> Self {
30        Self {
31            base_url: base_url.into(),
32            client: Client::new(),
33        }
34    }
35
36    /// Execute a Prometheus instant query
37    pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
38        let mut url = format!("{}/api/v1/query", self.base_url);
39        url.push_str(&format!("?query={}", urlencoding::encode(query)));
40
41        if let Some(t) = time {
42            url.push_str(&format!("&time={}", t));
43        }
44
45        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
46
47        Ok(response)
48    }
49
50    /// Execute a Prometheus range query
51    pub async fn query_range(
52        &self,
53        query: &str,
54        start: i64,
55        end: i64,
56        step: &str,
57    ) -> Result<PrometheusResponse> {
58        let url = format!(
59            "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
60            self.base_url,
61            urlencoding::encode(query),
62            start,
63            end,
64            step
65        );
66
67        let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
68
69        Ok(response)
70    }
71}
72
73/// Prometheus API response
74#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PrometheusResponse {
76    pub status: String,
77    pub data: PrometheusData,
78}
79
80/// Prometheus data payload
81#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct PrometheusData {
84    pub result_type: String,
85    pub result: Vec<PrometheusResult>,
86}
87
88/// Prometheus query result
89#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PrometheusResult {
91    pub metric: HashMap<String, String>,
92    pub value: Option<PrometheusValue>,
93    pub values: Option<Vec<PrometheusValue>>,
94}
95
96/// Prometheus metric value (timestamp, value)
97pub type PrometheusValue = (f64, String);
98
99/// Metrics aggregation service
100pub struct MetricsAggregator {
101    db: AnalyticsDatabase,
102    prom_client: PrometheusClient,
103    config: AnalyticsConfig,
104}
105
106impl MetricsAggregator {
107    /// Create a new metrics aggregator
108    pub fn new(
109        db: AnalyticsDatabase,
110        prometheus_url: impl Into<String>,
111        config: AnalyticsConfig,
112    ) -> Self {
113        Self {
114            db,
115            prom_client: PrometheusClient::new(prometheus_url),
116            config,
117        }
118    }
119
120    /// Start the aggregation service
121    pub async fn start(self: Arc<Self>) {
122        info!("Starting metrics aggregation service");
123
124        // Spawn minute aggregation task
125        let self_clone = Arc::clone(&self);
126        tokio::spawn(async move {
127            self_clone.run_minute_aggregation().await;
128        });
129
130        // Spawn hourly rollup task
131        let self_clone = Arc::clone(&self);
132        tokio::spawn(async move {
133            self_clone.run_hourly_rollup().await;
134        });
135
136        // Spawn daily rollup task
137        let self_clone = Arc::clone(&self);
138        tokio::spawn(async move {
139            self_clone.run_daily_rollup().await;
140        });
141    }
142
143    /// Run minute-level aggregation loop
144    async fn run_minute_aggregation(&self) {
145        let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
146
147        loop {
148            interval.tick().await;
149
150            if let Err(e) = self.aggregate_minute_metrics().await {
151                error!("Error aggregating minute metrics: {}", e);
152            }
153        }
154    }
155
156    /// Aggregate metrics for the last minute
157    async fn aggregate_minute_metrics(&self) -> Result<()> {
158        let now = Utc::now();
159        let minute_start =
160            now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
161        let timestamp = minute_start.timestamp();
162
163        debug!("Aggregating metrics for minute: {}", minute_start);
164
165        // Query request counts by protocol, method, path
166        let query = format!(
167            r"sum by (protocol, method, path, status) (
168                increase(mockforge_requests_by_path_total{{}}[1m]) > 0
169            )"
170        );
171
172        let response = self.prom_client.query(&query, Some(timestamp)).await?;
173
174        let mut aggregates = Vec::new();
175
176        for result in response.data.result {
177            let protocol = result
178                .metric
179                .get("protocol")
180                .map(|s| s.to_string())
181                .unwrap_or_else(|| "unknown".to_string());
182            let method = result.metric.get("method").cloned();
183            let endpoint = result.metric.get("path").cloned();
184            let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
185
186            let request_count = if let Some((_, value)) = result.value {
187                value.parse::<f64>().unwrap_or(0.0) as i64
188            } else {
189                0
190            };
191
192            // Query latency metrics for this combination
193            let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
194                (&Some(protocol.clone()), &method, &endpoint)
195            {
196                format!(
197                    r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{}",method="{}",path="{}"}}[1m])) by (le)) * 1000"#,
198                    p, m, e
199                )
200            } else {
201                continue;
202            };
203
204            let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
205                Ok(resp) => resp
206                    .data
207                    .result
208                    .first()
209                    .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
210                Err(e) => {
211                    warn!("Failed to query latency: {}", e);
212                    None
213                }
214            };
215
216            let agg = MetricsAggregate {
217                id: None,
218                timestamp,
219                protocol: protocol.clone(),
220                method: method.clone(),
221                endpoint: endpoint.clone(),
222                status_code,
223                workspace_id: None,
224                environment: None,
225                request_count,
226                error_count: if let Some(sc) = status_code {
227                    if sc >= 400 {
228                        request_count
229                    } else {
230                        0
231                    }
232                } else {
233                    0
234                },
235                latency_sum: 0.0,
236                latency_min: None,
237                latency_max: None,
238                latency_p50: None,
239                latency_p95,
240                latency_p99: None,
241                bytes_sent: 0,
242                bytes_received: 0,
243                active_connections: None,
244                created_at: None,
245            };
246
247            aggregates.push(agg);
248        }
249
250        if !aggregates.is_empty() {
251            self.db.insert_minute_aggregates_batch(&aggregates).await?;
252            info!("Stored {} minute aggregates", aggregates.len());
253
254            // Also update endpoint stats
255            for agg in &aggregates {
256                let stats = EndpointStats {
257                    id: None,
258                    endpoint: agg.endpoint.clone().unwrap_or_default(),
259                    protocol: agg.protocol.clone(),
260                    method: agg.method.clone(),
261                    workspace_id: agg.workspace_id.clone(),
262                    environment: agg.environment.clone(),
263                    total_requests: agg.request_count,
264                    total_errors: agg.error_count,
265                    avg_latency_ms: agg.latency_p95,
266                    min_latency_ms: agg.latency_min,
267                    max_latency_ms: agg.latency_max,
268                    p95_latency_ms: agg.latency_p95,
269                    status_codes: None,
270                    total_bytes_sent: agg.bytes_sent,
271                    total_bytes_received: agg.bytes_received,
272                    first_seen: timestamp,
273                    last_seen: timestamp,
274                    updated_at: None,
275                };
276
277                if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
278                    warn!("Failed to update endpoint stats: {}", e);
279                }
280            }
281        }
282
283        Ok(())
284    }
285
286    /// Run hourly rollup loop
287    async fn run_hourly_rollup(&self) {
288        let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
289
290        loop {
291            interval.tick().await;
292
293            if let Err(e) = self.rollup_to_hour().await {
294                error!("Error rolling up to hourly metrics: {}", e);
295            }
296        }
297    }
298
299    /// Roll up minute data to hour-level aggregates
300    async fn rollup_to_hour(&self) -> Result<()> {
301        let now = Utc::now();
302        let hour_start =
303            now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
304                - chrono::Duration::hours(1);
305        let hour_end = hour_start + chrono::Duration::hours(1);
306
307        info!("Rolling up metrics to hour: {}", hour_start);
308
309        let filter = AnalyticsFilter {
310            start_time: Some(hour_start.timestamp()),
311            end_time: Some(hour_end.timestamp()),
312            ..Default::default()
313        };
314
315        let minute_data = self.db.get_minute_aggregates(&filter).await?;
316
317        if minute_data.is_empty() {
318            debug!("No minute data to roll up");
319            return Ok(());
320        }
321
322        // Group by protocol, method, endpoint, status_code
323        let mut groups: HashMap<
324            (String, Option<String>, Option<String>, Option<i32>),
325            Vec<&MetricsAggregate>,
326        > = HashMap::new();
327
328        for agg in &minute_data {
329            let key =
330                (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
331            groups.entry(key).or_default().push(agg);
332        }
333
334        for ((protocol, method, endpoint, status_code), group) in groups {
335            let request_count: i64 = group.iter().map(|a| a.request_count).sum();
336            let error_count: i64 = group.iter().map(|a| a.error_count).sum();
337            let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
338            let latency_min =
339                group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
340            let latency_max =
341                group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
342
343            let hour_agg = HourMetricsAggregate {
344                id: None,
345                timestamp: hour_start.timestamp(),
346                protocol,
347                method,
348                endpoint,
349                status_code,
350                workspace_id: None,
351                environment: None,
352                request_count,
353                error_count,
354                latency_sum,
355                latency_min: if latency_min.is_finite() {
356                    Some(latency_min)
357                } else {
358                    None
359                },
360                latency_max: if latency_max.is_finite() {
361                    Some(latency_max)
362                } else {
363                    None
364                },
365                latency_p50: None,
366                latency_p95: None,
367                latency_p99: None,
368                bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
369                bytes_received: group.iter().map(|a| a.bytes_received).sum(),
370                active_connections_avg: None,
371                active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
372                created_at: None,
373            };
374
375            self.db.insert_hour_aggregate(&hour_agg).await?;
376        }
377
378        info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
379        Ok(())
380    }
381
382    /// Run daily rollup loop
383    async fn run_daily_rollup(&self) {
384        let mut interval = interval(Duration::from_secs(86400)); // Daily
385
386        loop {
387            interval.tick().await;
388
389            if let Err(e) = self.rollup_to_day().await {
390                error!("Error rolling up to daily metrics: {}", e);
391            }
392        }
393    }
394
395    /// Roll up hour data to day-level aggregates
396    async fn rollup_to_day(&self) -> Result<()> {
397        let now = Utc::now();
398        let day_start = now
399            .with_hour(0)
400            .unwrap()
401            .with_minute(0)
402            .unwrap()
403            .with_second(0)
404            .unwrap()
405            .with_nanosecond(0)
406            .unwrap()
407            - chrono::Duration::days(1);
408        let _day_end = day_start + chrono::Duration::days(1);
409
410        info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
411
412        // TODO: Query hour aggregates and roll up to day
413
414        Ok(())
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    #[test]
423    fn test_prometheus_client_creation() {
424        let client = PrometheusClient::new("http://localhost:9090");
425        assert_eq!(client.base_url, "http://localhost:9090");
426    }
427}