viewpoint_cdp/connection/
mod.rs

1//! CDP WebSocket connection management.
2
3mod discovery;
4
5pub use discovery::{BrowserVersion, CdpConnectionOptions, discover_websocket_url};
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use futures_util::{SinkExt, StreamExt};
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use serde_json::Value;
16use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
17use tokio::time::timeout;
18use tokio_tungstenite::MaybeTlsStream;
19use tokio_tungstenite::tungstenite::Message;
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tracing::{debug, error, info, instrument, trace, warn};
22
23use crate::error::CdpError;
24use crate::transport::{CdpEvent, CdpMessage, CdpRequest, CdpResponse};
25
26/// Default timeout for CDP commands.
27const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
28
29/// Buffer size for the event broadcast channel.
30const EVENT_CHANNEL_SIZE: usize = 256;
31
32/// A CDP connection to a browser.
33#[derive(Debug)]
34pub struct CdpConnection {
35    /// Sender for outgoing messages.
36    tx: mpsc::Sender<CdpRequest>,
37    /// Receiver for incoming events.
38    event_rx: broadcast::Sender<CdpEvent>,
39    /// Pending responses waiting for completion.
40    pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>>,
41    /// Atomic counter for message IDs.
42    message_id: AtomicU64,
43    /// Handle to the background read task.
44    _read_handle: tokio::task::JoinHandle<()>,
45    /// Handle to the background write task.
46    _write_handle: tokio::task::JoinHandle<()>,
47}
48
49impl CdpConnection {
50    /// Connect to a CDP WebSocket endpoint.
51    ///
52    /// # Errors
53    ///
54    /// Returns an error if the WebSocket connection fails.
55    #[instrument(level = "info", skip(ws_url), fields(ws_url = %ws_url))]
56    pub async fn connect(ws_url: &str) -> Result<Self, CdpError> {
57        Self::connect_with_options(ws_url, &CdpConnectionOptions::default()).await
58    }
59
60    /// Connect to a CDP WebSocket endpoint with custom options.
61    ///
62    /// # Errors
63    ///
64    /// Returns an error if the WebSocket connection fails.
65    #[instrument(level = "info", skip(ws_url, options), fields(ws_url = %ws_url))]
66    pub async fn connect_with_options(
67        ws_url: &str,
68        options: &CdpConnectionOptions,
69    ) -> Result<Self, CdpError> {
70        info!("Connecting to CDP WebSocket endpoint");
71
72        // Build the WebSocket request with custom headers
73        let mut request =
74            ws_url
75                .into_client_request()
76                .map_err(|e: tokio_tungstenite::tungstenite::Error| {
77                    CdpError::InvalidUrl(format!("{ws_url}: {e}"))
78                })?;
79
80        // Add custom headers
81        for (name, value) in &options.headers {
82            let header_name = name
83                .parse::<tokio_tungstenite::tungstenite::http::HeaderName>()
84                .map_err(|e| CdpError::ConnectionFailed(format!("invalid header name: {e}")))?;
85            let header_value = value
86                .parse::<tokio_tungstenite::tungstenite::http::HeaderValue>()
87                .map_err(|e| CdpError::ConnectionFailed(format!("invalid header value: {e}")))?;
88            request.headers_mut().insert(header_name, header_value);
89        }
90
91        // Connect with optional timeout
92        type WsStream = tokio_tungstenite::WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
93        let connect_future = tokio_tungstenite::connect_async(request);
94        let (ws_stream, response): (WsStream, _) = if let Some(timeout_duration) = options.timeout {
95            timeout(timeout_duration, connect_future)
96                .await
97                .map_err(|_| CdpError::ConnectionTimeout(timeout_duration))?
98                .map_err(CdpError::from)?
99        } else {
100            connect_future.await?
101        };
102
103        info!(status = %response.status(), "WebSocket connection established");
104
105        let (write, read) = ws_stream.split();
106
107        // Channels for internal communication
108        let (tx, rx) = mpsc::channel::<CdpRequest>(64);
109        let (event_tx, _) = broadcast::channel::<CdpEvent>(EVENT_CHANNEL_SIZE);
110        let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>> =
111            Arc::new(Mutex::new(HashMap::new()));
112
113        // Spawn the write task
114        let write_handle = tokio::spawn(Self::write_loop(rx, write));
115        debug!("Spawned CDP write loop");
116
117        // Spawn the read task
118        let read_pending = pending.clone();
119        let read_event_tx = event_tx.clone();
120        let read_handle = tokio::spawn(Self::read_loop(read, read_pending, read_event_tx));
121        debug!("Spawned CDP read loop");
122
123        info!("CDP connection ready");
124        Ok(Self {
125            tx,
126            event_rx: event_tx,
127            pending,
128            message_id: AtomicU64::new(1),
129            _read_handle: read_handle,
130            _write_handle: write_handle,
131        })
132    }
133
134    /// Connect to a browser via HTTP endpoint URL.
135    ///
136    /// This method discovers the WebSocket URL from an HTTP endpoint like
137    /// `http://localhost:9222` by fetching `/json/version`.
138    ///
139    /// # Example
140    ///
141    /// ```no_run
142    /// use viewpoint_cdp::{CdpConnection, connection::CdpConnectionOptions};
143    /// use std::time::Duration;
144    ///
145    /// # async fn example() -> Result<(), viewpoint_cdp::CdpError> {
146    /// // Simple connection
147    /// let conn = CdpConnection::connect_via_http("http://localhost:9222").await?;
148    ///
149    /// // With custom options
150    /// let options = CdpConnectionOptions::new()
151    ///     .timeout(Duration::from_secs(10))
152    ///     .header("Authorization", "Bearer token");
153    /// let conn = CdpConnection::connect_via_http_with_options(
154    ///     "http://localhost:9222",
155    ///     options,
156    /// ).await?;
157    /// # Ok(())
158    /// # }
159    /// ```
160    ///
161    /// # Errors
162    ///
163    /// Returns an error if:
164    /// - The HTTP endpoint is unreachable
165    /// - The endpoint doesn't expose CDP
166    /// - The WebSocket connection fails
167    pub async fn connect_via_http(endpoint_url: &str) -> Result<Self, CdpError> {
168        Self::connect_via_http_with_options(endpoint_url, CdpConnectionOptions::default()).await
169    }
170
171    /// Connect to a browser via HTTP endpoint URL with custom options.
172    ///
173    /// # Errors
174    ///
175    /// Returns an error if discovery or connection fails.
176    #[instrument(level = "info", skip(options), fields(endpoint_url = %endpoint_url))]
177    pub async fn connect_via_http_with_options(
178        endpoint_url: &str,
179        options: CdpConnectionOptions,
180    ) -> Result<Self, CdpError> {
181        // Discover the WebSocket URL
182        let ws_url = discover_websocket_url(endpoint_url, &options).await?;
183
184        // Connect to the WebSocket
185        Self::connect_with_options(&ws_url, &options).await
186    }
187
188    /// Background task that writes CDP requests to the WebSocket.
189    async fn write_loop<S>(mut rx: mpsc::Receiver<CdpRequest>, mut sink: S)
190    where
191        S: futures_util::Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
192    {
193        debug!("CDP write loop started");
194        while let Some(request) = rx.recv().await {
195            let method = request.method.clone();
196            let id = request.id;
197
198            let json = match serde_json::to_string(&request) {
199                Ok(j) => j,
200                Err(e) => {
201                    error!(error = %e, method = %method, "Failed to serialize CDP request");
202                    continue;
203                }
204            };
205
206            trace!(id = id, method = %method, json_len = json.len(), "Sending CDP request");
207
208            if sink.send(Message::Text(json.into())).await.is_err() {
209                warn!("WebSocket sink closed, ending write loop");
210                break;
211            }
212
213            debug!(id = id, method = %method, "CDP request sent");
214        }
215        debug!("CDP write loop ended");
216    }
217
218    /// Background task that reads CDP messages from the WebSocket.
219    async fn read_loop<S>(
220        mut stream: S,
221        pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>>,
222        event_tx: broadcast::Sender<CdpEvent>,
223    ) where
224        S: futures_util::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
225            + Unpin,
226    {
227        debug!("CDP read loop started");
228        while let Some(msg) = stream.next().await {
229            let msg = match msg {
230                Ok(Message::Text(text)) => text,
231                Ok(Message::Close(frame)) => {
232                    info!(?frame, "WebSocket closed by remote");
233                    break;
234                }
235                Err(e) => {
236                    warn!(error = %e, "WebSocket error, ending read loop");
237                    break;
238                }
239                Ok(_) => continue,
240            };
241
242            trace!(json_len = msg.len(), "Received CDP message");
243
244            // Parse the incoming message
245            let cdp_msg: CdpMessage = match serde_json::from_str(&msg) {
246                Ok(m) => m,
247                Err(e) => {
248                    error!(error = %e, "Failed to parse CDP message");
249                    continue;
250                }
251            };
252
253            match cdp_msg {
254                CdpMessage::Response(resp) => {
255                    let id = resp.id;
256                    let has_error = resp.error.is_some();
257                    debug!(id = id, has_error = has_error, "Received CDP response");
258
259                    let mut pending = pending.lock().await;
260                    if let Some(sender) = pending.remove(&id) {
261                        let _ = sender.send(resp);
262                    } else {
263                        warn!(id = id, "Received response for unknown request ID");
264                    }
265                }
266                CdpMessage::Event(ref event) => {
267                    trace!(method = %event.method, session_id = ?event.session_id, "Received CDP event");
268                    // Broadcast to all subscribers; ignore if no receivers.
269                    let _ = event_tx.send(event.clone());
270                }
271            }
272        }
273        debug!("CDP read loop ended");
274    }
275
276    /// Send a CDP command and wait for the response.
277    ///
278    /// # Errors
279    ///
280    /// Returns an error if:
281    /// - The command cannot be sent
282    /// - The response times out
283    /// - The browser returns a protocol error
284    pub async fn send_command<P, R>(
285        &self,
286        method: &str,
287        params: Option<P>,
288        session_id: Option<&str>,
289    ) -> Result<R, CdpError>
290    where
291        P: Serialize,
292        R: DeserializeOwned,
293    {
294        self.send_command_with_timeout(method, params, session_id, DEFAULT_TIMEOUT)
295            .await
296    }
297
298    /// Send a CDP command with a custom timeout.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if:
303    /// - The command cannot be sent
304    /// - The response times out
305    /// - The browser returns a protocol error
306    #[instrument(level = "debug", skip(self, params), fields(method = %method, session_id = ?session_id))]
307    pub async fn send_command_with_timeout<P, R>(
308        &self,
309        method: &str,
310        params: Option<P>,
311        session_id: Option<&str>,
312        timeout_duration: Duration,
313    ) -> Result<R, CdpError>
314    where
315        P: Serialize,
316        R: DeserializeOwned,
317    {
318        let id = self.message_id.fetch_add(1, Ordering::Relaxed);
319        debug!(
320            id = id,
321            timeout_ms = timeout_duration.as_millis(),
322            "Preparing CDP command"
323        );
324
325        let params_value = params.map(|p| serde_json::to_value(p)).transpose()?;
326
327        let request = CdpRequest {
328            id,
329            method: method.to_string(),
330            params: params_value,
331            session_id: session_id.map(ToString::to_string),
332        };
333
334        // Create a oneshot channel for the response
335        let (resp_tx, resp_rx) = oneshot::channel();
336
337        // Register the pending response
338        {
339            let mut pending = self.pending.lock().await;
340            pending.insert(id, resp_tx);
341            trace!(
342                id = id,
343                pending_count = pending.len(),
344                "Registered pending response"
345            );
346        }
347
348        // Send the request
349        self.tx
350            .send(request)
351            .await
352            .map_err(|_| CdpError::ConnectionLost)?;
353
354        trace!(id = id, "Request queued for sending");
355
356        // Wait for the response with timeout
357        let response = timeout(timeout_duration, resp_rx)
358            .await
359            .map_err(|_| {
360                warn!(id = id, method = %method, "CDP command timed out");
361                CdpError::Timeout(timeout_duration)
362            })?
363            .map_err(|_| CdpError::ConnectionLost)?;
364
365        // Check for protocol errors
366        if let Some(ref error) = response.error {
367            warn!(id = id, method = %method, code = error.code, error_msg = %error.message, "CDP protocol error");
368            return Err(CdpError::Protocol {
369                code: error.code,
370                message: error.message.clone(),
371            });
372        }
373
374        debug!(id = id, "CDP command completed successfully");
375
376        // Parse the result
377        let result = response.result.unwrap_or(Value::Null);
378        serde_json::from_value(result).map_err(CdpError::from)
379    }
380
381    /// Subscribe to CDP events.
382    ///
383    /// Returns a receiver that will receive all CDP events from the browser.
384    pub fn subscribe_events(&self) -> broadcast::Receiver<CdpEvent> {
385        debug!("New CDP event subscription created");
386        self.event_rx.subscribe()
387    }
388}
389
390#[cfg(test)]
391mod tests;