ferrous_browser/
connection.rs1use serde_json::Value;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use tokio_tungstenite::tungstenite::Message;
5use futures_util::StreamExt;
6
7use crate::cdp::{CDPClient, CDPMessage};
8use crate::error::Result;
9
10use futures_util::stream::SplitStream;
11
12pub struct Connection {
14 cdp: Arc<CDPClient>,
15 stream: Arc<RwLock<Option<SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>>,
16}
17
18impl Connection {
19 pub fn new(
21 cdp: Arc<CDPClient>,
22 stream: SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
23 ) -> Self {
24 Connection {
25 cdp,
26 stream: Arc::new(RwLock::new(Some(stream))),
27 }
28 }
29
30 pub async fn run(self) -> Result<()> {
32 let mut stream_guard = self.stream.write().await;
33 if let Some(mut stream) = stream_guard.take() {
34 drop(stream_guard);
35 loop {
36 match stream.next().await {
37 Some(Ok(Message::Text(text))) => {
38 if let Ok(value) = serde_json::from_str::<Value>(&text) {
39 if let Ok(msg) = CDPMessage::from_json(value) {
40 let _ = self.cdp.handle_message(msg).await;
41 }
42 }
43 }
44 Some(Ok(Message::Close(_))) | None => return Ok(()),
45 Some(Err(_)) => return Ok(()),
46 Some(Ok(_)) => {}
47 }
48 }
49 } else {
50 Err(crate::error::BrowserError::websocket(
51 "Connection::run",
52 "WebSocket stream not available",
53 ))
54 }
55 }
56}