mockforge_http/
management_ws.rs

1/// WebSocket interface for live mock updates
2///
3/// Provides real-time notifications when mocks are created, updated, or deleted.
4/// Used by developer tools like VS Code extension for live synchronization.
5use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16/// Events that can be broadcasted to WebSocket clients
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20    /// Mock was created
21    MockCreated {
22        mock: super::management::MockConfig,
23        timestamp: String,
24    },
25    /// Mock was updated
26    MockUpdated {
27        mock: super::management::MockConfig,
28        timestamp: String,
29    },
30    /// Mock was deleted
31    MockDeleted { id: String, timestamp: String },
32    /// Server statistics changed
33    StatsUpdated {
34        stats: super::management::ServerStats,
35        timestamp: String,
36    },
37    /// Connection established confirmation
38    Connected { message: String, timestamp: String },
39}
40
41impl MockEvent {
42    pub fn mock_created(mock: super::management::MockConfig) -> Self {
43        Self::MockCreated {
44            mock,
45            timestamp: chrono::Utc::now().to_rfc3339(),
46        }
47    }
48
49    pub fn mock_updated(mock: super::management::MockConfig) -> Self {
50        Self::MockUpdated {
51            mock,
52            timestamp: chrono::Utc::now().to_rfc3339(),
53        }
54    }
55
56    pub fn mock_deleted(id: String) -> Self {
57        Self::MockDeleted {
58            id,
59            timestamp: chrono::Utc::now().to_rfc3339(),
60        }
61    }
62
63    pub fn stats_updated(stats: super::management::ServerStats) -> Self {
64        Self::StatsUpdated {
65            stats,
66            timestamp: chrono::Utc::now().to_rfc3339(),
67        }
68    }
69
70    pub fn connected(message: String) -> Self {
71        Self::Connected {
72            message,
73            timestamp: chrono::Utc::now().to_rfc3339(),
74        }
75    }
76}
77
78/// Shared state for WebSocket management
79#[derive(Clone)]
80pub struct WsManagementState {
81    /// Broadcast channel for sending events to all connected clients
82    pub tx: broadcast::Sender<MockEvent>,
83}
84
85impl WsManagementState {
86    pub fn new() -> Self {
87        let (tx, _) = broadcast::channel(100);
88        Self { tx }
89    }
90
91    /// Broadcast an event to all connected clients
92    pub fn broadcast(
93        &self,
94        event: MockEvent,
95    ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
96        self.tx.send(event).map_err(Box::new)
97    }
98}
99
100impl Default for WsManagementState {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106/// WebSocket upgrade handler
107async fn ws_handler(
108    ws: WebSocketUpgrade,
109    State(state): State<WsManagementState>,
110) -> impl IntoResponse {
111    ws.on_upgrade(move |socket| handle_socket(socket, state))
112}
113
114/// Handle a WebSocket connection
115async fn handle_socket(socket: WebSocket, state: WsManagementState) {
116    let (mut sender, mut receiver) = socket.split();
117
118    // Subscribe to broadcast channel
119    let mut rx = state.tx.subscribe();
120
121    // Send initial connection confirmation
122    let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
123    if let Ok(json) = serde_json::to_string(&connected_event) {
124        if sender.send(Message::Text(json.into())).await.is_err() {
125            return;
126        }
127    }
128
129    // Spawn a task to forward broadcast messages to this client
130    let mut send_task = tokio::spawn(async move {
131        while let Ok(event) = rx.recv().await {
132            if let Ok(json) = serde_json::to_string(&event) {
133                if sender.send(Message::Text(json.into())).await.is_err() {
134                    break;
135                }
136            }
137        }
138    });
139
140    // Handle incoming messages from client (for now, just keep connection alive)
141    let mut recv_task = tokio::spawn(async move {
142        while let Some(Ok(msg)) = receiver.next().await {
143            match msg {
144                Message::Text(text) => {
145                    debug!("Received WebSocket message: {}", text);
146                    // Could handle client commands here in the future
147                }
148                Message::Close(_) => {
149                    info!("WebSocket client disconnected");
150                    break;
151                }
152                _ => {}
153            }
154        }
155    });
156
157    // Wait for either task to finish
158    tokio::select! {
159        _ = &mut send_task => {
160            debug!("Send task completed");
161            recv_task.abort();
162        }
163        _ = &mut recv_task => {
164            debug!("Receive task completed");
165            send_task.abort();
166        }
167    }
168}
169
170/// Build the WebSocket management router
171pub fn ws_management_router(state: WsManagementState) -> Router {
172    Router::new().route("/", get(ws_handler)).with_state(state)
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn test_ws_management_state_creation() {
181        let _state = WsManagementState::new();
182        // Should be able to create state without errors
183    }
184
185    #[test]
186    fn test_mock_event_creation() {
187        use super::super::management::{MockConfig, MockResponse};
188
189        let mock = MockConfig {
190            id: "test-1".to_string(),
191            name: "Test Mock".to_string(),
192            method: "GET".to_string(),
193            path: "/test".to_string(),
194            response: MockResponse {
195                body: serde_json::json!({"message": "test"}),
196                headers: None,
197            },
198            enabled: true,
199            latency_ms: None,
200            status_code: Some(200),
201        };
202
203        let event = MockEvent::mock_created(mock);
204
205        // Should serialize successfully
206        let json = serde_json::to_string(&event).unwrap();
207        assert!(json.contains("mock_created"));
208    }
209
210    #[test]
211    fn test_broadcast_event() {
212        let state = WsManagementState::new();
213
214        let event = MockEvent::connected("Test connection".to_string());
215
216        // Should be able to send even with no subscribers
217        let result = state.broadcast(event);
218        // With no subscribers, this returns Err with the number of subscribers (0)
219        assert!(result.is_err() || result.is_ok());
220    }
221
222    #[tokio::test]
223    async fn test_ws_management_router_creation() {
224        let state = WsManagementState::new();
225        let _router = ws_management_router(state);
226        // Router should be created successfully
227    }
228}