mockforge_ws/
lib.rs

1//! # MockForge WebSocket
2//!
3//! WebSocket mocking library for MockForge with replay, proxy, and AI-powered event generation.
4//!
5//! This crate provides comprehensive WebSocket mocking capabilities, including:
6//!
7//! - **Replay Mode**: Script and replay WebSocket message sequences
8//! - **Interactive Mode**: Dynamic responses based on client messages
9//! - **AI Event Streams**: Generate narrative-driven event sequences
10//! - **Proxy Mode**: Forward messages to real WebSocket backends
11//! - **JSONPath Matching**: Sophisticated message matching with JSONPath queries
12//!
13//! ## Overview
14//!
15//! MockForge WebSocket supports multiple operational modes:
16//!
17//! ### 1. Replay Mode
18//! Play back pre-recorded WebSocket interactions from JSONL files with template expansion.
19//!
20//! ### 2. Proxy Mode
21//! Forward WebSocket messages to upstream servers with optional message transformation.
22//!
23//! ### 3. AI Event Generation
24//! Generate realistic event streams using LLMs based on narrative descriptions.
25//!
26//! ## Quick Start
27//!
28//! ### Basic WebSocket Server
29//!
30//! ```rust,no_run
31//! use mockforge_ws::router;
32//! use std::net::SocketAddr;
33//!
34//! #[tokio::main]
35//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
36//!     // Create WebSocket router
37//!     let app = router();
38//!
39//!     // Start server
40//!     let addr: SocketAddr = "0.0.0.0:3001".parse()?;
41//!     let listener = tokio::net::TcpListener::bind(addr).await?;
42//!     axum::serve(listener, app).await?;
43//!
44//!     Ok(())
45//! }
46//! ```
47//!
48//! ### With Latency Simulation
49//!
50//! ```rust,no_run
51//! use mockforge_ws::router_with_latency;
52//! use mockforge_core::latency::{FaultConfig, LatencyInjector};
53//! use mockforge_core::LatencyProfile;
54//!
55//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
56//! let latency = LatencyProfile::with_normal_distribution(250, 75.0)
57//!     .with_min_ms(100)
58//!     .with_max_ms(500);
59//! let injector = LatencyInjector::new(latency, FaultConfig::default());
60//! let app = router_with_latency(injector);
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! ### With Proxy Support
66//!
67//! ```rust,no_run
68//! use mockforge_ws::router_with_proxy;
69//! use mockforge_core::{WsProxyHandler, WsProxyConfig};
70//!
71//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
72//! let proxy_config = WsProxyConfig {
73//!     upstream_url: "wss://api.example.com/ws".to_string(),
74//!     ..Default::default()
75//! };
76//! let proxy = WsProxyHandler::new(proxy_config);
77//! let app = router_with_proxy(proxy);
78//! # Ok(())
79//! # }
80//! ```
81//!
82//! ### AI Event Generation
83//!
84//! Generate realistic event streams from narrative descriptions:
85//!
86//! ```rust,no_run
87//! use mockforge_ws::{AiEventGenerator, WebSocketAiConfig};
88//! use mockforge_data::replay_augmentation::{scenarios, ReplayMode};
89//!
90//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
91//! let ai_config = WebSocketAiConfig {
92//!     enabled: true,
93//!     replay: Some(scenarios::stock_market_scenario()),
94//!     max_events: Some(30),
95//!     event_rate: Some(1.5),
96//! };
97//!
98//! let generator = AiEventGenerator::new(ai_config.replay.clone().unwrap())?;
99//! let _events = generator; // use the generator with `stream_events` in your handler
100//! # Ok(())
101//! # }
102//! ```
103//!
104//! ## Replay File Format
105//!
106//! WebSocket replay files use JSON Lines (JSONL) format:
107//!
108//! ```json
109//! {"ts":0,"dir":"out","text":"HELLO {{uuid}}","waitFor":"^CLIENT_READY$"}
110//! {"ts":10,"dir":"out","text":"{\"type\":\"welcome\",\"sessionId\":\"{{uuid}}\"}"}
111//! {"ts":20,"dir":"out","text":"{\"data\":{{randInt 1 100}}}","waitFor":"^ACK$"}
112//! ```
113//!
114//! Fields:
115//! - `ts`: Timestamp in milliseconds
116//! - `dir`: Direction ("in" = received, "out" = sent)
117//! - `text`: Message content (supports template expansion)
118//! - `waitFor`: Optional regex/JSONPath pattern to wait for
119//!
120//! ## JSONPath Message Matching
121//!
122//! Match messages using JSONPath queries:
123//!
124//! ```json
125//! {"waitFor": "$.type", "text": "Type received"}
126//! {"waitFor": "$.user.id", "text": "User authenticated"}
127//! {"waitFor": "$.order.status", "text": "Order updated"}
128//! ```
129//!
130//! ## Key Modules
131//!
132//! - [`ai_event_generator`]: AI-powered event stream generation
133//! - [`ws_tracing`]: Distributed tracing integration
134//!
135//! ## Examples
136//!
137//! See the [examples directory](https://github.com/SaaSy-Solutions/mockforge/tree/main/examples)
138//! for complete working examples.
139//!
140//! ## Related Crates
141//!
142//! - [`mockforge-core`](https://docs.rs/mockforge-core): Core mocking functionality
143//! - [`mockforge-data`](https://docs.rs/mockforge-data): Synthetic data generation
144//!
145//! ## Documentation
146//!
147//! - [MockForge Book](https://docs.mockforge.dev/)
148//! - [WebSocket Mocking Guide](https://docs.mockforge.dev/user-guide/websocket-mocking.html)
149//! - [API Reference](https://docs.rs/mockforge-ws)
150
151pub mod ai_event_generator;
152pub mod ws_tracing;
153
154use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
155use axum::extract::{Path, State};
156use axum::{response::IntoResponse, routing::get, Router};
157use mockforge_core::{latency::LatencyInjector, LatencyProfile, WsProxyHandler};
158#[cfg(feature = "data-faker")]
159use mockforge_data::provider::register_core_faker_provider;
160use mockforge_observability::get_global_registry;
161use serde_json::Value;
162use tokio::fs;
163use tokio::time::{sleep, Duration};
164use tracing::*;
165
166// Re-export AI event generator utilities
167pub use ai_event_generator::{AiEventGenerator, WebSocketAiConfig};
168
169// Re-export tracing utilities
170pub use ws_tracing::{
171    create_ws_connection_span, create_ws_message_span, record_ws_connection_success,
172    record_ws_error, record_ws_message_success,
173};
174
175/// Build the WebSocket router (exposed for tests and embedding)
176pub fn router() -> Router {
177    #[cfg(feature = "data-faker")]
178    register_core_faker_provider();
179
180    Router::new().route("/ws", get(ws_handler_no_state))
181}
182
183/// Build the WebSocket router with latency injector state
184pub fn router_with_latency(latency_injector: LatencyInjector) -> Router {
185    #[cfg(feature = "data-faker")]
186    register_core_faker_provider();
187
188    Router::new()
189        .route("/ws", get(ws_handler_with_state))
190        .with_state(latency_injector)
191}
192
193/// Build the WebSocket router with proxy handler
194pub fn router_with_proxy(proxy_handler: WsProxyHandler) -> Router {
195    #[cfg(feature = "data-faker")]
196    register_core_faker_provider();
197
198    Router::new()
199        .route("/ws", get(ws_handler_with_proxy))
200        .route("/ws/{*path}", get(ws_handler_with_proxy_path))
201        .with_state(proxy_handler)
202}
203
204/// Start WebSocket server with latency simulation
205pub async fn start_with_latency(
206    port: u16,
207    latency: Option<LatencyProfile>,
208) -> Result<(), Box<dyn std::error::Error>> {
209    let latency_injector = latency.map(|profile| LatencyInjector::new(profile, Default::default()));
210    let router = if let Some(injector) = latency_injector {
211        router_with_latency(injector)
212    } else {
213        router()
214    };
215
216    let addr: std::net::SocketAddr = format!("127.0.0.1:{}", port).parse()?;
217    info!("WebSocket server listening on {}", addr);
218
219    let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
220        format!(
221            "Failed to bind WebSocket server to port {}: {}\n\
222             Hint: The port may already be in use. Try using a different port with --ws-port or check if another process is using this port with: lsof -i :{} or netstat -tulpn | grep {}",
223            port, e, port, port
224        )
225    })?;
226
227    axum::serve(listener, router).await?;
228    Ok(())
229}
230
231// WebSocket handlers
232async fn ws_handler_no_state(ws: WebSocketUpgrade) -> impl IntoResponse {
233    ws.on_upgrade(handle_socket)
234}
235
236async fn ws_handler_with_state(
237    ws: WebSocketUpgrade,
238    axum::extract::State(_latency): axum::extract::State<LatencyInjector>,
239) -> impl IntoResponse {
240    ws.on_upgrade(handle_socket)
241}
242
243async fn ws_handler_with_proxy(
244    ws: WebSocketUpgrade,
245    State(proxy): State<WsProxyHandler>,
246) -> impl IntoResponse {
247    ws.on_upgrade(move |socket| handle_socket_with_proxy(socket, proxy, "/ws".to_string()))
248}
249
250async fn ws_handler_with_proxy_path(
251    Path(path): Path<String>,
252    ws: WebSocketUpgrade,
253    State(proxy): State<WsProxyHandler>,
254) -> impl IntoResponse {
255    let full_path = format!("/ws/{}", path);
256    ws.on_upgrade(move |socket| handle_socket_with_proxy(socket, proxy, full_path))
257}
258
259async fn handle_socket(mut socket: WebSocket) {
260    use std::time::Instant;
261
262    // Track WebSocket connection
263    let registry = get_global_registry();
264    let connection_start = Instant::now();
265    registry.record_ws_connection_established();
266    debug!("WebSocket connection established, tracking metrics");
267
268    // Track connection status (for metrics reporting)
269    let mut status = "normal";
270
271    // Check if replay mode is enabled
272    if let Ok(replay_file) = std::env::var("MOCKFORGE_WS_REPLAY_FILE") {
273        info!("WebSocket replay mode enabled with file: {}", replay_file);
274        handle_socket_with_replay(socket, &replay_file).await;
275    } else {
276        // Normal echo mode
277        while let Some(msg) = socket.recv().await {
278            match msg {
279                Ok(Message::Text(text)) => {
280                    registry.record_ws_message_received();
281
282                    // Echo the message back with "echo: " prefix
283                    let response = format!("echo: {}", text);
284                    if socket.send(Message::Text(response.into())).await.is_err() {
285                        status = "send_error";
286                        break;
287                    }
288                    registry.record_ws_message_sent();
289                }
290                Ok(Message::Close(_)) => {
291                    status = "client_close";
292                    break;
293                }
294                Err(e) => {
295                    error!("WebSocket error: {}", e);
296                    registry.record_ws_error();
297                    status = "error";
298                    break;
299                }
300                _ => {}
301            }
302        }
303    }
304
305    // Connection closed - record duration
306    let duration = connection_start.elapsed().as_secs_f64();
307    registry.record_ws_connection_closed(duration, status);
308    debug!("WebSocket connection closed (status: {}, duration: {:.2}s)", status, duration);
309}
310
311async fn handle_socket_with_replay(mut socket: WebSocket, replay_file: &str) {
312    let _registry = get_global_registry(); // Available for future message tracking
313
314    // Read the replay file
315    let content = match fs::read_to_string(replay_file).await {
316        Ok(content) => content,
317        Err(e) => {
318            error!("Failed to read replay file {}: {}", replay_file, e);
319            return;
320        }
321    };
322
323    // Parse JSONL file
324    let mut replay_entries = Vec::new();
325    for line in content.lines() {
326        if let Ok(entry) = serde_json::from_str::<Value>(line) {
327            replay_entries.push(entry);
328        }
329    }
330
331    info!("Loaded {} replay entries", replay_entries.len());
332
333    // Process replay entries
334    for entry in replay_entries {
335        // Check if we need to wait for a specific message
336        if let Some(wait_for) = entry.get("waitFor") {
337            if let Some(wait_pattern) = wait_for.as_str() {
338                info!("Waiting for pattern: {}", wait_pattern);
339                // Wait for matching message from client
340                let mut found = false;
341                while let Some(msg) = socket.recv().await {
342                    if let Ok(Message::Text(text)) = msg {
343                        if text.contains(wait_pattern) || wait_pattern == "^CLIENT_READY$" {
344                            found = true;
345                            break;
346                        }
347                    }
348                }
349                if !found {
350                    break;
351                }
352            }
353        }
354
355        // Get the message text
356        if let Some(text) = entry.get("text").and_then(|v| v.as_str()) {
357            // Expand tokens if enabled
358            let expanded_text = if std::env::var("MOCKFORGE_RESPONSE_TEMPLATE_EXPAND")
359                .map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
360                .unwrap_or(false)
361            {
362                expand_tokens(text)
363            } else {
364                text.to_string()
365            };
366
367            info!("Sending replay message: {}", expanded_text);
368            if socket.send(Message::Text(expanded_text.into())).await.is_err() {
369                break;
370            }
371        }
372
373        // Wait for the specified time
374        if let Some(ts) = entry.get("ts").and_then(|v| v.as_u64()) {
375            sleep(Duration::from_millis(ts * 10)).await; // Convert to milliseconds
376        }
377    }
378}
379
380fn expand_tokens(text: &str) -> String {
381    let mut result = text.to_string();
382
383    // Expand {{uuid}}
384    result = result.replace("{{uuid}}", &uuid::Uuid::new_v4().to_string());
385
386    // Expand {{now}}
387    result = result.replace("{{now}}", &chrono::Utc::now().to_rfc3339());
388
389    // Expand {{now+1m}} (add 1 minute)
390    if result.contains("{{now+1m}}") {
391        let now_plus_1m = chrono::Utc::now() + chrono::Duration::minutes(1);
392        result = result.replace("{{now+1m}}", &now_plus_1m.to_rfc3339());
393    }
394
395    // Expand {{now+1h}} (add 1 hour)
396    if result.contains("{{now+1h}}") {
397        let now_plus_1h = chrono::Utc::now() + chrono::Duration::hours(1);
398        result = result.replace("{{now+1h}}", &now_plus_1h.to_rfc3339());
399    }
400
401    // Expand {{randInt min max}}
402    while result.contains("{{randInt") {
403        if let Some(start) = result.find("{{randInt") {
404            if let Some(end) = result[start..].find("}}") {
405                let full_match = &result[start..start + end + 2];
406                let content = &result[start + 9..start + end]; // Skip "{{randInt"
407
408                if let Some(space_pos) = content.find(' ') {
409                    let min_str = &content[..space_pos];
410                    let max_str = &content[space_pos + 1..];
411
412                    if let (Ok(min), Ok(max)) = (min_str.parse::<i32>(), max_str.parse::<i32>()) {
413                        let random_value = fastrand::i32(min..=max);
414                        result = result.replace(full_match, &random_value.to_string());
415                    } else {
416                        result = result.replace(full_match, "0");
417                    }
418                } else {
419                    result = result.replace(full_match, "0");
420                }
421            } else {
422                break;
423            }
424        } else {
425            break;
426        }
427    }
428
429    result
430}
431
432async fn handle_socket_with_proxy(socket: WebSocket, proxy: WsProxyHandler, path: String) {
433    use std::time::Instant;
434
435    let registry = get_global_registry();
436    let connection_start = Instant::now();
437    registry.record_ws_connection_established();
438
439    let mut status = "normal";
440
441    // Check if this connection should be proxied
442    if proxy.config.should_proxy(&path) {
443        info!("Proxying WebSocket connection for path: {}", path);
444        if let Err(e) = proxy.proxy_connection(&path, socket).await {
445            error!("Failed to proxy WebSocket connection: {}", e);
446            registry.record_ws_error();
447            status = "proxy_error";
448        }
449    } else {
450        info!("Handling WebSocket connection locally for path: {}", path);
451        // Handle locally by echoing messages
452        // Note: handle_socket already tracks its own connection metrics,
453        // so we need to avoid double-counting
454        registry.record_ws_connection_closed(0.0, ""); // Decrement the one we just added
455        handle_socket(socket).await;
456        return; // Early return to avoid double-tracking
457    }
458
459    let duration = connection_start.elapsed().as_secs_f64();
460    registry.record_ws_connection_closed(duration, status);
461    debug!(
462        "Proxied WebSocket connection closed (status: {}, duration: {:.2}s)",
463        status, duration
464    );
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_router_creation() {
473        let _router = router();
474        // Router should be created successfully
475    }
476
477    #[test]
478    fn test_router_with_latency_creation() {
479        let latency_profile = LatencyProfile::default();
480        let latency_injector = LatencyInjector::new(latency_profile, Default::default());
481        let _router = router_with_latency(latency_injector);
482        // Router should be created successfully
483    }
484
485    #[test]
486    fn test_router_with_proxy_creation() {
487        let config = mockforge_core::WsProxyConfig {
488            upstream_url: "ws://localhost:8080".to_string(),
489            ..Default::default()
490        };
491        let proxy_handler = WsProxyHandler::new(config);
492        let _router = router_with_proxy(proxy_handler);
493        // Router should be created successfully
494    }
495
496    #[tokio::test]
497    async fn test_start_with_latency_config_none() {
498        // Test that we can create the router without latency
499        let result = std::panic::catch_unwind(|| {
500            let _router = router();
501        });
502        assert!(result.is_ok());
503    }
504
505    #[tokio::test]
506    async fn test_start_with_latency_config_some() {
507        // Test that we can create the router with latency
508        let latency_profile = LatencyProfile::default();
509        let latency_injector = LatencyInjector::new(latency_profile, Default::default());
510
511        let result = std::panic::catch_unwind(|| {
512            let _router = router_with_latency(latency_injector);
513        });
514        assert!(result.is_ok());
515    }
516}