use std::time::Duration;
use futures_util::{SinkExt, StreamExt};
use serde::Serialize;
use tokio::time::timeout;
use tokio_tungstenite::tungstenite::Message;
use crate::core::CliError;
#[derive(Serialize)]
struct CdpRequest<'a> {
id: u64,
method: &'a str,
params: serde_json::Value,
}
type CdpStream =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
pub(super) struct CdpSession {
ws: CdpStream,
next_id: u64,
}
impl CdpSession {
pub(super) async fn connect(ws_url: &str) -> Result<Self, CliError> {
let (ws, _) = tokio_tungstenite::connect_async(ws_url)
.await
.map_err(|e| CliError::Config(format!("CDP ws connect: {e}")))?;
Ok(Self { ws, next_id: 0 })
}
pub(super) async fn call(
&mut self,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, CliError> {
self.next_id += 1;
let id = self.next_id;
let req = CdpRequest { id, method, params };
let payload = serde_json::to_string(&req).unwrap();
self.ws
.send(Message::Text(payload))
.await
.map_err(|e| CliError::Config(format!("CDP ws send {method}: {e}")))?;
loop {
let msg = timeout(Duration::from_secs(60), self.ws.next())
.await
.map_err(|_| CliError::Config(format!("CDP {method} timeout")))?
.ok_or_else(|| CliError::Config(format!("CDP {method} ws closed")))?
.map_err(|e| CliError::Config(format!("CDP {method} ws err: {e}")))?;
let text = match msg {
Message::Text(text) => text.to_string(),
Message::Binary(_) | Message::Ping(_) | Message::Pong(_) | Message::Frame(_) => {
continue;
}
Message::Close(_) => {
return Err(CliError::Config(format!("CDP {method} ws closed mid-call")));
}
};
let value: serde_json::Value = serde_json::from_str(&text)
.map_err(|e| CliError::Config(format!("CDP {method} json: {e}")))?;
if value.get("id").and_then(|id| id.as_u64()) == Some(id) {
if let Some(err) = value.get("error") {
return Err(CliError::Config(format!("CDP {method} error: {err}")));
}
return Ok(value
.get("result")
.cloned()
.unwrap_or(serde_json::Value::Null));
}
}
}
}