viewpoint_core/network/websocket/mod.rs
1//! WebSocket monitoring and testing.
2//!
3//! This module provides functionality for monitoring WebSocket connections,
4//! including frame events for sent and received messages. Use this to test
5//! real-time features without polling.
6//!
7//! # Strategy for Testing Dynamic WebSocket Content
8//!
9//! Testing WebSocket-driven dynamic content requires:
10//! 1. Set up WebSocket listeners **before** navigation
11//! 2. Navigate to the page that establishes WebSocket connections
12//! 3. Use Viewpoint's auto-waiting locator assertions to verify DOM updates
13//! 4. Optionally capture WebSocket messages for detailed verification
14//!
15//! ## Verifying WebSocket Data Updates in the DOM (Without Polling)
16//!
17//! The key insight is to use Viewpoint's built-in auto-waiting assertions
18//! (`expect(locator).to_have_text()`, `to_be_visible()`, etc.) which automatically
19//! wait for conditions without manual polling:
20//!
21//! ```ignore
22//! use viewpoint_core::Browser;
23//! use viewpoint_test::expect; // from viewpoint-test crate
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! let browser = Browser::launch().headless(true).launch().await?;
27//! let context = browser.new_context().await?;
28//! let page = context.new_page().await?;
29//!
30//! // Navigate to page with WebSocket-driven live data
31//! page.goto("https://example.com/live-dashboard").goto().await?;
32//!
33//! // Auto-waiting assertions verify DOM updates without polling!
34//! // These wait up to 30 seconds for the condition to be true
35//!
36//! // Verify that live data container becomes visible
37//! expect(page.locator(".live-data-container")).to_be_visible().await?;
38//!
39//! // Verify that WebSocket data rendered specific text content
40//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
41//!
42//! // Verify multiple data points updated via WebSocket
43//! expect(page.locator(".connection-status")).to_have_text("Connected").await?;
44//! expect(page.locator(".last-update")).not().to_be_empty().await?;
45//!
46//! // Verify a list populated by WebSocket messages
47//! expect(page.locator(".message-list li")).to_have_count_greater_than(0).await?;
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Capturing and Verifying Specific WebSocket Messages
53//!
54//! For more detailed verification, capture WebSocket frames and correlate
55//! with DOM state:
56//!
57//! ```ignore
58//! use viewpoint_core::Browser;
59//! use viewpoint_test::expect;
60//! use std::sync::Arc;
61//! use tokio::sync::Mutex;
62//!
63//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64//! let browser = Browser::launch().headless(true).launch().await?;
65//! let context = browser.new_context().await?;
66//! let page = context.new_page().await?;
67//!
68//! // Capture WebSocket messages for verification
69//! let received_messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
70//! let messages_clone = received_messages.clone();
71//!
72//! // Set up WebSocket monitoring BEFORE navigation
73//! page.on_websocket(move |ws| {
74//! let messages = messages_clone.clone();
75//! async move {
76//! println!("WebSocket connected: {}", ws.url());
77//!
78//! // Capture all received frames
79//! ws.on_framereceived(move |frame| {
80//! let messages = messages.clone();
81//! async move {
82//! if frame.is_text() {
83//! messages.lock().await.push(frame.payload().to_string());
84//! }
85//! }
86//! }).await;
87//! }
88//! }).await;
89//!
90//! // Navigate to the page
91//! page.goto("https://example.com/realtime-chat").goto().await?;
92//!
93//! // Wait for WebSocket data to be reflected in the DOM
94//! // The auto-waiting assertion handles timing without polling
95//! expect(page.locator(".chat-messages")).to_be_visible().await?;
96//! expect(page.locator(".chat-message")).to_have_count_greater_than(0).await?;
97//!
98//! // Verify the DOM content matches what was received via WebSocket
99//! let messages = received_messages.lock().await;
100//! if !messages.is_empty() {
101//! // Parse the WebSocket message (assuming JSON)
102//! let first_msg = &messages[0];
103//! if first_msg.contains("\"text\":") {
104//! // Verify the message text appears in the DOM
105//! let msg_text = page.locator(".chat-message").first().text_content().await?;
106//! assert!(msg_text.is_some(), "Message should be rendered in DOM");
107//! }
108//! }
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ## Waiting for Specific WebSocket Events Before DOM Verification
114//!
115//! Use synchronization primitives to coordinate between WebSocket events
116//! and DOM assertions:
117//!
118//! ```ignore
119//! use viewpoint_core::Browser;
120//! use viewpoint_test::expect;
121//! use std::sync::Arc;
122//! use std::sync::atomic::{AtomicBool, Ordering};
123//! use tokio::sync::Notify;
124//!
125//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
126//! let browser = Browser::launch().headless(true).launch().await?;
127//! let context = browser.new_context().await?;
128//! let page = context.new_page().await?;
129//!
130//! // Use Notify to signal when specific data arrives
131//! let data_ready = Arc::new(Notify::new());
132//! let data_ready_clone = data_ready.clone();
133//!
134//! page.on_websocket(move |ws| {
135//! let notify = data_ready_clone.clone();
136//! async move {
137//! ws.on_framereceived(move |frame| {
138//! let notify = notify.clone();
139//! async move {
140//! // Signal when we receive the expected data
141//! if frame.payload().contains("\"status\":\"ready\"") {
142//! notify.notify_one();
143//! }
144//! }
145//! }).await;
146//! }
147//! }).await;
148//!
149//! page.goto("https://example.com/app").goto().await?;
150//!
151//! // Wait for the specific WebSocket message (with timeout)
152//! tokio::select! {
153//! _ = data_ready.notified() => {
154//! // Data arrived, now verify DOM reflects it
155//! expect(page.locator(".status-indicator")).to_have_text("Ready").await?;
156//! expect(page.locator(".data-panel")).to_be_visible().await?;
157//! }
158//! _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
159//! panic!("Timeout waiting for WebSocket data");
160//! }
161//! }
162//! # Ok(())
163//! # }
164//! ```
165//!
166//! ## Complete Example: Testing a Real-Time Stock Ticker
167//!
168//! ```ignore
169//! use viewpoint_core::Browser;
170//! use viewpoint_test::{TestHarness, expect};
171//! use std::sync::Arc;
172//! use tokio::sync::Mutex;
173//!
174//! #[tokio::test]
175//! async fn test_stock_ticker_updates() -> Result<(), Box<dyn std::error::Error>> {
176//! let harness = TestHarness::new().await?;
177//! let page = harness.page();
178//!
179//! // Track stock updates received via WebSocket
180//! let stock_updates: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
181//! let updates_clone = stock_updates.clone();
182//!
183//! // Monitor WebSocket for stock price updates
184//! page.on_websocket(move |ws| {
185//! let updates = updates_clone.clone();
186//! async move {
187//! if ws.url().contains("stock-feed") {
188//! ws.on_framereceived(move |frame| {
189//! let updates = updates.clone();
190//! async move {
191//! updates.lock().await.push(frame.payload().to_string());
192//! }
193//! }).await;
194//! }
195//! }
196//! }).await;
197//!
198//! // Navigate to the stock ticker page
199//! page.goto("https://example.com/stocks").goto().await?;
200//!
201//! // Verify the ticker display updates (auto-waits for DOM changes)
202//! expect(page.locator(".stock-ticker")).to_be_visible().await?;
203//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
204//!
205//! // Verify connection indicator shows live status
206//! expect(page.locator(".connection-status"))
207//! .to_have_text("Live")
208//! .await?;
209//!
210//! // Verify at least one price update was rendered
211//! expect(page.locator(".price-change")).not().to_be_empty().await?;
212//!
213//! // Confirm WebSocket messages were received
214//! let updates = stock_updates.lock().await;
215//! assert!(!updates.is_empty(), "Should have received stock updates via WebSocket");
216//!
217//! Ok(())
218//! }
219//! ```
220
221// Allow dead code for websocket monitoring scaffolding (spec: network-events)
222
223use std::collections::HashMap;
224use std::future::Future;
225use std::pin::Pin;
226use std::sync::Arc;
227use std::sync::atomic::{AtomicBool, Ordering};
228
229use tokio::sync::{RwLock, broadcast};
230use tracing::{debug, trace};
231use viewpoint_cdp::CdpConnection;
232use viewpoint_cdp::protocol::{
233 WebSocketClosedEvent, WebSocketCreatedEvent, WebSocketFrame as CdpWebSocketFrame,
234 WebSocketFrameReceivedEvent, WebSocketFrameSentEvent,
235};
236
237/// A WebSocket connection being monitored.
238///
239/// This struct represents an active WebSocket connection and provides
240/// methods to register handlers for frame events.
241#[derive(Clone)]
242pub struct WebSocket {
243 /// The request ID identifying this WebSocket.
244 request_id: String,
245 /// The WebSocket URL.
246 url: String,
247 /// Whether the WebSocket is closed.
248 is_closed: Arc<AtomicBool>,
249 /// Frame sent event broadcaster.
250 frame_sent_tx: broadcast::Sender<WebSocketFrame>,
251 /// Frame received event broadcaster.
252 frame_received_tx: broadcast::Sender<WebSocketFrame>,
253 /// Close event broadcaster.
254 close_tx: broadcast::Sender<()>,
255}
256
257impl std::fmt::Debug for WebSocket {
258 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("WebSocket")
260 .field("request_id", &self.request_id)
261 .field("url", &self.url)
262 .field("is_closed", &self.is_closed.load(Ordering::SeqCst))
263 .finish()
264 }
265}
266
267impl WebSocket {
268 /// Create a new WebSocket instance.
269 pub(crate) fn new(request_id: String, url: String) -> Self {
270 let (frame_sent_tx, _) = broadcast::channel(256);
271 let (frame_received_tx, _) = broadcast::channel(256);
272 let (close_tx, _) = broadcast::channel(16);
273
274 Self {
275 request_id,
276 url,
277 is_closed: Arc::new(AtomicBool::new(false)),
278 frame_sent_tx,
279 frame_received_tx,
280 close_tx,
281 }
282 }
283
284 /// Get the WebSocket URL.
285 pub fn url(&self) -> &str {
286 &self.url
287 }
288
289 /// Check if the WebSocket is closed.
290 pub fn is_closed(&self) -> bool {
291 self.is_closed.load(Ordering::SeqCst)
292 }
293
294 /// Get the request ID for this WebSocket.
295 pub fn request_id(&self) -> &str {
296 &self.request_id
297 }
298
299 /// Register a handler for frame sent events.
300 ///
301 /// The handler will be called whenever a frame is sent over this WebSocket.
302 ///
303 /// # Example
304 ///
305 /// ```no_run
306 /// use viewpoint_core::WebSocket;
307 ///
308 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
309 /// websocket.on_framesent(|frame| async move {
310 /// println!("Sent: {:?}", frame.payload());
311 /// }).await;
312 /// # Ok(())
313 /// # }
314 pub async fn on_framesent<F, Fut>(&self, handler: F)
315 where
316 F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
317 Fut: Future<Output = ()> + Send + 'static,
318 {
319 let mut rx = self.frame_sent_tx.subscribe();
320 tokio::spawn(async move {
321 while let Ok(frame) = rx.recv().await {
322 handler(frame).await;
323 }
324 });
325 }
326
327 /// Register a handler for frame received events.
328 ///
329 /// The handler will be called whenever a frame is received on this WebSocket.
330 ///
331 /// # Example
332 ///
333 /// ```no_run
334 /// use viewpoint_core::WebSocket;
335 ///
336 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
337 /// websocket.on_framereceived(|frame| async move {
338 /// println!("Received: {:?}", frame.payload());
339 /// }).await;
340 /// # Ok(())
341 /// # }
342 pub async fn on_framereceived<F, Fut>(&self, handler: F)
343 where
344 F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
345 Fut: Future<Output = ()> + Send + 'static,
346 {
347 let mut rx = self.frame_received_tx.subscribe();
348 tokio::spawn(async move {
349 while let Ok(frame) = rx.recv().await {
350 handler(frame).await;
351 }
352 });
353 }
354
355 /// Register a handler for WebSocket close events.
356 ///
357 /// The handler will be called when this WebSocket connection is closed.
358 ///
359 /// # Example
360 ///
361 /// ```no_run
362 /// use viewpoint_core::WebSocket;
363 ///
364 /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
365 /// websocket.on_close(|| async {
366 /// println!("WebSocket closed");
367 /// }).await;
368 /// # Ok(())
369 /// # }
370 pub async fn on_close<F, Fut>(&self, handler: F)
371 where
372 F: Fn() -> Fut + Send + Sync + 'static,
373 Fut: Future<Output = ()> + Send + 'static,
374 {
375 let mut rx = self.close_tx.subscribe();
376 tokio::spawn(async move {
377 if rx.recv().await.is_ok() {
378 handler().await;
379 }
380 });
381 }
382
383 /// Emit a frame sent event (internal use).
384 pub(crate) fn emit_frame_sent(&self, frame: WebSocketFrame) {
385 let _ = self.frame_sent_tx.send(frame);
386 }
387
388 /// Emit a frame received event (internal use).
389 pub(crate) fn emit_frame_received(&self, frame: WebSocketFrame) {
390 let _ = self.frame_received_tx.send(frame);
391 }
392
393 /// Mark the WebSocket as closed and emit close event (internal use).
394 pub(crate) fn mark_closed(&self) {
395 self.is_closed.store(true, Ordering::SeqCst);
396 let _ = self.close_tx.send(());
397 }
398}
399
400/// A WebSocket message frame.
401#[derive(Debug, Clone)]
402pub struct WebSocketFrame {
403 /// The frame opcode (1 for text, 2 for binary).
404 opcode: u8,
405 /// The frame payload data.
406 payload_data: String,
407}
408
409impl WebSocketFrame {
410 /// Create a new WebSocket frame.
411 pub(crate) fn new(opcode: u8, payload_data: String) -> Self {
412 Self {
413 opcode,
414 payload_data,
415 }
416 }
417
418 /// Create a WebSocket frame from CDP frame data.
419 pub(crate) fn from_cdp(cdp_frame: &CdpWebSocketFrame) -> Self {
420 Self {
421 opcode: cdp_frame.opcode as u8,
422 payload_data: cdp_frame.payload_data.clone(),
423 }
424 }
425
426 /// Get the frame opcode.
427 ///
428 /// Common opcodes:
429 /// - 1: Text frame
430 /// - 2: Binary frame
431 /// - 8: Close frame
432 /// - 9: Ping frame
433 /// - 10: Pong frame
434 pub fn opcode(&self) -> u8 {
435 self.opcode
436 }
437
438 /// Get the frame payload data.
439 pub fn payload(&self) -> &str {
440 &self.payload_data
441 }
442
443 /// Check if this is a text frame.
444 pub fn is_text(&self) -> bool {
445 self.opcode == 1
446 }
447
448 /// Check if this is a binary frame.
449 pub fn is_binary(&self) -> bool {
450 self.opcode == 2
451 }
452}
453
454/// Type alias for the WebSocket event handler function.
455pub type WebSocketEventHandler =
456 Box<dyn Fn(WebSocket) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
457
458/// Manager for WebSocket events on a page.
459pub struct WebSocketManager {
460 /// CDP connection.
461 connection: Arc<CdpConnection>,
462 /// Session ID.
463 session_id: String,
464 /// Active WebSocket connections indexed by request ID.
465 websockets: Arc<RwLock<HashMap<String, WebSocket>>>,
466 /// WebSocket created event handler.
467 handler: Arc<RwLock<Option<WebSocketEventHandler>>>,
468 /// Whether the manager is listening for events.
469 is_listening: AtomicBool,
470}
471
472impl WebSocketManager {
473 /// Create a new WebSocket manager for a page.
474 pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
475 Self {
476 connection,
477 session_id,
478 websockets: Arc::new(RwLock::new(HashMap::new())),
479 handler: Arc::new(RwLock::new(None)),
480 is_listening: AtomicBool::new(false),
481 }
482 }
483
484 /// Set a handler for WebSocket created events.
485 pub async fn set_handler<F, Fut>(&self, handler: F)
486 where
487 F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
488 Fut: Future<Output = ()> + Send + 'static,
489 {
490 let boxed_handler: WebSocketEventHandler = Box::new(move |ws| Box::pin(handler(ws)));
491 let mut h = self.handler.write().await;
492 *h = Some(boxed_handler);
493
494 // Start listening for events if not already
495 self.start_listening().await;
496 }
497
498 /// Remove the WebSocket handler.
499 pub async fn remove_handler(&self) {
500 let mut h = self.handler.write().await;
501 *h = None;
502 }
503
504 /// Start listening for WebSocket CDP events.
505 async fn start_listening(&self) {
506 if self.is_listening.swap(true, Ordering::SeqCst) {
507 // Already listening
508 return;
509 }
510
511 let mut events = self.connection.subscribe_events();
512 let session_id = self.session_id.clone();
513 let websockets = self.websockets.clone();
514 let handler = self.handler.clone();
515
516 tokio::spawn(async move {
517 debug!("WebSocket manager started listening for events");
518
519 while let Ok(event) = events.recv().await {
520 // Filter events for this session
521 if event.session_id.as_deref() != Some(&session_id) {
522 continue;
523 }
524
525 match event.method.as_str() {
526 "Network.webSocketCreated" => {
527 if let Some(params) = &event.params {
528 if let Ok(created) =
529 serde_json::from_value::<WebSocketCreatedEvent>(params.clone())
530 {
531 trace!(
532 "WebSocket created: {} -> {}",
533 created.request_id, created.url
534 );
535
536 let ws = WebSocket::new(created.request_id.clone(), created.url);
537
538 // Store the WebSocket
539 {
540 let mut sockets = websockets.write().await;
541 sockets.insert(created.request_id, ws.clone());
542 }
543
544 // Call the handler
545 let h = handler.read().await;
546 if let Some(ref handler_fn) = *h {
547 handler_fn(ws).await;
548 }
549 }
550 }
551 }
552 "Network.webSocketClosed" => {
553 if let Some(params) = &event.params {
554 if let Ok(closed) =
555 serde_json::from_value::<WebSocketClosedEvent>(params.clone())
556 {
557 trace!("WebSocket closed: {}", closed.request_id);
558
559 let sockets = websockets.read().await;
560 if let Some(ws) = sockets.get(&closed.request_id) {
561 ws.mark_closed();
562 }
563 }
564 }
565 }
566 "Network.webSocketFrameSent" => {
567 if let Some(params) = &event.params {
568 if let Ok(frame_event) =
569 serde_json::from_value::<WebSocketFrameSentEvent>(params.clone())
570 {
571 trace!("WebSocket frame sent: {}", frame_event.request_id);
572
573 let sockets = websockets.read().await;
574 if let Some(ws) = sockets.get(&frame_event.request_id) {
575 let frame = WebSocketFrame::from_cdp(&frame_event.response);
576 ws.emit_frame_sent(frame);
577 }
578 }
579 }
580 }
581 "Network.webSocketFrameReceived" => {
582 if let Some(params) = &event.params {
583 if let Ok(frame_event) = serde_json::from_value::<
584 WebSocketFrameReceivedEvent,
585 >(params.clone())
586 {
587 trace!("WebSocket frame received: {}", frame_event.request_id);
588
589 let sockets = websockets.read().await;
590 if let Some(ws) = sockets.get(&frame_event.request_id) {
591 let frame = WebSocketFrame::from_cdp(&frame_event.response);
592 ws.emit_frame_received(frame);
593 }
594 }
595 }
596 }
597 _ => {}
598 }
599 }
600
601 debug!("WebSocket manager stopped listening");
602 });
603 }
604
605 /// Get a WebSocket by request ID.
606 pub async fn get(&self, request_id: &str) -> Option<WebSocket> {
607 let sockets = self.websockets.read().await;
608 sockets.get(request_id).cloned()
609 }
610
611 /// Get all active `WebSockets`.
612 pub async fn all(&self) -> Vec<WebSocket> {
613 let sockets = self.websockets.read().await;
614 sockets.values().cloned().collect()
615 }
616}
617
618#[cfg(test)]
619mod tests;