1pub mod ai_event_generator;
152pub mod ws_tracing;
153
154use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
155use axum::extract::{Path, State};
156use axum::{response::IntoResponse, routing::get, Router};
157use mockforge_core::{latency::LatencyInjector, LatencyProfile, WsProxyHandler};
158#[cfg(feature = "data-faker")]
159use mockforge_data::provider::register_core_faker_provider;
160use mockforge_observability::get_global_registry;
161use serde_json::Value;
162use tokio::fs;
163use tokio::time::{sleep, Duration};
164use tracing::*;
165
166pub use ai_event_generator::{AiEventGenerator, WebSocketAiConfig};
168
169pub use ws_tracing::{
171 create_ws_connection_span, create_ws_message_span, record_ws_connection_success,
172 record_ws_error, record_ws_message_success,
173};
174
175pub fn router() -> Router {
177 #[cfg(feature = "data-faker")]
178 register_core_faker_provider();
179
180 Router::new().route("/ws", get(ws_handler_no_state))
181}
182
183pub fn router_with_latency(latency_injector: LatencyInjector) -> Router {
185 #[cfg(feature = "data-faker")]
186 register_core_faker_provider();
187
188 Router::new()
189 .route("/ws", get(ws_handler_with_state))
190 .with_state(latency_injector)
191}
192
193pub fn router_with_proxy(proxy_handler: WsProxyHandler) -> Router {
195 #[cfg(feature = "data-faker")]
196 register_core_faker_provider();
197
198 Router::new()
199 .route("/ws", get(ws_handler_with_proxy))
200 .route("/ws/{*path}", get(ws_handler_with_proxy_path))
201 .with_state(proxy_handler)
202}
203
204pub async fn start_with_latency(
206 port: u16,
207 latency: Option<LatencyProfile>,
208) -> Result<(), Box<dyn std::error::Error>> {
209 let latency_injector = latency.map(|profile| LatencyInjector::new(profile, Default::default()));
210 let router = if let Some(injector) = latency_injector {
211 router_with_latency(injector)
212 } else {
213 router()
214 };
215
216 let addr: std::net::SocketAddr = format!("127.0.0.1:{}", port).parse()?;
217 info!("WebSocket server listening on {}", addr);
218
219 let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
220 format!(
221 "Failed to bind WebSocket server to port {}: {}\n\
222 Hint: The port may already be in use. Try using a different port with --ws-port or check if another process is using this port with: lsof -i :{} or netstat -tulpn | grep {}",
223 port, e, port, port
224 )
225 })?;
226
227 axum::serve(listener, router).await?;
228 Ok(())
229}
230
231async fn ws_handler_no_state(ws: WebSocketUpgrade) -> impl IntoResponse {
233 ws.on_upgrade(handle_socket)
234}
235
236async fn ws_handler_with_state(
237 ws: WebSocketUpgrade,
238 axum::extract::State(_latency): axum::extract::State<LatencyInjector>,
239) -> impl IntoResponse {
240 ws.on_upgrade(handle_socket)
241}
242
243async fn ws_handler_with_proxy(
244 ws: WebSocketUpgrade,
245 State(proxy): State<WsProxyHandler>,
246) -> impl IntoResponse {
247 ws.on_upgrade(move |socket| handle_socket_with_proxy(socket, proxy, "/ws".to_string()))
248}
249
250async fn ws_handler_with_proxy_path(
251 Path(path): Path<String>,
252 ws: WebSocketUpgrade,
253 State(proxy): State<WsProxyHandler>,
254) -> impl IntoResponse {
255 let full_path = format!("/ws/{}", path);
256 ws.on_upgrade(move |socket| handle_socket_with_proxy(socket, proxy, full_path))
257}
258
259async fn handle_socket(mut socket: WebSocket) {
260 use std::time::Instant;
261
262 let registry = get_global_registry();
264 let connection_start = Instant::now();
265 registry.record_ws_connection_established();
266 debug!("WebSocket connection established, tracking metrics");
267
268 let mut status = "normal";
270
271 if let Ok(replay_file) = std::env::var("MOCKFORGE_WS_REPLAY_FILE") {
273 info!("WebSocket replay mode enabled with file: {}", replay_file);
274 handle_socket_with_replay(socket, &replay_file).await;
275 } else {
276 while let Some(msg) = socket.recv().await {
278 match msg {
279 Ok(Message::Text(text)) => {
280 registry.record_ws_message_received();
281
282 let response = format!("echo: {}", text);
284 if socket.send(Message::Text(response.into())).await.is_err() {
285 status = "send_error";
286 break;
287 }
288 registry.record_ws_message_sent();
289 }
290 Ok(Message::Close(_)) => {
291 status = "client_close";
292 break;
293 }
294 Err(e) => {
295 error!("WebSocket error: {}", e);
296 registry.record_ws_error();
297 status = "error";
298 break;
299 }
300 _ => {}
301 }
302 }
303 }
304
305 let duration = connection_start.elapsed().as_secs_f64();
307 registry.record_ws_connection_closed(duration, status);
308 debug!("WebSocket connection closed (status: {}, duration: {:.2}s)", status, duration);
309}
310
311async fn handle_socket_with_replay(mut socket: WebSocket, replay_file: &str) {
312 let _registry = get_global_registry(); let content = match fs::read_to_string(replay_file).await {
316 Ok(content) => content,
317 Err(e) => {
318 error!("Failed to read replay file {}: {}", replay_file, e);
319 return;
320 }
321 };
322
323 let mut replay_entries = Vec::new();
325 for line in content.lines() {
326 if let Ok(entry) = serde_json::from_str::<Value>(line) {
327 replay_entries.push(entry);
328 }
329 }
330
331 info!("Loaded {} replay entries", replay_entries.len());
332
333 for entry in replay_entries {
335 if let Some(wait_for) = entry.get("waitFor") {
337 if let Some(wait_pattern) = wait_for.as_str() {
338 info!("Waiting for pattern: {}", wait_pattern);
339 let mut found = false;
341 while let Some(msg) = socket.recv().await {
342 if let Ok(Message::Text(text)) = msg {
343 if text.contains(wait_pattern) || wait_pattern == "^CLIENT_READY$" {
344 found = true;
345 break;
346 }
347 }
348 }
349 if !found {
350 break;
351 }
352 }
353 }
354
355 if let Some(text) = entry.get("text").and_then(|v| v.as_str()) {
357 let expanded_text = if std::env::var("MOCKFORGE_RESPONSE_TEMPLATE_EXPAND")
359 .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
360 .unwrap_or(false)
361 {
362 expand_tokens(text)
363 } else {
364 text.to_string()
365 };
366
367 info!("Sending replay message: {}", expanded_text);
368 if socket.send(Message::Text(expanded_text.into())).await.is_err() {
369 break;
370 }
371 }
372
373 if let Some(ts) = entry.get("ts").and_then(|v| v.as_u64()) {
375 sleep(Duration::from_millis(ts * 10)).await; }
377 }
378}
379
380fn expand_tokens(text: &str) -> String {
381 let mut result = text.to_string();
382
383 result = result.replace("{{uuid}}", &uuid::Uuid::new_v4().to_string());
385
386 result = result.replace("{{now}}", &chrono::Utc::now().to_rfc3339());
388
389 if result.contains("{{now+1m}}") {
391 let now_plus_1m = chrono::Utc::now() + chrono::Duration::minutes(1);
392 result = result.replace("{{now+1m}}", &now_plus_1m.to_rfc3339());
393 }
394
395 if result.contains("{{now+1h}}") {
397 let now_plus_1h = chrono::Utc::now() + chrono::Duration::hours(1);
398 result = result.replace("{{now+1h}}", &now_plus_1h.to_rfc3339());
399 }
400
401 while result.contains("{{randInt") {
403 if let Some(start) = result.find("{{randInt") {
404 if let Some(end) = result[start..].find("}}") {
405 let full_match = &result[start..start + end + 2];
406 let content = &result[start + 9..start + end]; if let Some(space_pos) = content.find(' ') {
409 let min_str = &content[..space_pos];
410 let max_str = &content[space_pos + 1..];
411
412 if let (Ok(min), Ok(max)) = (min_str.parse::<i32>(), max_str.parse::<i32>()) {
413 let random_value = fastrand::i32(min..=max);
414 result = result.replace(full_match, &random_value.to_string());
415 } else {
416 result = result.replace(full_match, "0");
417 }
418 } else {
419 result = result.replace(full_match, "0");
420 }
421 } else {
422 break;
423 }
424 } else {
425 break;
426 }
427 }
428
429 result
430}
431
432async fn handle_socket_with_proxy(socket: WebSocket, proxy: WsProxyHandler, path: String) {
433 use std::time::Instant;
434
435 let registry = get_global_registry();
436 let connection_start = Instant::now();
437 registry.record_ws_connection_established();
438
439 let mut status = "normal";
440
441 if proxy.config.should_proxy(&path) {
443 info!("Proxying WebSocket connection for path: {}", path);
444 if let Err(e) = proxy.proxy_connection(&path, socket).await {
445 error!("Failed to proxy WebSocket connection: {}", e);
446 registry.record_ws_error();
447 status = "proxy_error";
448 }
449 } else {
450 info!("Handling WebSocket connection locally for path: {}", path);
451 registry.record_ws_connection_closed(0.0, ""); handle_socket(socket).await;
456 return; }
458
459 let duration = connection_start.elapsed().as_secs_f64();
460 registry.record_ws_connection_closed(duration, status);
461 debug!(
462 "Proxied WebSocket connection closed (status: {}, duration: {:.2}s)",
463 status, duration
464 );
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470
471 #[test]
472 fn test_router_creation() {
473 let _router = router();
474 }
476
477 #[test]
478 fn test_router_with_latency_creation() {
479 let latency_profile = LatencyProfile::default();
480 let latency_injector = LatencyInjector::new(latency_profile, Default::default());
481 let _router = router_with_latency(latency_injector);
482 }
484
485 #[test]
486 fn test_router_with_proxy_creation() {
487 let config = mockforge_core::WsProxyConfig {
488 upstream_url: "ws://localhost:8080".to_string(),
489 ..Default::default()
490 };
491 let proxy_handler = WsProxyHandler::new(config);
492 let _router = router_with_proxy(proxy_handler);
493 }
495
496 #[tokio::test]
497 async fn test_start_with_latency_config_none() {
498 let result = std::panic::catch_unwind(|| {
500 let _router = router();
501 });
502 assert!(result.is_ok());
503 }
504
505 #[tokio::test]
506 async fn test_start_with_latency_config_some() {
507 let latency_profile = LatencyProfile::default();
509 let latency_injector = LatencyInjector::new(latency_profile, Default::default());
510
511 let result = std::panic::catch_unwind(|| {
512 let _router = router_with_latency(latency_injector);
513 });
514 assert!(result.is_ok());
515 }
516}