viewpoint_core/network/websocket/
mod.rs

1//! WebSocket monitoring and testing.
2//!
3//! This module provides functionality for monitoring WebSocket connections,
4//! including frame events for sent and received messages. Use this to test
5//! real-time features without polling.
6//!
7//! # Strategy for Testing Dynamic WebSocket Content
8//!
9//! Testing WebSocket-driven dynamic content requires:
10//! 1. Set up WebSocket listeners **before** navigation
11//! 2. Navigate to the page that establishes WebSocket connections
12//! 3. Use Viewpoint's auto-waiting locator assertions to verify DOM updates
13//! 4. Optionally capture WebSocket messages for detailed verification
14//!
15//! ## Verifying WebSocket Data Updates in the DOM (Without Polling)
16//!
17//! The key insight is to use Viewpoint's built-in auto-waiting assertions
18//! (`expect(locator).to_have_text()`, `to_be_visible()`, etc.) which automatically
19//! wait for conditions without manual polling:
20//!
21//! ```ignore
22//! use viewpoint_core::Browser;
23//! use viewpoint_test::expect;  // from viewpoint-test crate
24//!
25//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26//! let browser = Browser::launch().headless(true).launch().await?;
27//! let context = browser.new_context().await?;
28//! let page = context.new_page().await?;
29//!
30//! // Navigate to page with WebSocket-driven live data
31//! page.goto("https://example.com/live-dashboard").goto().await?;
32//!
33//! // Auto-waiting assertions verify DOM updates without polling!
34//! // These wait up to 30 seconds for the condition to be true
35//!
36//! // Verify that live data container becomes visible
37//! expect(page.locator(".live-data-container")).to_be_visible().await?;
38//!
39//! // Verify that WebSocket data rendered specific text content
40//! expect(page.locator(".stock-price")).to_contain_text("$").await?;
41//!
42//! // Verify multiple data points updated via WebSocket
43//! expect(page.locator(".connection-status")).to_have_text("Connected").await?;
44//! expect(page.locator(".last-update")).not().to_be_empty().await?;
45//!
46//! // Verify a list populated by WebSocket messages
47//! expect(page.locator(".message-list li")).to_have_count_greater_than(0).await?;
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! ## Capturing and Verifying Specific WebSocket Messages
53//!
54//! For more detailed verification, capture WebSocket frames and correlate
55//! with DOM state:
56//!
57//! ```ignore
58//! use viewpoint_core::Browser;
59//! use viewpoint_test::expect;
60//! use std::sync::Arc;
61//! use tokio::sync::Mutex;
62//!
63//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
64//! let browser = Browser::launch().headless(true).launch().await?;
65//! let context = browser.new_context().await?;
66//! let page = context.new_page().await?;
67//!
68//! // Capture WebSocket messages for verification
69//! let received_messages: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
70//! let messages_clone = received_messages.clone();
71//!
72//! // Set up WebSocket monitoring BEFORE navigation
73//! page.on_websocket(move |ws| {
74//!     let messages = messages_clone.clone();
75//!     async move {
76//!         println!("WebSocket connected: {}", ws.url());
77//!         
78//!         // Capture all received frames
79//!         ws.on_framereceived(move |frame| {
80//!             let messages = messages.clone();
81//!             async move {
82//!                 if frame.is_text() {
83//!                     messages.lock().await.push(frame.payload().to_string());
84//!                 }
85//!             }
86//!         }).await;
87//!     }
88//! }).await;
89//!
90//! // Navigate to the page
91//! page.goto("https://example.com/realtime-chat").goto().await?;
92//!
93//! // Wait for WebSocket data to be reflected in the DOM
94//! // The auto-waiting assertion handles timing without polling
95//! expect(page.locator(".chat-messages")).to_be_visible().await?;
96//! expect(page.locator(".chat-message")).to_have_count_greater_than(0).await?;
97//!
98//! // Verify the DOM content matches what was received via WebSocket
99//! let messages = received_messages.lock().await;
100//! if !messages.is_empty() {
101//!     // Parse the WebSocket message (assuming JSON)
102//!     let first_msg = &messages[0];
103//!     if first_msg.contains("\"text\":") {
104//!         // Verify the message text appears in the DOM
105//!         let msg_text = page.locator(".chat-message").first().text_content().await?;
106//!         assert!(msg_text.is_some(), "Message should be rendered in DOM");
107//!     }
108//! }
109//! # Ok(())
110//! # }
111//! ```
112//!
113//! ## Waiting for Specific WebSocket Events Before DOM Verification
114//!
115//! Use synchronization primitives to coordinate between WebSocket events
116//! and DOM assertions:
117//!
118//! ```ignore
119//! use viewpoint_core::Browser;
120//! use viewpoint_test::expect;
121//! use std::sync::Arc;
122//! use std::sync::atomic::{AtomicBool, Ordering};
123//! use tokio::sync::Notify;
124//!
125//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
126//! let browser = Browser::launch().headless(true).launch().await?;
127//! let context = browser.new_context().await?;
128//! let page = context.new_page().await?;
129//!
130//! // Use Notify to signal when specific data arrives
131//! let data_ready = Arc::new(Notify::new());
132//! let data_ready_clone = data_ready.clone();
133//!
134//! page.on_websocket(move |ws| {
135//!     let notify = data_ready_clone.clone();
136//!     async move {
137//!         ws.on_framereceived(move |frame| {
138//!             let notify = notify.clone();
139//!             async move {
140//!                 // Signal when we receive the expected data
141//!                 if frame.payload().contains("\"status\":\"ready\"") {
142//!                     notify.notify_one();
143//!                 }
144//!             }
145//!         }).await;
146//!     }
147//! }).await;
148//!
149//! page.goto("https://example.com/app").goto().await?;
150//!
151//! // Wait for the specific WebSocket message (with timeout)
152//! tokio::select! {
153//!     _ = data_ready.notified() => {
154//!         // Data arrived, now verify DOM reflects it
155//!         expect(page.locator(".status-indicator")).to_have_text("Ready").await?;
156//!         expect(page.locator(".data-panel")).to_be_visible().await?;
157//!     }
158//!     _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
159//!         panic!("Timeout waiting for WebSocket data");
160//!     }
161//! }
162//! # Ok(())
163//! # }
164//! ```
165//!
166//! ## Complete Example: Testing a Real-Time Stock Ticker
167//!
168//! ```ignore
169//! use viewpoint_core::Browser;
170//! use viewpoint_test::{TestHarness, expect};
171//! use std::sync::Arc;
172//! use tokio::sync::Mutex;
173//!
174//! #[tokio::test]
175//! async fn test_stock_ticker_updates() -> Result<(), Box<dyn std::error::Error>> {
176//!     let harness = TestHarness::new().await?;
177//!     let page = harness.page();
178//!
179//!     // Track stock updates received via WebSocket
180//!     let stock_updates: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
181//!     let updates_clone = stock_updates.clone();
182//!
183//!     // Monitor WebSocket for stock price updates
184//!     page.on_websocket(move |ws| {
185//!         let updates = updates_clone.clone();
186//!         async move {
187//!             if ws.url().contains("stock-feed") {
188//!                 ws.on_framereceived(move |frame| {
189//!                     let updates = updates.clone();
190//!                     async move {
191//!                         updates.lock().await.push(frame.payload().to_string());
192//!                     }
193//!                 }).await;
194//!             }
195//!         }
196//!     }).await;
197//!
198//!     // Navigate to the stock ticker page
199//!     page.goto("https://example.com/stocks").goto().await?;
200//!
201//!     // Verify the ticker display updates (auto-waits for DOM changes)
202//!     expect(page.locator(".stock-ticker")).to_be_visible().await?;
203//!     expect(page.locator(".stock-price")).to_contain_text("$").await?;
204//!     
205//!     // Verify connection indicator shows live status
206//!     expect(page.locator(".connection-status"))
207//!         .to_have_text("Live")
208//!         .await?;
209//!
210//!     // Verify at least one price update was rendered
211//!     expect(page.locator(".price-change")).not().to_be_empty().await?;
212//!
213//!     // Confirm WebSocket messages were received
214//!     let updates = stock_updates.lock().await;
215//!     assert!(!updates.is_empty(), "Should have received stock updates via WebSocket");
216//!
217//!     Ok(())
218//! }
219//! ```
220
221// Allow dead code for websocket monitoring scaffolding (spec: network-events)
222
223mod event_listener;
224mod frame;
225
226pub use frame::WebSocketFrame;
227
228use std::collections::HashMap;
229use std::future::Future;
230use std::pin::Pin;
231use std::sync::Arc;
232use std::sync::atomic::{AtomicBool, Ordering};
233
234use tokio::sync::{RwLock, broadcast};
235use viewpoint_cdp::CdpConnection;
236
237/// A WebSocket connection being monitored.
238///
239/// This struct represents an active WebSocket connection and provides
240/// methods to register handlers for frame events.
241#[derive(Clone)]
242pub struct WebSocket {
243    /// The request ID identifying this WebSocket.
244    request_id: String,
245    /// The WebSocket URL.
246    url: String,
247    /// Whether the WebSocket is closed.
248    is_closed: Arc<AtomicBool>,
249    /// Frame sent event broadcaster.
250    frame_sent_tx: broadcast::Sender<WebSocketFrame>,
251    /// Frame received event broadcaster.
252    frame_received_tx: broadcast::Sender<WebSocketFrame>,
253    /// Close event broadcaster.
254    close_tx: broadcast::Sender<()>,
255}
256
257impl std::fmt::Debug for WebSocket {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        f.debug_struct("WebSocket")
260            .field("request_id", &self.request_id)
261            .field("url", &self.url)
262            .field("is_closed", &self.is_closed.load(Ordering::SeqCst))
263            .finish()
264    }
265}
266
267impl WebSocket {
268    /// Create a new WebSocket instance.
269    pub(crate) fn new(request_id: String, url: String) -> Self {
270        let (frame_sent_tx, _) = broadcast::channel(256);
271        let (frame_received_tx, _) = broadcast::channel(256);
272        let (close_tx, _) = broadcast::channel(16);
273
274        Self {
275            request_id,
276            url,
277            is_closed: Arc::new(AtomicBool::new(false)),
278            frame_sent_tx,
279            frame_received_tx,
280            close_tx,
281        }
282    }
283
284    /// Get the WebSocket URL.
285    pub fn url(&self) -> &str {
286        &self.url
287    }
288
289    /// Check if the WebSocket is closed.
290    pub fn is_closed(&self) -> bool {
291        self.is_closed.load(Ordering::SeqCst)
292    }
293
294    /// Get the request ID for this WebSocket.
295    pub fn request_id(&self) -> &str {
296        &self.request_id
297    }
298
299    /// Register a handler for frame sent events.
300    ///
301    /// The handler will be called whenever a frame is sent over this WebSocket.
302    ///
303    /// # Example
304    ///
305    /// ```no_run
306    /// use viewpoint_core::WebSocket;
307    ///
308    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
309    /// websocket.on_framesent(|frame| async move {
310    ///     println!("Sent: {:?}", frame.payload());
311    /// }).await;
312    /// # Ok(())
313    /// # }
314    pub async fn on_framesent<F, Fut>(&self, handler: F)
315    where
316        F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
317        Fut: Future<Output = ()> + Send + 'static,
318    {
319        let mut rx = self.frame_sent_tx.subscribe();
320        tokio::spawn(async move {
321            while let Ok(frame) = rx.recv().await {
322                handler(frame).await;
323            }
324        });
325    }
326
327    /// Register a handler for frame received events.
328    ///
329    /// The handler will be called whenever a frame is received on this WebSocket.
330    ///
331    /// # Example
332    ///
333    /// ```no_run
334    /// use viewpoint_core::WebSocket;
335    ///
336    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
337    /// websocket.on_framereceived(|frame| async move {
338    ///     println!("Received: {:?}", frame.payload());
339    /// }).await;
340    /// # Ok(())
341    /// # }
342    pub async fn on_framereceived<F, Fut>(&self, handler: F)
343    where
344        F: Fn(WebSocketFrame) -> Fut + Send + Sync + 'static,
345        Fut: Future<Output = ()> + Send + 'static,
346    {
347        let mut rx = self.frame_received_tx.subscribe();
348        tokio::spawn(async move {
349            while let Ok(frame) = rx.recv().await {
350                handler(frame).await;
351            }
352        });
353    }
354
355    /// Register a handler for WebSocket close events.
356    ///
357    /// The handler will be called when this WebSocket connection is closed.
358    ///
359    /// # Example
360    ///
361    /// ```no_run
362    /// use viewpoint_core::WebSocket;
363    ///
364    /// # async fn example(websocket: WebSocket) -> Result<(), viewpoint_core::CoreError> {
365    /// websocket.on_close(|| async {
366    ///     println!("WebSocket closed");
367    /// }).await;
368    /// # Ok(())
369    /// # }
370    pub async fn on_close<F, Fut>(&self, handler: F)
371    where
372        F: Fn() -> Fut + Send + Sync + 'static,
373        Fut: Future<Output = ()> + Send + 'static,
374    {
375        let mut rx = self.close_tx.subscribe();
376        tokio::spawn(async move {
377            if rx.recv().await.is_ok() {
378                handler().await;
379            }
380        });
381    }
382
383    /// Emit a frame sent event (internal use).
384    pub(crate) fn emit_frame_sent(&self, frame: WebSocketFrame) {
385        let _ = self.frame_sent_tx.send(frame);
386    }
387
388    /// Emit a frame received event (internal use).
389    pub(crate) fn emit_frame_received(&self, frame: WebSocketFrame) {
390        let _ = self.frame_received_tx.send(frame);
391    }
392
393    /// Mark the WebSocket as closed and emit close event (internal use).
394    pub(crate) fn mark_closed(&self) {
395        self.is_closed.store(true, Ordering::SeqCst);
396        let _ = self.close_tx.send(());
397    }
398}
399
400/// Type alias for the WebSocket event handler function.
401pub type WebSocketEventHandler =
402    Box<dyn Fn(WebSocket) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
403
404/// Manager for WebSocket events on a page.
405pub struct WebSocketManager {
406    /// CDP connection.
407    connection: Arc<CdpConnection>,
408    /// Session ID.
409    session_id: String,
410    /// Active WebSocket connections indexed by request ID.
411    websockets: Arc<RwLock<HashMap<String, WebSocket>>>,
412    /// WebSocket created event handler.
413    handler: Arc<RwLock<Option<WebSocketEventHandler>>>,
414    /// Whether the manager is listening for events.
415    is_listening: AtomicBool,
416}
417
418impl WebSocketManager {
419    /// Create a new WebSocket manager for a page.
420    pub fn new(connection: Arc<CdpConnection>, session_id: String) -> Self {
421        Self {
422            connection,
423            session_id,
424            websockets: Arc::new(RwLock::new(HashMap::new())),
425            handler: Arc::new(RwLock::new(None)),
426            is_listening: AtomicBool::new(false),
427        }
428    }
429
430    /// Set a handler for WebSocket created events.
431    pub async fn set_handler<F, Fut>(&self, handler: F)
432    where
433        F: Fn(WebSocket) -> Fut + Send + Sync + 'static,
434        Fut: Future<Output = ()> + Send + 'static,
435    {
436        let boxed_handler: WebSocketEventHandler = Box::new(move |ws| Box::pin(handler(ws)));
437        let mut h = self.handler.write().await;
438        *h = Some(boxed_handler);
439
440        // Start listening for events if not already
441        self.start_listening().await;
442    }
443
444    /// Remove the WebSocket handler.
445    pub async fn remove_handler(&self) {
446        let mut h = self.handler.write().await;
447        *h = None;
448    }
449
450    /// Start listening for WebSocket CDP events.
451    async fn start_listening(&self) {
452        if self.is_listening.swap(true, Ordering::SeqCst) {
453            // Already listening
454            return;
455        }
456
457        event_listener::spawn_event_listener(
458            self.connection.clone(),
459            self.session_id.clone(),
460            self.websockets.clone(),
461            self.handler.clone(),
462        );
463    }
464
465    /// Get a WebSocket by request ID.
466    pub async fn get(&self, request_id: &str) -> Option<WebSocket> {
467        let sockets = self.websockets.read().await;
468        sockets.get(request_id).cloned()
469    }
470
471    /// Get all active `WebSockets`.
472    pub async fn all(&self) -> Vec<WebSocket> {
473        let sockets = self.websockets.read().await;
474        sockets.values().cloned().collect()
475    }
476}
477
478#[cfg(test)]
479mod tests;