1use crate::config::AnalyticsConfig;
9use crate::database::AnalyticsDatabase;
10use crate::error::Result;
11use crate::models::*;
12use chrono::{Timelike, Utc};
13use reqwest::Client;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::time::{interval, Duration};
18use tracing::{debug, error, info, warn};
19
20#[derive(Clone)]
22pub struct PrometheusClient {
23 base_url: String,
24 client: Client,
25}
26
27impl PrometheusClient {
28 pub fn new(base_url: impl Into<String>) -> Self {
30 Self {
31 base_url: base_url.into(),
32 client: Client::new(),
33 }
34 }
35
36 pub async fn query(&self, query: &str, time: Option<i64>) -> Result<PrometheusResponse> {
38 let mut url = format!("{}/api/v1/query", self.base_url);
39 url.push_str(&format!("?query={}", urlencoding::encode(query)));
40
41 if let Some(t) = time {
42 url.push_str(&format!("&time={}", t));
43 }
44
45 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
46
47 Ok(response)
48 }
49
50 pub async fn query_range(
52 &self,
53 query: &str,
54 start: i64,
55 end: i64,
56 step: &str,
57 ) -> Result<PrometheusResponse> {
58 let url = format!(
59 "{}/api/v1/query_range?query={}&start={}&end={}&step={}",
60 self.base_url,
61 urlencoding::encode(query),
62 start,
63 end,
64 step
65 );
66
67 let response = self.client.get(&url).send().await?.json::<PrometheusResponse>().await?;
68
69 Ok(response)
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
75pub struct PrometheusResponse {
76 pub status: String,
77 pub data: PrometheusData,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82#[serde(rename_all = "camelCase")]
83pub struct PrometheusData {
84 pub result_type: String,
85 pub result: Vec<PrometheusResult>,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
90pub struct PrometheusResult {
91 pub metric: HashMap<String, String>,
92 pub value: Option<PrometheusValue>,
93 pub values: Option<Vec<PrometheusValue>>,
94}
95
96pub type PrometheusValue = (f64, String);
98
99pub struct MetricsAggregator {
101 db: AnalyticsDatabase,
102 prom_client: PrometheusClient,
103 config: AnalyticsConfig,
104}
105
106impl MetricsAggregator {
107 pub fn new(
109 db: AnalyticsDatabase,
110 prometheus_url: impl Into<String>,
111 config: AnalyticsConfig,
112 ) -> Self {
113 Self {
114 db,
115 prom_client: PrometheusClient::new(prometheus_url),
116 config,
117 }
118 }
119
120 pub async fn start(self: Arc<Self>) {
122 info!("Starting metrics aggregation service");
123
124 let self_clone = Arc::clone(&self);
126 tokio::spawn(async move {
127 self_clone.run_minute_aggregation().await;
128 });
129
130 let self_clone = Arc::clone(&self);
132 tokio::spawn(async move {
133 self_clone.run_hourly_rollup().await;
134 });
135
136 let self_clone = Arc::clone(&self);
138 tokio::spawn(async move {
139 self_clone.run_daily_rollup().await;
140 });
141 }
142
143 async fn run_minute_aggregation(&self) {
145 let mut interval = interval(Duration::from_secs(self.config.aggregation_interval_seconds));
146
147 loop {
148 interval.tick().await;
149
150 if let Err(e) = self.aggregate_minute_metrics().await {
151 error!("Error aggregating minute metrics: {}", e);
152 }
153 }
154 }
155
156 async fn aggregate_minute_metrics(&self) -> Result<()> {
158 let now = Utc::now();
159 let minute_start =
160 now.with_second(0).unwrap().with_nanosecond(0).unwrap() - chrono::Duration::minutes(1);
161 let timestamp = minute_start.timestamp();
162
163 debug!("Aggregating metrics for minute: {}", minute_start);
164
165 let query = format!(
167 r"sum by (protocol, method, path, status) (
168 increase(mockforge_requests_by_path_total{{}}[1m]) > 0
169 )"
170 );
171
172 let response = self.prom_client.query(&query, Some(timestamp)).await?;
173
174 let mut aggregates = Vec::new();
175
176 for result in response.data.result {
177 let protocol = result
178 .metric
179 .get("protocol")
180 .map(|s| s.to_string())
181 .unwrap_or_else(|| "unknown".to_string());
182 let method = result.metric.get("method").cloned();
183 let endpoint = result.metric.get("path").cloned();
184 let status_code = result.metric.get("status").and_then(|s| s.parse::<i32>().ok());
185
186 let request_count = if let Some((_, value)) = result.value {
187 value.parse::<f64>().unwrap_or(0.0) as i64
188 } else {
189 0
190 };
191
192 let latency_query = if let (Some(ref p), Some(ref m), Some(ref e)) =
194 (&Some(protocol.clone()), &method, &endpoint)
195 {
196 format!(
197 r#"histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{protocol="{}",method="{}",path="{}"}}[1m])) by (le)) * 1000"#,
198 p, m, e
199 )
200 } else {
201 continue;
202 };
203
204 let latency_p95 = match self.prom_client.query(&latency_query, Some(timestamp)).await {
205 Ok(resp) => resp
206 .data
207 .result
208 .first()
209 .and_then(|r| r.value.as_ref().and_then(|(_, v)| v.parse::<f64>().ok())),
210 Err(e) => {
211 warn!("Failed to query latency: {}", e);
212 None
213 }
214 };
215
216 let agg = MetricsAggregate {
217 id: None,
218 timestamp,
219 protocol: protocol.clone(),
220 method: method.clone(),
221 endpoint: endpoint.clone(),
222 status_code,
223 workspace_id: None,
224 environment: None,
225 request_count,
226 error_count: if let Some(sc) = status_code {
227 if sc >= 400 {
228 request_count
229 } else {
230 0
231 }
232 } else {
233 0
234 },
235 latency_sum: 0.0,
236 latency_min: None,
237 latency_max: None,
238 latency_p50: None,
239 latency_p95,
240 latency_p99: None,
241 bytes_sent: 0,
242 bytes_received: 0,
243 active_connections: None,
244 created_at: None,
245 };
246
247 aggregates.push(agg);
248 }
249
250 if !aggregates.is_empty() {
251 self.db.insert_minute_aggregates_batch(&aggregates).await?;
252 info!("Stored {} minute aggregates", aggregates.len());
253
254 for agg in &aggregates {
256 let stats = EndpointStats {
257 id: None,
258 endpoint: agg.endpoint.clone().unwrap_or_default(),
259 protocol: agg.protocol.clone(),
260 method: agg.method.clone(),
261 workspace_id: agg.workspace_id.clone(),
262 environment: agg.environment.clone(),
263 total_requests: agg.request_count,
264 total_errors: agg.error_count,
265 avg_latency_ms: agg.latency_p95,
266 min_latency_ms: agg.latency_min,
267 max_latency_ms: agg.latency_max,
268 p95_latency_ms: agg.latency_p95,
269 status_codes: None,
270 total_bytes_sent: agg.bytes_sent,
271 total_bytes_received: agg.bytes_received,
272 first_seen: timestamp,
273 last_seen: timestamp,
274 updated_at: None,
275 };
276
277 if let Err(e) = self.db.upsert_endpoint_stats(&stats).await {
278 warn!("Failed to update endpoint stats: {}", e);
279 }
280 }
281 }
282
283 Ok(())
284 }
285
286 async fn run_hourly_rollup(&self) {
288 let mut interval = interval(Duration::from_secs(self.config.rollup_interval_hours * 3600));
289
290 loop {
291 interval.tick().await;
292
293 if let Err(e) = self.rollup_to_hour().await {
294 error!("Error rolling up to hourly metrics: {}", e);
295 }
296 }
297 }
298
299 async fn rollup_to_hour(&self) -> Result<()> {
301 let now = Utc::now();
302 let hour_start =
303 now.with_minute(0).unwrap().with_second(0).unwrap().with_nanosecond(0).unwrap()
304 - chrono::Duration::hours(1);
305 let hour_end = hour_start + chrono::Duration::hours(1);
306
307 info!("Rolling up metrics to hour: {}", hour_start);
308
309 let filter = AnalyticsFilter {
310 start_time: Some(hour_start.timestamp()),
311 end_time: Some(hour_end.timestamp()),
312 ..Default::default()
313 };
314
315 let minute_data = self.db.get_minute_aggregates(&filter).await?;
316
317 if minute_data.is_empty() {
318 debug!("No minute data to roll up");
319 return Ok(());
320 }
321
322 let mut groups: HashMap<
324 (String, Option<String>, Option<String>, Option<i32>),
325 Vec<&MetricsAggregate>,
326 > = HashMap::new();
327
328 for agg in &minute_data {
329 let key =
330 (agg.protocol.clone(), agg.method.clone(), agg.endpoint.clone(), agg.status_code);
331 groups.entry(key).or_default().push(agg);
332 }
333
334 for ((protocol, method, endpoint, status_code), group) in groups {
335 let request_count: i64 = group.iter().map(|a| a.request_count).sum();
336 let error_count: i64 = group.iter().map(|a| a.error_count).sum();
337 let latency_sum: f64 = group.iter().map(|a| a.latency_sum).sum();
338 let latency_min =
339 group.iter().filter_map(|a| a.latency_min).fold(f64::INFINITY, f64::min);
340 let latency_max =
341 group.iter().filter_map(|a| a.latency_max).fold(f64::NEG_INFINITY, f64::max);
342
343 let hour_agg = HourMetricsAggregate {
344 id: None,
345 timestamp: hour_start.timestamp(),
346 protocol,
347 method,
348 endpoint,
349 status_code,
350 workspace_id: None,
351 environment: None,
352 request_count,
353 error_count,
354 latency_sum,
355 latency_min: if latency_min.is_finite() {
356 Some(latency_min)
357 } else {
358 None
359 },
360 latency_max: if latency_max.is_finite() {
361 Some(latency_max)
362 } else {
363 None
364 },
365 latency_p50: None,
366 latency_p95: None,
367 latency_p99: None,
368 bytes_sent: group.iter().map(|a| a.bytes_sent).sum(),
369 bytes_received: group.iter().map(|a| a.bytes_received).sum(),
370 active_connections_avg: None,
371 active_connections_max: group.iter().filter_map(|a| a.active_connections).max(),
372 created_at: None,
373 };
374
375 self.db.insert_hour_aggregate(&hour_agg).await?;
376 }
377
378 info!("Rolled up {} minute aggregates into hour aggregates", minute_data.len());
379 Ok(())
380 }
381
382 async fn run_daily_rollup(&self) {
384 let mut interval = interval(Duration::from_secs(86400)); loop {
387 interval.tick().await;
388
389 if let Err(e) = self.rollup_to_day().await {
390 error!("Error rolling up to daily metrics: {}", e);
391 }
392 }
393 }
394
395 async fn rollup_to_day(&self) -> Result<()> {
397 let now = Utc::now();
398 let day_start = now
399 .with_hour(0)
400 .unwrap()
401 .with_minute(0)
402 .unwrap()
403 .with_second(0)
404 .unwrap()
405 .with_nanosecond(0)
406 .unwrap()
407 - chrono::Duration::days(1);
408 let _day_end = day_start + chrono::Duration::days(1);
409
410 info!("Rolling up metrics to day: {}", day_start.format("%Y-%m-%d"));
411
412 Ok(())
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421
422 #[test]
423 fn test_prometheus_client_creation() {
424 let client = PrometheusClient::new("http://localhost:9090");
425 assert_eq!(client.base_url, "http://localhost:9090");
426 }
427}