1use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20 MockCreated {
22 mock: super::management::MockConfig,
24 timestamp: String,
26 },
27 MockUpdated {
29 mock: super::management::MockConfig,
31 timestamp: String,
33 },
34 MockDeleted {
36 id: String,
38 timestamp: String,
40 },
41 StatsUpdated {
43 stats: super::management::ServerStats,
45 timestamp: String,
47 },
48 Connected {
50 message: String,
52 timestamp: String,
54 },
55 StateMachineUpdated {
57 resource_type: String,
59 state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
61 timestamp: String,
63 },
64 StateMachineDeleted {
66 resource_type: String,
68 timestamp: String,
70 },
71 StateInstanceCreated {
73 resource_id: String,
75 resource_type: String,
77 initial_state: String,
79 timestamp: String,
81 },
82 StateTransitioned {
84 resource_id: String,
86 resource_type: String,
88 from_state: String,
90 to_state: String,
92 state_data: std::collections::HashMap<String, serde_json::Value>,
94 timestamp: String,
96 },
97 StateInstanceDeleted {
99 resource_id: String,
101 resource_type: String,
103 timestamp: String,
105 },
106}
107
108impl MockEvent {
109 pub fn mock_created(mock: super::management::MockConfig) -> Self {
111 Self::MockCreated {
112 mock,
113 timestamp: chrono::Utc::now().to_rfc3339(),
114 }
115 }
116
117 pub fn mock_updated(mock: super::management::MockConfig) -> Self {
119 Self::MockUpdated {
120 mock,
121 timestamp: chrono::Utc::now().to_rfc3339(),
122 }
123 }
124
125 pub fn mock_deleted(id: String) -> Self {
127 Self::MockDeleted {
128 id,
129 timestamp: chrono::Utc::now().to_rfc3339(),
130 }
131 }
132
133 pub fn stats_updated(stats: super::management::ServerStats) -> Self {
135 Self::StatsUpdated {
136 stats,
137 timestamp: chrono::Utc::now().to_rfc3339(),
138 }
139 }
140
141 pub fn connected(message: String) -> Self {
143 Self::Connected {
144 message,
145 timestamp: chrono::Utc::now().to_rfc3339(),
146 }
147 }
148
149 pub fn state_machine_updated(
151 resource_type: String,
152 state_machine: mockforge_core::intelligent_behavior::rules::StateMachine,
153 ) -> Self {
154 Self::StateMachineUpdated {
155 resource_type,
156 state_machine,
157 timestamp: chrono::Utc::now().to_rfc3339(),
158 }
159 }
160
161 pub fn state_machine_deleted(resource_type: String) -> Self {
163 Self::StateMachineDeleted {
164 resource_type,
165 timestamp: chrono::Utc::now().to_rfc3339(),
166 }
167 }
168
169 pub fn state_instance_created(
171 resource_id: String,
172 resource_type: String,
173 initial_state: String,
174 ) -> Self {
175 Self::StateInstanceCreated {
176 resource_id,
177 resource_type,
178 initial_state,
179 timestamp: chrono::Utc::now().to_rfc3339(),
180 }
181 }
182
183 pub fn state_transitioned(
185 resource_id: String,
186 resource_type: String,
187 from_state: String,
188 to_state: String,
189 state_data: std::collections::HashMap<String, serde_json::Value>,
190 ) -> Self {
191 Self::StateTransitioned {
192 resource_id,
193 resource_type,
194 from_state,
195 to_state,
196 state_data,
197 timestamp: chrono::Utc::now().to_rfc3339(),
198 }
199 }
200
201 pub fn state_instance_deleted(resource_id: String, resource_type: String) -> Self {
203 Self::StateInstanceDeleted {
204 resource_id,
205 resource_type,
206 timestamp: chrono::Utc::now().to_rfc3339(),
207 }
208 }
209}
210
211#[derive(Clone)]
213pub struct WsManagementState {
214 pub tx: broadcast::Sender<MockEvent>,
216}
217
218impl WsManagementState {
219 pub fn new() -> Self {
221 let (tx, _) = broadcast::channel(100);
222 Self { tx }
223 }
224
225 pub fn broadcast(
227 &self,
228 event: MockEvent,
229 ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
230 self.tx.send(event).map_err(Box::new)
231 }
232}
233
234impl Default for WsManagementState {
235 fn default() -> Self {
236 Self::new()
237 }
238}
239
240async fn ws_handler(
242 ws: WebSocketUpgrade,
243 State(state): State<WsManagementState>,
244) -> impl IntoResponse {
245 ws.on_upgrade(move |socket| handle_socket(socket, state))
246}
247
248async fn handle_socket(socket: WebSocket, state: WsManagementState) {
250 let (mut sender, mut receiver) = socket.split();
251
252 let mut rx = state.tx.subscribe();
254
255 let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
257 if let Ok(json) = serde_json::to_string(&connected_event) {
258 if sender.send(Message::Text(json.into())).await.is_err() {
259 return;
260 }
261 }
262
263 let mut send_task = tokio::spawn(async move {
265 while let Ok(event) = rx.recv().await {
266 if let Ok(json) = serde_json::to_string(&event) {
267 if sender.send(Message::Text(json.into())).await.is_err() {
268 break;
269 }
270 }
271 }
272 });
273
274 let mut recv_task = tokio::spawn(async move {
276 while let Some(Ok(msg)) = receiver.next().await {
277 match msg {
278 Message::Text(text) => {
279 debug!("Received WebSocket message: {}", text);
280 }
282 Message::Close(_) => {
283 info!("WebSocket client disconnected");
284 break;
285 }
286 _ => {}
287 }
288 }
289 });
290
291 tokio::select! {
293 _ = &mut send_task => {
294 debug!("Send task completed");
295 recv_task.abort();
296 }
297 _ = &mut recv_task => {
298 debug!("Receive task completed");
299 send_task.abort();
300 }
301 }
302}
303
304pub fn ws_management_router(state: WsManagementState) -> Router {
306 Router::new().route("/", get(ws_handler)).with_state(state)
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_ws_management_state_creation() {
315 let _state = WsManagementState::new();
316 }
318
319 #[test]
320 fn test_mock_event_creation() {
321 use super::super::management::{MockConfig, MockResponse};
322
323 let mock = MockConfig {
324 id: "test-1".to_string(),
325 name: "Test Mock".to_string(),
326 method: "GET".to_string(),
327 path: "/test".to_string(),
328 response: MockResponse {
329 body: serde_json::json!({"message": "test"}),
330 headers: None,
331 },
332 enabled: true,
333 latency_ms: None,
334 status_code: Some(200),
335 request_match: None,
336 priority: None,
337 scenario: None,
338 required_scenario_state: None,
339 new_scenario_state: None,
340 };
341
342 let event = MockEvent::mock_created(mock);
343
344 let json = serde_json::to_string(&event).unwrap();
346 assert!(json.contains("mock_created"));
347 }
348
349 #[test]
350 fn test_broadcast_event() {
351 let state = WsManagementState::new();
352
353 let event = MockEvent::connected("Test connection".to_string());
354
355 let result = state.broadcast(event);
357 assert!(result.is_err() || result.is_ok());
359 }
360
361 #[tokio::test]
362 async fn test_ws_management_router_creation() {
363 let state = WsManagementState::new();
364 let _router = ws_management_router(state);
365 }
367}