xcelerate_core/connection/
handler.rs1use crate::error::XcelerateResult;
2use futures::{sink::SinkExt, stream::StreamExt};
3use serde_json::Value;
4use std::collections::HashMap;
5use tokio::sync::{mpsc, oneshot};
6use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
7use tokio::net::TcpStream;
8
9pub struct CdpHandler {
10 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
11 cmd_rx: mpsc::UnboundedReceiver<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
12 pending: HashMap<u32, oneshot::Sender<XcelerateResult<Value>>>,
13 pub(crate) event_tx: tokio::sync::broadcast::Sender<Value>,
14}
15
16impl CdpHandler {
17 pub fn new(
18 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
19 cmd_rx: mpsc::UnboundedReceiver<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
20 ) -> (Self, tokio::sync::broadcast::Receiver<Value>) {
21 let (event_tx, event_rx) = tokio::sync::broadcast::channel(100);
22 let handler = Self {
23 ws,
24 cmd_rx,
25 pending: HashMap::new(),
26 event_tx,
27 };
28 (handler, event_rx)
29 }
30
31 pub async fn run(mut self) {
32 loop {
33 tokio::select! {
34 Some((id, msg, tx)) = self.cmd_rx.recv() => {
35 if let Ok(text) = serde_json::to_string(&msg) {
36 if self.ws.send(Message::Text(text.into())).await.is_err() {
37 break;
38 }
39 self.pending.insert(id, tx);
40 }
41 }
42 msg = self.ws.next() => {
43 let Some(msg) = msg else { break };
44 let Ok(Message::Text(text)) = msg else { continue };
45
46 let Ok(resp): Result<Value, _> = serde_json::from_str(&text) else { continue };
47
48 if let Some(id) = resp["id"].as_u64() {
49 if let Some(tx) = self.pending.remove(&(id as u32)) {
50 let result = if resp["error"].is_null() {
51 Ok(resp["result"].clone())
52 } else {
53 Err(crate::error::XcelerateError::CdpResponseError {
54 code: resp["error"]["code"].as_i64().unwrap_or(0) as i32,
55 message: resp["error"]["message"].as_str().unwrap_or("Unknown").into(),
56 })
57 };
58 let _ = tx.send(result);
59 }
60 } else if resp["method"].is_string() {
61 let _ = self.event_tx.send(resp);
63 }
64 }
65 }
66 }
67 }
68}