viewpoint_core/network/websocket/
mod.rs

1//! WebSocket monitoring and testing.
2//!
3//! This module provides functionality for monitoring WebSocket connections,
4//! including frame events for sent and received messages. Use this to test
5//! real-time features without polling.
6//!
7//! # Strategy for Testing Dynamic WebSocket Content
8//!
9//! Testing WebSocket-driven dynamic content requires:
10//! 1. Set up WebSocket listeners **before** navigation
11//! 2. Navigate to the page that establishes WebSocket connections
12//! 3. Use Viewpoint's auto-waiting locator assertions to verify DOM updates
13//! 4. Optionally capture WebSocket messages for detailed verification
14//!
15//! ## Verifying WebSocket Data Updates in the DOM (Without Polling)
16//!
17//! The key insight is to use Viewpoint's built-in auto-waiting assertions
18//! (`expect(locator).to_have_text()`, `to_be_visible()`, etc.) which automatically
19//! wait for conditions without manual polling:
20//!
21//! ```ignore
22//! use viewpoint_core::Browser;
23//! use viewpoint_test::expect;  // from viewpoint-test crate
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! let browser = Browser::launch().headless(true).launch().await?;
27//! let context = browser.new_context().await?;
28//! let page = context.new_page().await?;
29//!
30//! // Navigate to page with WebSocket-driven live data
31//! page.goto("https://example.com/live-dashboard").goto().await?;
32//!
33//! // Auto-waiting assertions verify DOM updates without polling!
34//! // These wait up to 30 seconds for the condition to be true
35//!
36//! // Verify that live data container becomes visible
37//! expect(page.locator(".live-data-container")).to_be_visible().await?;
38//!
39//! // Verify that WebSocket data rendered specific text content
40//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
41//!
42//! // Verify multiple data points updated via WebSocket
43//! expect(page.locator(".connection-status")).to_have_text("Connected").await?;
44//! expect(page.locator(".last-update")).not().to_be_empty().await?;
45//!
46//! // Verify a list populated by WebSocket messages
47//! expect(page.locator(".message-list li")).to_have_count_greater_than(0).await?;
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Capturing and Verifying Specific WebSocket Messages
53//!
54//! For more detailed verification, capture WebSocket frames and correlate
55//! with DOM state:
56//!
57//! ```ignore
58//! use viewpoint_core::Browser;
59//! use viewpoint_test::expect;
60//! use std::sync::Arc;
61//! use tokio::sync::Mutex;
62//!
63//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64//! let browser = Browser::launch().headless(true).launch().await?;
65//! let context = browser.new_context().await?;
66//! let page = context.new_page().await?;
67//!
68//! // Capture WebSocket messages for verification
69//! let received_messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
70//! let messages_clone = received_messages.clone();
71//!
72//! // Set up WebSocket monitoring BEFORE navigation
73//! page.on_websocket(move |ws| {
74//!     let messages = messages_clone.clone();
75//!     async move {
76//!         println!("WebSocket connected: {}", ws.url());
77//!         
78//!         // Capture all received frames
79//!         ws.on_framereceived(move |frame| {
80//!             let messages = messages.clone();
81//!             async move {
82//!                 if frame.is_text() {
83//!                     messages.lock().await.push(frame.payload().to_string());
84//!                 }
85//!             }
86//!         }).await;
87//!     }
88//! }).await;
89//!
90//! // Navigate to the page
91//! page.goto("https://example.com/realtime-chat").goto().await?;
92//!
93//! // Wait for WebSocket data to be reflected in the DOM
94//! // The auto-waiting assertion handles timing without polling
95//! expect(page.locator(".chat-messages")).to_be_visible().await?;
96//! expect(page.locator(".chat-message")).to_have_count_greater_than(0).await?;
97//!
98//! // Verify the DOM content matches what was received via WebSocket
99//! let messages = received_messages.lock().await;
100//! if !messages.is_empty() {
101//!     // Parse the WebSocket message (assuming JSON)
102//!     let first_msg = &messages[0];
103//!     if first_msg.contains("\"text\":") {
104//!         // Verify the message text appears in the DOM
105//!         let msg_text = page.locator(".chat-message").first().text_content().await?;
106//!         assert!(msg_text.is_some(), "Message should be rendered in DOM");
107//!     }
108//! }
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ## Waiting for Specific WebSocket Events Before DOM Verification
114//!
115//! Use synchronization primitives to coordinate between WebSocket events
116//! and DOM assertions:
117//!
118//! ```ignore
119//! use viewpoint_core::Browser;
120//! use viewpoint_test::expect;
121//! use std::sync::Arc;
122//! use std::sync::atomic::{AtomicBool, Ordering};
123//! use tokio::sync::Notify;
124//!
125//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
126//! let browser = Browser::launch().headless(true).launch().await?;
127//! let context = browser.new_context().await?;
128//! let page = context.new_page().await?;
129//!
130//! // Use Notify to signal when specific data arrives
131//! let data_ready = Arc::new(Notify::new());
132//! let data_ready_clone = data_ready.clone();
133//!
134//! page.on_websocket(move |ws| {
135//!     let notify = data_ready_clone.clone();
136//!     async move {
137//!         ws.on_framereceived(move |frame| {
138//!             let notify = notify.clone();
139//!             async move {
140//!                 // Signal when we receive the expected data
141//!                 if frame.payload().contains("\"status\":\"ready\"") {
142//!                     notify.notify_one();
143//!                 }
144//!             }
145//!         }).await;
146//!     }
147//! }).await;
148//!
149//! page.goto("https://example.com/app").goto().await?;
150//!
151//! // Wait for the specific WebSocket message (with timeout)
152//! tokio::select! {
153//!     _ = data_ready.notified() => {
154//!         // Data arrived, now verify DOM reflects it
155//!         expect(page.locator(".status-indicator")).to_have_text("Ready").await?;
156//!         expect(page.locator(".data-panel")).to_be_visible().await?;
157//!     }
158//!     _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
159//!         panic!("Timeout waiting for WebSocket data");
160//!     }
161//! }
162//! # Ok(())
163//! # }
164//! ```
165//!
166//! ## Complete Example: Testing a Real-Time Stock Ticker
167//!
168//! ```ignore
169//! use viewpoint_core::Browser;
170//! use viewpoint_test::{TestHarness, expect};
171//! use std::sync::Arc;
172//! use tokio::sync::Mutex;
173//!
174//! #[tokio::test]
175//! async fn test_stock_ticker_updates() -> Result<(), Box<dyn std::error::Error>> {
176//!     let harness = TestHarness::new().await?;
177//!     let page = harness.page();
178//!
179//!     // Track stock updates received via WebSocket
180//!     let stock_updates: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
181//!     let updates_clone = stock_updates.clone();
182//!
183//!     // Monitor WebSocket for stock price updates
184//!     page.on_websocket(move |ws| {
185//!         let updates = updates_clone.clone();
186//!         async move {
187//!             if ws.url().contains("stock-feed") {
188//!                 ws.on_framereceived(move |frame| {
189//!                     let updates = updates.clone();
190//!                     async move {
191//!                         updates.lock().await.push(frame.payload().to_string());
192//!                     }
193//!                 }).await;
194//!             }
195//!         }
196//!     }).await;
197//!
198//!     // Navigate to the stock ticker page
199//!     page.goto("https://example.com/stocks").goto().await?;
200//!
201//!     // Verify the ticker display updates (auto-waits for DOM changes)
202//!     expect(page.locator(".stock-ticker")).to_be_visible().await?;
203//!     expect(page.locator(".stock-price")).to_contain_text("$").await?;
204//!     
205//!     // Verify connection indicator shows live status
206//!     expect(page.locator(".connection-status"))
207//!         .to_have_text("Live")
208//!         .await?;
209//!
210//!     // Verify at least one price update was rendered
211//!     expect(page.locator(".price-change")).not().to_be_empty().await?;
212//!
213//!     // Confirm WebSocket messages were received
214//!     let updates = stock_updates.lock().await;
215//!     assert!(!updates.is_empty(), "Should have received stock updates via WebSocket");
216//!
217//!     Ok(())
218//! }
219//! ```
220
221// Allow dead code for websocket monitoring scaffolding (spec: network-events)
222
223mod event_listener;
224
225use std::collections::HashMap;
226use std::future::Future;
227use std::pin::Pin;
228use std::sync::Arc;
229use std::sync::atomic::{AtomicBool, Ordering};
230
231use tokio::sync::{RwLock, broadcast};
232use viewpoint_cdp::CdpConnection;
233use viewpoint_cdp::protocol::WebSocketFrame as CdpWebSocketFrame;
234
235/// A WebSocket connection being monitored.
236///
237/// This struct represents an active WebSocket connection and provides
238/// methods to register handlers for frame events.
239#[derive(Clone)]
240pub struct WebSocket {
241    /// The request ID identifying this WebSocket.
242    request_id: String,
243    /// The WebSocket URL.
244    url: String,
245    /// Whether the WebSocket is closed.
246    is_closed: Arc<AtomicBool>,
247    /// Frame sent event broadcaster.
248    frame_sent_tx: broadcast::Sender<WebSocketFrame>,
249    /// Frame received event broadcaster.
250    frame_received_tx: broadcast::Sender<WebSocketFrame>,
251    /// Close event broadcaster.
252    close_tx: broadcast::Sender<()>,
253}
254
255impl std::fmt::Debug for WebSocket {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        f.debug_struct("WebSocket")
258            .field("request_id", &self.request_id)
259            .field("url", &self.url)
260            .field("is_closed", &self.is_closed.load(Ordering::SeqCst))
261            .finish()
262    }
263}
264
265impl WebSocket {
266    /// Create a new WebSocket instance.
267    pub(crate) fn new(request_id: String, url: String) -> Self {
268        let (frame_sent_tx, _) = broadcast::channel(256);
269        let (frame_received_tx, _) = broadcast::channel(256);
270        let (close_tx, _) = broadcast::channel(16);
271
272        Self {
273            request_id,
274            url,
275            is_closed: Arc::new(AtomicBool::new(false)),
276            frame_sent_tx,
277            frame_received_tx,
278            close_tx,
279        }
280    }
281
282    /// Get the WebSocket URL.
283    pub fn url(&self) -> &str {
284        &self.url
285    }
286
287    /// Check if the WebSocket is closed.
288    pub fn is_closed(&self) -> bool {
289        self.is_closed.load(Ordering::SeqCst)
290    }
291
292    /// Get the request ID for this WebSocket.
293    pub fn request_id(&self) -> &str {
294        &self.request_id
295    }
296
297    /// Register a handler for frame sent events.
298    ///
299    /// The handler will be called whenever a frame is sent over this WebSocket.
300    ///
301    /// # Example
302    ///
303    /// ```no_run
304    /// use viewpoint_core::WebSocket;
305    ///
306    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
307    /// websocket.on_framesent(|frame| async move {
308    ///     println!("Sent: {:?}", frame.payload());
309    /// }).await;
310    /// # Ok(())
311    /// # }
312    pub async fn on_framesent<F, Fut>(&self, handler: F)
313    where
314        F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
315        Fut: Future<Output = ()> + Send + 'static,
316    {
317        let mut rx = self.frame_sent_tx.subscribe();
318        tokio::spawn(async move {
319            while let Ok(frame) = rx.recv().await {
320                handler(frame).await;
321            }
322        });
323    }
324
325    /// Register a handler for frame received events.
326    ///
327    /// The handler will be called whenever a frame is received on this WebSocket.
328    ///
329    /// # Example
330    ///
331    /// ```no_run
332    /// use viewpoint_core::WebSocket;
333    ///
334    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
335    /// websocket.on_framereceived(|frame| async move {
336    ///     println!("Received: {:?}", frame.payload());
337    /// }).await;
338    /// # Ok(())
339    /// # }
340    pub async fn on_framereceived<F, Fut>(&self, handler: F)
341    where
342        F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
343        Fut: Future<Output = ()> + Send + 'static,
344    {
345        let mut rx = self.frame_received_tx.subscribe();
346        tokio::spawn(async move {
347            while let Ok(frame) = rx.recv().await {
348                handler(frame).await;
349            }
350        });
351    }
352
353    /// Register a handler for WebSocket close events.
354    ///
355    /// The handler will be called when this WebSocket connection is closed.
356    ///
357    /// # Example
358    ///
359    /// ```no_run
360    /// use viewpoint_core::WebSocket;
361    ///
362    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
363    /// websocket.on_close(|| async {
364    ///     println!("WebSocket closed");
365    /// }).await;
366    /// # Ok(())
367    /// # }
368    pub async fn on_close<F, Fut>(&self, handler: F)
369    where
370        F: Fn() -> Fut + Send + Sync + 'static,
371        Fut: Future<Output = ()> + Send + 'static,
372    {
373        let mut rx = self.close_tx.subscribe();
374        tokio::spawn(async move {
375            if rx.recv().await.is_ok() {
376                handler().await;
377            }
378        });
379    }
380
381    /// Emit a frame sent event (internal use).
382    pub(crate) fn emit_frame_sent(&self, frame: WebSocketFrame) {
383        let _ = self.frame_sent_tx.send(frame);
384    }
385
386    /// Emit a frame received event (internal use).
387    pub(crate) fn emit_frame_received(&self, frame: WebSocketFrame) {
388        let _ = self.frame_received_tx.send(frame);
389    }
390
391    /// Mark the WebSocket as closed and emit close event (internal use).
392    pub(crate) fn mark_closed(&self) {
393        self.is_closed.store(true, Ordering::SeqCst);
394        let _ = self.close_tx.send(());
395    }
396}
397
398/// A WebSocket message frame.
399#[derive(Debug, Clone)]
400pub struct WebSocketFrame {
401    /// The frame opcode (1 for text, 2 for binary).
402    opcode: u8,
403    /// The frame payload data.
404    payload_data: String,
405}
406
407impl WebSocketFrame {
408    /// Create a new WebSocket frame.
409    pub(crate) fn new(opcode: u8, payload_data: String) -> Self {
410        Self {
411            opcode,
412            payload_data,
413        }
414    }
415
416    /// Create a WebSocket frame from CDP frame data.
417    pub(crate) fn from_cdp(cdp_frame: &CdpWebSocketFrame) -> Self {
418        Self {
419            opcode: cdp_frame.opcode as u8,
420            payload_data: cdp_frame.payload_data.clone(),
421        }
422    }
423
424    /// Get the frame opcode.
425    ///
426    /// Common opcodes:
427    /// - 1: Text frame
428    /// - 2: Binary frame
429    /// - 8: Close frame
430    /// - 9: Ping frame
431    /// - 10: Pong frame
432    pub fn opcode(&self) -> u8 {
433        self.opcode
434    }
435
436    /// Get the frame payload data.
437    pub fn payload(&self) -> &str {
438        &self.payload_data
439    }
440
441    /// Check if this is a text frame.
442    pub fn is_text(&self) -> bool {
443        self.opcode == 1
444    }
445
446    /// Check if this is a binary frame.
447    pub fn is_binary(&self) -> bool {
448        self.opcode == 2
449    }
450}
451
452/// Type alias for the WebSocket event handler function.
453pub type WebSocketEventHandler =
454    Box<dyn Fn(WebSocket) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
455
456/// Manager for WebSocket events on a page.
457pub struct WebSocketManager {
458    /// CDP connection.
459    connection: Arc<CdpConnection>,
460    /// Session ID.
461    session_id: String,
462    /// Active WebSocket connections indexed by request ID.
463    websockets: Arc<RwLock<HashMap<String, WebSocket>>>,
464    /// WebSocket created event handler.
465    handler: Arc<RwLock<Option<WebSocketEventHandler>>>,
466    /// Whether the manager is listening for events.
467    is_listening: AtomicBool,
468}
469
470impl WebSocketManager {
471    /// Create a new WebSocket manager for a page.
472    pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
473        Self {
474            connection,
475            session_id,
476            websockets: Arc::new(RwLock::new(HashMap::new())),
477            handler: Arc::new(RwLock::new(None)),
478            is_listening: AtomicBool::new(false),
479        }
480    }
481
482    /// Set a handler for WebSocket created events.
483    pub async fn set_handler<F, Fut>(&self, handler: F)
484    where
485        F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
486        Fut: Future<Output = ()> + Send + 'static,
487    {
488        let boxed_handler: WebSocketEventHandler = Box::new(move |ws| Box::pin(handler(ws)));
489        let mut h = self.handler.write().await;
490        *h = Some(boxed_handler);
491
492        // Start listening for events if not already
493        self.start_listening().await;
494    }
495
496    /// Remove the WebSocket handler.
497    pub async fn remove_handler(&self) {
498        let mut h = self.handler.write().await;
499        *h = None;
500    }
501
502    /// Start listening for WebSocket CDP events.
503    async fn start_listening(&self) {
504        if self.is_listening.swap(true, Ordering::SeqCst) {
505            // Already listening
506            return;
507        }
508
509        event_listener::spawn_event_listener(
510            self.connection.clone(),
511            self.session_id.clone(),
512            self.websockets.clone(),
513            self.handler.clone(),
514        );
515    }
516
517    /// Get a WebSocket by request ID.
518    pub async fn get(&self, request_id: &str) -> Option<WebSocket> {
519        let sockets = self.websockets.read().await;
520        sockets.get(request_id).cloned()
521    }
522
523    /// Get all active `WebSockets`.
524    pub async fn all(&self) -> Vec<WebSocket> {
525        let sockets = self.websockets.read().await;
526        sockets.values().cloned().collect()
527    }
528}
529
530#[cfg(test)]
531mod tests;