mockforge_http/
management_ws.rs

1/// WebSocket interface for live mock updates
2///
3/// Provides real-time notifications when mocks are created, updated, or deleted.
4/// Used by developer tools like VS Code extension for live synchronization.
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16/// Events that can be broadcasted to WebSocket clients
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20    /// Mock was created
21    MockCreated {
22        /// The created mock configuration
23        mock: super::management::MockConfig,
24        /// ISO 8601 timestamp of the event
25        timestamp: String,
26    },
27    /// Mock was updated
28    MockUpdated {
29        /// The updated mock configuration
30        mock: super::management::MockConfig,
31        /// ISO 8601 timestamp of the event
32        timestamp: String,
33    },
34    /// Mock was deleted
35    MockDeleted {
36        /// ID of the deleted mock
37        id: String,
38        /// ISO 8601 timestamp of the event
39        timestamp: String,
40    },
41    /// Server statistics changed
42    StatsUpdated {
43        /// Updated server statistics
44        stats: super::management::ServerStats,
45        /// ISO 8601 timestamp of the event
46        timestamp: String,
47    },
48    /// Connection established confirmation
49    Connected {
50        /// Connection confirmation message
51        message: String,
52        /// ISO 8601 timestamp of the event
53        timestamp: String,
54    },
55}
56
57impl MockEvent {
58    /// Create a mock created event
59    pub fn mock_created(mock: super::management::MockConfig) -> Self {
60        Self::MockCreated {
61            mock,
62            timestamp: chrono::Utc::now().to_rfc3339(),
63        }
64    }
65
66    /// Create a mock updated event
67    pub fn mock_updated(mock: super::management::MockConfig) -> Self {
68        Self::MockUpdated {
69            mock,
70            timestamp: chrono::Utc::now().to_rfc3339(),
71        }
72    }
73
74    /// Create a mock deleted event
75    pub fn mock_deleted(id: String) -> Self {
76        Self::MockDeleted {
77            id,
78            timestamp: chrono::Utc::now().to_rfc3339(),
79        }
80    }
81
82    /// Create a stats updated event
83    pub fn stats_updated(stats: super::management::ServerStats) -> Self {
84        Self::StatsUpdated {
85            stats,
86            timestamp: chrono::Utc::now().to_rfc3339(),
87        }
88    }
89
90    /// Create a connection established event
91    pub fn connected(message: String) -> Self {
92        Self::Connected {
93            message,
94            timestamp: chrono::Utc::now().to_rfc3339(),
95        }
96    }
97}
98
99/// Shared state for WebSocket management
100#[derive(Clone)]
101pub struct WsManagementState {
102    /// Broadcast channel for sending events to all connected clients
103    pub tx: broadcast::Sender<MockEvent>,
104}
105
106impl WsManagementState {
107    /// Create a new WebSocket management state with broadcast channel
108    pub fn new() -> Self {
109        let (tx, _) = broadcast::channel(100);
110        Self { tx }
111    }
112
113    /// Broadcast an event to all connected clients
114    pub fn broadcast(
115        &self,
116        event: MockEvent,
117    ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
118        self.tx.send(event).map_err(Box::new)
119    }
120}
121
122impl Default for WsManagementState {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128/// WebSocket upgrade handler
129async fn ws_handler(
130    ws: WebSocketUpgrade,
131    State(state): State<WsManagementState>,
132) -> impl IntoResponse {
133    ws.on_upgrade(move |socket| handle_socket(socket, state))
134}
135
136/// Handle a WebSocket connection
137async fn handle_socket(socket: WebSocket, state: WsManagementState) {
138    let (mut sender, mut receiver) = socket.split();
139
140    // Subscribe to broadcast channel
141    let mut rx = state.tx.subscribe();
142
143    // Send initial connection confirmation
144    let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
145    if let Ok(json) = serde_json::to_string(&connected_event) {
146        if sender.send(Message::Text(json.into())).await.is_err() {
147            return;
148        }
149    }
150
151    // Spawn a task to forward broadcast messages to this client
152    let mut send_task = tokio::spawn(async move {
153        while let Ok(event) = rx.recv().await {
154            if let Ok(json) = serde_json::to_string(&event) {
155                if sender.send(Message::Text(json.into())).await.is_err() {
156                    break;
157                }
158            }
159        }
160    });
161
162    // Handle incoming messages from client (for now, just keep connection alive)
163    let mut recv_task = tokio::spawn(async move {
164        while let Some(Ok(msg)) = receiver.next().await {
165            match msg {
166                Message::Text(text) => {
167                    debug!("Received WebSocket message: {}", text);
168                    // Could handle client commands here in the future
169                }
170                Message::Close(_) => {
171                    info!("WebSocket client disconnected");
172                    break;
173                }
174                _ => {}
175            }
176        }
177    });
178
179    // Wait for either task to finish
180    tokio::select! {
181        _ = &mut send_task => {
182            debug!("Send task completed");
183            recv_task.abort();
184        }
185        _ = &mut recv_task => {
186            debug!("Receive task completed");
187            send_task.abort();
188        }
189    }
190}
191
192/// Build the WebSocket management router
193pub fn ws_management_router(state: WsManagementState) -> Router {
194    Router::new().route("/", get(ws_handler)).with_state(state)
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200
201    #[test]
202    fn test_ws_management_state_creation() {
203        let _state = WsManagementState::new();
204        // Should be able to create state without errors
205    }
206
207    #[test]
208    fn test_mock_event_creation() {
209        use super::super::management::{MockConfig, MockResponse};
210
211        let mock = MockConfig {
212            id: "test-1".to_string(),
213            name: "Test Mock".to_string(),
214            method: "GET".to_string(),
215            path: "/test".to_string(),
216            response: MockResponse {
217                body: serde_json::json!({"message": "test"}),
218                headers: None,
219            },
220            enabled: true,
221            latency_ms: None,
222            status_code: Some(200),
223        };
224
225        let event = MockEvent::mock_created(mock);
226
227        // Should serialize successfully
228        let json = serde_json::to_string(&event).unwrap();
229        assert!(json.contains("mock_created"));
230    }
231
232    #[test]
233    fn test_broadcast_event() {
234        let state = WsManagementState::new();
235
236        let event = MockEvent::connected("Test connection".to_string());
237
238        // Should be able to send even with no subscribers
239        let result = state.broadcast(event);
240        // With no subscribers, this returns Err with the number of subscribers (0)
241        assert!(result.is_err() || result.is_ok());
242    }
243
244    #[tokio::test]
245    async fn test_ws_management_router_creation() {
246        let state = WsManagementState::new();
247        let _router = ws_management_router(state);
248        // Router should be created successfully
249    }
250}