xcelerate_core/connection/
client.rs1use crate::error::{XcelerateResult, XcelerateError};
2use std::sync::atomic::{AtomicU32, Ordering};
3use tokio::sync::{mpsc, oneshot};
4use serde_json::{Value, json};
5use serde::Serialize;
6
7pub trait CdpCommand: Serialize {
8 type Response: for<'de> serde::Deserialize<'de>;
9 const METHOD: &'static str;
10}
11
12pub struct CdpClient {
13 pub(crate) next_id: AtomicU32,
14 pub(crate) cmd_tx: mpsc::UnboundedSender<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
15 pub(crate) event_tx: tokio::sync::broadcast::Sender<Value>,
16}
17
18impl CdpClient {
19 pub fn new(
20 cmd_tx: mpsc::UnboundedSender<(u32, Value, oneshot::Sender<XcelerateResult<Value>>)>,
21 event_tx: tokio::sync::broadcast::Sender<Value>,
22 ) -> Self {
23 Self {
24 next_id: AtomicU32::new(1),
25 cmd_tx,
26 event_tx,
27 }
28 }
29
30 pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<Value> {
31 self.event_tx.subscribe()
32 }
33
34 pub async fn execute<T: CdpCommand>(&self, params: T) -> XcelerateResult<T::Response> {
35 self.execute_with_session(None, params).await
36 }
37
38 pub async fn execute_with_session<T: CdpCommand>(
39 &self,
40 session_id: Option<&str>,
41 params: T
42 ) -> XcelerateResult<T::Response> {
43 let id = self.next_id.fetch_add(1, Ordering::SeqCst);
44 let params_val = serde_json::to_value(params)?;
45
46 let mut envelope = json!({
47 "id": id,
48 "method": T::METHOD,
49 "params": params_val,
50 });
51
52 if let Some(sid) = session_id {
53 envelope.as_object_mut().unwrap().insert("sessionId".to_string(), json!(sid));
54 }
55
56 let (tx, rx) = oneshot::channel();
57 self.cmd_tx.send((id, envelope, tx)).map_err(|_| XcelerateError::InternalError)?;
58
59 let res = rx.await.map_err(|_| XcelerateError::InternalError)??;
60 let response: T::Response = serde_json::from_value(res)?;
61 Ok(response)
62 }
63}