1use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{
12 AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate,
13};
14use chrono::{Timelike, Utc};
15use reqwest::Client;
16use serde::{Deserialize, Serialize};
17use std::collections::HashMap;
18use std::sync::Arc;
19use tokio::time::{interval, Duration};
20use tracing::{debug, error, info, warn};
21
22#[derive(Clone)]
24pub struct PrometheusClient {
25 base_url: String,
26 client: Client,
27}
28
29impl PrometheusClient {
30 pub fn new(base_url: impl Into<String>) -> Self {
32 Self {
33 base_url: base_url.into(),
34 client: Client::new(),
35 }
36 }
37
38 pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
40 let mut url = format!("{}/api/v1/query", self.base_url);
41 url.push_str(&format!("?query={}", urlencoding::encode(query)));
42
43 if let Some(t) = time {
44 url.push_str(&format!("&time={t}"));
45 }
46
47 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
48
49 Ok(response)
50 }
51
52 pub async fn query_range(
54 &self,
55 query: &str,
56 start: i64,
57 end: i64,
58 step: &str,
59 ) -> Result<PrometheusResponse> {
60 let url = format!(
61 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
62 self.base_url,
63 urlencoding::encode(query),
64 start,
65 end,
66 step
67 );
68
69 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
70
71 Ok(response)
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct PrometheusResponse {
78 pub status: String,
79 pub data: PrometheusData,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84#[serde(rename_all = "camelCase")]
85pub struct PrometheusData {
86 pub result_type: String,
87 pub result: Vec<PrometheusResult>,
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct PrometheusResult {
93 pub metric: HashMap<String, String>,
94 pub value: Option<PrometheusValue>,
95 pub values: Option<Vec<PrometheusValue>>,
96}
97
98pub type PrometheusValue = (f64, String);
100
101pub struct MetricsAggregator {
103 db: AnalyticsDatabase,
104 prom_client: PrometheusClient,
105 config: AnalyticsConfig,
106}
107
108impl MetricsAggregator {
109 pub fn new(
111 db: AnalyticsDatabase,
112 prometheus_url: impl Into<String>,
113 config: AnalyticsConfig,
114 ) -> Self {
115 Self {
116 db,
117 prom_client: PrometheusClient::new(prometheus_url),
118 config,
119 }
120 }
121
122 pub async fn start(self: Arc<Self>) {
124 info!("Starting metrics aggregation service");
125
126 let self_clone = Arc::clone(&self);
128 tokio::spawn(async move {
129 self_clone.run_minute_aggregation().await;
130 });
131
132 let self_clone = Arc::clone(&self);
134 tokio::spawn(async move {
135 self_clone.run_hourly_rollup().await;
136 });
137
138 let self_clone = Arc::clone(&self);
140 tokio::spawn(async move {
141 self_clone.run_daily_rollup().await;
142 });
143 }
144
145 async fn run_minute_aggregation(&self) {
147 let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
148
149 loop {
150 interval.tick().await;
151
152 if let Err(e) = self.aggregate_minute_metrics().await {
153 error!("Error aggregating minute metrics: {}", e);
154 }
155 }
156 }
157
158 async fn aggregate_minute_metrics(&self) -> Result<()> {
160 let now = Utc::now();
161 let minute_start = now
162 .with_second(0)
163 .expect("0 is valid for seconds")
164 .with_nanosecond(0)
165 .expect("0 is valid for nanoseconds")
166 - chrono::Duration::minutes(1);
167 let timestamp = minute_start.timestamp();
168
169 debug!("Aggregating metrics for minute: {}", minute_start);
170
171 let query = r"sum by (protocol, method, path, status) (
173 increase(mockforge_requests_by_path_total{}[1m]) > 0
174 )"
175 .to_string();
176
177 let response = self.prom_client.query(&query, Some(timestamp)).await?;
178
179 let mut aggregates = Vec::new();
180
181 for result in response.data.result {
182 let protocol = result
183 .metric
184 .get("protocol")
185 .map_or_else(|| "unknown".to_string(), ToString::to_string);
186 let method = result.metric.get("method").cloned();
187 let endpoint = result.metric.get("path").cloned();
188 let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
189
190 let request_count = if let Some((_, value)) = result.value {
191 value.parse::<f64>().unwrap_or(0.0) as i64
192 } else {
193 0
194 };
195
196 let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
198 (&Some(protocol.clone()), &method, &endpoint)
199 {
200 format!(
201 r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
202 )
203 } else {
204 continue;
205 };
206
207 let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
208 Ok(resp) => resp
209 .data
210 .result
211 .first()
212 .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
213 Err(e) => {
214 warn!("Failed to query latency: {}", e);
215 None
216 }
217 };
218
219 let agg = MetricsAggregate {
220 id: None,
221 timestamp,
222 protocol: protocol.clone(),
223 method: method.clone(),
224 endpoint: endpoint.clone(),
225 status_code,
226 workspace_id: None,
227 environment: None,
228 request_count,
229 error_count: if let Some(sc) = status_code {
230 if sc >= 400 {
231 request_count
232 } else {
233 0
234 }
235 } else {
236 0
237 },
238 latency_sum: 0.0,
239 latency_min: None,
240 latency_max: None,
241 latency_p50: None,
242 latency_p95,
243 latency_p99: None,
244 bytes_sent: 0,
245 bytes_received: 0,
246 active_connections: None,
247 created_at: None,
248 };
249
250 aggregates.push(agg);
251 }
252
253 if !aggregates.is_empty() {
254 self.db.insert_minute_aggregates_batch(&aggregates).await?;
255 info!("Stored {} minute aggregates", aggregates.len());
256
257 for agg in &aggregates {
259 let stats = EndpointStats {
260 id: None,
261 endpoint: agg.endpoint.clone().unwrap_or_default(),
262 protocol: agg.protocol.clone(),
263 method: agg.method.clone(),
264 workspace_id: agg.workspace_id.clone(),
265 environment: agg.environment.clone(),
266 total_requests: agg.request_count,
267 total_errors: agg.error_count,
268 avg_latency_ms: agg.latency_p95,
269 min_latency_ms: agg.latency_min,
270 max_latency_ms: agg.latency_max,
271 p95_latency_ms: agg.latency_p95,
272 status_codes: None,
273 total_bytes_sent: agg.bytes_sent,
274 total_bytes_received: agg.bytes_received,
275 first_seen: timestamp,
276 last_seen: timestamp,
277 updated_at: None,
278 };
279
280 if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
281 warn!("Failed to update endpoint stats: {}", e);
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 async fn run_hourly_rollup(&self) {
291 let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
292
293 loop {
294 interval.tick().await;
295
296 if let Err(e) = self.rollup_to_hour().await {
297 error!("Error rolling up to hourly metrics: {}", e);
298 }
299 }
300 }
301
302 async fn rollup_to_hour(&self) -> Result<()> {
304 let now = Utc::now();
305 let hour_start = now
306 .with_minute(0)
307 .expect("0 is valid for minutes")
308 .with_second(0)
309 .expect("0 is valid for seconds")
310 .with_nanosecond(0)
311 .expect("0 is valid for nanoseconds")
312 - chrono::Duration::hours(1);
313 let hour_end = hour_start + chrono::Duration::hours(1);
314
315 info!("Rolling up metrics to hour: {}", hour_start);
316
317 let filter = AnalyticsFilter {
318 start_time: Some(hour_start.timestamp()),
319 end_time: Some(hour_end.timestamp()),
320 ..Default::default()
321 };
322
323 let minute_data = self.db.get_minute_aggregates(&filter).await?;
324
325 if minute_data.is_empty() {
326 debug!("No minute data to roll up");
327 return Ok(());
328 }
329
330 let mut groups: HashMap<
332 (String, Option<String>, Option<String>, Option<i32>),
333 Vec<&MetricsAggregate>,
334 > = HashMap::new();
335
336 for agg in &minute_data {
337 let key =
338 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
339 groups.entry(key).or_default().push(agg);
340 }
341
342 for ((protocol, method, endpoint, status_code), group) in groups {
343 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
344 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
345 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
346 let latency_min =
347 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
348 let latency_max =
349 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
350
351 let hour_agg = HourMetricsAggregate {
352 id: None,
353 timestamp: hour_start.timestamp(),
354 protocol,
355 method,
356 endpoint,
357 status_code,
358 workspace_id: None,
359 environment: None,
360 request_count,
361 error_count,
362 latency_sum,
363 latency_min: if latency_min.is_finite() {
364 Some(latency_min)
365 } else {
366 None
367 },
368 latency_max: if latency_max.is_finite() {
369 Some(latency_max)
370 } else {
371 None
372 },
373 latency_p50: None,
374 latency_p95: None,
375 latency_p99: None,
376 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
377 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
378 active_connections_avg: None,
379 active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
380 created_at: None,
381 };
382
383 self.db.insert_hour_aggregate(&hour_agg).await?;
384 }
385
386 info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
387 Ok(())
388 }
389
390 async fn run_daily_rollup(&self) {
392 let mut interval = interval(Duration::from_secs(86400)); loop {
395 interval.tick().await;
396
397 if let Err(e) = self.rollup_to_day().await {
398 error!("Error rolling up to daily metrics: {}", e);
399 }
400 }
401 }
402
403 async fn rollup_to_day(&self) -> Result<()> {
405 let now = Utc::now();
406 let day_start = now
407 .with_hour(0)
408 .expect("0 is valid for hours")
409 .with_minute(0)
410 .expect("0 is valid for minutes")
411 .with_second(0)
412 .expect("0 is valid for seconds")
413 .with_nanosecond(0)
414 .expect("0 is valid for nanoseconds")
415 - chrono::Duration::days(1);
416 let day_end = day_start + chrono::Duration::days(1);
417
418 info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
419
420 let filter = AnalyticsFilter {
421 start_time: Some(day_start.timestamp()),
422 end_time: Some(day_end.timestamp()),
423 ..Default::default()
424 };
425
426 let hour_data = self.db.get_hour_aggregates(&filter).await?;
427
428 if hour_data.is_empty() {
429 debug!("No hour data to roll up");
430 return Ok(());
431 }
432
433 let mut groups: HashMap<
435 (String, Option<String>, Option<String>, Option<i32>),
436 Vec<&HourMetricsAggregate>,
437 > = HashMap::new();
438
439 for agg in &hour_data {
440 let key =
441 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
442 groups.entry(key).or_default().push(agg);
443 }
444
445 let mut peak_hour: Option<i32> = None;
447 let mut max_requests = 0i64;
448 for agg in &hour_data {
449 if agg.request_count > max_requests {
450 max_requests = agg.request_count;
451 if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
453 peak_hour = Some(dt.hour() as i32);
454 }
455 }
456 }
457
458 for ((protocol, method, endpoint, status_code), group) in groups {
459 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
460 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
461 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
462 let latency_min =
463 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
464 let latency_max =
465 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
466
467 let latency_p50_avg: Option<f64> = {
469 let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
470 if !p50_values.is_empty() {
471 Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
472 } else {
473 None
474 }
475 };
476 let latency_p95_avg: Option<f64> = {
477 let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
478 if !p95_values.is_empty() {
479 Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
480 } else {
481 None
482 }
483 };
484 let latency_p99_avg: Option<f64> = {
485 let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
486 if !p99_values.is_empty() {
487 Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
488 } else {
489 None
490 }
491 };
492
493 let active_connections_avg: Option<f64> = {
495 let avg_values: Vec<f64> =
496 group.iter().filter_map(|a| a.active_connections_avg).collect();
497 if !avg_values.is_empty() {
498 Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
499 } else {
500 None
501 }
502 };
503
504 let active_connections_max =
506 group.iter().filter_map(|a| a.active_connections_max).max();
507
508 let day_agg = DayMetricsAggregate {
509 id: None,
510 date: day_start.format("%Y-%m-%d").to_string(),
511 timestamp: day_start.timestamp(),
512 protocol,
513 method,
514 endpoint,
515 status_code,
516 workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
517 environment: group.first().and_then(|a| a.environment.clone()),
518 request_count,
519 error_count,
520 latency_sum,
521 latency_min: if latency_min.is_finite() {
522 Some(latency_min)
523 } else {
524 None
525 },
526 latency_max: if latency_max.is_finite() {
527 Some(latency_max)
528 } else {
529 None
530 },
531 latency_p50: latency_p50_avg,
532 latency_p95: latency_p95_avg,
533 latency_p99: latency_p99_avg,
534 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
535 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
536 active_connections_avg,
537 active_connections_max,
538 unique_clients: None, peak_hour,
540 created_at: None,
541 };
542
543 self.db.insert_day_aggregate(&day_agg).await?;
544 }
545
546 info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
547 Ok(())
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554
555 #[test]
556 fn test_prometheus_client_creation() {
557 let client = PrometheusClient::new("http://localhost:9090");
558 assert_eq!(client.base_url, "http://localhost:9090");
559 }
560}