use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Mutex;
use tungstenite::client::connect_with_config;
use tungstenite::Message;
#[derive(Debug, thiserror::Error)]
pub enum CdpError {
#[error("HTTP 请求失败: {0}")]
Http(String),
#[error("WebSocket 连接失败: {0}")]
Connect(String),
#[error("发送失败: {0}")]
Send(String),
#[error("接收失败: {0}")]
Recv(String),
#[error("CDP 错误: id={id:?}, code={code}, message={message}")]
Protocol { id: Option<i64>, code: i64, message: String },
#[error("JSON 错误: {0}")]
Json(#[from] serde_json::Error),
}
impl CdpError {
pub fn with_context(self, ctx: &str) -> Self {
match self {
CdpError::Protocol { id, code, message } => CdpError::Protocol {
id,
code,
message: format!("{} (表达式: {})", message, ctx),
},
other => other,
}
}
}
#[derive(Debug, Serialize)]
struct CdpCommand {
id: i64,
method: String,
#[serde(skip_serializing_if = "Option::is_none")]
params: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none", rename = "sessionId")]
session_id: Option<String>,
}
#[derive(Debug, Deserialize)]
struct CdpResponse {
id: Option<i64>,
#[serde(default)]
result: Option<Value>,
#[serde(default)]
error: Option<CdpErrorBody>,
#[serde(default)]
#[allow(dead_code)]
method: Option<String>,
#[serde(default)]
#[allow(dead_code)]
params: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct CdpErrorBody {
code: i64,
message: String,
}
pub struct CdpClient {
stream: Mutex<tungstenite::protocol::WebSocket<tungstenite::stream::MaybeTlsStream<std::net::TcpStream>>>,
next_id: AtomicI64,
}
impl CdpClient {
pub fn connect(ws_url: &str) -> Result<Self, CdpError> {
let url = ws_url
.parse::<url::Url>()
.map_err(|e| CdpError::Connect(e.to_string()))?;
let config = tungstenite::protocol::WebSocketConfig {
max_message_size: None,
max_frame_size: None,
..Default::default()
};
let (stream, _) = connect_with_config(url, Some(config), 3)
.map_err(|e| CdpError::Connect(e.to_string()))?;
Ok(Self {
stream: Mutex::new(stream),
next_id: AtomicI64::new(1),
})
}
pub fn send(&self, method: &str, params: Option<Value>) -> Result<Value, CdpError> {
self.send_with_session(method, params, None)
}
pub fn send_with_session(
&self,
method: &str,
params: Option<Value>,
session_id: Option<&str>,
) -> Result<Value, CdpError> {
let id = self.next_id.fetch_add(1, Ordering::SeqCst);
let cmd = CdpCommand {
id,
method: method.to_string(),
params,
session_id: session_id.map(String::from),
};
let msg = serde_json::to_string(&cmd).map_err(CdpError::Json)?;
self.write_message(&msg)?;
self.read_response_until_id(id)
}
fn write_message(&self, text: &str) -> Result<(), CdpError> {
let mut guard = self
.stream
.lock()
.map_err(|e| CdpError::Send(e.to_string()))?;
guard
.send(Message::Text(text.into()))
.map_err(|e| CdpError::Send(e.to_string()))?;
Ok(())
}
fn read_response_until_id(&self, expect_id: i64) -> Result<Value, CdpError> {
let mut guard = self
.stream
.lock()
.map_err(|e| CdpError::Recv(e.to_string()))?;
loop {
let msg = guard
.read()
.map_err(|e| CdpError::Recv(e.to_string()))?;
let text = match msg {
Message::Text(t) => t,
Message::Close(_) => return Err(CdpError::Recv("连接已关闭".into())),
_ => continue,
};
let resp: CdpResponse = serde_json::from_str(&text).map_err(CdpError::Json)?;
if resp.id == Some(expect_id) {
if let Some(e) = resp.error {
return Err(CdpError::Protocol {
id: Some(expect_id),
code: e.code,
message: e.message,
});
}
return resp
.result
.ok_or_else(|| CdpError::Protocol {
id: Some(expect_id),
code: -1,
message: "响应无 result".into(),
});
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn cdp_error_display() {
let e = CdpError::Http("timeout".into());
assert!(e.to_string().contains("timeout"));
let e = CdpError::Protocol {
id: Some(1),
code: -32600,
message: "Invalid".into(),
};
assert!(e.to_string().contains("Invalid"));
assert!(e.to_string().contains("-32600"));
}
}