mockforge_http/
management_ws.rs

1/// WebSocket interface for live mock updates
2///
3/// Provides real-time notifications when mocks are created, updated, or deleted.
4/// Used by developer tools like VS Code extension for live synchronization.
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16/// Events that can be broadcasted to WebSocket clients
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20    /// Mock was created
21    MockCreated {
22        /// The created mock configuration
23        mock: super::management::MockConfig,
24        /// ISO 8601 timestamp of the event
25        timestamp: String,
26    },
27    /// Mock was updated
28    MockUpdated {
29        /// The updated mock configuration
30        mock: super::management::MockConfig,
31        /// ISO 8601 timestamp of the event
32        timestamp: String,
33    },
34    /// Mock was deleted
35    MockDeleted {
36        /// ID of the deleted mock
37        id: String,
38        /// ISO 8601 timestamp of the event
39        timestamp: String,
40    },
41    /// Server statistics changed
42    StatsUpdated {
43        /// Updated server statistics
44        stats: super::management::ServerStats,
45        /// ISO 8601 timestamp of the event
46        timestamp: String,
47    },
48    /// Connection established confirmation
49    Connected {
50        /// Connection confirmation message
51        message: String,
52        /// ISO 8601 timestamp of the event
53        timestamp: String,
54    },
55    /// State machine was created or updated
56    StateMachineUpdated {
57        /// Resource type of the state machine
58        resource_type: String,
59        /// The state machine definition
60        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
61        /// ISO 8601 timestamp of the event
62        timestamp: String,
63    },
64    /// State machine was deleted
65    StateMachineDeleted {
66        /// Resource type of the deleted state machine
67        resource_type: String,
68        /// ISO 8601 timestamp of the event
69        timestamp: String,
70    },
71    /// State instance was created
72    StateInstanceCreated {
73        /// Resource ID
74        resource_id: String,
75        /// Resource type
76        resource_type: String,
77        /// Initial state
78        initial_state: String,
79        /// ISO 8601 timestamp of the event
80        timestamp: String,
81    },
82    /// State transition occurred
83    StateTransitioned {
84        /// Resource ID
85        resource_id: String,
86        /// Resource type
87        resource_type: String,
88        /// Previous state
89        from_state: String,
90        /// New state
91        to_state: String,
92        /// Current state data
93        state_data: std::collections::HashMap<String, serde_json::Value>,
94        /// ISO 8601 timestamp of the event
95        timestamp: String,
96    },
97    /// State instance was deleted
98    StateInstanceDeleted {
99        /// Resource ID
100        resource_id: String,
101        /// Resource type
102        resource_type: String,
103        /// ISO 8601 timestamp of the event
104        timestamp: String,
105    },
106}
107
108impl MockEvent {
109    /// Create a mock created event
110    pub fn mock_created(mock: super::management::MockConfig) -> Self {
111        Self::MockCreated {
112            mock,
113            timestamp: chrono::Utc::now().to_rfc3339(),
114        }
115    }
116
117    /// Create a mock updated event
118    pub fn mock_updated(mock: super::management::MockConfig) -> Self {
119        Self::MockUpdated {
120            mock,
121            timestamp: chrono::Utc::now().to_rfc3339(),
122        }
123    }
124
125    /// Create a mock deleted event
126    pub fn mock_deleted(id: String) -> Self {
127        Self::MockDeleted {
128            id,
129            timestamp: chrono::Utc::now().to_rfc3339(),
130        }
131    }
132
133    /// Create a stats updated event
134    pub fn stats_updated(stats: super::management::ServerStats) -> Self {
135        Self::StatsUpdated {
136            stats,
137            timestamp: chrono::Utc::now().to_rfc3339(),
138        }
139    }
140
141    /// Create a connection established event
142    pub fn connected(message: String) -> Self {
143        Self::Connected {
144            message,
145            timestamp: chrono::Utc::now().to_rfc3339(),
146        }
147    }
148
149    /// Create a state machine updated event
150    pub fn state_machine_updated(
151        resource_type: String,
152        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
153    ) -> Self {
154        Self::StateMachineUpdated {
155            resource_type,
156            state_machine,
157            timestamp: chrono::Utc::now().to_rfc3339(),
158        }
159    }
160
161    /// Create a state machine deleted event
162    pub fn state_machine_deleted(resource_type: String) -> Self {
163        Self::StateMachineDeleted {
164            resource_type,
165            timestamp: chrono::Utc::now().to_rfc3339(),
166        }
167    }
168
169    /// Create a state instance created event
170    pub fn state_instance_created(
171        resource_id: String,
172        resource_type: String,
173        initial_state: String,
174    ) -> Self {
175        Self::StateInstanceCreated {
176            resource_id,
177            resource_type,
178            initial_state,
179            timestamp: chrono::Utc::now().to_rfc3339(),
180        }
181    }
182
183    /// Create a state transitioned event
184    pub fn state_transitioned(
185        resource_id: String,
186        resource_type: String,
187        from_state: String,
188        to_state: String,
189        state_data: std::collections::HashMap<String, serde_json::Value>,
190    ) -> Self {
191        Self::StateTransitioned {
192            resource_id,
193            resource_type,
194            from_state,
195            to_state,
196            state_data,
197            timestamp: chrono::Utc::now().to_rfc3339(),
198        }
199    }
200
201    /// Create a state instance deleted event
202    pub fn state_instance_deleted(resource_id: String, resource_type: String) -> Self {
203        Self::StateInstanceDeleted {
204            resource_id,
205            resource_type,
206            timestamp: chrono::Utc::now().to_rfc3339(),
207        }
208    }
209}
210
211/// Shared state for WebSocket management
212#[derive(Clone)]
213pub struct WsManagementState {
214    /// Broadcast channel for sending events to all connected clients
215    pub tx: broadcast::Sender<MockEvent>,
216}
217
218impl WsManagementState {
219    /// Create a new WebSocket management state with broadcast channel
220    pub fn new() -> Self {
221        let (tx, _) = broadcast::channel(100);
222        Self { tx }
223    }
224
225    /// Broadcast an event to all connected clients
226    pub fn broadcast(
227        &self,
228        event: MockEvent,
229    ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
230        self.tx.send(event).map_err(Box::new)
231    }
232}
233
234impl Default for WsManagementState {
235    fn default() -> Self {
236        Self::new()
237    }
238}
239
240/// WebSocket upgrade handler
241async fn ws_handler(
242    ws: WebSocketUpgrade,
243    State(state): State<WsManagementState>,
244) -> impl IntoResponse {
245    ws.on_upgrade(move |socket| handle_socket(socket, state))
246}
247
248/// Handle a WebSocket connection
249async fn handle_socket(socket: WebSocket, state: WsManagementState) {
250    let (mut sender, mut receiver) = socket.split();
251
252    // Subscribe to broadcast channel
253    let mut rx = state.tx.subscribe();
254
255    // Send initial connection confirmation
256    let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
257    if let Ok(json) = serde_json::to_string(&connected_event) {
258        if sender.send(Message::Text(json.into())).await.is_err() {
259            return;
260        }
261    }
262
263    // Spawn a task to forward broadcast messages to this client
264    let mut send_task = tokio::spawn(async move {
265        while let Ok(event) = rx.recv().await {
266            if let Ok(json) = serde_json::to_string(&event) {
267                if sender.send(Message::Text(json.into())).await.is_err() {
268                    break;
269                }
270            }
271        }
272    });
273
274    // Handle incoming messages from client (for now, just keep connection alive)
275    let mut recv_task = tokio::spawn(async move {
276        while let Some(Ok(msg)) = receiver.next().await {
277            match msg {
278                Message::Text(text) => {
279                    debug!("Received WebSocket message: {}", text);
280                    // Could handle client commands here in the future
281                }
282                Message::Close(_) => {
283                    info!("WebSocket client disconnected");
284                    break;
285                }
286                _ => {}
287            }
288        }
289    });
290
291    // Wait for either task to finish
292    tokio::select! {
293        _ = &mut send_task => {
294            debug!("Send task completed");
295            recv_task.abort();
296        }
297        _ = &mut recv_task => {
298            debug!("Receive task completed");
299            send_task.abort();
300        }
301    }
302}
303
304/// Build the WebSocket management router
305pub fn ws_management_router(state: WsManagementState) -> Router {
306    Router::new().route("/", get(ws_handler)).with_state(state)
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_ws_management_state_creation() {
315        let _state = WsManagementState::new();
316        // Should be able to create state without errors
317    }
318
319    #[test]
320    fn test_mock_event_creation() {
321        use super::super::management::{MockConfig, MockResponse};
322
323        let mock = MockConfig {
324            id: "test-1".to_string(),
325            name: "Test Mock".to_string(),
326            method: "GET".to_string(),
327            path: "/test".to_string(),
328            response: MockResponse {
329                body: serde_json::json!({"message": "test"}),
330                headers: None,
331            },
332            enabled: true,
333            latency_ms: None,
334            status_code: Some(200),
335            request_match: None,
336            priority: None,
337            scenario: None,
338            required_scenario_state: None,
339            new_scenario_state: None,
340        };
341
342        let event = MockEvent::mock_created(mock);
343
344        // Should serialize successfully
345        let json = serde_json::to_string(&event).unwrap();
346        assert!(json.contains("mock_created"));
347    }
348
349    #[test]
350    fn test_broadcast_event() {
351        let state = WsManagementState::new();
352
353        let event = MockEvent::connected("Test connection".to_string());
354
355        // Should be able to send even with no subscribers
356        let result = state.broadcast(event);
357        // With no subscribers, this returns Err with the number of subscribers (0)
358        assert!(result.is_err() || result.is_ok());
359    }
360
361    #[tokio::test]
362    async fn test_ws_management_router_creation() {
363        let state = WsManagementState::new();
364        let _router = ws_management_router(state);
365        // Router should be created successfully
366    }
367}