1use axum::{
6    extract::{Query, State},
7    http::StatusCode,
8    Json,
9};
10use chrono::Utc;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use tracing::{debug, error};
14
15use crate::models::ApiResponse;
16use crate::prometheus_client::PrometheusClient;
17
18#[derive(Clone)]
20pub struct AnalyticsState {
21    pub prometheus_client: PrometheusClient,
22}
23
24impl AnalyticsState {
25    pub fn new(prometheus_url: String) -> Self {
26        Self {
27            prometheus_client: PrometheusClient::new(prometheus_url),
28        }
29    }
30}
31
32#[derive(Debug, Deserialize)]
34pub struct TimeRangeQuery {
35    #[serde(default = "default_range")]
36    pub range: String,
37}
38
39fn default_range() -> String {
40    "1h".to_string()
41}
42
43#[derive(Debug, Serialize)]
45pub struct SummaryMetrics {
46    pub timestamp: String,
47    pub request_rate: f64,
48    pub p95_latency_ms: f64,
49    pub error_rate_percent: f64,
50    pub active_connections: f64,
51}
52
53#[derive(Debug, Serialize)]
55pub struct RequestMetrics {
56    pub timestamps: Vec<i64>,
57    pub series: Vec<SeriesData>,
58}
59
60#[derive(Debug, Serialize)]
61pub struct SeriesData {
62    pub name: String,
63    pub values: Vec<f64>,
64}
65
66#[derive(Debug, Serialize)]
68pub struct EndpointMetrics {
69    pub path: String,
70    pub method: String,
71    pub request_rate: f64,
72    pub avg_latency_ms: f64,
73    pub p95_latency_ms: f64,
74    pub errors: f64,
75    pub error_rate_percent: f64,
76}
77
78#[derive(Debug, Serialize)]
80pub struct WebSocketMetrics {
81    pub active_connections: f64,
82    pub total_connections: f64,
83    pub message_rate_sent: f64,
84    pub message_rate_received: f64,
85    pub error_rate: f64,
86    pub avg_connection_duration_seconds: f64,
87}
88
89#[derive(Debug, Serialize)]
91pub struct SystemMetrics {
92    pub memory_usage_mb: f64,
93    pub cpu_usage_percent: f64,
94    pub thread_count: f64,
95    pub uptime_seconds: f64,
96}
97
98#[derive(Debug, Serialize)]
100pub struct SmtpMetrics {
101    pub active_connections: f64,
102    pub total_connections: f64,
103    pub message_rate_received: f64,
104    pub message_rate_stored: f64,
105    pub error_rate: f64,
106}
107
108pub async fn get_summary(
110    State(state): State<AnalyticsState>,
111    Query(params): Query<TimeRangeQuery>,
112) -> Result<Json<ApiResponse<SummaryMetrics>>, StatusCode> {
113    debug!("Fetching analytics summary for range: {}", params.range);
114
115    let request_rate_query = "sum(rate(mockforge_requests_total[5m]))";
117    let request_rate = match state.prometheus_client.query(request_rate_query).await {
118        Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
119        Err(e) => {
120            error!("Failed to query request rate: {}", e);
121            0.0
122        }
123    };
124
125    let p95_query = "histogram_quantile(0.95, sum(rate(mockforge_request_duration_seconds_bucket[5m])) by (le)) * 1000";
127    let p95_latency = match state.prometheus_client.query(p95_query).await {
128        Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
129        Err(e) => {
130            error!("Failed to query P95 latency: {}", e);
131            0.0
132        }
133    };
134
135    let error_rate_query =
137        "(sum(rate(mockforge_errors_total[5m])) / sum(rate(mockforge_requests_total[5m]))) * 100";
138    let error_rate = match state.prometheus_client.query(error_rate_query).await {
139        Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
140        Err(e) => {
141            error!("Failed to query error rate: {}", e);
142            0.0
143        }
144    };
145
146    let active_conn_query = "sum(mockforge_requests_in_flight)";
148    let active_connections = match state.prometheus_client.query(active_conn_query).await {
149        Ok(response) => PrometheusClient::extract_single_value(&response).unwrap_or(0.0),
150        Err(e) => {
151            error!("Failed to query active connections: {}", e);
152            0.0
153        }
154    };
155
156    let summary = SummaryMetrics {
157        timestamp: Utc::now().to_rfc3339(),
158        request_rate,
159        p95_latency_ms: p95_latency,
160        error_rate_percent: error_rate,
161        active_connections,
162    };
163
164    Ok(Json(ApiResponse::success(summary)))
165}
166
167pub async fn get_requests(
169    State(state): State<AnalyticsState>,
170    Query(params): Query<TimeRangeQuery>,
171) -> Result<Json<ApiResponse<RequestMetrics>>, StatusCode> {
172    debug!("Fetching request metrics for range: {}", params.range);
173
174    let (start, end, step) = parse_time_range(¶ms.range);
175
176    let query = "sum by (protocol) (rate(mockforge_requests_total[5m]))";
177
178    match state.prometheus_client.query_range(query, start, end, &step).await {
179        Ok(response) => {
180            let time_series = PrometheusClient::extract_time_series(&response);
181
182            let mut timestamps: Vec<i64> = Vec::new();
184            if let Some((_, values)) = time_series.first() {
185                timestamps = values.iter().map(|(ts, _)| *ts).collect();
186            }
187
188            let series: Vec<SeriesData> = time_series
190                .into_iter()
191                .map(|(name, values)| SeriesData {
192                    name,
193                    values: values.into_iter().map(|(_, v)| v).collect(),
194                })
195                .collect();
196
197            let metrics = RequestMetrics { timestamps, series };
198
199            Ok(Json(ApiResponse::success(metrics)))
200        }
201        Err(e) => {
202            error!("Failed to query request metrics: {}", e);
203            Err(StatusCode::INTERNAL_SERVER_ERROR)
204        }
205    }
206}
207
208pub async fn get_endpoints(
210    State(state): State<AnalyticsState>,
211    Query(params): Query<HashMap<String, String>>,
212) -> Result<Json<ApiResponse<Vec<EndpointMetrics>>>, StatusCode> {
213    let limit = params.get("limit").and_then(|s| s.parse::<usize>().ok()).unwrap_or(10);
214
215    debug!("Fetching top {} endpoints", limit);
216
217    let query = format!(
219        "topk({}, sum by (path, method) (rate(mockforge_requests_by_path_total[5m])))",
220        limit
221    );
222
223    match state.prometheus_client.query(&query).await {
224        Ok(response) => {
225            let mut endpoints = Vec::new();
226
227            for result in &response.data.result {
228                if let Some(metric) = result.metric.as_object() {
229                    let path =
230                        metric.get("path").and_then(|v| v.as_str()).unwrap_or("").to_string();
231                    let method =
232                        metric.get("method").and_then(|v| v.as_str()).unwrap_or("").to_string();
233                    let request_rate: f64 =
234                        result.value.as_ref().and_then(|(_, v)| v.parse().ok()).unwrap_or(0.0);
235
236                    let avg_latency_query = format!(
238                        "mockforge_average_latency_by_path_seconds{{path=\"{}\",method=\"{}\"}} * 1000",
239                        path, method
240                    );
241                    let avg_latency = state
242                        .prometheus_client
243                        .query(&avg_latency_query)
244                        .await
245                        .ok()
246                        .and_then(|r| PrometheusClient::extract_single_value(&r))
247                        .unwrap_or(0.0);
248
249                    let p95_query = format!(
251                        "histogram_quantile(0.95, sum(rate(mockforge_request_duration_by_path_seconds_bucket{{path=\"{}\",method=\"{}\"}}[5m])) by (le)) * 1000",
252                        path, method
253                    );
254                    let p95_latency = state
255                        .prometheus_client
256                        .query(&p95_query)
257                        .await
258                        .ok()
259                        .and_then(|r| PrometheusClient::extract_single_value(&r))
260                        .unwrap_or(0.0);
261
262                    let error_count_query = format!(
264                        "sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\",status=~\"4..|5..\"}}[5m]))",
265                        path, method
266                    );
267                    let error_count = state
268                        .prometheus_client
269                        .query(&error_count_query)
270                        .await
271                        .ok()
272                        .and_then(|r| PrometheusClient::extract_single_value(&r))
273                        .unwrap_or(0.0);
274
275                    let error_rate_query = format!(
277                        "(sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\",status=~\"4..|5..\"}}[5m])) / sum(rate(mockforge_requests_by_path_total{{path=\"{}\",method=\"{}\"}}[5m]))) * 100",
278                        path, method, path, method
279                    );
280                    let error_rate_percent = state
281                        .prometheus_client
282                        .query(&error_rate_query)
283                        .await
284                        .ok()
285                        .and_then(|r| PrometheusClient::extract_single_value(&r))
286                        .unwrap_or(0.0);
287
288                    endpoints.push(EndpointMetrics {
289                        path,
290                        method,
291                        request_rate,
292                        avg_latency_ms: avg_latency,
293                        p95_latency_ms: p95_latency,
294                        errors: error_count,
295                        error_rate_percent,
296                    });
297                }
298            }
299
300            Ok(Json(ApiResponse::success(endpoints)))
301        }
302        Err(e) => {
303            error!("Failed to query endpoint metrics: {}", e);
304            Err(StatusCode::INTERNAL_SERVER_ERROR)
305        }
306    }
307}
308
309pub async fn get_websocket(
311    State(state): State<AnalyticsState>,
312) -> Result<Json<ApiResponse<WebSocketMetrics>>, StatusCode> {
313    debug!("Fetching WebSocket metrics");
314
315    let active_query = "mockforge_ws_connections_active";
317    let active_connections = state
318        .prometheus_client
319        .query(active_query)
320        .await
321        .ok()
322        .and_then(|r| PrometheusClient::extract_single_value(&r))
323        .unwrap_or(0.0);
324
325    let total_query = "mockforge_ws_connections_total";
327    let total_connections = state
328        .prometheus_client
329        .query(total_query)
330        .await
331        .ok()
332        .and_then(|r| PrometheusClient::extract_single_value(&r))
333        .unwrap_or(0.0);
334
335    let sent_query = "rate(mockforge_ws_messages_sent_total[5m])";
337    let message_rate_sent = state
338        .prometheus_client
339        .query(sent_query)
340        .await
341        .ok()
342        .and_then(|r| PrometheusClient::extract_single_value(&r))
343        .unwrap_or(0.0);
344
345    let received_query = "rate(mockforge_ws_messages_received_total[5m])";
347    let message_rate_received = state
348        .prometheus_client
349        .query(received_query)
350        .await
351        .ok()
352        .and_then(|r| PrometheusClient::extract_single_value(&r))
353        .unwrap_or(0.0);
354
355    let error_query = "rate(mockforge_ws_errors_total[5m])";
357    let error_rate = state
358        .prometheus_client
359        .query(error_query)
360        .await
361        .ok()
362        .and_then(|r| PrometheusClient::extract_single_value(&r))
363        .unwrap_or(0.0);
364
365    let duration_query =
367        "rate(mockforge_ws_connection_duration_seconds_sum[5m]) / rate(mockforge_ws_connection_duration_seconds_count[5m])";
368    let avg_duration = state
369        .prometheus_client
370        .query(duration_query)
371        .await
372        .ok()
373        .and_then(|r| PrometheusClient::extract_single_value(&r))
374        .unwrap_or(0.0);
375
376    let metrics = WebSocketMetrics {
377        active_connections,
378        total_connections,
379        message_rate_sent,
380        message_rate_received,
381        error_rate,
382        avg_connection_duration_seconds: avg_duration,
383    };
384
385    Ok(Json(ApiResponse::success(metrics)))
386}
387
388pub async fn get_smtp(
390    State(state): State<AnalyticsState>,
391) -> Result<Json<ApiResponse<SmtpMetrics>>, StatusCode> {
392    debug!("Fetching SMTP metrics");
393
394    let active_query = "mockforge_smtp_connections_active";
395    let active_connections = state
396        .prometheus_client
397        .query(active_query)
398        .await
399        .ok()
400        .and_then(|r| PrometheusClient::extract_single_value(&r))
401        .unwrap_or(0.0);
402
403    let total_query = "mockforge_smtp_connections_total";
404    let total_connections = state
405        .prometheus_client
406        .query(total_query)
407        .await
408        .ok()
409        .and_then(|r| PrometheusClient::extract_single_value(&r))
410        .unwrap_or(0.0);
411
412    let received_query = "rate(mockforge_smtp_messages_received_total[5m])";
413    let message_rate_received = state
414        .prometheus_client
415        .query(received_query)
416        .await
417        .ok()
418        .and_then(|r| PrometheusClient::extract_single_value(&r))
419        .unwrap_or(0.0);
420
421    let stored_query = "rate(mockforge_smtp_messages_stored_total[5m])";
422    let message_rate_stored = state
423        .prometheus_client
424        .query(stored_query)
425        .await
426        .ok()
427        .and_then(|r| PrometheusClient::extract_single_value(&r))
428        .unwrap_or(0.0);
429
430    let error_query = "sum(rate(mockforge_smtp_errors_total[5m]))";
431    let error_rate = state
432        .prometheus_client
433        .query(error_query)
434        .await
435        .ok()
436        .and_then(|r| PrometheusClient::extract_single_value(&r))
437        .unwrap_or(0.0);
438
439    let metrics = SmtpMetrics {
440        active_connections,
441        total_connections,
442        message_rate_received,
443        message_rate_stored,
444        error_rate,
445    };
446
447    Ok(Json(ApiResponse::success(metrics)))
448}
449
450pub async fn get_system(
452    State(state): State<AnalyticsState>,
453) -> Result<Json<ApiResponse<SystemMetrics>>, StatusCode> {
454    debug!("Fetching system metrics");
455
456    let memory_query = "mockforge_memory_usage_bytes / 1024 / 1024";
457    let memory_usage_mb = state
458        .prometheus_client
459        .query(memory_query)
460        .await
461        .ok()
462        .and_then(|r| PrometheusClient::extract_single_value(&r))
463        .unwrap_or(0.0);
464
465    let cpu_query = "mockforge_cpu_usage_percent";
466    let cpu_usage_percent = state
467        .prometheus_client
468        .query(cpu_query)
469        .await
470        .ok()
471        .and_then(|r| PrometheusClient::extract_single_value(&r))
472        .unwrap_or(0.0);
473
474    let thread_query = "mockforge_thread_count";
475    let thread_count = state
476        .prometheus_client
477        .query(thread_query)
478        .await
479        .ok()
480        .and_then(|r| PrometheusClient::extract_single_value(&r))
481        .unwrap_or(0.0);
482
483    let uptime_query = "mockforge_uptime_seconds";
484    let uptime_seconds = state
485        .prometheus_client
486        .query(uptime_query)
487        .await
488        .ok()
489        .and_then(|r| PrometheusClient::extract_single_value(&r))
490        .unwrap_or(0.0);
491
492    let metrics = SystemMetrics {
493        memory_usage_mb,
494        cpu_usage_percent,
495        thread_count,
496        uptime_seconds,
497    };
498
499    Ok(Json(ApiResponse::success(metrics)))
500}
501
502fn parse_time_range(range: &str) -> (i64, i64, String) {
504    let now = Utc::now().timestamp();
505    let duration_secs = match range {
506        "5m" => 5 * 60,
507        "15m" => 15 * 60,
508        "1h" => 60 * 60,
509        "6h" => 6 * 60 * 60,
510        "24h" => 24 * 60 * 60,
511        _ => 60 * 60, };
513
514    let start = now - duration_secs;
515    let step = match range {
516        "5m" => "15s",
517        "15m" => "30s",
518        "1h" => "1m",
519        "6h" => "5m",
520        "24h" => "15m",
521        _ => "1m",
522    }
523    .to_string();
524
525    (start, now, step)
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531
532    #[test]
533    fn test_parse_time_range() {
534        let (start, end, step) = parse_time_range("1h");
535        assert!(end - start == 3600);
536        assert_eq!(step, "1m");
537    }
538
539    #[test]
540    fn test_parse_time_range_5m() {
541        let (start, end, step) = parse_time_range("5m");
542        assert!(end - start == 300);
543        assert_eq!(step, "15s");
544    }
545}