viewpoint_core/network/websocket/mod.rs
1//! WebSocket monitoring and testing.
2//!
3//! This module provides functionality for monitoring WebSocket connections,
4//! including frame events for sent and received messages. Use this to test
5//! real-time features without polling.
6//!
7//! # Strategy for Testing Dynamic WebSocket Content
8//!
9//! Testing WebSocket-driven dynamic content requires:
10//! 1. Set up WebSocket listeners **before** navigation
11//! 2. Navigate to the page that establishes WebSocket connections
12//! 3. Use Viewpoint's auto-waiting locator assertions to verify DOM updates
13//! 4. Optionally capture WebSocket messages for detailed verification
14//!
15//! ## Verifying WebSocket Data Updates in the DOM (Without Polling)
16//!
17//! The key insight is to use Viewpoint's built-in auto-waiting assertions
18//! (`expect(locator).to_have_text()`, `to_be_visible()`, etc.) which automatically
19//! wait for conditions without manual polling:
20//!
21//! ```ignore
22//! use viewpoint_core::Browser;
23//! use viewpoint_test::expect; // from viewpoint-test crate
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! let browser = Browser::launch().headless(true).launch().await?;
27//! let context = browser.new_context().await?;
28//! let page = context.new_page().await?;
29//!
30//! // Navigate to page with WebSocket-driven live data
31//! page.goto("https://example.com/live-dashboard").goto().await?;
32//!
33//! // Auto-waiting assertions verify DOM updates without polling!
34//! // These wait up to 30 seconds for the condition to be true
35//!
36//! // Verify that live data container becomes visible
37//! expect(page.locator(".live-data-container")).to_be_visible().await?;
38//!
39//! // Verify that WebSocket data rendered specific text content
40//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
41//!
42//! // Verify multiple data points updated via WebSocket
43//! expect(page.locator(".connection-status")).to_have_text("Connected").await?;
44//! expect(page.locator(".last-update")).not().to_be_empty().await?;
45//!
46//! // Verify a list populated by WebSocket messages
47//! expect(page.locator(".message-list li")).to_have_count_greater_than(0).await?;
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Capturing and Verifying Specific WebSocket Messages
53//!
54//! For more detailed verification, capture WebSocket frames and correlate
55//! with DOM state:
56//!
57//! ```ignore
58//! use viewpoint_core::Browser;
59//! use viewpoint_test::expect;
60//! use std::sync::Arc;
61//! use tokio::sync::Mutex;
62//!
63//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64//! let browser = Browser::launch().headless(true).launch().await?;
65//! let context = browser.new_context().await?;
66//! let page = context.new_page().await?;
67//!
68//! // Capture WebSocket messages for verification
69//! let received_messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
70//! let messages_clone = received_messages.clone();
71//!
72//! // Set up WebSocket monitoring BEFORE navigation
73//! page.on_websocket(move |ws| {
74//! let messages = messages_clone.clone();
75//! async move {
76//! println!("WebSocket connected: {}", ws.url());
77//!
78//! // Capture all received frames
79//! ws.on_framereceived(move |frame| {
80//! let messages = messages.clone();
81//! async move {
82//! if frame.is_text() {
83//! messages.lock().await.push(frame.payload().to_string());
84//! }
85//! }
86//! }).await;
87//! }
88//! }).await;
89//!
90//! // Navigate to the page
91//! page.goto("https://example.com/realtime-chat").goto().await?;
92//!
93//! // Wait for WebSocket data to be reflected in the DOM
94//! // The auto-waiting assertion handles timing without polling
95//! expect(page.locator(".chat-messages")).to_be_visible().await?;
96//! expect(page.locator(".chat-message")).to_have_count_greater_than(0).await?;
97//!
98//! // Verify the DOM content matches what was received via WebSocket
99//! let messages = received_messages.lock().await;
100//! if !messages.is_empty() {
101//! // Parse the WebSocket message (assuming JSON)
102//! let first_msg = &messages[0];
103//! if first_msg.contains("\"text\":") {
104//! // Verify the message text appears in the DOM
105//! let msg_text = page.locator(".chat-message").first().text_content().await?;
106//! assert!(msg_text.is_some(), "Message should be rendered in DOM");
107//! }
108//! }
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ## Waiting for Specific WebSocket Events Before DOM Verification
114//!
115//! Use synchronization primitives to coordinate between WebSocket events
116//! and DOM assertions:
117//!
118//! ```ignore
119//! use viewpoint_core::Browser;
120//! use viewpoint_test::expect;
121//! use std::sync::Arc;
122//! use std::sync::atomic::{AtomicBool, Ordering};
123//! use tokio::sync::Notify;
124//!
125//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
126//! let browser = Browser::launch().headless(true).launch().await?;
127//! let context = browser.new_context().await?;
128//! let page = context.new_page().await?;
129//!
130//! // Use Notify to signal when specific data arrives
131//! let data_ready = Arc::new(Notify::new());
132//! let data_ready_clone = data_ready.clone();
133//!
134//! page.on_websocket(move |ws| {
135//! let notify = data_ready_clone.clone();
136//! async move {
137//! ws.on_framereceived(move |frame| {
138//! let notify = notify.clone();
139//! async move {
140//! // Signal when we receive the expected data
141//! if frame.payload().contains("\"status\":\"ready\"") {
142//! notify.notify_one();
143//! }
144//! }
145//! }).await;
146//! }
147//! }).await;
148//!
149//! page.goto("https://example.com/app").goto().await?;
150//!
151//! // Wait for the specific WebSocket message (with timeout)
152//! tokio::select! {
153//! _ = data_ready.notified() => {
154//! // Data arrived, now verify DOM reflects it
155//! expect(page.locator(".status-indicator")).to_have_text("Ready").await?;
156//! expect(page.locator(".data-panel")).to_be_visible().await?;
157//! }
158//! _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
159//! panic!("Timeout waiting for WebSocket data");
160//! }
161//! }
162//! # Ok(())
163//! # }
164//! ```
165//!
166//! ## Complete Example: Testing a Real-Time Stock Ticker
167//!
168//! ```ignore
169//! use viewpoint_core::Browser;
170//! use viewpoint_test::{TestHarness, expect};
171//! use std::sync::Arc;
172//! use tokio::sync::Mutex;
173//!
174//! #[tokio::test]
175//! async fn test_stock_ticker_updates() -> Result<(), Box<dyn std::error::Error>> {
176//! let harness = TestHarness::new().await?;
177//! let page = harness.page();
178//!
179//! // Track stock updates received via WebSocket
180//! let stock_updates: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
181//! let updates_clone = stock_updates.clone();
182//!
183//! // Monitor WebSocket for stock price updates
184//! page.on_websocket(move |ws| {
185//! let updates = updates_clone.clone();
186//! async move {
187//! if ws.url().contains("stock-feed") {
188//! ws.on_framereceived(move |frame| {
189//! let updates = updates.clone();
190//! async move {
191//! updates.lock().await.push(frame.payload().to_string());
192//! }
193//! }).await;
194//! }
195//! }
196//! }).await;
197//!
198//! // Navigate to the stock ticker page
199//! page.goto("https://example.com/stocks").goto().await?;
200//!
201//! // Verify the ticker display updates (auto-waits for DOM changes)
202//! expect(page.locator(".stock-ticker")).to_be_visible().await?;
203//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
204//!
205//! // Verify connection indicator shows live status
206//! expect(page.locator(".connection-status"))
207//! .to_have_text("Live")
208//! .await?;
209//!
210//! // Verify at least one price update was rendered
211//! expect(page.locator(".price-change")).not().to_be_empty().await?;
212//!
213//! // Confirm WebSocket messages were received
214//! let updates = stock_updates.lock().await;
215//! assert!(!updates.is_empty(), "Should have received stock updates via WebSocket");
216//!
217//! Ok(())
218//! }
219//! ```
220
221// Allow dead code for websocket monitoring scaffolding (spec: network-events)
222
223mod event_listener;
224mod frame;
225
226pub use frame::WebSocketFrame;
227
228use std::collections::HashMap;
229use std::future::Future;
230use std::pin::Pin;
231use std::sync::Arc;
232use std::sync::atomic::{AtomicBool, Ordering};
233
234use tokio::sync::{RwLock, broadcast};
235use viewpoint_cdp::CdpConnection;
236
237/// A WebSocket connection being monitored.
238///
239/// This struct represents an active WebSocket connection and provides
240/// methods to register handlers for frame events.
241#[derive(Clone)]
242pub struct WebSocket {
243 /// The request ID identifying this WebSocket.
244 request_id: String,
245 /// The WebSocket URL.
246 url: String,
247 /// Whether the WebSocket is closed.
248 is_closed: Arc<AtomicBool>,
249 /// Frame sent event broadcaster.
250 frame_sent_tx: broadcast::Sender<WebSocketFrame>,
251 /// Frame received event broadcaster.
252 frame_received_tx: broadcast::Sender<WebSocketFrame>,
253 /// Close event broadcaster.
254 close_tx: broadcast::Sender<()>,
255}
256
257impl std::fmt::Debug for WebSocket {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("WebSocket")
260 .field("request_id", &self.request_id)
261 .field("url", &self.url)
262 .field("is_closed", &self.is_closed.load(Ordering::SeqCst))
263 .finish()
264 }
265}
266
267impl WebSocket {
268 /// Create a new WebSocket instance.
269 pub(crate) fn new(request_id: String, url: String) -> Self {
270 let (frame_sent_tx, _) = broadcast::channel(256);
271 let (frame_received_tx, _) = broadcast::channel(256);
272 let (close_tx, _) = broadcast::channel(16);
273
274 Self {
275 request_id,
276 url,
277 is_closed: Arc::new(AtomicBool::new(false)),
278 frame_sent_tx,
279 frame_received_tx,
280 close_tx,
281 }
282 }
283
284 /// Get the WebSocket URL.
285 pub fn url(&self) -> &str {
286 &self.url
287 }
288
289 /// Check if the WebSocket is closed.
290 pub fn is_closed(&self) -> bool {
291 self.is_closed.load(Ordering::SeqCst)
292 }
293
294 /// Get the request ID for this WebSocket.
295 pub fn request_id(&self) -> &str {
296 &self.request_id
297 }
298
299 /// Register a handler for frame sent events.
300 ///
301 /// The handler will be called whenever a frame is sent over this WebSocket.
302 ///
303 /// # Example
304 ///
305 /// ```no_run
306 /// use viewpoint_core::WebSocket;
307 ///
308 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
309 /// websocket.on_framesent(|frame| async move {
310 /// println!("Sent: {:?}", frame.payload());
311 /// }).await;
312 /// # Ok(())
313 /// # }
314 pub async fn on_framesent<F, Fut>(&self, handler: F)
315 where
316 F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
317 Fut: Future<Output = ()> + Send + 'static,
318 {
319 let mut rx = self.frame_sent_tx.subscribe();
320 tokio::spawn(async move {
321 while let Ok(frame) = rx.recv().await {
322 handler(frame).await;
323 }
324 });
325 }
326
327 /// Register a handler for frame received events.
328 ///
329 /// The handler will be called whenever a frame is received on this WebSocket.
330 ///
331 /// # Example
332 ///
333 /// ```no_run
334 /// use viewpoint_core::WebSocket;
335 ///
336 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
337 /// websocket.on_framereceived(|frame| async move {
338 /// println!("Received: {:?}", frame.payload());
339 /// }).await;
340 /// # Ok(())
341 /// # }
342 pub async fn on_framereceived<F, Fut>(&self, handler: F)
343 where
344 F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
345 Fut: Future<Output = ()> + Send + 'static,
346 {
347 let mut rx = self.frame_received_tx.subscribe();
348 tokio::spawn(async move {
349 while let Ok(frame) = rx.recv().await {
350 handler(frame).await;
351 }
352 });
353 }
354
355 /// Register a handler for WebSocket close events.
356 ///
357 /// The handler will be called when this WebSocket connection is closed.
358 ///
359 /// # Example
360 ///
361 /// ```no_run
362 /// use viewpoint_core::WebSocket;
363 ///
364 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
365 /// websocket.on_close(|| async {
366 /// println!("WebSocket closed");
367 /// }).await;
368 /// # Ok(())
369 /// # }
370 pub async fn on_close<F, Fut>(&self, handler: F)
371 where
372 F: Fn() -> Fut + Send + Sync + 'static,
373 Fut: Future<Output = ()> + Send + 'static,
374 {
375 let mut rx = self.close_tx.subscribe();
376 tokio::spawn(async move {
377 if rx.recv().await.is_ok() {
378 handler().await;
379 }
380 });
381 }
382
383 /// Emit a frame sent event (internal use).
384 pub(crate) fn emit_frame_sent(&self, frame: WebSocketFrame) {
385 let _ = self.frame_sent_tx.send(frame);
386 }
387
388 /// Emit a frame received event (internal use).
389 pub(crate) fn emit_frame_received(&self, frame: WebSocketFrame) {
390 let _ = self.frame_received_tx.send(frame);
391 }
392
393 /// Mark the WebSocket as closed and emit close event (internal use).
394 pub(crate) fn mark_closed(&self) {
395 self.is_closed.store(true, Ordering::SeqCst);
396 let _ = self.close_tx.send(());
397 }
398}
399
400/// Type alias for the WebSocket event handler function.
401pub type WebSocketEventHandler =
402 Box<dyn Fn(WebSocket) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
403
404/// Manager for WebSocket events on a page.
405pub struct WebSocketManager {
406 /// CDP connection.
407 connection: Arc<CdpConnection>,
408 /// Session ID.
409 session_id: String,
410 /// Active WebSocket connections indexed by request ID.
411 websockets: Arc<RwLock<HashMap<String, WebSocket>>>,
412 /// WebSocket created event handler.
413 handler: Arc<RwLock<Option<WebSocketEventHandler>>>,
414 /// Whether the manager is listening for events.
415 is_listening: AtomicBool,
416}
417
418impl WebSocketManager {
419 /// Create a new WebSocket manager for a page.
420 pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
421 Self {
422 connection,
423 session_id,
424 websockets: Arc::new(RwLock::new(HashMap::new())),
425 handler: Arc::new(RwLock::new(None)),
426 is_listening: AtomicBool::new(false),
427 }
428 }
429
430 /// Set a handler for WebSocket created events.
431 pub async fn set_handler<F, Fut>(&self, handler: F)
432 where
433 F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
434 Fut: Future<Output = ()> + Send + 'static,
435 {
436 let boxed_handler: WebSocketEventHandler = Box::new(move |ws| Box::pin(handler(ws)));
437 let mut h = self.handler.write().await;
438 *h = Some(boxed_handler);
439
440 // Start listening for events if not already
441 self.start_listening().await;
442 }
443
444 /// Remove the WebSocket handler.
445 pub async fn remove_handler(&self) {
446 let mut h = self.handler.write().await;
447 *h = None;
448 }
449
450 /// Start listening for WebSocket CDP events.
451 async fn start_listening(&self) {
452 if self.is_listening.swap(true, Ordering::SeqCst) {
453 // Already listening
454 return;
455 }
456
457 event_listener::spawn_event_listener(
458 self.connection.clone(),
459 self.session_id.clone(),
460 self.websockets.clone(),
461 self.handler.clone(),
462 );
463 }
464
465 /// Get a WebSocket by request ID.
466 pub async fn get(&self, request_id: &str) -> Option<WebSocket> {
467 let sockets = self.websockets.read().await;
468 sockets.get(request_id).cloned()
469 }
470
471 /// Get all active `WebSockets`.
472 pub async fn all(&self) -> Vec<WebSocket> {
473 let sockets = self.websockets.read().await;
474 sockets.values().cloned().collect()
475 }
476}
477
478#[cfg(test)]
479mod tests;