Skip to main content

chrome_cli/cdp/
client.rs

1use tokio::sync::{mpsc, oneshot};
2use tokio::time::{Duration, Instant};
3
4use super::error::CdpError;
5use super::transport::{ReconnectConfig, TransportCommand, TransportHandle, spawn_transport};
6use super::types::CdpEvent;
7
8/// Configuration for a CDP client connection.
9#[derive(Debug, Clone)]
10pub struct CdpConfig {
11    /// Timeout for the initial WebSocket connection (default: 10s).
12    pub connect_timeout: Duration,
13    /// Timeout for individual CDP commands (default: 30s).
14    pub command_timeout: Duration,
15    /// Capacity of the internal command channel (default: 256).
16    pub channel_capacity: usize,
17    /// Reconnection settings.
18    pub reconnect: ReconnectConfig,
19}
20
21impl Default for CdpConfig {
22    fn default() -> Self {
23        Self {
24            connect_timeout: Duration::from_secs(10),
25            command_timeout: Duration::from_secs(30),
26            channel_capacity: 256,
27            reconnect: ReconnectConfig::default(),
28        }
29    }
30}
31
32/// A CDP client connected to Chrome over WebSocket.
33///
34/// This is the main entry point for sending CDP commands and subscribing
35/// to events. It communicates with a background transport task that owns
36/// the WebSocket connection.
37#[derive(Debug)]
38pub struct CdpClient {
39    handle: TransportHandle,
40    config: CdpConfig,
41    url: String,
42}
43
44impl CdpClient {
45    /// Connect to a Chrome CDP WebSocket endpoint.
46    ///
47    /// # Errors
48    ///
49    /// Returns `CdpError::Connection` if the WebSocket handshake fails,
50    /// or `CdpError::ConnectionTimeout` if the connection attempt exceeds
51    /// the configured timeout.
52    pub async fn connect(url: &str, config: CdpConfig) -> Result<Self, CdpError> {
53        let handle = spawn_transport(
54            url,
55            config.channel_capacity,
56            config.reconnect.clone(),
57            config.connect_timeout,
58        )
59        .await?;
60
61        Ok(Self {
62            handle,
63            config,
64            url: url.to_owned(),
65        })
66    }
67
68    /// Send a CDP command (browser-level, no session).
69    ///
70    /// # Errors
71    ///
72    /// Returns `CdpError::CommandTimeout` if Chrome does not respond within
73    /// the configured timeout, `CdpError::Protocol` if Chrome returns an
74    /// error, or `CdpError::Internal` if the transport task has exited.
75    pub async fn send_command(
76        &self,
77        method: &str,
78        params: Option<serde_json::Value>,
79    ) -> Result<serde_json::Value, CdpError> {
80        send_command_impl(
81            &self.handle,
82            self.config.command_timeout,
83            method,
84            params,
85            None,
86        )
87        .await
88    }
89
90    /// Subscribe to CDP events matching a method name.
91    ///
92    /// Returns a receiver that yields `CdpEvent` values. Events stop
93    /// being delivered when the receiver is dropped.
94    ///
95    /// # Errors
96    ///
97    /// Returns `CdpError::Internal` if the transport task has exited.
98    pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
99        subscribe_impl(&self.handle, self.config.channel_capacity, method, None).await
100    }
101
102    /// Create a CDP session attached to a specific target.
103    ///
104    /// Sends `Target.attachToTarget` and returns a `CdpSession` bound
105    /// to the returned session ID.
106    ///
107    /// # Errors
108    ///
109    /// Returns `CdpError::Protocol` if the target cannot be attached,
110    /// or any transport error.
111    pub async fn create_session(&self, target_id: &str) -> Result<CdpSession, CdpError> {
112        let params = serde_json::json!({
113            "targetId": target_id,
114            "flatten": true,
115        });
116        let result = self
117            .send_command("Target.attachToTarget", Some(params))
118            .await?;
119        let session_id = result["sessionId"]
120            .as_str()
121            .ok_or_else(|| {
122                CdpError::InvalidResponse("Target.attachToTarget response missing sessionId".into())
123            })?
124            .to_owned();
125
126        Ok(CdpSession {
127            session_id,
128            handle: self.handle.clone(),
129            config: self.config.clone(),
130        })
131    }
132
133    /// Gracefully close the WebSocket connection.
134    ///
135    /// # Errors
136    ///
137    /// Returns `CdpError::Internal` if the transport task has already exited.
138    pub async fn close(self) -> Result<(), CdpError> {
139        self.handle.send(TransportCommand::Shutdown).await
140    }
141
142    /// Check if the client is currently connected.
143    #[must_use]
144    pub fn is_connected(&self) -> bool {
145        self.handle.is_connected()
146    }
147
148    /// Get the WebSocket URL this client is connected to.
149    #[must_use]
150    pub fn url(&self) -> &str {
151        &self.url
152    }
153}
154
155/// A CDP session bound to a specific target (tab).
156///
157/// Sessions share the parent client's WebSocket connection but route
158/// commands and events through a `sessionId`.
159#[derive(Debug, Clone)]
160pub struct CdpSession {
161    session_id: String,
162    handle: TransportHandle,
163    config: CdpConfig,
164}
165
166impl CdpSession {
167    /// Send a command within this session's context.
168    ///
169    /// # Errors
170    ///
171    /// Returns the same errors as [`CdpClient::send_command`].
172    pub async fn send_command(
173        &self,
174        method: &str,
175        params: Option<serde_json::Value>,
176    ) -> Result<serde_json::Value, CdpError> {
177        send_command_impl(
178            &self.handle,
179            self.config.command_timeout,
180            method,
181            params,
182            Some(self.session_id.clone()),
183        )
184        .await
185    }
186
187    /// Subscribe to events within this session.
188    ///
189    /// # Errors
190    ///
191    /// Returns `CdpError::Internal` if the transport task has exited.
192    pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
193        subscribe_impl(
194            &self.handle,
195            self.config.channel_capacity,
196            method,
197            Some(self.session_id.clone()),
198        )
199        .await
200    }
201
202    /// Get the session ID.
203    #[must_use]
204    pub fn session_id(&self) -> &str {
205        &self.session_id
206    }
207}
208
209// =============================================================================
210// Shared helpers
211// =============================================================================
212
213/// Send a CDP command via the transport handle and await the response.
214async fn send_command_impl(
215    handle: &TransportHandle,
216    command_timeout: Duration,
217    method: &str,
218    params: Option<serde_json::Value>,
219    session_id: Option<String>,
220) -> Result<serde_json::Value, CdpError> {
221    let id = handle.next_message_id();
222    let command = super::types::CdpCommand {
223        id,
224        method: method.to_owned(),
225        params,
226        session_id,
227    };
228
229    let (response_tx, response_rx) = oneshot::channel();
230    let deadline = Instant::now() + command_timeout;
231
232    handle
233        .send(TransportCommand::SendCommand {
234            command,
235            response_tx,
236            deadline,
237        })
238        .await?;
239
240    response_rx
241        .await
242        .map_err(|_| CdpError::Internal("transport task exited before responding".into()))?
243}
244
245/// Register an event subscription via the transport handle.
246async fn subscribe_impl(
247    handle: &TransportHandle,
248    channel_capacity: usize,
249    method: &str,
250    session_id: Option<String>,
251) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
252    let (event_tx, event_rx) = mpsc::channel(channel_capacity);
253    handle
254        .send(TransportCommand::Subscribe {
255            method: method.to_owned(),
256            session_id,
257            event_tx,
258        })
259        .await?;
260    Ok(event_rx)
261}