mockforge_http/
management_ws.rs

1/// WebSocket interface for live mock updates
2///
3/// Provides real-time notifications when mocks are created, updated, or deleted.
4/// Used by developer tools like VS Code extension for live synchronization.
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16/// Default broadcast channel capacity for WebSocket mock events
17const DEFAULT_WS_BROADCAST_CAPACITY: usize = 100;
18
19/// Get the WebSocket broadcast channel capacity from environment or use default
20fn get_ws_broadcast_capacity() -> usize {
21    std::env::var("MOCKFORGE_WS_BROADCAST_CAPACITY")
22        .ok()
23        .and_then(|s| s.parse().ok())
24        .unwrap_or(DEFAULT_WS_BROADCAST_CAPACITY)
25}
26
27/// Events that can be broadcasted to WebSocket clients
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "type", rename_all = "snake_case")]
30pub enum MockEvent {
31    /// Mock was created
32    MockCreated {
33        /// The created mock configuration
34        mock: super::management::MockConfig,
35        /// ISO 8601 timestamp of the event
36        timestamp: String,
37    },
38    /// Mock was updated
39    MockUpdated {
40        /// The updated mock configuration
41        mock: super::management::MockConfig,
42        /// ISO 8601 timestamp of the event
43        timestamp: String,
44    },
45    /// Mock was deleted
46    MockDeleted {
47        /// ID of the deleted mock
48        id: String,
49        /// ISO 8601 timestamp of the event
50        timestamp: String,
51    },
52    /// Server statistics changed
53    StatsUpdated {
54        /// Updated server statistics
55        stats: super::management::ServerStats,
56        /// ISO 8601 timestamp of the event
57        timestamp: String,
58    },
59    /// Connection established confirmation
60    Connected {
61        /// Connection confirmation message
62        message: String,
63        /// ISO 8601 timestamp of the event
64        timestamp: String,
65    },
66    /// State machine was created or updated
67    StateMachineUpdated {
68        /// Resource type of the state machine
69        resource_type: String,
70        /// The state machine definition
71        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
72        /// ISO 8601 timestamp of the event
73        timestamp: String,
74    },
75    /// State machine was deleted
76    StateMachineDeleted {
77        /// Resource type of the deleted state machine
78        resource_type: String,
79        /// ISO 8601 timestamp of the event
80        timestamp: String,
81    },
82    /// State instance was created
83    StateInstanceCreated {
84        /// Resource ID
85        resource_id: String,
86        /// Resource type
87        resource_type: String,
88        /// Initial state
89        initial_state: String,
90        /// ISO 8601 timestamp of the event
91        timestamp: String,
92    },
93    /// State transition occurred
94    StateTransitioned {
95        /// Resource ID
96        resource_id: String,
97        /// Resource type
98        resource_type: String,
99        /// Previous state
100        from_state: String,
101        /// New state
102        to_state: String,
103        /// Current state data
104        state_data: std::collections::HashMap<String, serde_json::Value>,
105        /// ISO 8601 timestamp of the event
106        timestamp: String,
107    },
108    /// State instance was deleted
109    StateInstanceDeleted {
110        /// Resource ID
111        resource_id: String,
112        /// Resource type
113        resource_type: String,
114        /// ISO 8601 timestamp of the event
115        timestamp: String,
116    },
117}
118
119impl MockEvent {
120    /// Create a mock created event
121    pub fn mock_created(mock: super::management::MockConfig) -> Self {
122        Self::MockCreated {
123            mock,
124            timestamp: chrono::Utc::now().to_rfc3339(),
125        }
126    }
127
128    /// Create a mock updated event
129    pub fn mock_updated(mock: super::management::MockConfig) -> Self {
130        Self::MockUpdated {
131            mock,
132            timestamp: chrono::Utc::now().to_rfc3339(),
133        }
134    }
135
136    /// Create a mock deleted event
137    pub fn mock_deleted(id: String) -> Self {
138        Self::MockDeleted {
139            id,
140            timestamp: chrono::Utc::now().to_rfc3339(),
141        }
142    }
143
144    /// Create a stats updated event
145    pub fn stats_updated(stats: super::management::ServerStats) -> Self {
146        Self::StatsUpdated {
147            stats,
148            timestamp: chrono::Utc::now().to_rfc3339(),
149        }
150    }
151
152    /// Create a connection established event
153    pub fn connected(message: String) -> Self {
154        Self::Connected {
155            message,
156            timestamp: chrono::Utc::now().to_rfc3339(),
157        }
158    }
159
160    /// Create a state machine updated event
161    pub fn state_machine_updated(
162        resource_type: String,
163        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
164    ) -> Self {
165        Self::StateMachineUpdated {
166            resource_type,
167            state_machine,
168            timestamp: chrono::Utc::now().to_rfc3339(),
169        }
170    }
171
172    /// Create a state machine deleted event
173    pub fn state_machine_deleted(resource_type: String) -> Self {
174        Self::StateMachineDeleted {
175            resource_type,
176            timestamp: chrono::Utc::now().to_rfc3339(),
177        }
178    }
179
180    /// Create a state instance created event
181    pub fn state_instance_created(
182        resource_id: String,
183        resource_type: String,
184        initial_state: String,
185    ) -> Self {
186        Self::StateInstanceCreated {
187            resource_id,
188            resource_type,
189            initial_state,
190            timestamp: chrono::Utc::now().to_rfc3339(),
191        }
192    }
193
194    /// Create a state transitioned event
195    pub fn state_transitioned(
196        resource_id: String,
197        resource_type: String,
198        from_state: String,
199        to_state: String,
200        state_data: std::collections::HashMap<String, serde_json::Value>,
201    ) -> Self {
202        Self::StateTransitioned {
203            resource_id,
204            resource_type,
205            from_state,
206            to_state,
207            state_data,
208            timestamp: chrono::Utc::now().to_rfc3339(),
209        }
210    }
211
212    /// Create a state instance deleted event
213    pub fn state_instance_deleted(resource_id: String, resource_type: String) -> Self {
214        Self::StateInstanceDeleted {
215            resource_id,
216            resource_type,
217            timestamp: chrono::Utc::now().to_rfc3339(),
218        }
219    }
220}
221
222/// Shared state for WebSocket management
223#[derive(Clone)]
224pub struct WsManagementState {
225    /// Broadcast channel for sending events to all connected clients
226    pub tx: broadcast::Sender<MockEvent>,
227}
228
229impl WsManagementState {
230    /// Create a new WebSocket management state with broadcast channel
231    ///
232    /// The broadcast channel capacity can be configured via the
233    /// `MOCKFORGE_WS_BROADCAST_CAPACITY` environment variable.
234    pub fn new() -> Self {
235        let capacity = get_ws_broadcast_capacity();
236        let (tx, _) = broadcast::channel(capacity);
237        Self { tx }
238    }
239
240    /// Broadcast an event to all connected clients
241    pub fn broadcast(
242        &self,
243        event: MockEvent,
244    ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
245        self.tx.send(event).map_err(Box::new)
246    }
247}
248
249impl Default for WsManagementState {
250    fn default() -> Self {
251        Self::new()
252    }
253}
254
255/// WebSocket upgrade handler
256async fn ws_handler(
257    ws: WebSocketUpgrade,
258    State(state): State<WsManagementState>,
259) -> impl IntoResponse {
260    ws.on_upgrade(move |socket| handle_socket(socket, state))
261}
262
263/// Handle a WebSocket connection
264async fn handle_socket(socket: WebSocket, state: WsManagementState) {
265    let (mut sender, mut receiver) = socket.split();
266
267    // Subscribe to broadcast channel
268    let mut rx = state.tx.subscribe();
269
270    // Send initial connection confirmation
271    let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
272    if let Ok(json) = serde_json::to_string(&connected_event) {
273        if sender.send(Message::Text(json.into())).await.is_err() {
274            return;
275        }
276    }
277
278    // Spawn a task to forward broadcast messages to this client
279    let mut send_task = tokio::spawn(async move {
280        while let Ok(event) = rx.recv().await {
281            if let Ok(json) = serde_json::to_string(&event) {
282                if sender.send(Message::Text(json.into())).await.is_err() {
283                    break;
284                }
285            }
286        }
287    });
288
289    // Handle incoming messages from client (for now, just keep connection alive)
290    let mut recv_task = tokio::spawn(async move {
291        while let Some(Ok(msg)) = receiver.next().await {
292            match msg {
293                Message::Text(text) => {
294                    debug!("Received WebSocket message: {}", text);
295                    // Could handle client commands here in the future
296                }
297                Message::Close(_) => {
298                    info!("WebSocket client disconnected");
299                    break;
300                }
301                _ => {}
302            }
303        }
304    });
305
306    // Wait for either task to finish
307    tokio::select! {
308        _ = &mut send_task => {
309            debug!("Send task completed");
310            recv_task.abort();
311        }
312        _ = &mut recv_task => {
313            debug!("Receive task completed");
314            send_task.abort();
315        }
316    }
317}
318
319/// Build the WebSocket management router
320pub fn ws_management_router(state: WsManagementState) -> Router {
321    Router::new().route("/", get(ws_handler)).with_state(state)
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_ws_management_state_creation() {
330        let _state = WsManagementState::new();
331        // Should be able to create state without errors
332    }
333
334    #[test]
335    fn test_mock_event_creation() {
336        use super::super::management::{MockConfig, MockResponse};
337
338        let mock = MockConfig {
339            id: "test-1".to_string(),
340            name: "Test Mock".to_string(),
341            method: "GET".to_string(),
342            path: "/test".to_string(),
343            response: MockResponse {
344                body: serde_json::json!({"message": "test"}),
345                headers: None,
346            },
347            enabled: true,
348            latency_ms: None,
349            status_code: Some(200),
350            request_match: None,
351            priority: None,
352            scenario: None,
353            required_scenario_state: None,
354            new_scenario_state: None,
355        };
356
357        let event = MockEvent::mock_created(mock);
358
359        // Should serialize successfully
360        let json = serde_json::to_string(&event).unwrap();
361        assert!(json.contains("mock_created"));
362    }
363
364    #[test]
365    fn test_broadcast_event() {
366        let state = WsManagementState::new();
367
368        let event = MockEvent::connected("Test connection".to_string());
369
370        // Should be able to send even with no subscribers
371        let result = state.broadcast(event);
372        // With no subscribers, this returns Err with the number of subscribers (0)
373        assert!(result.is_err() || result.is_ok());
374    }
375
376    #[tokio::test]
377    async fn test_ws_management_router_creation() {
378        let state = WsManagementState::new();
379        let _router = ws_management_router(state);
380        // Router should be created successfully
381    }
382}