Skip to main content

mockforge_http/
management_ws.rs

1/// WebSocket interface for live mock updates
2///
3/// Provides real-time notifications when mocks are created, updated, or deleted.
4/// Used by developer tools like VS Code extension for live synchronization.
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16/// Default broadcast channel capacity for WebSocket mock events
17const DEFAULT_WS_BROADCAST_CAPACITY: usize = 1024;
18
19/// Get the WebSocket broadcast channel capacity from environment or use default
20fn get_ws_broadcast_capacity() -> usize {
21    std::env::var("MOCKFORGE_WS_BROADCAST_CAPACITY")
22        .ok()
23        .and_then(|s| s.parse().ok())
24        .unwrap_or(DEFAULT_WS_BROADCAST_CAPACITY)
25}
26
27/// Events that can be broadcasted to WebSocket clients
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "type", rename_all = "snake_case")]
30pub enum MockEvent {
31    /// Mock was created
32    MockCreated {
33        /// The created mock configuration
34        mock: super::management::MockConfig,
35        /// ISO 8601 timestamp of the event
36        timestamp: String,
37    },
38    /// Mock was updated
39    MockUpdated {
40        /// The updated mock configuration
41        mock: super::management::MockConfig,
42        /// ISO 8601 timestamp of the event
43        timestamp: String,
44    },
45    /// Mock was deleted
46    MockDeleted {
47        /// ID of the deleted mock
48        id: String,
49        /// ISO 8601 timestamp of the event
50        timestamp: String,
51    },
52    /// Server statistics changed
53    StatsUpdated {
54        /// Updated server statistics
55        stats: super::management::ServerStats,
56        /// ISO 8601 timestamp of the event
57        timestamp: String,
58    },
59    /// Connection established confirmation
60    Connected {
61        /// Connection confirmation message
62        message: String,
63        /// ISO 8601 timestamp of the event
64        timestamp: String,
65    },
66    /// State machine was created or updated
67    StateMachineUpdated {
68        /// Resource type of the state machine
69        resource_type: String,
70        /// The state machine definition
71        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
72        /// ISO 8601 timestamp of the event
73        timestamp: String,
74    },
75    /// State machine was deleted
76    StateMachineDeleted {
77        /// Resource type of the deleted state machine
78        resource_type: String,
79        /// ISO 8601 timestamp of the event
80        timestamp: String,
81    },
82    /// State instance was created
83    StateInstanceCreated {
84        /// Resource ID
85        resource_id: String,
86        /// Resource type
87        resource_type: String,
88        /// Initial state
89        initial_state: String,
90        /// ISO 8601 timestamp of the event
91        timestamp: String,
92    },
93    /// State transition occurred
94    StateTransitioned {
95        /// Resource ID
96        resource_id: String,
97        /// Resource type
98        resource_type: String,
99        /// Previous state
100        from_state: String,
101        /// New state
102        to_state: String,
103        /// Current state data
104        state_data: std::collections::HashMap<String, serde_json::Value>,
105        /// ISO 8601 timestamp of the event
106        timestamp: String,
107    },
108    /// State instance was deleted
109    StateInstanceDeleted {
110        /// Resource ID
111        resource_id: String,
112        /// Resource type
113        resource_type: String,
114        /// ISO 8601 timestamp of the event
115        timestamp: String,
116    },
117}
118
119impl MockEvent {
120    /// Create a mock created event
121    pub fn mock_created(mock: super::management::MockConfig) -> Self {
122        Self::MockCreated {
123            mock,
124            timestamp: chrono::Utc::now().to_rfc3339(),
125        }
126    }
127
128    /// Create a mock updated event
129    pub fn mock_updated(mock: super::management::MockConfig) -> Self {
130        Self::MockUpdated {
131            mock,
132            timestamp: chrono::Utc::now().to_rfc3339(),
133        }
134    }
135
136    /// Create a mock deleted event
137    pub fn mock_deleted(id: String) -> Self {
138        Self::MockDeleted {
139            id,
140            timestamp: chrono::Utc::now().to_rfc3339(),
141        }
142    }
143
144    /// Create a stats updated event
145    pub fn stats_updated(stats: super::management::ServerStats) -> Self {
146        Self::StatsUpdated {
147            stats,
148            timestamp: chrono::Utc::now().to_rfc3339(),
149        }
150    }
151
152    /// Create a connection established event
153    pub fn connected(message: String) -> Self {
154        Self::Connected {
155            message,
156            timestamp: chrono::Utc::now().to_rfc3339(),
157        }
158    }
159
160    /// Create a state machine updated event
161    pub fn state_machine_updated(
162        resource_type: String,
163        state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
164    ) -> Self {
165        Self::StateMachineUpdated {
166            resource_type,
167            state_machine,
168            timestamp: chrono::Utc::now().to_rfc3339(),
169        }
170    }
171
172    /// Create a state machine deleted event
173    pub fn state_machine_deleted(resource_type: String) -> Self {
174        Self::StateMachineDeleted {
175            resource_type,
176            timestamp: chrono::Utc::now().to_rfc3339(),
177        }
178    }
179
180    /// Create a state instance created event
181    pub fn state_instance_created(
182        resource_id: String,
183        resource_type: String,
184        initial_state: String,
185    ) -> Self {
186        Self::StateInstanceCreated {
187            resource_id,
188            resource_type,
189            initial_state,
190            timestamp: chrono::Utc::now().to_rfc3339(),
191        }
192    }
193
194    /// Create a state transitioned event
195    pub fn state_transitioned(
196        resource_id: String,
197        resource_type: String,
198        from_state: String,
199        to_state: String,
200        state_data: std::collections::HashMap<String, serde_json::Value>,
201    ) -> Self {
202        Self::StateTransitioned {
203            resource_id,
204            resource_type,
205            from_state,
206            to_state,
207            state_data,
208            timestamp: chrono::Utc::now().to_rfc3339(),
209        }
210    }
211
212    /// Create a state instance deleted event
213    pub fn state_instance_deleted(resource_id: String, resource_type: String) -> Self {
214        Self::StateInstanceDeleted {
215            resource_id,
216            resource_type,
217            timestamp: chrono::Utc::now().to_rfc3339(),
218        }
219    }
220}
221
222/// Shared state for WebSocket management
223#[derive(Clone)]
224pub struct WsManagementState {
225    /// Broadcast channel for sending events to all connected clients
226    pub tx: broadcast::Sender<MockEvent>,
227}
228
229impl WsManagementState {
230    /// Create a new WebSocket management state with broadcast channel
231    ///
232    /// The broadcast channel capacity can be configured via the
233    /// `MOCKFORGE_WS_BROADCAST_CAPACITY` environment variable.
234    pub fn new() -> Self {
235        let capacity = get_ws_broadcast_capacity();
236        let (tx, _) = broadcast::channel(capacity);
237        Self { tx }
238    }
239
240    /// Broadcast an event to all connected clients
241    ///
242    /// Returns the number of receivers that received the event.
243    /// Logs a warning if no receivers are available (all events would be dropped).
244    pub fn broadcast(
245        &self,
246        event: MockEvent,
247    ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
248        match self.tx.send(event) {
249            Ok(n) => Ok(n),
250            Err(e) => {
251                warn!(
252                    "WebSocket broadcast failed: no active receivers. Event dropped: {:?}",
253                    std::mem::discriminant(&e.0)
254                );
255                Err(Box::new(e))
256            }
257        }
258    }
259}
260
261impl Default for WsManagementState {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// WebSocket upgrade handler
268async fn ws_handler(
269    ws: WebSocketUpgrade,
270    State(state): State<WsManagementState>,
271) -> impl IntoResponse {
272    ws.on_upgrade(move |socket| handle_socket(socket, state))
273}
274
275/// Handle a WebSocket connection
276async fn handle_socket(socket: WebSocket, state: WsManagementState) {
277    let (mut sender, mut receiver) = socket.split();
278
279    // Subscribe to broadcast channel
280    let mut rx = state.tx.subscribe();
281
282    // Send initial connection confirmation
283    let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
284    if let Ok(json) = serde_json::to_string(&connected_event) {
285        if sender.send(Message::Text(json.into())).await.is_err() {
286            return;
287        }
288    }
289
290    // Spawn a task to forward broadcast messages to this client
291    let mut send_task = tokio::spawn(async move {
292        loop {
293            match rx.recv().await {
294                Ok(event) => {
295                    if let Ok(json) = serde_json::to_string(&event) {
296                        if sender.send(Message::Text(json.into())).await.is_err() {
297                            break;
298                        }
299                    }
300                }
301                Err(broadcast::error::RecvError::Lagged(count)) => {
302                    warn!("WebSocket client lagged behind, {} events dropped", count);
303                    // Continue receiving — the channel is still valid after a lag
304                }
305                Err(broadcast::error::RecvError::Closed) => {
306                    break;
307                }
308            }
309        }
310    });
311
312    // Handle incoming messages from client (for now, just keep connection alive)
313    let mut recv_task = tokio::spawn(async move {
314        while let Some(Ok(msg)) = receiver.next().await {
315            match msg {
316                Message::Text(text) => {
317                    debug!("Received WebSocket message: {}", text);
318                    // Could handle client commands here in the future
319                }
320                Message::Close(_) => {
321                    info!("WebSocket client disconnected");
322                    break;
323                }
324                _ => {}
325            }
326        }
327    });
328
329    // Wait for either task to finish
330    tokio::select! {
331        _ = &mut send_task => {
332            debug!("Send task completed");
333            recv_task.abort();
334        }
335        _ = &mut recv_task => {
336            debug!("Receive task completed");
337            send_task.abort();
338        }
339    }
340}
341
342/// Build the WebSocket management router
343pub fn ws_management_router(state: WsManagementState) -> Router {
344    Router::new().route("/", get(ws_handler)).with_state(state)
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_ws_management_state_creation() {
353        let _state = WsManagementState::new();
354        // Should be able to create state without errors
355    }
356
357    #[test]
358    fn test_mock_event_creation() {
359        use super::super::management::{MockConfig, MockResponse};
360
361        let mock = MockConfig {
362            id: "test-1".to_string(),
363            name: "Test Mock".to_string(),
364            method: "GET".to_string(),
365            path: "/test".to_string(),
366            response: MockResponse {
367                body: serde_json::json!({"message": "test"}),
368                headers: None,
369            },
370            enabled: true,
371            latency_ms: None,
372            status_code: Some(200),
373            request_match: None,
374            priority: None,
375            scenario: None,
376            required_scenario_state: None,
377            new_scenario_state: None,
378            version: 1,
379        };
380
381        let event = MockEvent::mock_created(mock);
382
383        // Should serialize successfully
384        let json = serde_json::to_string(&event).unwrap();
385        assert!(json.contains("mock_created"));
386    }
387
388    #[test]
389    fn test_broadcast_event() {
390        let state = WsManagementState::new();
391
392        let event = MockEvent::connected("Test connection".to_string());
393
394        // Should be able to send even with no subscribers
395        let result = state.broadcast(event);
396        // With no subscribers, this returns Err with the number of subscribers (0)
397        assert!(result.is_err() || result.is_ok());
398    }
399
400    #[tokio::test]
401    async fn test_ws_management_router_creation() {
402        let state = WsManagementState::new();
403        let _router = ws_management_router(state);
404        // Router should be created successfully
405    }
406}