mockforge_ui/handlers/
analytics_v2.rs

1//! Enhanced Analytics API handlers with persistent storage
2//!
3//! This module provides comprehensive analytics endpoints that combine:
4//! - Real-time metrics from Prometheus
5//! - Historical data from the analytics database
6//! - Advanced queries (time-series, trends, patterns)
7
8use axum::{
9    extract::{Query, State},
10    http::StatusCode,
11    Json,
12};
13use chrono::Utc;
14use mockforge_analytics::{AnalyticsDatabase, AnalyticsFilter, Granularity, OverviewMetrics};
15use serde::{Deserialize, Serialize};
16use std::sync::Arc;
17use tracing::{debug, error};
18
19use crate::models::ApiResponse;
20
21/// Enhanced analytics state with both Prometheus and database access
22#[derive(Clone)]
23pub struct AnalyticsV2State {
24    pub db: Arc<AnalyticsDatabase>,
25}
26
27impl AnalyticsV2State {
28    pub fn new(db: AnalyticsDatabase) -> Self {
29        Self { db: Arc::new(db) }
30    }
31}
32
33/// Query parameters for analytics endpoints
34#[derive(Debug, Deserialize)]
35pub struct AnalyticsQuery {
36    /// Start time (Unix timestamp)
37    pub start_time: Option<i64>,
38    /// End time (Unix timestamp)
39    pub end_time: Option<i64>,
40    /// Duration in seconds (alternative to start/end)
41    #[serde(default = "default_duration")]
42    pub duration: i64,
43    /// Protocol filter (HTTP, gRPC, WebSocket, etc.)
44    pub protocol: Option<String>,
45    /// Endpoint filter
46    pub endpoint: Option<String>,
47    /// Method filter (GET, POST, etc.)
48    pub method: Option<String>,
49    /// Status code filter
50    pub status_code: Option<i32>,
51    /// Workspace ID filter
52    pub workspace_id: Option<String>,
53    /// Environment filter (dev, staging, prod)
54    pub environment: Option<String>,
55    /// Limit results
56    #[serde(default = "default_limit")]
57    pub limit: i64,
58    /// Granularity for time-series data
59    #[serde(default = "default_granularity")]
60    pub granularity: String,
61}
62
63fn default_duration() -> i64 {
64    3600 // 1 hour
65}
66
67fn default_limit() -> i64 {
68    100
69}
70
71fn default_granularity() -> String {
72    "minute".to_string()
73}
74
75impl AnalyticsQuery {
76    /// Convert to AnalyticsFilter
77    fn to_filter(&self) -> AnalyticsFilter {
78        let (start_time, end_time) =
79            if let (Some(start), Some(end)) = (self.start_time, self.end_time) {
80                (Some(start), Some(end))
81            } else {
82                let end = Utc::now().timestamp();
83                let start = end - self.duration;
84                (Some(start), Some(end))
85            };
86
87        AnalyticsFilter {
88            start_time,
89            end_time,
90            protocol: self.protocol.clone(),
91            endpoint: self.endpoint.clone(),
92            method: self.method.clone(),
93            status_code: self.status_code,
94            workspace_id: self.workspace_id.clone(),
95            environment: self.environment.clone(),
96            limit: Some(self.limit),
97        }
98    }
99
100    /// Parse granularity string
101    fn get_granularity(&self) -> Granularity {
102        match self.granularity.as_str() {
103            "minute" => Granularity::Minute,
104            "hour" => Granularity::Hour,
105            "day" => Granularity::Day,
106            _ => Granularity::Minute,
107        }
108    }
109}
110
111// ============================================================================
112// REST API Endpoints
113// ============================================================================
114
115/// GET /api/v2/analytics/overview
116///
117/// Get dashboard overview metrics including:
118/// - Total requests, errors, error rate
119/// - Latency percentiles (avg, p50, p95, p99)
120/// - Active connections, throughput
121/// - Top protocols and endpoints
122pub async fn get_overview(
123    State(state): State<AnalyticsV2State>,
124    Query(query): Query<AnalyticsQuery>,
125) -> Result<Json<ApiResponse<OverviewMetrics>>, StatusCode> {
126    debug!("Fetching analytics overview for duration: {}s", query.duration);
127
128    match state.db.get_overview_metrics(query.duration).await {
129        Ok(overview) => Ok(Json(ApiResponse::success(overview))),
130        Err(e) => {
131            error!("Failed to get overview metrics: {}", e);
132            Err(StatusCode::INTERNAL_SERVER_ERROR)
133        }
134    }
135}
136
137/// GET /api/v2/analytics/requests
138///
139/// Get request count time-series data
140#[derive(Debug, Serialize)]
141pub struct TimeSeriesResponse {
142    pub series: Vec<SeriesData>,
143}
144
145#[derive(Debug, Serialize)]
146pub struct SeriesData {
147    pub label: String,
148    pub data: Vec<DataPoint>,
149}
150
151#[derive(Debug, Serialize)]
152pub struct DataPoint {
153    pub timestamp: i64,
154    pub value: f64,
155}
156
157pub async fn get_requests_timeseries(
158    State(state): State<AnalyticsV2State>,
159    Query(query): Query<AnalyticsQuery>,
160) -> Result<Json<ApiResponse<TimeSeriesResponse>>, StatusCode> {
161    debug!("Fetching request time-series");
162
163    let filter = query.to_filter();
164    let granularity = query.get_granularity();
165
166    match state.db.get_request_time_series(&filter, granularity).await {
167        Ok(time_series) => {
168            let series = time_series
169                .into_iter()
170                .map(|ts| SeriesData {
171                    label: ts.label,
172                    data: ts
173                        .data
174                        .into_iter()
175                        .map(|point| DataPoint {
176                            timestamp: point.timestamp,
177                            value: point.value,
178                        })
179                        .collect(),
180                })
181                .collect();
182
183            Ok(Json(ApiResponse::success(TimeSeriesResponse { series })))
184        }
185        Err(e) => {
186            error!("Failed to get request time-series: {}", e);
187            Err(StatusCode::INTERNAL_SERVER_ERROR)
188        }
189    }
190}
191
192/// GET /api/v2/analytics/latency
193///
194/// Get latency trends (percentiles over time)
195#[derive(Debug, Serialize)]
196pub struct LatencyResponse {
197    pub trends: Vec<LatencyTrendData>,
198}
199
200#[derive(Debug, Serialize)]
201pub struct LatencyTrendData {
202    pub timestamp: i64,
203    pub p50: f64,
204    pub p95: f64,
205    pub p99: f64,
206    pub avg: f64,
207    pub min: f64,
208    pub max: f64,
209}
210
211pub async fn get_latency_trends(
212    State(state): State<AnalyticsV2State>,
213    Query(query): Query<AnalyticsQuery>,
214) -> Result<Json<ApiResponse<LatencyResponse>>, StatusCode> {
215    debug!("Fetching latency trends");
216
217    let filter = query.to_filter();
218
219    match state.db.get_latency_trends(&filter).await {
220        Ok(trends) => {
221            let trend_data = trends
222                .into_iter()
223                .map(|t| LatencyTrendData {
224                    timestamp: t.timestamp,
225                    p50: t.p50,
226                    p95: t.p95,
227                    p99: t.p99,
228                    avg: t.avg,
229                    min: t.min,
230                    max: t.max,
231                })
232                .collect();
233
234            Ok(Json(ApiResponse::success(LatencyResponse { trends: trend_data })))
235        }
236        Err(e) => {
237            error!("Failed to get latency trends: {}", e);
238            Err(StatusCode::INTERNAL_SERVER_ERROR)
239        }
240    }
241}
242
243/// GET /api/v2/analytics/errors
244///
245/// Get error summary (grouped by type and category)
246#[derive(Debug, Serialize)]
247pub struct ErrorResponse {
248    pub errors: Vec<ErrorSummaryData>,
249}
250
251#[derive(Debug, Serialize)]
252pub struct ErrorSummaryData {
253    pub error_type: String,
254    pub error_category: String,
255    pub count: i64,
256    pub endpoints: Vec<String>,
257    pub last_occurrence: String,
258}
259
260pub async fn get_error_summary(
261    State(state): State<AnalyticsV2State>,
262    Query(query): Query<AnalyticsQuery>,
263) -> Result<Json<ApiResponse<ErrorResponse>>, StatusCode> {
264    debug!("Fetching error summary");
265
266    let filter = query.to_filter();
267
268    match state.db.get_error_summary(&filter, query.limit).await {
269        Ok(errors) => {
270            let error_data = errors
271                .into_iter()
272                .map(|e| ErrorSummaryData {
273                    error_type: e.error_type,
274                    error_category: e.error_category,
275                    count: e.count,
276                    endpoints: e.endpoints,
277                    last_occurrence: e.last_occurrence.to_rfc3339(),
278                })
279                .collect();
280
281            Ok(Json(ApiResponse::success(ErrorResponse { errors: error_data })))
282        }
283        Err(e) => {
284            error!("Failed to get error summary: {}", e);
285            Err(StatusCode::INTERNAL_SERVER_ERROR)
286        }
287    }
288}
289
290/// GET /api/v2/analytics/endpoints
291///
292/// Get top endpoints by traffic
293#[derive(Debug, Serialize)]
294pub struct EndpointsResponse {
295    pub endpoints: Vec<EndpointData>,
296}
297
298#[derive(Debug, Serialize)]
299pub struct EndpointData {
300    pub endpoint: String,
301    pub protocol: String,
302    pub method: Option<String>,
303    pub total_requests: i64,
304    pub total_errors: i64,
305    pub error_rate: f64,
306    pub avg_latency_ms: f64,
307    pub p95_latency_ms: f64,
308    pub bytes_sent: i64,
309    pub bytes_received: i64,
310}
311
312pub async fn get_top_endpoints(
313    State(state): State<AnalyticsV2State>,
314    Query(query): Query<AnalyticsQuery>,
315) -> Result<Json<ApiResponse<EndpointsResponse>>, StatusCode> {
316    debug!("Fetching top {} endpoints", query.limit);
317
318    match state.db.get_top_endpoints(query.limit, query.workspace_id.as_deref()).await {
319        Ok(endpoints) => {
320            let endpoint_data = endpoints
321                .into_iter()
322                .map(|e| {
323                    let error_rate = if e.total_requests > 0 {
324                        (e.total_errors as f64 / e.total_requests as f64) * 100.0
325                    } else {
326                        0.0
327                    };
328
329                    EndpointData {
330                        endpoint: e.endpoint,
331                        protocol: e.protocol,
332                        method: e.method,
333                        total_requests: e.total_requests,
334                        total_errors: e.total_errors,
335                        error_rate,
336                        avg_latency_ms: e.avg_latency_ms.unwrap_or(0.0),
337                        p95_latency_ms: e.p95_latency_ms.unwrap_or(0.0),
338                        bytes_sent: e.total_bytes_sent,
339                        bytes_received: e.total_bytes_received,
340                    }
341                })
342                .collect();
343
344            Ok(Json(ApiResponse::success(EndpointsResponse {
345                endpoints: endpoint_data,
346            })))
347        }
348        Err(e) => {
349            error!("Failed to get top endpoints: {}", e);
350            Err(StatusCode::INTERNAL_SERVER_ERROR)
351        }
352    }
353}
354
355/// GET /api/v2/analytics/protocols
356///
357/// Get traffic breakdown by protocol
358#[derive(Debug, Serialize)]
359pub struct ProtocolsResponse {
360    pub protocols: Vec<ProtocolData>,
361}
362
363#[derive(Debug, Serialize)]
364pub struct ProtocolData {
365    pub protocol: String,
366    pub request_count: i64,
367    pub error_count: i64,
368    pub avg_latency_ms: f64,
369}
370
371pub async fn get_protocol_breakdown(
372    State(state): State<AnalyticsV2State>,
373    Query(query): Query<AnalyticsQuery>,
374) -> Result<Json<ApiResponse<ProtocolsResponse>>, StatusCode> {
375    debug!("Fetching protocol breakdown");
376
377    match state.db.get_top_protocols(10, query.workspace_id.as_deref()).await {
378        Ok(protocols) => {
379            let protocol_data = protocols
380                .into_iter()
381                .map(|p| ProtocolData {
382                    protocol: p.protocol,
383                    request_count: p.request_count,
384                    error_count: p.error_count,
385                    avg_latency_ms: p.avg_latency_ms,
386                })
387                .collect();
388
389            Ok(Json(ApiResponse::success(ProtocolsResponse {
390                protocols: protocol_data,
391            })))
392        }
393        Err(e) => {
394            error!("Failed to get protocol breakdown: {}", e);
395            Err(StatusCode::INTERNAL_SERVER_ERROR)
396        }
397    }
398}
399
400/// GET /api/v2/analytics/traffic-patterns
401///
402/// Get traffic patterns for heatmap visualization
403#[derive(Debug, Serialize)]
404pub struct TrafficPatternsResponse {
405    pub patterns: Vec<TrafficPatternData>,
406}
407
408#[derive(Debug, Serialize)]
409pub struct TrafficPatternData {
410    pub date: String,
411    pub hour: i32,
412    pub day_of_week: i32,
413    pub request_count: i64,
414    pub error_count: i64,
415    pub avg_latency_ms: f64,
416}
417
418#[derive(Debug, Deserialize)]
419pub struct TrafficPatternsQuery {
420    #[serde(default = "default_pattern_days")]
421    pub days: i64,
422    pub workspace_id: Option<String>,
423}
424
425fn default_pattern_days() -> i64 {
426    30
427}
428
429pub async fn get_traffic_patterns(
430    State(state): State<AnalyticsV2State>,
431    Query(query): Query<TrafficPatternsQuery>,
432) -> Result<Json<ApiResponse<TrafficPatternsResponse>>, StatusCode> {
433    debug!("Fetching traffic patterns for {} days", query.days);
434
435    match state.db.get_traffic_patterns(query.days, query.workspace_id.as_deref()).await {
436        Ok(patterns) => {
437            let pattern_data = patterns
438                .into_iter()
439                .map(|p| TrafficPatternData {
440                    date: p.date,
441                    hour: p.hour,
442                    day_of_week: p.day_of_week,
443                    request_count: p.request_count,
444                    error_count: p.error_count,
445                    avg_latency_ms: p.avg_latency_ms.unwrap_or(0.0),
446                })
447                .collect();
448
449            Ok(Json(ApiResponse::success(TrafficPatternsResponse {
450                patterns: pattern_data,
451            })))
452        }
453        Err(e) => {
454            error!("Failed to get traffic patterns: {}", e);
455            Err(StatusCode::INTERNAL_SERVER_ERROR)
456        }
457    }
458}
459
460/// GET /api/v2/analytics/export/csv
461///
462/// Export analytics data to CSV format
463pub async fn export_csv(
464    State(state): State<AnalyticsV2State>,
465    Query(query): Query<AnalyticsQuery>,
466) -> Result<(StatusCode, String), StatusCode> {
467    debug!("Exporting analytics to CSV");
468
469    let filter = query.to_filter();
470    let mut buffer = Vec::new();
471
472    match state.db.export_to_csv(&mut buffer, &filter).await {
473        Ok(_) => {
474            let csv_data = String::from_utf8(buffer).unwrap_or_default();
475            Ok((StatusCode::OK, csv_data))
476        }
477        Err(e) => {
478            error!("Failed to export to CSV: {}", e);
479            Err(StatusCode::INTERNAL_SERVER_ERROR)
480        }
481    }
482}
483
484/// GET /api/v2/analytics/export/json
485///
486/// Export analytics data to JSON format
487pub async fn export_json(
488    State(state): State<AnalyticsV2State>,
489    Query(query): Query<AnalyticsQuery>,
490) -> Result<(StatusCode, String), StatusCode> {
491    debug!("Exporting analytics to JSON");
492
493    let filter = query.to_filter();
494
495    match state.db.export_to_json(&filter).await {
496        Ok(json) => Ok((StatusCode::OK, json)),
497        Err(e) => {
498            error!("Failed to export to JSON: {}", e);
499            Err(StatusCode::INTERNAL_SERVER_ERROR)
500        }
501    }
502}
503
504#[cfg(test)]
505mod tests {
506    use super::*;
507
508    #[test]
509    fn test_analytics_query_to_filter() {
510        let query = AnalyticsQuery {
511            start_time: Some(100),
512            end_time: Some(200),
513            duration: 3600,
514            protocol: Some("HTTP".to_string()),
515            endpoint: Some("/api/test".to_string()),
516            method: Some("GET".to_string()),
517            status_code: Some(200),
518            workspace_id: None,
519            environment: Some("prod".to_string()),
520            limit: 50,
521            granularity: "minute".to_string(),
522        };
523
524        let filter = query.to_filter();
525        assert_eq!(filter.start_time, Some(100));
526        assert_eq!(filter.end_time, Some(200));
527        assert_eq!(filter.protocol, Some("HTTP".to_string()));
528        assert_eq!(filter.limit, Some(50));
529    }
530
531    #[test]
532    fn test_granularity_parsing() {
533        let query = AnalyticsQuery {
534            start_time: None,
535            end_time: None,
536            duration: 3600,
537            protocol: None,
538            endpoint: None,
539            method: None,
540            status_code: None,
541            workspace_id: None,
542            environment: None,
543            limit: 100,
544            granularity: "hour".to_string(),
545        };
546
547        assert_eq!(query.get_granularity(), Granularity::Hour);
548    }
549}