1use crate::protocol::{JsonTarget, JsonVersion};
12use crate::session::CdpSession;
13use base64::Engine;
14use http_body_util::Full;
15use hyper::body::Bytes;
16use hyper::server::conn::http1;
17use hyper::service::service_fn;
18use hyper::{Request, Response, StatusCode};
19use hyper_util::rt::TokioIo;
20use oxibrowser_core::Browser;
21use sha1::{Digest, Sha1};
22use std::net::SocketAddr;
23use std::sync::Arc;
24use std::sync::atomic::{AtomicUsize, Ordering};
25use tokio::net::TcpListener;
26use tokio::sync::broadcast;
27use tokio_tungstenite::WebSocketStream;
28use tokio_tungstenite::tungstenite::protocol::Role;
29use tracing::{error, info, warn};
30
31type HttpBody = Full<Bytes>;
33
34const WS_MAGIC_GUID: &[u8] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
36
37const MAX_CONCURRENT_CONNECTIONS: usize = 16;
39
40pub(crate) const MAX_CDP_MESSAGE_SIZE: usize = 1024 * 1024;
42
43fn derive_accept_key(client_key: &[u8]) -> String {
45 let mut hasher = Sha1::new();
46 hasher.update(client_key);
47 hasher.update(WS_MAGIC_GUID);
48 let hash = hasher.finalize();
49 base64::engine::general_purpose::STANDARD.encode(hash)
50}
51
52pub struct CdpServer {
54 addr: SocketAddr,
56 browser: Arc<Browser>,
58 shutdown_tx: broadcast::Sender<()>,
60 connection_count: Arc<AtomicUsize>,
62}
63
64impl CdpServer {
65 pub fn new(addr: SocketAddr, browser: Arc<Browser>) -> Self {
67 let (shutdown_tx, _) = broadcast::channel(1);
68 Self {
69 addr,
70 browser,
71 shutdown_tx,
72 connection_count: Arc::new(AtomicUsize::new(0)),
73 }
74 }
75
76 #[tracing::instrument(skip(self), fields(addr = %self.addr), err)]
80 pub async fn start(self: &Arc<Self>) -> anyhow::Result<SocketAddr> {
81 let listener = TcpListener::bind(self.addr).await?;
82 let actual_addr = listener.local_addr()?;
83
84 info!(addr = %actual_addr, "CDP server listening");
85
86 let mut shutdown_rx = self.shutdown_tx.subscribe();
87
88 loop {
89 tokio::select! {
90 accept_result = listener.accept() => {
91 match accept_result {
92 Ok((stream, peer_addr)) => {
93 let active = self.connection_count.load(Ordering::Relaxed);
94 if active >= MAX_CONCURRENT_CONNECTIONS {
95 warn!(
96 peer = %peer_addr,
97 active,
98 max = MAX_CONCURRENT_CONNECTIONS,
99 "rejecting connection: too many concurrent connections"
100 );
101 continue;
102 }
103
104 info!(peer = %peer_addr, "new connection");
105
106 let server = self.clone();
107 let connection_count = self.connection_count.clone();
108 connection_count.fetch_add(1, Ordering::Relaxed);
109 tokio::spawn(async move {
110 if let Err(e) = server.handle_connection(stream, peer_addr).await {
111 warn!(peer = %peer_addr, error = %e, "connection error");
112 }
113 connection_count.fetch_sub(1, Ordering::Relaxed);
114 });
115 }
116 Err(e) => {
117 error!(error = %e, "accept failed");
118 }
119 }
120 }
121 _ = shutdown_rx.recv() => {
122 info!("CDP server shutting down");
123 break;
124 }
125 }
126 }
127
128 Ok(actual_addr)
129 }
130
131 pub fn shutdown(&self) {
133 let _ = self.shutdown_tx.send(());
134 }
135
136 async fn handle_connection(
138 &self,
139 stream: tokio::net::TcpStream,
140 _peer_addr: SocketAddr,
141 ) -> anyhow::Result<()> {
142 let ws_url = format!("ws://{}/ws", self.addr);
143 let browser = self.browser.clone();
144
145 let io = TokioIo::new(stream);
146
147 let service = service_fn(move |req: Request<hyper::body::Incoming>| {
148 let ws_url = ws_url.clone();
149 let browser = browser.clone();
150 async move { Self::handle_http_request(req, &ws_url, browser).await }
151 });
152
153 http1::Builder::new()
154 .serve_connection(io, service)
155 .with_upgrades()
156 .await?;
157
158 Ok(())
159 }
160
161 async fn handle_http_request(
163 req: Request<hyper::body::Incoming>,
164 ws_url: &str,
165 browser: Arc<Browser>,
166 ) -> anyhow::Result<Response<HttpBody>> {
167 match req.uri().path() {
168 "/health" => {
169 let body = serde_json::json!({
170 "status": "ok",
171 "version": env!("CARGO_PKG_VERSION"),
172 });
173 Ok(Response::builder()
174 .header("Content-Type", "application/json")
175 .body(Full::new(Bytes::from(body.to_string())))?)
176 }
177 "/json/version" => {
178 let version = JsonVersion::new(ws_url.to_string());
179 let body = serde_json::to_string(&version)?;
180 Ok(Response::builder()
181 .header("Content-Type", "application/json")
182 .body(Full::new(Bytes::from(body)))?)
183 }
184 "/json" | "/json/list" => {
185 let targets = vec![JsonTarget {
186 id: "default".to_string(),
187 title: "OxiBrowser".to_string(),
188 target_type: "page".to_string(),
189 url: "about:blank".to_string(),
190 web_socket_debugger_url: ws_url.to_string(),
191 }];
192 let body = serde_json::to_string(&targets)?;
193 Ok(Response::builder()
194 .header("Content-Type", "application/json")
195 .body(Full::new(Bytes::from(body)))?)
196 }
197 "/ws" => {
198 let is_ws_upgrade = req
200 .headers()
201 .get("upgrade")
202 .and_then(|v| v.to_str().ok())
203 .map(|v| v.eq_ignore_ascii_case("websocket"))
204 .unwrap_or(false);
205
206 if !is_ws_upgrade {
207 return Ok(Response::builder().status(StatusCode::BAD_REQUEST).body(
208 Full::new(Bytes::from(
209 "WebSocket upgrade required. Use a WebSocket client.",
210 )),
211 )?);
212 }
213
214 let client_key = req
216 .headers()
217 .get("sec-websocket-key")
218 .and_then(|v| v.to_str().ok())
219 .map(|s| s.to_string())
220 .unwrap_or_default();
221
222 let accept_key = derive_accept_key(client_key.as_bytes());
223
224 tokio::spawn(async move {
227 match hyper::upgrade::on(req).await {
228 Ok(upgraded) => {
229 let io = TokioIo::new(upgraded);
232 let ws = WebSocketStream::from_raw_socket(io, Role::Server, None).await;
233
234 match CdpSession::new(ws, browser).await {
235 Ok(session) => {
236 if let Err(e) = session.run().await {
237 warn!(error = %e, "CDP session error");
238 }
239 }
240 Err(e) => {
241 warn!(error = %e, "failed to create CDP session");
242 }
243 }
244 }
245 Err(e) => {
246 warn!(error = %e, "WebSocket upgrade failed");
247 }
248 }
249 });
250
251 Ok(Response::builder()
253 .status(StatusCode::SWITCHING_PROTOCOLS)
254 .header("Upgrade", "websocket")
255 .header("Connection", "Upgrade")
256 .header("Sec-WebSocket-Accept", accept_key)
257 .body(Full::new(Bytes::new()))?)
258 }
259 _ => Ok(Response::builder()
260 .status(StatusCode::NOT_FOUND)
261 .body(Full::new(Bytes::from("Not Found")))?),
262 }
263 }
264}