mockforge_ui/handlers/
analytics_v2.rs

1//! Enhanced Analytics API handlers with persistent storage
2//!
3//! This module provides comprehensive analytics endpoints that combine:
4//! - Real-time metrics from Prometheus
5//! - Historical data from the analytics database
6//! - Advanced queries (time-series, trends, patterns)
7
8use axum::{
9    extract::{Query, State},
10    http::StatusCode,
11    Json,
12};
13use chrono::Utc;
14use mockforge_analytics::{
15    AnalyticsDatabase, AnalyticsFilter, ErrorCategory, Granularity, OverviewMetrics,
16};
17use serde::{Deserialize, Serialize};
18use std::sync::Arc;
19use tracing::{debug, error};
20
21use crate::models::ApiResponse;
22
23/// Enhanced analytics state with both Prometheus and database access
24#[derive(Clone)]
25pub struct AnalyticsV2State {
26    pub db: Arc<AnalyticsDatabase>,
27}
28
29impl AnalyticsV2State {
30    pub fn new(db: AnalyticsDatabase) -> Self {
31        Self { db: Arc::new(db) }
32    }
33}
34
35/// Query parameters for analytics endpoints
36#[derive(Debug, Deserialize)]
37pub struct AnalyticsQuery {
38    /// Start time (Unix timestamp)
39    pub start_time: Option<i64>,
40    /// End time (Unix timestamp)
41    pub end_time: Option<i64>,
42    /// Duration in seconds (alternative to start/end)
43    #[serde(default = "default_duration")]
44    pub duration: i64,
45    /// Protocol filter (HTTP, gRPC, WebSocket, etc.)
46    pub protocol: Option<String>,
47    /// Endpoint filter
48    pub endpoint: Option<String>,
49    /// Method filter (GET, POST, etc.)
50    pub method: Option<String>,
51    /// Status code filter
52    pub status_code: Option<i32>,
53    /// Workspace ID filter
54    pub workspace_id: Option<String>,
55    /// Environment filter (dev, staging, prod)
56    pub environment: Option<String>,
57    /// Limit results
58    #[serde(default = "default_limit")]
59    pub limit: i64,
60    /// Granularity for time-series data
61    #[serde(default = "default_granularity")]
62    pub granularity: String,
63}
64
65fn default_duration() -> i64 {
66    3600 // 1 hour
67}
68
69fn default_limit() -> i64 {
70    100
71}
72
73fn default_granularity() -> String {
74    "minute".to_string()
75}
76
77impl AnalyticsQuery {
78    /// Convert to AnalyticsFilter
79    fn to_filter(&self) -> AnalyticsFilter {
80        let (start_time, end_time) =
81            if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
82                (Some(start), Some(end))
83            } else {
84                let end = Utc::now().timestamp();
85                let start = end - self.duration;
86                (Some(start), Some(end))
87            };
88
89        AnalyticsFilter {
90            start_time,
91            end_time,
92            protocol: self.protocol.clone(),
93            endpoint: self.endpoint.clone(),
94            method: self.method.clone(),
95            status_code: self.status_code,
96            workspace_id: self.workspace_id.clone(),
97            environment: self.environment.clone(),
98            limit: Some(self.limit),
99        }
100    }
101
102    /// Parse granularity string
103    fn get_granularity(&self) -> Granularity {
104        match self.granularity.as_str() {
105            "minute" => Granularity::Minute,
106            "hour" => Granularity::Hour,
107            "day" => Granularity::Day,
108            _ => Granularity::Minute,
109        }
110    }
111}
112
113// ============================================================================
114// REST API Endpoints
115// ============================================================================
116
117/// GET /api/v2/analytics/overview
118///
119/// Get dashboard overview metrics including:
120/// - Total requests, errors, error rate
121/// - Latency percentiles (avg, p50, p95, p99)
122/// - Active connections, throughput
123/// - Top protocols and endpoints
124pub async fn get_overview(
125    State(state): State<AnalyticsV2State>,
126    Query(query): Query<AnalyticsQuery>,
127) -> Result<Json<ApiResponse<OverviewMetrics>>, StatusCode> {
128    debug!("Fetching analytics overview for duration: {}s", query.duration);
129
130    match state.db.get_overview_metrics(query.duration).await {
131        Ok(overview) => Ok(Json(ApiResponse::success(overview))),
132        Err(e) => {
133            error!("Failed to get overview metrics: {}", e);
134            Err(StatusCode::INTERNAL_SERVER_ERROR)
135        }
136    }
137}
138
139/// GET /api/v2/analytics/requests
140///
141/// Get request count time-series data
142#[derive(Debug, Serialize)]
143pub struct TimeSeriesResponse {
144    pub series: Vec<SeriesData>,
145}
146
147#[derive(Debug, Serialize)]
148pub struct SeriesData {
149    pub label: String,
150    pub data: Vec<DataPoint>,
151}
152
153#[derive(Debug, Serialize)]
154pub struct DataPoint {
155    pub timestamp: i64,
156    pub value: f64,
157}
158
159pub async fn get_requests_timeseries(
160    State(state): State<AnalyticsV2State>,
161    Query(query): Query<AnalyticsQuery>,
162) -> Result<Json<ApiResponse<TimeSeriesResponse>>, StatusCode> {
163    debug!("Fetching request time-series");
164
165    let filter = query.to_filter();
166    let granularity = query.get_granularity();
167
168    match state.db.get_request_time_series(&filter, granularity).await {
169        Ok(time_series) => {
170            let series = time_series
171                .into_iter()
172                .map(|ts| SeriesData {
173                    label: ts.label,
174                    data: ts
175                        .data
176                        .into_iter()
177                        .map(|point| DataPoint {
178                            timestamp: point.timestamp,
179                            value: point.value,
180                        })
181                        .collect(),
182                })
183                .collect();
184
185            Ok(Json(ApiResponse::success(TimeSeriesResponse { series })))
186        }
187        Err(e) => {
188            error!("Failed to get request time-series: {}", e);
189            Err(StatusCode::INTERNAL_SERVER_ERROR)
190        }
191    }
192}
193
194/// GET /api/v2/analytics/latency
195///
196/// Get latency trends (percentiles over time)
197#[derive(Debug, Serialize)]
198pub struct LatencyResponse {
199    pub trends: Vec<LatencyTrendData>,
200}
201
202#[derive(Debug, Serialize)]
203pub struct LatencyTrendData {
204    pub timestamp: i64,
205    pub p50: f64,
206    pub p95: f64,
207    pub p99: f64,
208    pub avg: f64,
209    pub min: f64,
210    pub max: f64,
211}
212
213pub async fn get_latency_trends(
214    State(state): State<AnalyticsV2State>,
215    Query(query): Query<AnalyticsQuery>,
216) -> Result<Json<ApiResponse<LatencyResponse>>, StatusCode> {
217    debug!("Fetching latency trends");
218
219    let filter = query.to_filter();
220
221    match state.db.get_latency_trends(&filter).await {
222        Ok(trends) => {
223            let trend_data = trends
224                .into_iter()
225                .map(|t| LatencyTrendData {
226                    timestamp: t.timestamp,
227                    p50: t.p50,
228                    p95: t.p95,
229                    p99: t.p99,
230                    avg: t.avg,
231                    min: t.min,
232                    max: t.max,
233                })
234                .collect();
235
236            Ok(Json(ApiResponse::success(LatencyResponse { trends: trend_data })))
237        }
238        Err(e) => {
239            error!("Failed to get latency trends: {}", e);
240            Err(StatusCode::INTERNAL_SERVER_ERROR)
241        }
242    }
243}
244
245/// GET /api/v2/analytics/errors
246///
247/// Get error summary (grouped by type and category)
248#[derive(Debug, Serialize)]
249pub struct ErrorResponse {
250    pub errors: Vec<ErrorSummaryData>,
251}
252
253#[derive(Debug, Serialize)]
254pub struct ErrorSummaryData {
255    pub error_type: String,
256    pub error_category: String,
257    pub count: i64,
258    pub endpoints: Vec<String>,
259    pub last_occurrence: String,
260}
261
262pub async fn get_error_summary(
263    State(state): State<AnalyticsV2State>,
264    Query(query): Query<AnalyticsQuery>,
265) -> Result<Json<ApiResponse<ErrorResponse>>, StatusCode> {
266    debug!("Fetching error summary");
267
268    let filter = query.to_filter();
269
270    match state.db.get_error_summary(&filter, query.limit).await {
271        Ok(errors) => {
272            let error_data = errors
273                .into_iter()
274                .map(|e| ErrorSummaryData {
275                    error_type: e.error_type,
276                    error_category: e.error_category,
277                    count: e.count,
278                    endpoints: e.endpoints,
279                    last_occurrence: e.last_occurrence.to_rfc3339(),
280                })
281                .collect();
282
283            Ok(Json(ApiResponse::success(ErrorResponse { errors: error_data })))
284        }
285        Err(e) => {
286            error!("Failed to get error summary: {}", e);
287            Err(StatusCode::INTERNAL_SERVER_ERROR)
288        }
289    }
290}
291
292/// GET /api/v2/analytics/endpoints
293///
294/// Get top endpoints by traffic
295#[derive(Debug, Serialize)]
296pub struct EndpointsResponse {
297    pub endpoints: Vec<EndpointData>,
298}
299
300#[derive(Debug, Serialize)]
301pub struct EndpointData {
302    pub endpoint: String,
303    pub protocol: String,
304    pub method: Option<String>,
305    pub total_requests: i64,
306    pub total_errors: i64,
307    pub error_rate: f64,
308    pub avg_latency_ms: f64,
309    pub p95_latency_ms: f64,
310    pub bytes_sent: i64,
311    pub bytes_received: i64,
312}
313
314pub async fn get_top_endpoints(
315    State(state): State<AnalyticsV2State>,
316    Query(query): Query<AnalyticsQuery>,
317) -> Result<Json<ApiResponse<EndpointsResponse>>, StatusCode> {
318    debug!("Fetching top {} endpoints", query.limit);
319
320    match state.db.get_top_endpoints(query.limit, query.workspace_id.as_deref()).await {
321        Ok(endpoints) => {
322            let endpoint_data = endpoints
323                .into_iter()
324                .map(|e| {
325                    let error_rate = if e.total_requests > 0 {
326                        (e.total_errors as f64 / e.total_requests as f64) * 100.0
327                    } else {
328                        0.0
329                    };
330
331                    EndpointData {
332                        endpoint: e.endpoint,
333                        protocol: e.protocol,
334                        method: e.method,
335                        total_requests: e.total_requests,
336                        total_errors: e.total_errors,
337                        error_rate,
338                        avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
339                        p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
340                        bytes_sent: e.total_bytes_sent,
341                        bytes_received: e.total_bytes_received,
342                    }
343                })
344                .collect();
345
346            Ok(Json(ApiResponse::success(EndpointsResponse {
347                endpoints: endpoint_data,
348            })))
349        }
350        Err(e) => {
351            error!("Failed to get top endpoints: {}", e);
352            Err(StatusCode::INTERNAL_SERVER_ERROR)
353        }
354    }
355}
356
357/// GET /api/v2/analytics/protocols
358///
359/// Get traffic breakdown by protocol
360#[derive(Debug, Serialize)]
361pub struct ProtocolsResponse {
362    pub protocols: Vec<ProtocolData>,
363}
364
365#[derive(Debug, Serialize)]
366pub struct ProtocolData {
367    pub protocol: String,
368    pub request_count: i64,
369    pub error_count: i64,
370    pub avg_latency_ms: f64,
371}
372
373pub async fn get_protocol_breakdown(
374    State(state): State<AnalyticsV2State>,
375    Query(query): Query<AnalyticsQuery>,
376) -> Result<Json<ApiResponse<ProtocolsResponse>>, StatusCode> {
377    debug!("Fetching protocol breakdown");
378
379    match state.db.get_top_protocols(10, query.workspace_id.as_deref()).await {
380        Ok(protocols) => {
381            let protocol_data = protocols
382                .into_iter()
383                .map(|p| ProtocolData {
384                    protocol: p.protocol,
385                    request_count: p.request_count,
386                    error_count: p.error_count,
387                    avg_latency_ms: p.avg_latency_ms,
388                })
389                .collect();
390
391            Ok(Json(ApiResponse::success(ProtocolsResponse {
392                protocols: protocol_data,
393            })))
394        }
395        Err(e) => {
396            error!("Failed to get protocol breakdown: {}", e);
397            Err(StatusCode::INTERNAL_SERVER_ERROR)
398        }
399    }
400}
401
402/// GET /api/v2/analytics/traffic-patterns
403///
404/// Get traffic patterns for heatmap visualization
405#[derive(Debug, Serialize)]
406pub struct TrafficPatternsResponse {
407    pub patterns: Vec<TrafficPatternData>,
408}
409
410#[derive(Debug, Serialize)]
411pub struct TrafficPatternData {
412    pub date: String,
413    pub hour: i32,
414    pub day_of_week: i32,
415    pub request_count: i64,
416    pub error_count: i64,
417    pub avg_latency_ms: f64,
418}
419
420#[derive(Debug, Deserialize)]
421pub struct TrafficPatternsQuery {
422    #[serde(default = "default_pattern_days")]
423    pub days: i64,
424    pub workspace_id: Option<String>,
425}
426
427fn default_pattern_days() -> i64 {
428    30
429}
430
431pub async fn get_traffic_patterns(
432    State(state): State<AnalyticsV2State>,
433    Query(query): Query<TrafficPatternsQuery>,
434) -> Result<Json<ApiResponse<TrafficPatternsResponse>>, StatusCode> {
435    debug!("Fetching traffic patterns for {} days", query.days);
436
437    match state.db.get_traffic_patterns(query.days, query.workspace_id.as_deref()).await {
438        Ok(patterns) => {
439            let pattern_data = patterns
440                .into_iter()
441                .map(|p| TrafficPatternData {
442                    date: p.date,
443                    hour: p.hour,
444                    day_of_week: p.day_of_week,
445                    request_count: p.request_count,
446                    error_count: p.error_count,
447                    avg_latency_ms: p.avg_latency_ms.unwrap_or(0.0),
448                })
449                .collect();
450
451            Ok(Json(ApiResponse::success(TrafficPatternsResponse {
452                patterns: pattern_data,
453            })))
454        }
455        Err(e) => {
456            error!("Failed to get traffic patterns: {}", e);
457            Err(StatusCode::INTERNAL_SERVER_ERROR)
458        }
459    }
460}
461
462/// GET /api/v2/analytics/export/csv
463///
464/// Export analytics data to CSV format
465pub async fn export_csv(
466    State(state): State<AnalyticsV2State>,
467    Query(query): Query<AnalyticsQuery>,
468) -> Result<(StatusCode, String), StatusCode> {
469    debug!("Exporting analytics to CSV");
470
471    let filter = query.to_filter();
472    let mut buffer = Vec::new();
473
474    match state.db.export_to_csv(&mut buffer, &filter).await {
475        Ok(_) => {
476            let csv_data = String::from_utf8(buffer).unwrap_or_default();
477            Ok((StatusCode::OK, csv_data))
478        }
479        Err(e) => {
480            error!("Failed to export to CSV: {}", e);
481            Err(StatusCode::INTERNAL_SERVER_ERROR)
482        }
483    }
484}
485
486/// GET /api/v2/analytics/export/json
487///
488/// Export analytics data to JSON format
489pub async fn export_json(
490    State(state): State<AnalyticsV2State>,
491    Query(query): Query<AnalyticsQuery>,
492) -> Result<(StatusCode, String), StatusCode> {
493    debug!("Exporting analytics to JSON");
494
495    let filter = query.to_filter();
496
497    match state.db.export_to_json(&filter).await {
498        Ok(json) => Ok((StatusCode::OK, json)),
499        Err(e) => {
500            error!("Failed to export to JSON: {}", e);
501            Err(StatusCode::INTERNAL_SERVER_ERROR)
502        }
503    }
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509
510    #[test]
511    fn test_analytics_query_to_filter() {
512        let query = AnalyticsQuery {
513            start_time: Some(100),
514            end_time: Some(200),
515            duration: 3600,
516            protocol: Some("HTTP".to_string()),
517            endpoint: Some("/api/test".to_string()),
518            method: Some("GET".to_string()),
519            status_code: Some(200),
520            workspace_id: None,
521            environment: Some("prod".to_string()),
522            limit: 50,
523            granularity: "minute".to_string(),
524        };
525
526        let filter = query.to_filter();
527        assert_eq!(filter.start_time, Some(100));
528        assert_eq!(filter.end_time, Some(200));
529        assert_eq!(filter.protocol, Some("HTTP".to_string()));
530        assert_eq!(filter.limit, Some(50));
531    }
532
533    #[test]
534    fn test_granularity_parsing() {
535        let query = AnalyticsQuery {
536            start_time: None,
537            end_time: None,
538            duration: 3600,
539            protocol: None,
540            endpoint: None,
541            method: None,
542            status_code: None,
543            workspace_id: None,
544            environment: None,
545            limit: 100,
546            granularity: "hour".to_string(),
547        };
548
549        assert_eq!(query.get_granularity(), Granularity::Hour);
550    }
551}