Skip to main content

oxibrowser_cdp/
server.rs

1//! CDP server — TCP listener with HTTP and WebSocket endpoints.
2//!
3//! Provides:
4//! - GET /json/version — browser metadata
5//! - GET /json — list of debuggable targets
6//! - WebSocket upgrade for CDP message dispatch
7//!
8//! The server holds a shared reference to the `Browser` instance and passes
9//! it to each `CdpSession` created on WebSocket upgrade.
10
11use crate::protocol::{JsonTarget, JsonVersion};
12use crate::session::CdpSession;
13use base64::Engine;
14use http_body_util::Full;
15use hyper::body::Bytes;
16use hyper::server::conn::http1;
17use hyper::service::service_fn;
18use hyper::{Request, Response, StatusCode};
19use hyper_util::rt::TokioIo;
20use oxibrowser_core::Browser;
21use sha1::{Digest, Sha1};
22use std::net::SocketAddr;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use tokio::net::TcpListener;
26use tokio::sync::broadcast;
27use tokio_tungstenite::WebSocketStream;
28use tokio_tungstenite::tungstenite::protocol::Role;
29use tracing::{error, info, warn};
30
31/// Type alias for HTTP response body.
32type HttpBody = Full<Bytes>;
33
34/// WebSocket magic GUID for accept-key derivation (RFC 6455).
35const WS_MAGIC_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
36
37/// Maximum number of concurrent CDP connections.
38const MAX_CONCURRENT_CONNECTIONS: usize = 16;
39
40/// Maximum allowed CDP message size (1 MB).
41pub(crate) const MAX_CDP_MESSAGE_SIZE: usize = 1024 * 1024;
42
43/// Derive the `Sec-WebSocket-Accept` value from the client's key.
44fn derive_accept_key(client_key: &[u8]) -> String {
45    let mut hasher = Sha1::new();
46    hasher.update(client_key);
47    hasher.update(WS_MAGIC_GUID);
48    let hash = hasher.finalize();
49    base64::engine::general_purpose::STANDARD.encode(hash)
50}
51
52/// CDP server that listens for HTTP/WebSocket connections.
53pub struct CdpServer {
54    /// Address to bind to.
55    addr: SocketAddr,
56    /// Shared browser instance.
57    browser: Arc<Browser>,
58    /// Shutdown signal sender.
59    shutdown_tx: broadcast::Sender<()>,
60    /// Active connection count.
61    connection_count: Arc<AtomicUsize>,
62}
63
64impl CdpServer {
65    /// Create a new CDP server bound to the given address with a shared Browser.
66    pub fn new(addr: SocketAddr, browser: Arc<Browser>) -> Self {
67        let (shutdown_tx, _) = broadcast::channel(1);
68        Self {
69            addr,
70            browser,
71            shutdown_tx,
72            connection_count: Arc::new(AtomicUsize::new(0)),
73        }
74    }
75
76    /// Start the CDP server.
77    ///
78    /// Returns the actual bound address (useful when port 0 is used).
79    #[tracing::instrument(skip(self), fields(addr = %self.addr), err)]
80    pub async fn start(self: &Arc<Self>) -> anyhow::Result<SocketAddr> {
81        let listener = TcpListener::bind(self.addr).await?;
82        let actual_addr = listener.local_addr()?;
83
84        info!(addr = %actual_addr, "CDP server listening");
85
86        let mut shutdown_rx = self.shutdown_tx.subscribe();
87
88        loop {
89            tokio::select! {
90                accept_result = listener.accept() => {
91                    match accept_result {
92                        Ok((stream, peer_addr)) => {
93                            let active = self.connection_count.load(Ordering::Relaxed);
94                            if active >= MAX_CONCURRENT_CONNECTIONS {
95                                warn!(
96                                    peer = %peer_addr,
97                                    active,
98                                    max = MAX_CONCURRENT_CONNECTIONS,
99                                    "rejecting connection: too many concurrent connections"
100                                );
101                                continue;
102                            }
103
104                            info!(peer = %peer_addr, "new connection");
105
106                            let server = self.clone();
107                            let connection_count = self.connection_count.clone();
108                            connection_count.fetch_add(1, Ordering::Relaxed);
109                            tokio::spawn(async move {
110                                if let Err(e) = server.handle_connection(stream, peer_addr).await {
111                                    warn!(peer = %peer_addr, error = %e, "connection error");
112                                }
113                                connection_count.fetch_sub(1, Ordering::Relaxed);
114                            });
115                        }
116                        Err(e) => {
117                            error!(error = %e, "accept failed");
118                        }
119                    }
120                }
121                _ = shutdown_rx.recv() => {
122                    info!("CDP server shutting down");
123                    break;
124                }
125            }
126        }
127
128        Ok(actual_addr)
129    }
130
131    /// Signal the server to shut down.
132    pub fn shutdown(&self) {
133        let _ = self.shutdown_tx.send(());
134    }
135
136    /// Handle a single TCP connection (HTTP upgrade or WebSocket).
137    async fn handle_connection(
138        &self,
139        stream: tokio::net::TcpStream,
140        _peer_addr: SocketAddr,
141    ) -> anyhow::Result<()> {
142        let ws_url = format!("ws://{}/ws", self.addr);
143        let browser = self.browser.clone();
144
145        let io = TokioIo::new(stream);
146
147        let service = service_fn(move |req: Request<hyper::body::Incoming>| {
148            let ws_url = ws_url.clone();
149            let browser = browser.clone();
150            async move { Self::handle_http_request(req, &ws_url, browser).await }
151        });
152
153        http1::Builder::new()
154            .serve_connection(io, service)
155            .with_upgrades()
156            .await?;
157
158        Ok(())
159    }
160
161    /// Handle an HTTP request, possibly upgrading to WebSocket.
162    async fn handle_http_request(
163        req: Request<hyper::body::Incoming>,
164        ws_url: &str,
165        browser: Arc<Browser>,
166    ) -> anyhow::Result<Response<HttpBody>> {
167        match req.uri().path() {
168            "/health" => {
169                let body = serde_json::json!({
170                    "status": "ok",
171                    "version": env!("CARGO_PKG_VERSION"),
172                });
173                Ok(Response::builder()
174                    .header("Content-Type", "application/json")
175                    .body(Full::new(Bytes::from(body.to_string())))?)
176            }
177            "/json/version" => {
178                let version = JsonVersion::new(ws_url.to_string());
179                let body = serde_json::to_string(&version)?;
180                Ok(Response::builder()
181                    .header("Content-Type", "application/json")
182                    .body(Full::new(Bytes::from(body)))?)
183            }
184            "/json" | "/json/list" => {
185                let targets = vec![JsonTarget {
186                    id: "default".to_string(),
187                    title: "OxiBrowser".to_string(),
188                    target_type: "page".to_string(),
189                    url: "about:blank".to_string(),
190                    web_socket_debugger_url: ws_url.to_string(),
191                }];
192                let body = serde_json::to_string(&targets)?;
193                Ok(Response::builder()
194                    .header("Content-Type", "application/json")
195                    .body(Full::new(Bytes::from(body)))?)
196            }
197            "/ws" => {
198                // Check for WebSocket upgrade
199                let is_ws_upgrade = req
200                    .headers()
201                    .get("upgrade")
202                    .and_then(|v| v.to_str().ok())
203                    .map(|v| v.eq_ignore_ascii_case("websocket"))
204                    .unwrap_or(false);
205
206                if !is_ws_upgrade {
207                    return Ok(Response::builder().status(StatusCode::BAD_REQUEST).body(
208                        Full::new(Bytes::from(
209                            "WebSocket upgrade required. Use a WebSocket client.",
210                        )),
211                    )?);
212                }
213
214                // Extract the client's Sec-WebSocket-Key
215                let client_key = req
216                    .headers()
217                    .get("sec-websocket-key")
218                    .and_then(|v| v.to_str().ok())
219                    .map(|s| s.to_string())
220                    .unwrap_or_default();
221
222                let accept_key = derive_accept_key(client_key.as_bytes());
223
224                // Spawn a task to handle the upgraded WebSocket connection.
225                // After we return 101, hyper will hand over the raw IO.
226                tokio::spawn(async move {
227                    match hyper::upgrade::on(req).await {
228                        Ok(upgraded) => {
229                            // Wrap hyper's Upgraded IO with TokioIo to get
230                            // tokio AsyncRead/AsyncWrite traits.
231                            let io = TokioIo::new(upgraded);
232                            let ws = WebSocketStream::from_raw_socket(io, Role::Server, None).await;
233
234                            match CdpSession::new(ws, browser).await {
235                                Ok(session) => {
236                                    if let Err(e) = session.run().await {
237                                        warn!(error = %e, "CDP session error");
238                                    }
239                                }
240                                Err(e) => {
241                                    warn!(error = %e, "failed to create CDP session");
242                                }
243                            }
244                        }
245                        Err(e) => {
246                            warn!(error = %e, "WebSocket upgrade failed");
247                        }
248                    }
249                });
250
251                // Return 101 Switching Protocols with the accept key
252                Ok(Response::builder()
253                    .status(StatusCode::SWITCHING_PROTOCOLS)
254                    .header("Upgrade", "websocket")
255                    .header("Connection", "Upgrade")
256                    .header("Sec-WebSocket-Accept", accept_key)
257                    .body(Full::new(Bytes::new()))?)
258            }
259            _ => Ok(Response::builder()
260                .status(StatusCode::NOT_FOUND)
261                .body(Full::new(Bytes::from("Not Found")))?),
262        }
263    }
264}