1use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::{AnalyticsFilter, DayMetricsAggregate, EndpointStats, HourMetricsAggregate, MetricsAggregate};
12use chrono::{Timelike, Utc};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::time::{interval, Duration};
18use tracing::{debug, error, info, warn};
19
20#[derive(Clone)]
22pub struct PrometheusClient {
23 base_url: String,
24 client: Client,
25}
26
27impl PrometheusClient {
28 pub fn new(base_url: impl Into<String>) -> Self {
30 Self {
31 base_url: base_url.into(),
32 client: Client::new(),
33 }
34 }
35
36 pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
38 let mut url = format!("{}/api/v1/query", self.base_url);
39 url.push_str(&format!("?query={}", urlencoding::encode(query)));
40
41 if let Some(t) = time {
42 url.push_str(&format!("&time={t}"));
43 }
44
45 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
46
47 Ok(response)
48 }
49
50 pub async fn query_range(
52 &self,
53 query: &str,
54 start: i64,
55 end: i64,
56 step: &str,
57 ) -> Result<PrometheusResponse> {
58 let url = format!(
59 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
60 self.base_url,
61 urlencoding::encode(query),
62 start,
63 end,
64 step
65 );
66
67 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
68
69 Ok(response)
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PrometheusResponse {
76 pub status: String,
77 pub data: PrometheusData,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct PrometheusData {
84 pub result_type: String,
85 pub result: Vec<PrometheusResult>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PrometheusResult {
91 pub metric: HashMap<String, String>,
92 pub value: Option<PrometheusValue>,
93 pub values: Option<Vec<PrometheusValue>>,
94}
95
96pub type PrometheusValue = (f64, String);
98
99pub struct MetricsAggregator {
101 db: AnalyticsDatabase,
102 prom_client: PrometheusClient,
103 config: AnalyticsConfig,
104}
105
106impl MetricsAggregator {
107 pub fn new(
109 db: AnalyticsDatabase,
110 prometheus_url: impl Into<String>,
111 config: AnalyticsConfig,
112 ) -> Self {
113 Self {
114 db,
115 prom_client: PrometheusClient::new(prometheus_url),
116 config,
117 }
118 }
119
120 pub async fn start(self: Arc<Self>) {
122 info!("Starting metrics aggregation service");
123
124 let self_clone = Arc::clone(&self);
126 tokio::spawn(async move {
127 self_clone.run_minute_aggregation().await;
128 });
129
130 let self_clone = Arc::clone(&self);
132 tokio::spawn(async move {
133 self_clone.run_hourly_rollup().await;
134 });
135
136 let self_clone = Arc::clone(&self);
138 tokio::spawn(async move {
139 self_clone.run_daily_rollup().await;
140 });
141 }
142
143 async fn run_minute_aggregation(&self) {
145 let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
146
147 loop {
148 interval.tick().await;
149
150 if let Err(e) = self.aggregate_minute_metrics().await {
151 error!("Error aggregating minute metrics: {}", e);
152 }
153 }
154 }
155
156 async fn aggregate_minute_metrics(&self) -> Result<()> {
158 let now = Utc::now();
159 let minute_start =
160 now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
161 let timestamp = minute_start.timestamp();
162
163 debug!("Aggregating metrics for minute: {}", minute_start);
164
165 let query = r"sum by (protocol, method, path, status) (
167 increase(mockforge_requests_by_path_total{}[1m]) > 0
168 )"
169 .to_string();
170
171 let response = self.prom_client.query(&query, Some(timestamp)).await?;
172
173 let mut aggregates = Vec::new();
174
175 for result in response.data.result {
176 let protocol = result
177 .metric
178 .get("protocol")
179 .map_or_else(|| "unknown".to_string(), ToString::to_string);
180 let method = result.metric.get("method").cloned();
181 let endpoint = result.metric.get("path").cloned();
182 let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
183
184 let request_count = if let Some((_, value)) = result.value {
185 value.parse::<f64>().unwrap_or(0.0) as i64
186 } else {
187 0
188 };
189
190 let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
192 (&Some(protocol.clone()), &method, &endpoint)
193 {
194 format!(
195 r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{p}",method="{m}",path="{e}"}}[1m])) by (le)) * 1000"#
196 )
197 } else {
198 continue;
199 };
200
201 let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
202 Ok(resp) => resp
203 .data
204 .result
205 .first()
206 .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
207 Err(e) => {
208 warn!("Failed to query latency: {}", e);
209 None
210 }
211 };
212
213 let agg = MetricsAggregate {
214 id: None,
215 timestamp,
216 protocol: protocol.clone(),
217 method: method.clone(),
218 endpoint: endpoint.clone(),
219 status_code,
220 workspace_id: None,
221 environment: None,
222 request_count,
223 error_count: if let Some(sc) = status_code {
224 if sc >= 400 {
225 request_count
226 } else {
227 0
228 }
229 } else {
230 0
231 },
232 latency_sum: 0.0,
233 latency_min: None,
234 latency_max: None,
235 latency_p50: None,
236 latency_p95,
237 latency_p99: None,
238 bytes_sent: 0,
239 bytes_received: 0,
240 active_connections: None,
241 created_at: None,
242 };
243
244 aggregates.push(agg);
245 }
246
247 if !aggregates.is_empty() {
248 self.db.insert_minute_aggregates_batch(&aggregates).await?;
249 info!("Stored {} minute aggregates", aggregates.len());
250
251 for agg in &aggregates {
253 let stats = EndpointStats {
254 id: None,
255 endpoint: agg.endpoint.clone().unwrap_or_default(),
256 protocol: agg.protocol.clone(),
257 method: agg.method.clone(),
258 workspace_id: agg.workspace_id.clone(),
259 environment: agg.environment.clone(),
260 total_requests: agg.request_count,
261 total_errors: agg.error_count,
262 avg_latency_ms: agg.latency_p95,
263 min_latency_ms: agg.latency_min,
264 max_latency_ms: agg.latency_max,
265 p95_latency_ms: agg.latency_p95,
266 status_codes: None,
267 total_bytes_sent: agg.bytes_sent,
268 total_bytes_received: agg.bytes_received,
269 first_seen: timestamp,
270 last_seen: timestamp,
271 updated_at: None,
272 };
273
274 if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
275 warn!("Failed to update endpoint stats: {}", e);
276 }
277 }
278 }
279
280 Ok(())
281 }
282
283 async fn run_hourly_rollup(&self) {
285 let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
286
287 loop {
288 interval.tick().await;
289
290 if let Err(e) = self.rollup_to_hour().await {
291 error!("Error rolling up to hourly metrics: {}", e);
292 }
293 }
294 }
295
296 async fn rollup_to_hour(&self) -> Result<()> {
298 let now = Utc::now();
299 let hour_start =
300 now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
301 - chrono::Duration::hours(1);
302 let hour_end = hour_start + chrono::Duration::hours(1);
303
304 info!("Rolling up metrics to hour: {}", hour_start);
305
306 let filter = AnalyticsFilter {
307 start_time: Some(hour_start.timestamp()),
308 end_time: Some(hour_end.timestamp()),
309 ..Default::default()
310 };
311
312 let minute_data = self.db.get_minute_aggregates(&filter).await?;
313
314 if minute_data.is_empty() {
315 debug!("No minute data to roll up");
316 return Ok(());
317 }
318
319 let mut groups: HashMap<
321 (String, Option<String>, Option<String>, Option<i32>),
322 Vec<&MetricsAggregate>,
323 > = HashMap::new();
324
325 for agg in &minute_data {
326 let key =
327 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
328 groups.entry(key).or_default().push(agg);
329 }
330
331 for ((protocol, method, endpoint, status_code), group) in groups {
332 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
333 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
334 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
335 let latency_min =
336 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
337 let latency_max =
338 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
339
340 let hour_agg = HourMetricsAggregate {
341 id: None,
342 timestamp: hour_start.timestamp(),
343 protocol,
344 method,
345 endpoint,
346 status_code,
347 workspace_id: None,
348 environment: None,
349 request_count,
350 error_count,
351 latency_sum,
352 latency_min: if latency_min.is_finite() {
353 Some(latency_min)
354 } else {
355 None
356 },
357 latency_max: if latency_max.is_finite() {
358 Some(latency_max)
359 } else {
360 None
361 },
362 latency_p50: None,
363 latency_p95: None,
364 latency_p99: None,
365 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
366 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
367 active_connections_avg: None,
368 active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
369 created_at: None,
370 };
371
372 self.db.insert_hour_aggregate(&hour_agg).await?;
373 }
374
375 info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
376 Ok(())
377 }
378
379 async fn run_daily_rollup(&self) {
381 let mut interval = interval(Duration::from_secs(86400)); loop {
384 interval.tick().await;
385
386 if let Err(e) = self.rollup_to_day().await {
387 error!("Error rolling up to daily metrics: {}", e);
388 }
389 }
390 }
391
392 async fn rollup_to_day(&self) -> Result<()> {
394 let now = Utc::now();
395 let day_start = now
396 .with_hour(0)
397 .unwrap()
398 .with_minute(0)
399 .unwrap()
400 .with_second(0)
401 .unwrap()
402 .with_nanosecond(0)
403 .unwrap()
404 - chrono::Duration::days(1);
405 let day_end = day_start + chrono::Duration::days(1);
406
407 info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
408
409 let filter = AnalyticsFilter {
410 start_time: Some(day_start.timestamp()),
411 end_time: Some(day_end.timestamp()),
412 ..Default::default()
413 };
414
415 let hour_data = self.db.get_hour_aggregates(&filter).await?;
416
417 if hour_data.is_empty() {
418 debug!("No hour data to roll up");
419 return Ok(());
420 }
421
422 let mut groups: HashMap<
424 (String, Option<String>, Option<String>, Option<i32>),
425 Vec<&HourMetricsAggregate>,
426 > = HashMap::new();
427
428 for agg in &hour_data {
429 let key =
430 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
431 groups.entry(key).or_default().push(agg);
432 }
433
434 let mut peak_hour: Option<i32> = None;
436 let mut max_requests = 0i64;
437 for agg in &hour_data {
438 if agg.request_count > max_requests {
439 max_requests = agg.request_count;
440 if let Some(dt) = chrono::DateTime::from_timestamp(agg.timestamp, 0) {
442 peak_hour = Some(dt.hour() as i32);
443 }
444 }
445 }
446
447 for ((protocol, method, endpoint, status_code), group) in groups {
448 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
449 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
450 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
451 let latency_min =
452 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
453 let latency_max =
454 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
455
456 let latency_p50_avg: Option<f64> = {
458 let p50_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p50).collect();
459 if !p50_values.is_empty() {
460 Some(p50_values.iter().sum::<f64>() / p50_values.len() as f64)
461 } else {
462 None
463 }
464 };
465 let latency_p95_avg: Option<f64> = {
466 let p95_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p95).collect();
467 if !p95_values.is_empty() {
468 Some(p95_values.iter().sum::<f64>() / p95_values.len() as f64)
469 } else {
470 None
471 }
472 };
473 let latency_p99_avg: Option<f64> = {
474 let p99_values: Vec<f64> = group.iter().filter_map(|a| a.latency_p99).collect();
475 if !p99_values.is_empty() {
476 Some(p99_values.iter().sum::<f64>() / p99_values.len() as f64)
477 } else {
478 None
479 }
480 };
481
482 let active_connections_avg: Option<f64> = {
484 let avg_values: Vec<f64> =
485 group.iter().filter_map(|a| a.active_connections_avg).collect();
486 if !avg_values.is_empty() {
487 Some(avg_values.iter().sum::<f64>() / avg_values.len() as f64)
488 } else {
489 None
490 }
491 };
492
493 let active_connections_max = group.iter().filter_map(|a| a.active_connections_max).max();
495
496 let day_agg = DayMetricsAggregate {
497 id: None,
498 date: day_start.format("%Y-%m-%d").to_string(),
499 timestamp: day_start.timestamp(),
500 protocol,
501 method,
502 endpoint,
503 status_code,
504 workspace_id: group.first().and_then(|a| a.workspace_id.clone()),
505 environment: group.first().and_then(|a| a.environment.clone()),
506 request_count,
507 error_count,
508 latency_sum,
509 latency_min: if latency_min.is_finite() {
510 Some(latency_min)
511 } else {
512 None
513 },
514 latency_max: if latency_max.is_finite() {
515 Some(latency_max)
516 } else {
517 None
518 },
519 latency_p50: latency_p50_avg,
520 latency_p95: latency_p95_avg,
521 latency_p99: latency_p99_avg,
522 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
523 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
524 active_connections_avg,
525 active_connections_max,
526 unique_clients: None, peak_hour,
528 created_at: None,
529 };
530
531 self.db.insert_day_aggregate(&day_agg).await?;
532 }
533
534 info!("Rolled up {} hour aggregates into day aggregates", hour_data.len());
535 Ok(())
536 }
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542
543 #[test]
544 fn test_prometheus_client_creation() {
545 let client = PrometheusClient::new("http://localhost:9090");
546 assert_eq!(client.base_url, "http://localhost:9090");
547 }
548}