1use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16const DEFAULT_WS_BROADCAST_CAPACITY: usize = 100;
18
19fn get_ws_broadcast_capacity() -> usize {
21 std::env::var("MOCKFORGE_WS_BROADCAST_CAPACITY")
22 .ok()
23 .and_then(|s| s.parse().ok())
24 .unwrap_or(DEFAULT_WS_BROADCAST_CAPACITY)
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(tag = "type", rename_all = "snake_case")]
30pub enum MockEvent {
31 MockCreated {
33 mock: super::management::MockConfig,
35 timestamp: String,
37 },
38 MockUpdated {
40 mock: super::management::MockConfig,
42 timestamp: String,
44 },
45 MockDeleted {
47 id: String,
49 timestamp: String,
51 },
52 StatsUpdated {
54 stats: super::management::ServerStats,
56 timestamp: String,
58 },
59 Connected {
61 message: String,
63 timestamp: String,
65 },
66 StateMachineUpdated {
68 resource_type: String,
70 state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
72 timestamp: String,
74 },
75 StateMachineDeleted {
77 resource_type: String,
79 timestamp: String,
81 },
82 StateInstanceCreated {
84 resource_id: String,
86 resource_type: String,
88 initial_state: String,
90 timestamp: String,
92 },
93 StateTransitioned {
95 resource_id: String,
97 resource_type: String,
99 from_state: String,
101 to_state: String,
103 state_data: std::collections::HashMap<String, serde_json::Value>,
105 timestamp: String,
107 },
108 StateInstanceDeleted {
110 resource_id: String,
112 resource_type: String,
114 timestamp: String,
116 },
117}
118
119impl MockEvent {
120 pub fn mock_created(mock: super::management::MockConfig) -> Self {
122 Self::MockCreated {
123 mock,
124 timestamp: chrono::Utc::now().to_rfc3339(),
125 }
126 }
127
128 pub fn mock_updated(mock: super::management::MockConfig) -> Self {
130 Self::MockUpdated {
131 mock,
132 timestamp: chrono::Utc::now().to_rfc3339(),
133 }
134 }
135
136 pub fn mock_deleted(id: String) -> Self {
138 Self::MockDeleted {
139 id,
140 timestamp: chrono::Utc::now().to_rfc3339(),
141 }
142 }
143
144 pub fn stats_updated(stats: super::management::ServerStats) -> Self {
146 Self::StatsUpdated {
147 stats,
148 timestamp: chrono::Utc::now().to_rfc3339(),
149 }
150 }
151
152 pub fn connected(message: String) -> Self {
154 Self::Connected {
155 message,
156 timestamp: chrono::Utc::now().to_rfc3339(),
157 }
158 }
159
160 pub fn state_machine_updated(
162 resource_type: String,
163 state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
164 ) -> Self {
165 Self::StateMachineUpdated {
166 resource_type,
167 state_machine,
168 timestamp: chrono::Utc::now().to_rfc3339(),
169 }
170 }
171
172 pub fn state_machine_deleted(resource_type: String) -> Self {
174 Self::StateMachineDeleted {
175 resource_type,
176 timestamp: chrono::Utc::now().to_rfc3339(),
177 }
178 }
179
180 pub fn state_instance_created(
182 resource_id: String,
183 resource_type: String,
184 initial_state: String,
185 ) -> Self {
186 Self::StateInstanceCreated {
187 resource_id,
188 resource_type,
189 initial_state,
190 timestamp: chrono::Utc::now().to_rfc3339(),
191 }
192 }
193
194 pub fn state_transitioned(
196 resource_id: String,
197 resource_type: String,
198 from_state: String,
199 to_state: String,
200 state_data: std::collections::HashMap<String, serde_json::Value>,
201 ) -> Self {
202 Self::StateTransitioned {
203 resource_id,
204 resource_type,
205 from_state,
206 to_state,
207 state_data,
208 timestamp: chrono::Utc::now().to_rfc3339(),
209 }
210 }
211
212 pub fn state_instance_deleted(resource_id: String, resource_type: String) -> Self {
214 Self::StateInstanceDeleted {
215 resource_id,
216 resource_type,
217 timestamp: chrono::Utc::now().to_rfc3339(),
218 }
219 }
220}
221
222#[derive(Clone)]
224pub struct WsManagementState {
225 pub tx: broadcast::Sender<MockEvent>,
227}
228
229impl WsManagementState {
230 pub fn new() -> Self {
235 let capacity = get_ws_broadcast_capacity();
236 let (tx, _) = broadcast::channel(capacity);
237 Self { tx }
238 }
239
240 pub fn broadcast(
242 &self,
243 event: MockEvent,
244 ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
245 self.tx.send(event).map_err(Box::new)
246 }
247}
248
249impl Default for WsManagementState {
250 fn default() -> Self {
251 Self::new()
252 }
253}
254
255async fn ws_handler(
257 ws: WebSocketUpgrade,
258 State(state): State<WsManagementState>,
259) -> impl IntoResponse {
260 ws.on_upgrade(move |socket| handle_socket(socket, state))
261}
262
263async fn handle_socket(socket: WebSocket, state: WsManagementState) {
265 let (mut sender, mut receiver) = socket.split();
266
267 let mut rx = state.tx.subscribe();
269
270 let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
272 if let Ok(json) = serde_json::to_string(&connected_event) {
273 if sender.send(Message::Text(json.into())).await.is_err() {
274 return;
275 }
276 }
277
278 let mut send_task = tokio::spawn(async move {
280 while let Ok(event) = rx.recv().await {
281 if let Ok(json) = serde_json::to_string(&event) {
282 if sender.send(Message::Text(json.into())).await.is_err() {
283 break;
284 }
285 }
286 }
287 });
288
289 let mut recv_task = tokio::spawn(async move {
291 while let Some(Ok(msg)) = receiver.next().await {
292 match msg {
293 Message::Text(text) => {
294 debug!("Received WebSocket message: {}", text);
295 }
297 Message::Close(_) => {
298 info!("WebSocket client disconnected");
299 break;
300 }
301 _ => {}
302 }
303 }
304 });
305
306 tokio::select! {
308 _ = &mut send_task => {
309 debug!("Send task completed");
310 recv_task.abort();
311 }
312 _ = &mut recv_task => {
313 debug!("Receive task completed");
314 send_task.abort();
315 }
316 }
317}
318
319pub fn ws_management_router(state: WsManagementState) -> Router {
321 Router::new().route("/", get(ws_handler)).with_state(state)
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_ws_management_state_creation() {
330 let _state = WsManagementState::new();
331 }
333
334 #[test]
335 fn test_mock_event_creation() {
336 use super::super::management::{MockConfig, MockResponse};
337
338 let mock = MockConfig {
339 id: "test-1".to_string(),
340 name: "Test Mock".to_string(),
341 method: "GET".to_string(),
342 path: "/test".to_string(),
343 response: MockResponse {
344 body: serde_json::json!({"message": "test"}),
345 headers: None,
346 },
347 enabled: true,
348 latency_ms: None,
349 status_code: Some(200),
350 request_match: None,
351 priority: None,
352 scenario: None,
353 required_scenario_state: None,
354 new_scenario_state: None,
355 };
356
357 let event = MockEvent::mock_created(mock);
358
359 let json = serde_json::to_string(&event).unwrap();
361 assert!(json.contains("mock_created"));
362 }
363
364 #[test]
365 fn test_broadcast_event() {
366 let state = WsManagementState::new();
367
368 let event = MockEvent::connected("Test connection".to_string());
369
370 let result = state.broadcast(event);
372 assert!(result.is_err() || result.is_ok());
374 }
375
376 #[tokio::test]
377 async fn test_ws_management_router_creation() {
378 let state = WsManagementState::new();
379 let _router = ws_management_router(state);
380 }
382}