mockforge_ui/handlers/
analytics_stream.rs

1//! Real-time analytics streaming via WebSocket
2//!
3//! Provides live updates of analytics metrics to connected clients
4
5use axum::{
6    extract::{
7        ws::{Message, WebSocket},
8        State, WebSocketUpgrade,
9    },
10    response::Response,
11};
12use futures_util::{SinkExt, StreamExt};
13use mockforge_analytics::AnalyticsDatabase;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::time::interval;
18use tracing::{debug, error, info, warn};
19
20/// WebSocket analytics state
21#[derive(Clone)]
22pub struct AnalyticsStreamState {
23    pub db: Arc<AnalyticsDatabase>,
24}
25
26impl AnalyticsStreamState {
27    pub fn new(db: AnalyticsDatabase) -> Self {
28        Self { db: Arc::new(db) }
29    }
30}
31
32/// Client subscription configuration
33#[derive(Debug, Clone, Deserialize)]
34pub struct StreamConfig {
35    /// Update interval in seconds (default: 5)
36    #[serde(default = "default_interval")]
37    pub interval_seconds: u64,
38    /// Metrics duration window (default: 3600)
39    #[serde(default = "default_duration")]
40    pub duration_seconds: i64,
41    /// Protocol filter
42    pub protocol: Option<String>,
43    /// Endpoint filter
44    pub endpoint: Option<String>,
45    /// Workspace ID filter
46    pub workspace_id: Option<String>,
47}
48
49fn default_interval() -> u64 {
50    5
51}
52
53fn default_duration() -> i64 {
54    3600
55}
56
57impl Default for StreamConfig {
58    fn default() -> Self {
59        Self {
60            interval_seconds: default_interval(),
61            duration_seconds: default_duration(),
62            protocol: None,
63            endpoint: None,
64            workspace_id: None,
65        }
66    }
67}
68
69/// Streamed metrics update
70#[derive(Debug, Serialize)]
71pub struct MetricsUpdate {
72    pub timestamp: i64,
73    pub total_requests: i64,
74    pub total_errors: i64,
75    pub error_rate: f64,
76    pub avg_latency_ms: f64,
77    pub p95_latency_ms: f64,
78    pub p99_latency_ms: f64,
79    pub active_connections: i64,
80    pub requests_per_second: f64,
81}
82
83/// WebSocket handler for analytics streaming
84pub async fn analytics_websocket_handler(
85    ws: WebSocketUpgrade,
86    State(state): State<AnalyticsStreamState>,
87) -> Response {
88    ws.on_upgrade(move |socket| handle_analytics_socket(socket, state))
89}
90
91async fn handle_analytics_socket(socket: WebSocket, state: AnalyticsStreamState) {
92    let (mut sender, mut receiver) = socket.split();
93
94    info!("Analytics WebSocket client connected");
95
96    // Default configuration
97    let mut config = StreamConfig::default();
98
99    // Spawn a task to handle incoming messages (config updates, ping/pong)
100    let config_clone = Arc::new(tokio::sync::Mutex::new(config.clone()));
101    let config_update_handle = {
102        let config_clone = Arc::clone(&config_clone);
103        tokio::spawn(async move {
104            while let Some(msg) = receiver.next().await {
105                match msg {
106                    Ok(Message::Text(text)) => {
107                        // Try to parse as config update
108                        if let Ok(new_config) = serde_json::from_str::<StreamConfig>(&text) {
109                            debug!("Received config update: {:?}", new_config);
110                            let mut cfg = config_clone.lock().await;
111                            *cfg = new_config;
112                        }
113                    }
114                    Ok(Message::Ping(data)) => {
115                        debug!("Received ping");
116                        // Pong is handled automatically by axum
117                    }
118                    Ok(Message::Close(_)) => {
119                        info!("Client requested close");
120                        break;
121                    }
122                    Err(e) => {
123                        warn!("WebSocket error: {}", e);
124                        break;
125                    }
126                    _ => {}
127                }
128            }
129        })
130    };
131
132    // Spawn a task to send periodic updates
133    let update_task = tokio::spawn(async move {
134        // Wait for initial config
135        tokio::time::sleep(Duration::from_millis(100)).await;
136
137        loop {
138            // Get current config
139            let current_config = {
140                let cfg = config_clone.lock().await;
141                cfg.clone()
142            };
143
144            // Create interval based on config
145            let mut tick_interval = interval(Duration::from_secs(current_config.interval_seconds));
146            tick_interval.tick().await; // First tick completes immediately
147
148            // Fetch and send metrics
149            match state.db.get_overview_metrics(current_config.duration_seconds).await {
150                Ok(overview) => {
151                    let update = MetricsUpdate {
152                        timestamp: chrono::Utc::now().timestamp(),
153                        total_requests: overview.total_requests,
154                        total_errors: overview.total_errors,
155                        error_rate: overview.error_rate,
156                        avg_latency_ms: overview.avg_latency_ms,
157                        p95_latency_ms: overview.p95_latency_ms,
158                        p99_latency_ms: overview.p99_latency_ms,
159                        active_connections: overview.active_connections,
160                        requests_per_second: overview.requests_per_second,
161                    };
162
163                    if let Ok(json) = serde_json::to_string(&update) {
164                        if sender.send(Message::Text(json.into())).await.is_err() {
165                            error!("Failed to send update to client");
166                            break;
167                        }
168                    }
169                }
170                Err(e) => {
171                    error!("Failed to get overview metrics: {}", e);
172                    // Continue trying
173                }
174            }
175
176            // Wait for next interval
177            tick_interval.tick().await;
178        }
179    });
180
181    // Wait for either task to complete
182    tokio::select! {
183        _ = config_update_handle => {
184            debug!("Config update handler completed");
185        }
186        _ = update_task => {
187            debug!("Update task completed");
188        }
189    }
190
191    info!("Analytics WebSocket client disconnected");
192}
193
194#[cfg(test)]
195mod tests {
196    use super::*;
197
198    #[test]
199    fn test_default_stream_config() {
200        let config = StreamConfig::default();
201        assert_eq!(config.interval_seconds, 5);
202        assert_eq!(config.duration_seconds, 3600);
203    }
204
205    #[test]
206    fn test_stream_config_parsing() {
207        let json = r#"{
208            "interval_seconds": 10,
209            "duration_seconds": 7200,
210            "protocol": "HTTP"
211        }"#;
212
213        let config: StreamConfig = serde_json::from_str(json).unwrap();
214        assert_eq!(config.interval_seconds, 10);
215        assert_eq!(config.duration_seconds, 7200);
216        assert_eq!(config.protocol, Some("HTTP".to_string()));
217    }
218}