Skip to main content

ferrous_browser/
connection.rs

1use futures_util::StreamExt;
2use serde_json::Value;
3use std::sync::Arc;
4use tokio::sync::RwLock;
5use tokio_tungstenite::tungstenite::Message;
6
7use crate::cdp::{CDPClient, CDPMessage};
8use crate::error::Result;
9
10use futures_util::stream::SplitStream;
11
12/// Type alias for the underlying WebSocket stream to reduce type complexity.
13type WsStream =
14    tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
15
16/// Manages the WebSocket connection and message routing.
17pub struct Connection {
18    cdp: Arc<CDPClient>,
19    stream: Arc<RwLock<Option<SplitStream<WsStream>>>>,
20}
21
22impl Connection {
23    /// Create a new connection
24    pub fn new(cdp: Arc<CDPClient>, stream: SplitStream<WsStream>) -> Self {
25        Connection {
26            cdp,
27            stream: Arc::new(RwLock::new(Some(stream))),
28        }
29    }
30
31    /// Start the connection loop.
32    ///
33    /// Runs forever, dispatching incoming CDP messages to the `CDPClient`.
34    /// When the WebSocket terminates (cleanly, with an error, or with EOF),
35    /// every still-pending CDP request is failed immediately so callers don't
36    /// hang waiting for a 30 second per-command timeout.
37    pub async fn run(self) -> Result<()> {
38        let mut stream_guard = self.stream.write().await;
39        let Some(mut stream) = stream_guard.take() else {
40            return Err(crate::error::BrowserError::websocket(
41                "Connection::run",
42                "WebSocket stream not available",
43            ));
44        };
45        drop(stream_guard);
46
47        let termination_reason: String = loop {
48            match stream.next().await {
49                Some(Ok(Message::Text(text))) => match serde_json::from_str::<Value>(&text) {
50                    Ok(value) => match CDPMessage::from_json(value) {
51                        Ok(msg) => {
52                            if let Err(e) = self.cdp.handle_message(msg) {
53                                tracing::warn!(error = %e, "handle_message failed");
54                            }
55                        }
56                        Err(e) => {
57                            tracing::warn!(error = %e, "malformed CDP message");
58                        }
59                    },
60                    Err(e) => {
61                        tracing::warn!(error = %e, "invalid JSON on CDP socket");
62                    }
63                },
64                Some(Ok(Message::Close(frame))) => {
65                    break format!("WebSocket closed by peer: {frame:?}");
66                }
67                None => {
68                    break "WebSocket stream ended (no more frames)".to_string();
69                }
70                Some(Err(e)) => {
71                    tracing::error!(error = %e, "WebSocket read error; tearing down");
72                    break format!("WebSocket error: {e}");
73                }
74                Some(Ok(_)) => {
75                    // Binary, Ping, Pong, and Frame messages aren't part of
76                    // the CDP text protocol; ignore.
77                }
78            }
79        };
80
81        // Wake every in-flight `send_command` with a clean failure rather
82        // than letting each one time out individually.
83        self.cdp.fail_all_pending(&termination_reason);
84        Ok(())
85    }
86}