mockforge_ui/handlers/
analytics_stream.rs1use axum::{
6 extract::{
7 ws::{Message, WebSocket},
8 State, WebSocketUpgrade,
9 },
10 response::Response,
11};
12use futures_util::{SinkExt, StreamExt};
13use mockforge_analytics::AnalyticsDatabase;
14use serde::{Deserialize, Serialize};
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::time::interval;
18use tracing::{debug, error, info, warn};
19
20#[derive(Clone)]
22pub struct AnalyticsStreamState {
23 pub db: Arc<AnalyticsDatabase>,
24}
25
26impl AnalyticsStreamState {
27 pub fn new(db: AnalyticsDatabase) -> Self {
28 Self { db: Arc::new(db) }
29 }
30}
31
32#[derive(Debug, Clone, Deserialize)]
34pub struct StreamConfig {
35 #[serde(default = "default_interval")]
37 pub interval_seconds: u64,
38 #[serde(default = "default_duration")]
40 pub duration_seconds: i64,
41 pub protocol: Option<String>,
43 pub endpoint: Option<String>,
45 pub workspace_id: Option<String>,
47}
48
49fn default_interval() -> u64 {
50 5
51}
52
53fn default_duration() -> i64 {
54 3600
55}
56
57impl Default for StreamConfig {
58 fn default() -> Self {
59 Self {
60 interval_seconds: default_interval(),
61 duration_seconds: default_duration(),
62 protocol: None,
63 endpoint: None,
64 workspace_id: None,
65 }
66 }
67}
68
69#[derive(Debug, Serialize)]
71pub struct MetricsUpdate {
72 pub timestamp: i64,
73 pub total_requests: i64,
74 pub total_errors: i64,
75 pub error_rate: f64,
76 pub avg_latency_ms: f64,
77 pub p95_latency_ms: f64,
78 pub p99_latency_ms: f64,
79 pub active_connections: i64,
80 pub requests_per_second: f64,
81}
82
83pub async fn analytics_websocket_handler(
85 ws: WebSocketUpgrade,
86 State(state): State<AnalyticsStreamState>,
87) -> Response {
88 ws.on_upgrade(move |socket| handle_analytics_socket(socket, state))
89}
90
91async fn handle_analytics_socket(socket: WebSocket, state: AnalyticsStreamState) {
92 let (mut sender, mut receiver) = socket.split();
93
94 info!("Analytics WebSocket client connected");
95
96 let mut config = StreamConfig::default();
98
99 let config_clone = Arc::new(tokio::sync::Mutex::new(config.clone()));
101 let config_update_handle = {
102 let config_clone = Arc::clone(&config_clone);
103 tokio::spawn(async move {
104 while let Some(msg) = receiver.next().await {
105 match msg {
106 Ok(Message::Text(text)) => {
107 if let Ok(new_config) = serde_json::from_str::<StreamConfig>(&text) {
109 debug!("Received config update: {:?}", new_config);
110 let mut cfg = config_clone.lock().await;
111 *cfg = new_config;
112 }
113 }
114 Ok(Message::Ping(data)) => {
115 debug!("Received ping");
116 }
118 Ok(Message::Close(_)) => {
119 info!("Client requested close");
120 break;
121 }
122 Err(e) => {
123 warn!("WebSocket error: {}", e);
124 break;
125 }
126 _ => {}
127 }
128 }
129 })
130 };
131
132 let update_task = tokio::spawn(async move {
134 tokio::time::sleep(Duration::from_millis(100)).await;
136
137 loop {
138 let current_config = {
140 let cfg = config_clone.lock().await;
141 cfg.clone()
142 };
143
144 let mut tick_interval = interval(Duration::from_secs(current_config.interval_seconds));
146 tick_interval.tick().await; match state.db.get_overview_metrics(current_config.duration_seconds).await {
150 Ok(overview) => {
151 let update = MetricsUpdate {
152 timestamp: chrono::Utc::now().timestamp(),
153 total_requests: overview.total_requests,
154 total_errors: overview.total_errors,
155 error_rate: overview.error_rate,
156 avg_latency_ms: overview.avg_latency_ms,
157 p95_latency_ms: overview.p95_latency_ms,
158 p99_latency_ms: overview.p99_latency_ms,
159 active_connections: overview.active_connections,
160 requests_per_second: overview.requests_per_second,
161 };
162
163 if let Ok(json) = serde_json::to_string(&update) {
164 if sender.send(Message::Text(json.into())).await.is_err() {
165 error!("Failed to send update to client");
166 break;
167 }
168 }
169 }
170 Err(e) => {
171 error!("Failed to get overview metrics: {}", e);
172 }
174 }
175
176 tick_interval.tick().await;
178 }
179 });
180
181 tokio::select! {
183 _ = config_update_handle => {
184 debug!("Config update handler completed");
185 }
186 _ = update_task => {
187 debug!("Update task completed");
188 }
189 }
190
191 info!("Analytics WebSocket client disconnected");
192}
193
194#[cfg(test)]
195mod tests {
196 use super::*;
197
198 #[test]
199 fn test_default_stream_config() {
200 let config = StreamConfig::default();
201 assert_eq!(config.interval_seconds, 5);
202 assert_eq!(config.duration_seconds, 3600);
203 }
204
205 #[test]
206 fn test_stream_config_parsing() {
207 let json = r#"{
208 "interval_seconds": 10,
209 "duration_seconds": 7200,
210 "protocol": "HTTP"
211 }"#;
212
213 let config: StreamConfig = serde_json::from_str(json).unwrap();
214 assert_eq!(config.interval_seconds, 10);
215 assert_eq!(config.duration_seconds, 7200);
216 assert_eq!(config.protocol, Some("HTTP".to_string()));
217 }
218}