Skip to main content

ferrous_browser/
connection.rs

1use serde_json::Value;
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use tokio_tungstenite::tungstenite::Message;
5use futures_util::StreamExt;
6
7use crate::cdp::{CDPClient, CDPMessage};
8use crate::error::Result;
9
10use futures_util::stream::SplitStream;
11
12/// Manages the WebSocket connection and message routing.
13pub struct Connection {
14    cdp: Arc<CDPClient>,
15    stream: Arc<RwLock<Option<SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>>,
16}
17
18impl Connection {
19    /// Create a new connection
20    pub fn new(
21        cdp: Arc<CDPClient>,
22        stream: SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>,
23    ) -> Self {
24        Connection {
25            cdp,
26            stream: Arc::new(RwLock::new(Some(stream))),
27        }
28    }
29
30    /// Start the connection loop.
31    pub async fn run(self) -> Result<()> {
32        let mut stream_guard = self.stream.write().await;
33        if let Some(mut stream) = stream_guard.take() {
34            drop(stream_guard);
35            loop {
36                match stream.next().await {
37                    Some(Ok(Message::Text(text))) => {
38                        if let Ok(value) = serde_json::from_str::<Value>(&text) {
39                            if let Ok(msg) = CDPMessage::from_json(value) {
40                                let _ = self.cdp.handle_message(msg).await;
41                            }
42                        }
43                    }
44                    Some(Ok(Message::Close(_))) | None => return Ok(()),
45                    Some(Err(_)) => return Ok(()),
46                    Some(Ok(_)) => {}
47                }
48            }
49        } else {
50            Err(crate::error::BrowserError::websocket(
51                "Connection::run",
52                "WebSocket stream not available",
53            ))
54        }
55    }
56}