use crate::domains;
use crate::domains::DispatchContext;
use crate::event::{event_channel, EventReceiver, EventSender};
use crate::protocol::{CdpEvent, CdpRequest, CdpResponse};
use crate::server::MAX_CDP_MESSAGE_SIZE;
use futures::{SinkExt, StreamExt};
use oxibrowser_core::Browser;
use std::sync::Arc;
use tokio::sync::RwLock;
use tokio_tungstenite::tungstenite;
use tracing::{debug, error, info, warn};
pub struct CdpSession {
sink: futures::stream::SplitSink<
tokio_tungstenite::WebSocketStream<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>>,
tungstenite::Message,
>,
ws: futures::stream::SplitStream<
tokio_tungstenite::WebSocketStream<hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>>,
>,
session_id: String,
#[allow(dead_code)]
target_id: Option<String>,
#[allow(dead_code)]
browser: Arc<Browser>,
session: Arc<RwLock<oxibrowser_core::session::Session>>,
event_sender: EventSender,
event_receiver: Option<EventReceiver>,
}
impl CdpSession {
pub async fn new(
ws_stream: tokio_tungstenite::WebSocketStream<
hyper_util::rt::TokioIo<hyper::upgrade::Upgraded>,
>,
browser: Arc<Browser>,
) -> anyhow::Result<Self> {
let (sink, ws) = ws_stream.split();
let session_id = format!("session-{}", uuid::Uuid::new_v4());
let session = browser.new_session().await?;
let (event_sender, event_receiver) = event_channel();
info!(session_id = %session_id, "CDP session created");
Ok(Self {
ws,
sink,
session_id,
target_id: None,
browser,
session,
event_sender,
event_receiver: Some(event_receiver),
})
}
pub async fn run(mut self) -> anyhow::Result<()> {
info!(session_id = %self.session_id, "CDP session started");
let mut event_rx = self
.event_receiver
.take()
.expect("event_receiver must be present at session start");
loop {
tokio::select! {
msg = self.ws.next() => {
match msg {
Some(Ok(tungstenite::Message::Text(text))) => {
debug!(text = %text, "received CDP message");
self.handle_text_message(&text).await?;
}
Some(Ok(tungstenite::Message::Close(_))) => {
info!(session_id = %self.session_id, "WebSocket closed by client");
break;
}
Some(Ok(tungstenite::Message::Ping(data))) => {
self.sink.send(tungstenite::Message::Pong(data)).await?;
}
Some(Ok(_)) => {
}
Some(Err(e)) => {
error!(error = %e, "WebSocket read error");
break;
}
None => {
info!(session_id = %self.session_id, "WebSocket stream ended");
break;
}
}
}
event = event_rx.recv() => {
match event {
Some(event) => {
if let Err(e) = self.send_event(event).await {
warn!(error = %e, "failed to send CDP event");
break;
}
}
None => {
break;
}
}
}
}
}
info!(session_id = %self.session_id, "CDP session ended");
Ok(())
}
async fn handle_text_message(&mut self, text: &str) -> anyhow::Result<()> {
if text.len() > MAX_CDP_MESSAGE_SIZE {
warn!(
size = text.len(),
max = MAX_CDP_MESSAGE_SIZE,
"CDP message too large, dropping"
);
let response = CdpResponse {
id: 0,
result: None,
error: Some(crate::protocol::CdpError {
code: -32600,
message: format!(
"Message too large: {} bytes (max {} bytes)",
text.len(),
MAX_CDP_MESSAGE_SIZE
),
}),
session_id: None,
};
self.send_response(response).await?;
return Ok(());
}
let request: CdpRequest = match serde_json::from_str(text) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "failed to parse CDP request");
let response = CdpResponse {
id: 0,
result: None,
error: Some(crate::protocol::CdpError {
code: -32700,
message: format!("Parse error: {e}"),
}),
session_id: None,
};
self.send_response(response).await?;
return Ok(());
}
};
let request_id = request.id.unwrap_or(0);
let session_id_for_response = request.session_id.clone();
debug!(
id = request_id,
method = %request.method,
"dispatching CDP command"
);
let ctx = DispatchContext {
session: self.session.clone(),
events: self.event_sender.clone(),
};
let response = match domains::dispatch(&request.method, request.params, &ctx).await {
Ok(result) => CdpResponse {
id: request_id,
result: Some(result.unwrap_or(serde_json::json!({}))),
error: None,
session_id: session_id_for_response,
},
Err(cdp_error) => CdpResponse {
id: request_id,
result: None,
error: Some(cdp_error),
session_id: session_id_for_response,
},
};
self.send_response(response).await
}
async fn send_response(&mut self, response: CdpResponse) -> anyhow::Result<()> {
let text = serde_json::to_string(&response)?;
debug!(text = %text, "sending CDP response");
self.sink
.send(tungstenite::Message::Text(text.into()))
.await?;
Ok(())
}
async fn send_event(&mut self, event: CdpEvent) -> anyhow::Result<()> {
let text = serde_json::to_string(&event)?;
debug!(text = %text, "sending CDP event");
self.sink
.send(tungstenite::Message::Text(text.into()))
.await?;
Ok(())
}
pub fn session_id(&self) -> &str {
&self.session_id
}
pub fn target_id(&self) -> Option<&str> {
self.target_id.as_deref()
}
}