viewpoint_cdp/connection/
mod.rs1mod discovery;
4
5pub use discovery::{BrowserVersion, CdpConnectionOptions, discover_websocket_url};
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::time::Duration;
11
12use futures_util::{SinkExt, StreamExt};
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use serde_json::Value;
16use tokio::sync::{Mutex, broadcast, mpsc, oneshot};
17use tokio::time::timeout;
18use tokio_tungstenite::MaybeTlsStream;
19use tokio_tungstenite::tungstenite::Message;
20use tokio_tungstenite::tungstenite::client::IntoClientRequest;
21use tracing::{debug, error, info, instrument, trace, warn};
22
23use crate::error::CdpError;
24use crate::transport::{CdpEvent, CdpMessage, CdpRequest, CdpResponse};
25
26const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
28
29const EVENT_CHANNEL_SIZE: usize = 256;
31
32#[derive(Debug)]
34pub struct CdpConnection {
35 tx: mpsc::Sender<CdpRequest>,
37 event_rx: broadcast::Sender<CdpEvent>,
39 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>>,
41 message_id: AtomicU64,
43 _read_handle: tokio::task::JoinHandle<()>,
45 _write_handle: tokio::task::JoinHandle<()>,
47}
48
49impl CdpConnection {
50 #[instrument(level = "info", skip(ws_url), fields(ws_url = %ws_url))]
56 pub async fn connect(ws_url: &str) -> Result<Self, CdpError> {
57 Self::connect_with_options(ws_url, &CdpConnectionOptions::default()).await
58 }
59
60 #[instrument(level = "info", skip(ws_url, options), fields(ws_url = %ws_url))]
66 pub async fn connect_with_options(
67 ws_url: &str,
68 options: &CdpConnectionOptions,
69 ) -> Result<Self, CdpError> {
70 info!("Connecting to CDP WebSocket endpoint");
71
72 let mut request =
74 ws_url
75 .into_client_request()
76 .map_err(|e: tokio_tungstenite::tungstenite::Error| {
77 CdpError::InvalidUrl(format!("{ws_url}: {e}"))
78 })?;
79
80 for (name, value) in &options.headers {
82 let header_name = name
83 .parse::<tokio_tungstenite::tungstenite::http::HeaderName>()
84 .map_err(|e| CdpError::ConnectionFailed(format!("invalid header name: {e}")))?;
85 let header_value = value
86 .parse::<tokio_tungstenite::tungstenite::http::HeaderValue>()
87 .map_err(|e| CdpError::ConnectionFailed(format!("invalid header value: {e}")))?;
88 request.headers_mut().insert(header_name, header_value);
89 }
90
91 type WsStream = tokio_tungstenite::WebSocketStream<MaybeTlsStream<tokio::net::TcpStream>>;
93 let connect_future = tokio_tungstenite::connect_async(request);
94 let (ws_stream, response): (WsStream, _) = if let Some(timeout_duration) = options.timeout {
95 timeout(timeout_duration, connect_future)
96 .await
97 .map_err(|_| CdpError::ConnectionTimeout(timeout_duration))?
98 .map_err(CdpError::from)?
99 } else {
100 connect_future.await?
101 };
102
103 info!(status = %response.status(), "WebSocket connection established");
104
105 let (write, read) = ws_stream.split();
106
107 let (tx, rx) = mpsc::channel::<CdpRequest>(64);
109 let (event_tx, _) = broadcast::channel::<CdpEvent>(EVENT_CHANNEL_SIZE);
110 let pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>> =
111 Arc::new(Mutex::new(HashMap::new()));
112
113 let write_handle = tokio::spawn(Self::write_loop(rx, write));
115 debug!("Spawned CDP write loop");
116
117 let read_pending = pending.clone();
119 let read_event_tx = event_tx.clone();
120 let read_handle = tokio::spawn(Self::read_loop(read, read_pending, read_event_tx));
121 debug!("Spawned CDP read loop");
122
123 info!("CDP connection ready");
124 Ok(Self {
125 tx,
126 event_rx: event_tx,
127 pending,
128 message_id: AtomicU64::new(1),
129 _read_handle: read_handle,
130 _write_handle: write_handle,
131 })
132 }
133
134 pub async fn connect_via_http(endpoint_url: &str) -> Result<Self, CdpError> {
168 Self::connect_via_http_with_options(endpoint_url, CdpConnectionOptions::default()).await
169 }
170
171 #[instrument(level = "info", skip(options), fields(endpoint_url = %endpoint_url))]
177 pub async fn connect_via_http_with_options(
178 endpoint_url: &str,
179 options: CdpConnectionOptions,
180 ) -> Result<Self, CdpError> {
181 let ws_url = discover_websocket_url(endpoint_url, &options).await?;
183
184 Self::connect_with_options(&ws_url, &options).await
186 }
187
188 async fn write_loop<S>(mut rx: mpsc::Receiver<CdpRequest>, mut sink: S)
190 where
191 S: futures_util::Sink<Message, Error = tokio_tungstenite::tungstenite::Error> + Unpin,
192 {
193 debug!("CDP write loop started");
194 while let Some(request) = rx.recv().await {
195 let method = request.method.clone();
196 let id = request.id;
197
198 let json = match serde_json::to_string(&request) {
199 Ok(j) => j,
200 Err(e) => {
201 error!(error = %e, method = %method, "Failed to serialize CDP request");
202 continue;
203 }
204 };
205
206 trace!(id = id, method = %method, json_len = json.len(), "Sending CDP request");
207
208 if sink.send(Message::Text(json.into())).await.is_err() {
209 warn!("WebSocket sink closed, ending write loop");
210 break;
211 }
212
213 debug!(id = id, method = %method, "CDP request sent");
214 }
215 debug!("CDP write loop ended");
216 }
217
218 async fn read_loop<S>(
220 mut stream: S,
221 pending: Arc<Mutex<HashMap<u64, oneshot::Sender<CdpResponse>>>>,
222 event_tx: broadcast::Sender<CdpEvent>,
223 ) where
224 S: futures_util::Stream<Item = Result<Message, tokio_tungstenite::tungstenite::Error>>
225 + Unpin,
226 {
227 debug!("CDP read loop started");
228 while let Some(msg) = stream.next().await {
229 let msg = match msg {
230 Ok(Message::Text(text)) => text,
231 Ok(Message::Close(frame)) => {
232 info!(?frame, "WebSocket closed by remote");
233 break;
234 }
235 Err(e) => {
236 warn!(error = %e, "WebSocket error, ending read loop");
237 break;
238 }
239 Ok(_) => continue,
240 };
241
242 trace!(json_len = msg.len(), "Received CDP message");
243
244 let cdp_msg: CdpMessage = match serde_json::from_str(&msg) {
246 Ok(m) => m,
247 Err(e) => {
248 error!(error = %e, "Failed to parse CDP message");
249 continue;
250 }
251 };
252
253 match cdp_msg {
254 CdpMessage::Response(resp) => {
255 let id = resp.id;
256 let has_error = resp.error.is_some();
257 debug!(id = id, has_error = has_error, "Received CDP response");
258
259 let mut pending = pending.lock().await;
260 if let Some(sender) = pending.remove(&id) {
261 let _ = sender.send(resp);
262 } else {
263 warn!(id = id, "Received response for unknown request ID");
264 }
265 }
266 CdpMessage::Event(ref event) => {
267 trace!(method = %event.method, session_id = ?event.session_id, "Received CDP event");
268 let _ = event_tx.send(event.clone());
270 }
271 }
272 }
273 debug!("CDP read loop ended");
274 }
275
276 pub async fn send_command<P, R>(
285 &self,
286 method: &str,
287 params: Option<P>,
288 session_id: Option<&str>,
289 ) -> Result<R, CdpError>
290 where
291 P: Serialize,
292 R: DeserializeOwned,
293 {
294 self.send_command_with_timeout(method, params, session_id, DEFAULT_TIMEOUT)
295 .await
296 }
297
298 #[instrument(level = "debug", skip(self, params), fields(method = %method, session_id = ?session_id))]
307 pub async fn send_command_with_timeout<P, R>(
308 &self,
309 method: &str,
310 params: Option<P>,
311 session_id: Option<&str>,
312 timeout_duration: Duration,
313 ) -> Result<R, CdpError>
314 where
315 P: Serialize,
316 R: DeserializeOwned,
317 {
318 let id = self.message_id.fetch_add(1, Ordering::Relaxed);
319 debug!(
320 id = id,
321 timeout_ms = timeout_duration.as_millis(),
322 "Preparing CDP command"
323 );
324
325 let params_value = params.map(|p| serde_json::to_value(p)).transpose()?;
326
327 let request = CdpRequest {
328 id,
329 method: method.to_string(),
330 params: params_value,
331 session_id: session_id.map(ToString::to_string),
332 };
333
334 let (resp_tx, resp_rx) = oneshot::channel();
336
337 {
339 let mut pending = self.pending.lock().await;
340 pending.insert(id, resp_tx);
341 trace!(
342 id = id,
343 pending_count = pending.len(),
344 "Registered pending response"
345 );
346 }
347
348 self.tx
350 .send(request)
351 .await
352 .map_err(|_| CdpError::ConnectionLost)?;
353
354 trace!(id = id, "Request queued for sending");
355
356 let response = timeout(timeout_duration, resp_rx)
358 .await
359 .map_err(|_| {
360 warn!(id = id, method = %method, "CDP command timed out");
361 CdpError::Timeout(timeout_duration)
362 })?
363 .map_err(|_| CdpError::ConnectionLost)?;
364
365 if let Some(ref error) = response.error {
367 warn!(id = id, method = %method, code = error.code, error_msg = %error.message, "CDP protocol error");
368 return Err(CdpError::Protocol {
369 code: error.code,
370 message: error.message.clone(),
371 });
372 }
373
374 debug!(id = id, "CDP command completed successfully");
375
376 let result = response.result.unwrap_or(Value::Null);
378 serde_json::from_value(result).map_err(CdpError::from)
379 }
380
381 pub fn subscribe_events(&self) -> broadcast::Receiver<CdpEvent> {
385 debug!("New CDP event subscription created");
386 self.event_rx.subscribe()
387 }
388}
389
390#[cfg(test)]
391mod tests;