1use tokio::sync::{mpsc, oneshot};
2use tokio::time::{Duration, Instant};
3
4use super::error::CdpError;
5use super::transport::{ReconnectConfig, TransportCommand, TransportHandle, spawn_transport};
6use super::types::CdpEvent;
7
8#[derive(Debug, Clone)]
10pub struct CdpConfig {
11 pub connect_timeout: Duration,
13 pub command_timeout: Duration,
15 pub channel_capacity: usize,
17 pub reconnect: ReconnectConfig,
19}
20
21impl Default for CdpConfig {
22 fn default() -> Self {
23 Self {
24 connect_timeout: Duration::from_secs(10),
25 command_timeout: Duration::from_secs(30),
26 channel_capacity: 256,
27 reconnect: ReconnectConfig::default(),
28 }
29 }
30}
31
32#[derive(Debug)]
38pub struct CdpClient {
39 handle: TransportHandle,
40 config: CdpConfig,
41 url: String,
42}
43
44impl CdpClient {
45 pub async fn connect(url: &str, config: CdpConfig) -> Result<Self, CdpError> {
53 let handle = spawn_transport(
54 url,
55 config.channel_capacity,
56 config.reconnect.clone(),
57 config.connect_timeout,
58 )
59 .await?;
60
61 Ok(Self {
62 handle,
63 config,
64 url: url.to_owned(),
65 })
66 }
67
68 pub async fn send_command(
76 &self,
77 method: &str,
78 params: Option<serde_json::Value>,
79 ) -> Result<serde_json::Value, CdpError> {
80 send_command_impl(
81 &self.handle,
82 self.config.command_timeout,
83 method,
84 params,
85 None,
86 )
87 .await
88 }
89
90 pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
99 subscribe_impl(&self.handle, self.config.channel_capacity, method, None).await
100 }
101
102 pub async fn create_session(&self, target_id: &str) -> Result<CdpSession, CdpError> {
112 let params = serde_json::json!({
113 "targetId": target_id,
114 "flatten": true,
115 });
116 let result = self
117 .send_command("Target.attachToTarget", Some(params))
118 .await?;
119 let session_id = result["sessionId"]
120 .as_str()
121 .ok_or_else(|| {
122 CdpError::InvalidResponse("Target.attachToTarget response missing sessionId".into())
123 })?
124 .to_owned();
125
126 Ok(CdpSession {
127 session_id,
128 handle: self.handle.clone(),
129 config: self.config.clone(),
130 })
131 }
132
133 pub async fn close(self) -> Result<(), CdpError> {
139 self.handle.send(TransportCommand::Shutdown).await
140 }
141
142 #[must_use]
144 pub fn is_connected(&self) -> bool {
145 self.handle.is_connected()
146 }
147
148 #[must_use]
150 pub fn url(&self) -> &str {
151 &self.url
152 }
153}
154
155#[derive(Debug, Clone)]
160pub struct CdpSession {
161 session_id: String,
162 handle: TransportHandle,
163 config: CdpConfig,
164}
165
166impl CdpSession {
167 pub async fn send_command(
173 &self,
174 method: &str,
175 params: Option<serde_json::Value>,
176 ) -> Result<serde_json::Value, CdpError> {
177 send_command_impl(
178 &self.handle,
179 self.config.command_timeout,
180 method,
181 params,
182 Some(self.session_id.clone()),
183 )
184 .await
185 }
186
187 pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
193 subscribe_impl(
194 &self.handle,
195 self.config.channel_capacity,
196 method,
197 Some(self.session_id.clone()),
198 )
199 .await
200 }
201
202 #[must_use]
204 pub fn session_id(&self) -> &str {
205 &self.session_id
206 }
207}
208
209async fn send_command_impl(
215 handle: &TransportHandle,
216 command_timeout: Duration,
217 method: &str,
218 params: Option<serde_json::Value>,
219 session_id: Option<String>,
220) -> Result<serde_json::Value, CdpError> {
221 let id = handle.next_message_id();
222 let command = super::types::CdpCommand {
223 id,
224 method: method.to_owned(),
225 params,
226 session_id,
227 };
228
229 let (response_tx, response_rx) = oneshot::channel();
230 let deadline = Instant::now() + command_timeout;
231
232 handle
233 .send(TransportCommand::SendCommand {
234 command,
235 response_tx,
236 deadline,
237 })
238 .await?;
239
240 response_rx
241 .await
242 .map_err(|_| CdpError::Internal("transport task exited before responding".into()))?
243}
244
245async fn subscribe_impl(
247 handle: &TransportHandle,
248 channel_capacity: usize,
249 method: &str,
250 session_id: Option<String>,
251) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
252 let (event_tx, event_rx) = mpsc::channel(channel_capacity);
253 handle
254 .send(TransportCommand::Subscribe {
255 method: method.to_owned(),
256 session_id,
257 event_tx,
258 })
259 .await?;
260 Ok(event_rx)
261}