use tokio::sync::{mpsc, oneshot};
use tokio::time::{Duration, Instant};
use super::error::CdpError;
use super::transport::{ReconnectConfig, TransportCommand, TransportHandle, spawn_transport};
use super::types::CdpEvent;
#[derive(Debug, Clone)]
pub struct CdpConfig {
pub connect_timeout: Duration,
pub command_timeout: Duration,
pub channel_capacity: usize,
pub reconnect: ReconnectConfig,
}
impl Default for CdpConfig {
fn default() -> Self {
Self {
connect_timeout: Duration::from_secs(10),
command_timeout: Duration::from_secs(30),
channel_capacity: 256,
reconnect: ReconnectConfig::default(),
}
}
}
#[derive(Debug)]
pub struct CdpClient {
handle: TransportHandle,
config: CdpConfig,
url: String,
}
impl CdpClient {
pub async fn connect(url: &str, config: CdpConfig) -> Result<Self, CdpError> {
let handle = spawn_transport(
url,
config.channel_capacity,
config.reconnect.clone(),
config.connect_timeout,
)
.await?;
Ok(Self {
handle,
config,
url: url.to_owned(),
})
}
pub async fn send_command(
&self,
method: &str,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value, CdpError> {
send_command_impl(
&self.handle,
self.config.command_timeout,
method,
params,
None,
)
.await
}
pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
subscribe_impl(&self.handle, self.config.channel_capacity, method, None).await
}
pub async fn create_session(&self, target_id: &str) -> Result<CdpSession, CdpError> {
let params = serde_json::json!({
"targetId": target_id,
"flatten": true,
});
let result = self
.send_command("Target.attachToTarget", Some(params))
.await?;
let session_id = result["sessionId"]
.as_str()
.ok_or_else(|| {
CdpError::InvalidResponse("Target.attachToTarget response missing sessionId".into())
})?
.to_owned();
Ok(CdpSession {
session_id,
handle: self.handle.clone(),
config: self.config.clone(),
})
}
pub async fn close(self) -> Result<(), CdpError> {
self.handle.send(TransportCommand::Shutdown).await
}
#[must_use]
pub fn is_connected(&self) -> bool {
self.handle.is_connected()
}
#[must_use]
pub fn url(&self) -> &str {
&self.url
}
}
#[derive(Debug, Clone)]
pub struct CdpSession {
session_id: String,
handle: TransportHandle,
config: CdpConfig,
}
impl CdpSession {
pub async fn send_command(
&self,
method: &str,
params: Option<serde_json::Value>,
) -> Result<serde_json::Value, CdpError> {
send_command_impl(
&self.handle,
self.config.command_timeout,
method,
params,
Some(self.session_id.clone()),
)
.await
}
pub async fn subscribe(&self, method: &str) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
subscribe_impl(
&self.handle,
self.config.channel_capacity,
method,
Some(self.session_id.clone()),
)
.await
}
#[must_use]
pub fn session_id(&self) -> &str {
&self.session_id
}
}
async fn send_command_impl(
handle: &TransportHandle,
command_timeout: Duration,
method: &str,
params: Option<serde_json::Value>,
session_id: Option<String>,
) -> Result<serde_json::Value, CdpError> {
let id = handle.next_message_id();
let command = super::types::CdpCommand {
id,
method: method.to_owned(),
params,
session_id,
};
let (response_tx, response_rx) = oneshot::channel();
let deadline = Instant::now() + command_timeout;
handle
.send(TransportCommand::SendCommand {
command,
response_tx,
deadline,
})
.await?;
response_rx
.await
.map_err(|_| CdpError::Internal("transport task exited before responding".into()))?
}
async fn subscribe_impl(
handle: &TransportHandle,
channel_capacity: usize,
method: &str,
session_id: Option<String>,
) -> Result<mpsc::Receiver<CdpEvent>, CdpError> {
let (event_tx, event_rx) = mpsc::channel(channel_capacity);
handle
.send(TransportCommand::Subscribe {
method: method.to_owned(),
session_id,
event_tx,
})
.await?;
Ok(event_rx)
}