Skip to main content

xcelerate_core/connection/
handler.rs

1use crate::error::XcelerateResult;
2use futures::{sink::SinkExt, stream::StreamExt};
3use serde_json::Value;
4use std::collections::HashMap;
5use tokio::sync::{mpsc, oneshot};
6use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream, MaybeTlsStream};
7use tokio::net::TcpStream;
8
9pub struct CdpHandler {
10    ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
11    cmd_rx: mpsc::UnboundedReceiver<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
12    pending: HashMap<u32, oneshot::Sender<XcelerateResult<Value>>>,
13    pub(crate) event_tx: tokio::sync::broadcast::Sender<Value>,
14}
15
16impl CdpHandler {
17    pub fn new(
18        ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
19        cmd_rx: mpsc::UnboundedReceiver<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
20    ) -> (Self, tokio::sync::broadcast::Receiver<Value>) {
21        let (event_tx, event_rx) = tokio::sync::broadcast::channel(100);
22        let handler = Self {
23            ws,
24            cmd_rx,
25            pending: HashMap::new(),
26            event_tx,
27        };
28        (handler, event_rx)
29    }
30
31    pub async fn run(mut self) {
32        loop {
33            tokio::select! {
34                Some((id, msg, tx)) = self.cmd_rx.recv() => {
35                    if let Ok(text) = serde_json::to_string(&msg) {
36                        if self.ws.send(Message::Text(text.into())).await.is_err() {
37                            break;
38                        }
39                        self.pending.insert(id, tx);
40                    }
41                }
42                msg = self.ws.next() => {
43                    let Some(msg) = msg else { break };
44                    let Ok(Message::Text(text)) = msg else { continue };
45                    
46                    let Ok(resp): Result<Value, _> = serde_json::from_str(&text) else { continue };
47                    
48                    if let Some(id) = resp["id"].as_u64() {
49                        if let Some(tx) = self.pending.remove(&(id as u32)) {
50                            let result = if resp["error"].is_null() {
51                                Ok(resp["result"].clone())
52                            } else {
53                                Err(crate::error::XcelerateError::CdpResponseError {
54                                    code: resp["error"]["code"].as_i64().unwrap_or(0) as i32,
55                                    message: resp["error"]["message"].as_str().unwrap_or("Unknown").into(),
56                                })
57                            };
58                            let _ = tx.send(result);
59                        }
60                    } else if resp["method"].is_string() {
61                        // This is an event, broadcast it
62                        let _ = self.event_tx.send(resp);
63                    }
64                }
65            }
66        }
67    }
68}