mockforge_http/
management_ws.rs1use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20 MockCreated {
22 mock: super::management::MockConfig,
23 timestamp: String,
24 },
25 MockUpdated {
27 mock: super::management::MockConfig,
28 timestamp: String,
29 },
30 MockDeleted { id: String, timestamp: String },
32 StatsUpdated {
34 stats: super::management::ServerStats,
35 timestamp: String,
36 },
37 Connected { message: String, timestamp: String },
39}
40
41impl MockEvent {
42 pub fn mock_created(mock: super::management::MockConfig) -> Self {
43 Self::MockCreated {
44 mock,
45 timestamp: chrono::Utc::now().to_rfc3339(),
46 }
47 }
48
49 pub fn mock_updated(mock: super::management::MockConfig) -> Self {
50 Self::MockUpdated {
51 mock,
52 timestamp: chrono::Utc::now().to_rfc3339(),
53 }
54 }
55
56 pub fn mock_deleted(id: String) -> Self {
57 Self::MockDeleted {
58 id,
59 timestamp: chrono::Utc::now().to_rfc3339(),
60 }
61 }
62
63 pub fn stats_updated(stats: super::management::ServerStats) -> Self {
64 Self::StatsUpdated {
65 stats,
66 timestamp: chrono::Utc::now().to_rfc3339(),
67 }
68 }
69
70 pub fn connected(message: String) -> Self {
71 Self::Connected {
72 message,
73 timestamp: chrono::Utc::now().to_rfc3339(),
74 }
75 }
76}
77
78#[derive(Clone)]
80pub struct WsManagementState {
81 pub tx: broadcast::Sender<MockEvent>,
83}
84
85impl WsManagementState {
86 pub fn new() -> Self {
87 let (tx, _) = broadcast::channel(100);
88 Self { tx }
89 }
90
91 pub fn broadcast(
93 &self,
94 event: MockEvent,
95 ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
96 self.tx.send(event).map_err(Box::new)
97 }
98}
99
100impl Default for WsManagementState {
101 fn default() -> Self {
102 Self::new()
103 }
104}
105
106async fn ws_handler(
108 ws: WebSocketUpgrade,
109 State(state): State<WsManagementState>,
110) -> impl IntoResponse {
111 ws.on_upgrade(move |socket| handle_socket(socket, state))
112}
113
114async fn handle_socket(socket: WebSocket, state: WsManagementState) {
116 let (mut sender, mut receiver) = socket.split();
117
118 let mut rx = state.tx.subscribe();
120
121 let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
123 if let Ok(json) = serde_json::to_string(&connected_event) {
124 if sender.send(Message::Text(json.into())).await.is_err() {
125 return;
126 }
127 }
128
129 let mut send_task = tokio::spawn(async move {
131 while let Ok(event) = rx.recv().await {
132 if let Ok(json) = serde_json::to_string(&event) {
133 if sender.send(Message::Text(json.into())).await.is_err() {
134 break;
135 }
136 }
137 }
138 });
139
140 let mut recv_task = tokio::spawn(async move {
142 while let Some(Ok(msg)) = receiver.next().await {
143 match msg {
144 Message::Text(text) => {
145 debug!("Received WebSocket message: {}", text);
146 }
148 Message::Close(_) => {
149 info!("WebSocket client disconnected");
150 break;
151 }
152 _ => {}
153 }
154 }
155 });
156
157 tokio::select! {
159 _ = &mut send_task => {
160 debug!("Send task completed");
161 recv_task.abort();
162 }
163 _ = &mut recv_task => {
164 debug!("Receive task completed");
165 send_task.abort();
166 }
167 }
168}
169
170pub fn ws_management_router(state: WsManagementState) -> Router {
172 Router::new().route("/", get(ws_handler)).with_state(state)
173}
174
175#[cfg(test)]
176mod tests {
177 use super::*;
178
179 #[test]
180 fn test_ws_management_state_creation() {
181 let _state = WsManagementState::new();
182 }
184
185 #[test]
186 fn test_mock_event_creation() {
187 use super::super::management::{MockConfig, MockResponse};
188
189 let mock = MockConfig {
190 id: "test-1".to_string(),
191 name: "Test Mock".to_string(),
192 method: "GET".to_string(),
193 path: "/test".to_string(),
194 response: MockResponse {
195 body: serde_json::json!({"message": "test"}),
196 headers: None,
197 },
198 enabled: true,
199 latency_ms: None,
200 status_code: Some(200),
201 };
202
203 let event = MockEvent::mock_created(mock);
204
205 let json = serde_json::to_string(&event).unwrap();
207 assert!(json.contains("mock_created"));
208 }
209
210 #[test]
211 fn test_broadcast_event() {
212 let state = WsManagementState::new();
213
214 let event = MockEvent::connected("Test connection".to_string());
215
216 let result = state.broadcast(event);
218 assert!(result.is_err() || result.is_ok());
220 }
221
222 #[tokio::test]
223 async fn test_ws_management_router_creation() {
224 let state = WsManagementState::new();
225 let _router = ws_management_router(state);
226 }
228}