ferrous_browser/
connection.rs1use futures_util::StreamExt;
2use serde_json::Value;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tokio_tungstenite::tungstenite::Message;
6
7use crate::cdp::{CDPClient, CDPMessage};
8use crate::error::Result;
9
10use futures_util::stream::SplitStream;
11
12type WsStream =
14 tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
15
16pub struct Connection {
18 cdp: Arc<CDPClient>,
19 stream: Arc<RwLock<Option<SplitStream<WsStream>>>>,
20}
21
22impl Connection {
23 pub fn new(cdp: Arc<CDPClient>, stream: SplitStream<WsStream>) -> Self {
25 Connection {
26 cdp,
27 stream: Arc::new(RwLock::new(Some(stream))),
28 }
29 }
30
31 pub async fn run(self) -> Result<()> {
38 let mut stream_guard = self.stream.write().await;
39 let Some(mut stream) = stream_guard.take() else {
40 return Err(crate::error::BrowserError::websocket(
41 "Connection::run",
42 "WebSocket stream not available",
43 ));
44 };
45 drop(stream_guard);
46
47 let termination_reason: String = loop {
48 match stream.next().await {
49 Some(Ok(Message::Text(text))) => match serde_json::from_str::<Value>(&text) {
50 Ok(value) => match CDPMessage::from_json(value) {
51 Ok(msg) => {
52 if let Err(e) = self.cdp.handle_message(msg) {
53 tracing::warn!(error = %e, "handle_message failed");
54 }
55 }
56 Err(e) => {
57 tracing::warn!(error = %e, "malformed CDP message");
58 }
59 },
60 Err(e) => {
61 tracing::warn!(error = %e, "invalid JSON on CDP socket");
62 }
63 },
64 Some(Ok(Message::Close(frame))) => {
65 break format!("WebSocket closed by peer: {frame:?}");
66 }
67 None => {
68 break "WebSocket stream ended (no more frames)".to_string();
69 }
70 Some(Err(e)) => {
71 tracing::error!(error = %e, "WebSocket read error; tearing down");
72 break format!("WebSocket error: {e}");
73 }
74 Some(Ok(_)) => {
75 }
78 }
79 };
80
81 self.cdp.fail_all_pending(&termination_reason);
84 Ok(())
85 }
86}