1use axum::{
6 extract::{Query, State},
7 http::StatusCode,
8 Json,
9};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use tracing::{debug, error};
14
15use crate::models::ApiResponse;
16use crate::prometheus_client::PrometheusClient;
17
18#[derive(Clone)]
20pub struct AnalyticsState {
21 pub prometheus_client: PrometheusClient,
22}
23
24impl AnalyticsState {
25 pub fn new(prometheus_url: String) -> Self {
26 Self {
27 prometheus_client: PrometheusClient::new(prometheus_url),
28 }
29 }
30}
31
32#[derive(Debug, Deserialize)]
34pub struct TimeRangeQuery {
35 #[serde(default = "default_range")]
36 pub range: String,
37}
38
39fn default_range() -> String {
40 "1h".to_string()
41}
42
43#[derive(Debug, Serialize)]
45pub struct SummaryMetrics {
46 pub timestamp: String,
47 pub request_rate: f64,
48 pub p95_latency_ms: f64,
49 pub error_rate_percent: f64,
50 pub active_connections: f64,
51}
52
53#[derive(Debug, Serialize)]
55pub struct RequestMetrics {
56 pub timestamps: Vec<i64>,
57 pub series: Vec<SeriesData>,
58}
59
60#[derive(Debug, Serialize)]
61pub struct SeriesData {
62 pub name: String,
63 pub values: Vec<f64>,
64}
65
66#[derive(Debug, Serialize)]
68pub struct EndpointMetrics {
69 pub path: String,
70 pub method: String,
71 pub request_rate: f64,
72 pub avg_latency_ms: f64,
73 pub p95_latency_ms: f64,
74 pub errors: f64,
75 pub error_rate_percent: f64,
76}
77
78#[derive(Debug, Serialize)]
80pub struct WebSocketMetrics {
81 pub active_connections: f64,
82 pub total_connections: f64,
83 pub message_rate_sent: f64,
84 pub message_rate_received: f64,
85 pub error_rate: f64,
86 pub avg_connection_duration_seconds: f64,
87}
88
89#[derive(Debug, Serialize)]
91pub struct SystemMetrics {
92 pub memory_usage_mb: f64,
93 pub cpu_usage_percent: f64,
94 pub thread_count: f64,
95 pub uptime_seconds: f64,
96}
97
98#[derive(Debug, Serialize)]
100pub struct SmtpMetrics {
101 pub active_connections: f64,
102 pub total_connections: f64,
103 pub message_rate_received: f64,
104 pub message_rate_stored: f64,
105 pub error_rate: f64,
106}
107
108pub async fn get_summary(
110 State(state): State<AnalyticsState>,
111 Query(params): Query<TimeRangeQuery>,
112) -> Result<Json<ApiResponse<SummaryMetrics>>, StatusCode> {
113 debug!("Fetching analytics summary for range: {}", params.range);
114
115 let request_rate_query = "sum(rate(mockforge_requests_total[5m]))";
117 let request_rate = match state.prometheus_client.query(request_rate_query).await {
118 Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
119 Err(e) => {
120 error!("Failed to query request rate: {}", e);
121 0.0
122 }
123 };
124
125 let p95_query = "histogram_quantile(0.95, sum(rate(mockforge_request_duration_seconds_bucket[5m])) by (le)) * 1000";
127 let p95_latency = match state.prometheus_client.query(p95_query).await {
128 Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
129 Err(e) => {
130 error!("Failed to query P95 latency: {}", e);
131 0.0
132 }
133 };
134
135 let error_rate_query =
137 "(sum(rate(mockforge_errors_total[5m])) / sum(rate(mockforge_requests_total[5m]))) * 100";
138 let error_rate = match state.prometheus_client.query(error_rate_query).await {
139 Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
140 Err(e) => {
141 error!("Failed to query error rate: {}", e);
142 0.0
143 }
144 };
145
146 let active_conn_query = "sum(mockforge_requests_in_flight)";
148 let active_connections = match state.prometheus_client.query(active_conn_query).await {
149 Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
150 Err(e) => {
151 error!("Failed to query active connections: {}", e);
152 0.0
153 }
154 };
155
156 let summary = SummaryMetrics {
157 timestamp: Utc::now().to_rfc3339(),
158 request_rate,
159 p95_latency_ms: p95_latency,
160 error_rate_percent: error_rate,
161 active_connections,
162 };
163
164 Ok(Json(ApiResponse::success(summary)))
165}
166
167pub async fn get_requests(
169 State(state): State<AnalyticsState>,
170 Query(params): Query<TimeRangeQuery>,
171) -> Result<Json<ApiResponse<RequestMetrics>>, StatusCode> {
172 debug!("Fetching request metrics for range: {}", params.range);
173
174 let (start, end, step) = parse_time_range(¶ms.range);
175
176 let query = "sum by (protocol) (rate(mockforge_requests_total[5m]))";
177
178 match state.prometheus_client.query_range(query, start, end, &step).await {
179 Ok(response) => {
180 let time_series = PrometheusClient::extract_time_series(&response);
181
182 let mut timestamps: Vec<i64> = Vec::new();
184 if let Some((_, values)) = time_series.first() {
185 timestamps = values.iter().map(|(ts, _)| *ts).collect();
186 }
187
188 let series: Vec<SeriesData> = time_series
190 .into_iter()
191 .map(|(name, values)| SeriesData {
192 name,
193 values: values.into_iter().map(|(_, v)| v).collect(),
194 })
195 .collect();
196
197 let metrics = RequestMetrics { timestamps, series };
198
199 Ok(Json(ApiResponse::success(metrics)))
200 }
201 Err(e) => {
202 error!("Failed to query request metrics: {}", e);
203 Err(StatusCode::INTERNAL_SERVER_ERROR)
204 }
205 }
206}
207
208pub async fn get_endpoints(
210 State(state): State<AnalyticsState>,
211 Query(params): Query<HashMap<String, String>>,
212) -> Result<Json<ApiResponse<Vec<EndpointMetrics>>>, StatusCode> {
213 let limit = params.get("limit").and_then(|s| s.parse::<usize>().ok()).unwrap_or(10);
214
215 debug!("Fetching top {} endpoints", limit);
216
217 let query = format!(
219 "topk({}, sum by (path, method) (rate(mockforge_requests_by_path_total[5m])))",
220 limit
221 );
222
223 match state.prometheus_client.query(&query).await {
224 Ok(response) => {
225 let mut endpoints = Vec::new();
226
227 for result in &response.data.result {
228 if let Some(metric) = result.metric.as_object() {
229 let path =
230 metric.get("path").and_then(|v| v.as_str()).unwrap_or("").to_string();
231 let method =
232 metric.get("method").and_then(|v| v.as_str()).unwrap_or("").to_string();
233 let request_rate: f64 =
234 result.value.as_ref().and_then(|(_, v)| v.parse().ok()).unwrap_or(0.0);
235
236 let avg_latency_query = format!(
238 "mockforge_average_latency_by_path_seconds{{path=\"{}\",method=\"{}\"}} * 1000",
239 path, method
240 );
241 let avg_latency = state
242 .prometheus_client
243 .query(&avg_latency_query)
244 .await
245 .ok()
246 .and_then(|r| PrometheusClient::extract_single_value(&r))
247 .unwrap_or(0.0);
248
249 let p95_query = format!(
251 "histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{path=\"{}\",method=\"{}\"}}[5m])) by (le)) * 1000",
252 path, method
253 );
254 let p95_latency = state
255 .prometheus_client
256 .query(&p95_query)
257 .await
258 .ok()
259 .and_then(|r| PrometheusClient::extract_single_value(&r))
260 .unwrap_or(0.0);
261
262 let error_count_query = format!(
264 "sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\",status=~\"4..|5..\"}}[5m]))",
265 path, method
266 );
267 let error_count = state
268 .prometheus_client
269 .query(&error_count_query)
270 .await
271 .ok()
272 .and_then(|r| PrometheusClient::extract_single_value(&r))
273 .unwrap_or(0.0);
274
275 let error_rate_query = format!(
277 "(sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\",status=~\"4..|5..\"}}[5m])) / sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\"}}[5m]))) * 100",
278 path, method, path, method
279 );
280 let error_rate_percent = state
281 .prometheus_client
282 .query(&error_rate_query)
283 .await
284 .ok()
285 .and_then(|r| PrometheusClient::extract_single_value(&r))
286 .unwrap_or(0.0);
287
288 endpoints.push(EndpointMetrics {
289 path,
290 method,
291 request_rate,
292 avg_latency_ms: avg_latency,
293 p95_latency_ms: p95_latency,
294 errors: error_count,
295 error_rate_percent,
296 });
297 }
298 }
299
300 Ok(Json(ApiResponse::success(endpoints)))
301 }
302 Err(e) => {
303 error!("Failed to query endpoint metrics: {}", e);
304 Err(StatusCode::INTERNAL_SERVER_ERROR)
305 }
306 }
307}
308
309pub async fn get_websocket(
311 State(state): State<AnalyticsState>,
312) -> Result<Json<ApiResponse<WebSocketMetrics>>, StatusCode> {
313 debug!("Fetching WebSocket metrics");
314
315 let active_query = "mockforge_ws_connections_active";
317 let active_connections = state
318 .prometheus_client
319 .query(active_query)
320 .await
321 .ok()
322 .and_then(|r| PrometheusClient::extract_single_value(&r))
323 .unwrap_or(0.0);
324
325 let total_query = "mockforge_ws_connections_total";
327 let total_connections = state
328 .prometheus_client
329 .query(total_query)
330 .await
331 .ok()
332 .and_then(|r| PrometheusClient::extract_single_value(&r))
333 .unwrap_or(0.0);
334
335 let sent_query = "rate(mockforge_ws_messages_sent_total[5m])";
337 let message_rate_sent = state
338 .prometheus_client
339 .query(sent_query)
340 .await
341 .ok()
342 .and_then(|r| PrometheusClient::extract_single_value(&r))
343 .unwrap_or(0.0);
344
345 let received_query = "rate(mockforge_ws_messages_received_total[5m])";
347 let message_rate_received = state
348 .prometheus_client
349 .query(received_query)
350 .await
351 .ok()
352 .and_then(|r| PrometheusClient::extract_single_value(&r))
353 .unwrap_or(0.0);
354
355 let error_query = "rate(mockforge_ws_errors_total[5m])";
357 let error_rate = state
358 .prometheus_client
359 .query(error_query)
360 .await
361 .ok()
362 .and_then(|r| PrometheusClient::extract_single_value(&r))
363 .unwrap_or(0.0);
364
365 let duration_query =
367 "rate(mockforge_ws_connection_duration_seconds_sum[5m]) / rate(mockforge_ws_connection_duration_seconds_count[5m])";
368 let avg_duration = state
369 .prometheus_client
370 .query(duration_query)
371 .await
372 .ok()
373 .and_then(|r| PrometheusClient::extract_single_value(&r))
374 .unwrap_or(0.0);
375
376 let metrics = WebSocketMetrics {
377 active_connections,
378 total_connections,
379 message_rate_sent,
380 message_rate_received,
381 error_rate,
382 avg_connection_duration_seconds: avg_duration,
383 };
384
385 Ok(Json(ApiResponse::success(metrics)))
386}
387
388pub async fn get_smtp(
390 State(state): State<AnalyticsState>,
391) -> Result<Json<ApiResponse<SmtpMetrics>>, StatusCode> {
392 debug!("Fetching SMTP metrics");
393
394 let active_query = "mockforge_smtp_connections_active";
395 let active_connections = state
396 .prometheus_client
397 .query(active_query)
398 .await
399 .ok()
400 .and_then(|r| PrometheusClient::extract_single_value(&r))
401 .unwrap_or(0.0);
402
403 let total_query = "mockforge_smtp_connections_total";
404 let total_connections = state
405 .prometheus_client
406 .query(total_query)
407 .await
408 .ok()
409 .and_then(|r| PrometheusClient::extract_single_value(&r))
410 .unwrap_or(0.0);
411
412 let received_query = "rate(mockforge_smtp_messages_received_total[5m])";
413 let message_rate_received = state
414 .prometheus_client
415 .query(received_query)
416 .await
417 .ok()
418 .and_then(|r| PrometheusClient::extract_single_value(&r))
419 .unwrap_or(0.0);
420
421 let stored_query = "rate(mockforge_smtp_messages_stored_total[5m])";
422 let message_rate_stored = state
423 .prometheus_client
424 .query(stored_query)
425 .await
426 .ok()
427 .and_then(|r| PrometheusClient::extract_single_value(&r))
428 .unwrap_or(0.0);
429
430 let error_query = "sum(rate(mockforge_smtp_errors_total[5m]))";
431 let error_rate = state
432 .prometheus_client
433 .query(error_query)
434 .await
435 .ok()
436 .and_then(|r| PrometheusClient::extract_single_value(&r))
437 .unwrap_or(0.0);
438
439 let metrics = SmtpMetrics {
440 active_connections,
441 total_connections,
442 message_rate_received,
443 message_rate_stored,
444 error_rate,
445 };
446
447 Ok(Json(ApiResponse::success(metrics)))
448}
449
450pub async fn get_system(
452 State(state): State<AnalyticsState>,
453) -> Result<Json<ApiResponse<SystemMetrics>>, StatusCode> {
454 debug!("Fetching system metrics");
455
456 let memory_query = "mockforge_memory_usage_bytes / 1024 / 1024";
457 let memory_usage_mb = state
458 .prometheus_client
459 .query(memory_query)
460 .await
461 .ok()
462 .and_then(|r| PrometheusClient::extract_single_value(&r))
463 .unwrap_or(0.0);
464
465 let cpu_query = "mockforge_cpu_usage_percent";
466 let cpu_usage_percent = state
467 .prometheus_client
468 .query(cpu_query)
469 .await
470 .ok()
471 .and_then(|r| PrometheusClient::extract_single_value(&r))
472 .unwrap_or(0.0);
473
474 let thread_query = "mockforge_thread_count";
475 let thread_count = state
476 .prometheus_client
477 .query(thread_query)
478 .await
479 .ok()
480 .and_then(|r| PrometheusClient::extract_single_value(&r))
481 .unwrap_or(0.0);
482
483 let uptime_query = "mockforge_uptime_seconds";
484 let uptime_seconds = state
485 .prometheus_client
486 .query(uptime_query)
487 .await
488 .ok()
489 .and_then(|r| PrometheusClient::extract_single_value(&r))
490 .unwrap_or(0.0);
491
492 let metrics = SystemMetrics {
493 memory_usage_mb,
494 cpu_usage_percent,
495 thread_count,
496 uptime_seconds,
497 };
498
499 Ok(Json(ApiResponse::success(metrics)))
500}
501
502fn parse_time_range(range: &str) -> (i64, i64, String) {
504 let now = Utc::now().timestamp();
505 let duration_secs = match range {
506 "5m" => 5 * 60,
507 "15m" => 15 * 60,
508 "1h" => 60 * 60,
509 "6h" => 6 * 60 * 60,
510 "24h" => 24 * 60 * 60,
511 _ => 60 * 60, };
513
514 let start = now - duration_secs;
515 let step = match range {
516 "5m" => "15s",
517 "15m" => "30s",
518 "1h" => "1m",
519 "6h" => "5m",
520 "24h" => "15m",
521 _ => "1m",
522 }
523 .to_string();
524
525 (start, now, step)
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531
532 #[test]
533 fn test_parse_time_range() {
534 let (start, end, step) = parse_time_range("1h");
535 assert!(end - start == 3600);
536 assert_eq!(step, "1m");
537 }
538
539 #[test]
540 fn test_parse_time_range_5m() {
541 let (start, end, step) = parse_time_range("5m");
542 assert!(end - start == 300);
543 assert_eq!(step, "15s");
544 }
545}