mockforge_http/
management_ws.rs1use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
6use axum::extract::State;
7use axum::response::IntoResponse;
8use axum::routing::get;
9use axum::Router;
10use futures::stream::StreamExt;
11use futures::SinkExt;
12use serde::{Deserialize, Serialize};
13use tokio::sync::broadcast;
14use tracing::*;
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "type", rename_all = "snake_case")]
19pub enum MockEvent {
20 MockCreated {
22 mock: super::management::MockConfig,
24 timestamp: String,
26 },
27 MockUpdated {
29 mock: super::management::MockConfig,
31 timestamp: String,
33 },
34 MockDeleted {
36 id: String,
38 timestamp: String,
40 },
41 StatsUpdated {
43 stats: super::management::ServerStats,
45 timestamp: String,
47 },
48 Connected {
50 message: String,
52 timestamp: String,
54 },
55}
56
57impl MockEvent {
58 pub fn mock_created(mock: super::management::MockConfig) -> Self {
60 Self::MockCreated {
61 mock,
62 timestamp: chrono::Utc::now().to_rfc3339(),
63 }
64 }
65
66 pub fn mock_updated(mock: super::management::MockConfig) -> Self {
68 Self::MockUpdated {
69 mock,
70 timestamp: chrono::Utc::now().to_rfc3339(),
71 }
72 }
73
74 pub fn mock_deleted(id: String) -> Self {
76 Self::MockDeleted {
77 id,
78 timestamp: chrono::Utc::now().to_rfc3339(),
79 }
80 }
81
82 pub fn stats_updated(stats: super::management::ServerStats) -> Self {
84 Self::StatsUpdated {
85 stats,
86 timestamp: chrono::Utc::now().to_rfc3339(),
87 }
88 }
89
90 pub fn connected(message: String) -> Self {
92 Self::Connected {
93 message,
94 timestamp: chrono::Utc::now().to_rfc3339(),
95 }
96 }
97}
98
99#[derive(Clone)]
101pub struct WsManagementState {
102 pub tx: broadcast::Sender<MockEvent>,
104}
105
106impl WsManagementState {
107 pub fn new() -> Self {
109 let (tx, _) = broadcast::channel(100);
110 Self { tx }
111 }
112
113 pub fn broadcast(
115 &self,
116 event: MockEvent,
117 ) -> Result<usize, Box<broadcast::error::SendError<MockEvent>>> {
118 self.tx.send(event).map_err(Box::new)
119 }
120}
121
122impl Default for WsManagementState {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128async fn ws_handler(
130 ws: WebSocketUpgrade,
131 State(state): State<WsManagementState>,
132) -> impl IntoResponse {
133 ws.on_upgrade(move |socket| handle_socket(socket, state))
134}
135
136async fn handle_socket(socket: WebSocket, state: WsManagementState) {
138 let (mut sender, mut receiver) = socket.split();
139
140 let mut rx = state.tx.subscribe();
142
143 let connected_event = MockEvent::connected("Connected to MockForge management API".to_string());
145 if let Ok(json) = serde_json::to_string(&connected_event) {
146 if sender.send(Message::Text(json.into())).await.is_err() {
147 return;
148 }
149 }
150
151 let mut send_task = tokio::spawn(async move {
153 while let Ok(event) = rx.recv().await {
154 if let Ok(json) = serde_json::to_string(&event) {
155 if sender.send(Message::Text(json.into())).await.is_err() {
156 break;
157 }
158 }
159 }
160 });
161
162 let mut recv_task = tokio::spawn(async move {
164 while let Some(Ok(msg)) = receiver.next().await {
165 match msg {
166 Message::Text(text) => {
167 debug!("Received WebSocket message: {}", text);
168 }
170 Message::Close(_) => {
171 info!("WebSocket client disconnected");
172 break;
173 }
174 _ => {}
175 }
176 }
177 });
178
179 tokio::select! {
181 _ = &mut send_task => {
182 debug!("Send task completed");
183 recv_task.abort();
184 }
185 _ = &mut recv_task => {
186 debug!("Receive task completed");
187 send_task.abort();
188 }
189 }
190}
191
192pub fn ws_management_router(state: WsManagementState) -> Router {
194 Router::new().route("/", get(ws_handler)).with_state(state)
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200
201 #[test]
202 fn test_ws_management_state_creation() {
203 let _state = WsManagementState::new();
204 }
206
207 #[test]
208 fn test_mock_event_creation() {
209 use super::super::management::{MockConfig, MockResponse};
210
211 let mock = MockConfig {
212 id: "test-1".to_string(),
213 name: "Test Mock".to_string(),
214 method: "GET".to_string(),
215 path: "/test".to_string(),
216 response: MockResponse {
217 body: serde_json::json!({"message": "test"}),
218 headers: None,
219 },
220 enabled: true,
221 latency_ms: None,
222 status_code: Some(200),
223 };
224
225 let event = MockEvent::mock_created(mock);
226
227 let json = serde_json::to_string(&event).unwrap();
229 assert!(json.contains("mock_created"));
230 }
231
232 #[test]
233 fn test_broadcast_event() {
234 let state = WsManagementState::new();
235
236 let event = MockEvent::connected("Test connection".to_string());
237
238 let result = state.broadcast(event);
240 assert!(result.is_err() || result.is_ok());
242 }
243
244 #[tokio::test]
245 async fn test_ws_management_router_creation() {
246 let state = WsManagementState::new();
247 let _router = ws_management_router(state);
248 }
250}