1use std::collections::BTreeMap;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::time::{Duration, Instant};
29
30use async_trait::async_trait;
31use secrecy::{ExposeSecret, SecretString};
32use serde::Deserialize;
33use serde_json::json;
34use tokio::sync::Mutex;
35use url::Url;
36
37use super::cdp::{CdpClient, CdpError, CdpEvent};
38use super::{BrowserBackend, RenderedPage};
39use crate::{Error, Result};
40
41const API_BASE: &str = "https://api.browserbase.com/v1";
42const CDP_CALL_TIMEOUT: Duration = Duration::from_secs(45);
46
47#[derive(Debug, Clone)]
49pub struct BrowserbaseConfig {
50 pub api_key: SecretString,
53 pub project_id: String,
55}
56
57pub struct BrowserbaseBackend {
59 cdp: CdpClient,
60 fetch_lock: Mutex<()>,
63 session_id: String,
64}
65
66#[derive(Debug, Deserialize)]
67struct CreateSessionResponse {
68 id: String,
69 #[serde(rename = "connectUrl")]
70 connect_url: String,
71}
72
73#[derive(Debug, Deserialize)]
74struct CreateTargetResult {
75 #[serde(rename = "targetId")]
76 target_id: String,
77}
78
79#[derive(Debug, Deserialize)]
80struct AttachToTargetResult {
81 #[serde(rename = "sessionId")]
82 session_id: String,
83}
84
85#[derive(Debug, Deserialize)]
86struct NavigateResult {
87 #[serde(rename = "frameId")]
88 frame_id: String,
89 #[serde(rename = "errorText", default)]
90 error_text: Option<String>,
91}
92
93#[derive(Debug, Deserialize)]
94struct EvaluateResult {
95 result: RemoteObject,
96 #[serde(rename = "exceptionDetails", default)]
97 exception_details: Option<serde_json::Value>,
98}
99
100#[derive(Debug, Deserialize)]
101struct RemoteObject {
102 #[serde(default)]
103 value: Option<serde_json::Value>,
104}
105
106impl BrowserbaseBackend {
107 pub async fn connect(cfg: BrowserbaseConfig) -> Result<Self> {
114 let session = create_session(&cfg).await?;
115 let cdp = CdpClient::connect(&session.connect_url)
116 .await
117 .map_err(|e| Error::BrowserSetup {
118 message: format!("connect CDP: {e}"),
119 })?;
120 tracing::info!(session_id = %session.id, "browserbase session opened");
121 Ok(Self {
122 cdp,
123 fetch_lock: Mutex::new(()),
124 session_id: session.id,
125 })
126 }
127
128 #[must_use]
130 pub fn session_id(&self) -> &str {
131 &self.session_id
132 }
133
134 #[cfg(test)]
138 pub(crate) fn from_parts(cdp: CdpClient, session_id: String) -> Self {
139 Self {
140 cdp,
141 fetch_lock: Mutex::new(()),
142 session_id,
143 }
144 }
145}
146
147async fn create_session(cfg: &BrowserbaseConfig) -> Result<CreateSessionResponse> {
148 let http = reqwest::Client::builder()
149 .timeout(Duration::from_secs(30))
150 .build()
151 .map_err(|e| Error::BrowserSetup {
152 message: format!("http client: {e}"),
153 })?;
154 let resp = http
155 .post(format!("{API_BASE}/sessions"))
156 .header("x-bb-api-key", cfg.api_key.expose_secret())
157 .header("content-type", "application/json")
158 .body(json!({ "projectId": cfg.project_id }).to_string())
159 .send()
160 .await
161 .map_err(|e| Error::BrowserSetup {
162 message: format!("create session: {e}"),
163 })?;
164 let status = resp.status();
165 if !status.is_success() {
166 let detail = resp.text().await.unwrap_or_default();
167 return Err(Error::BrowserSetup {
168 message: format!("create session: HTTP {status}: {detail}"),
169 });
170 }
171 resp.json::<CreateSessionResponse>()
172 .await
173 .map_err(|e| Error::BrowserSetup {
174 message: format!("decode session response: {e}"),
175 })
176}
177
178#[async_trait]
179impl BrowserBackend for BrowserbaseBackend {
180 #[allow(clippy::too_many_lines)]
184 async fn fetch(
185 &self,
186 url: &Url,
187 headers: &BTreeMap<String, String>,
188 timeout: Duration,
189 ) -> Result<RenderedPage> {
190 let start = Instant::now();
191 let cdp = &self.cdp;
192
193 let _guard = self.fetch_lock.lock().await;
197
198 let work = async {
199 let CreateTargetResult { target_id } = cdp
202 .execute(
203 "Target.createTarget",
204 json!({ "url": "about:blank" }),
205 None,
206 CDP_CALL_TIMEOUT,
207 )
208 .await
209 .map_err(|e| browser_err(&e))?;
210
211 let AttachToTargetResult { session_id: sid } = cdp
215 .execute(
216 "Target.attachToTarget",
217 json!({ "targetId": target_id, "flatten": true }),
218 None,
219 CDP_CALL_TIMEOUT,
220 )
221 .await
222 .map_err(|e| browser_err(&e))?;
223
224 let _: serde_json::Value = cdp
227 .execute("Page.enable", json!({}), Some(&sid), CDP_CALL_TIMEOUT)
228 .await
229 .map_err(|e| browser_err(&e))?;
230 let _: serde_json::Value = cdp
231 .execute("Network.enable", json!({}), Some(&sid), CDP_CALL_TIMEOUT)
232 .await
233 .map_err(|e| browser_err(&e))?;
234
235 if !headers.is_empty() {
245 let mut ua: Option<&str> = None;
246 let mut extras = serde_json::Map::new();
247 for (k, v) in headers {
248 if k.eq_ignore_ascii_case("user-agent") {
249 ua = Some(v.as_str());
250 } else {
251 extras.insert(k.clone(), serde_json::Value::String(v.clone()));
252 }
253 }
254 if let Some(ua) = ua {
255 let _: serde_json::Value = cdp
256 .execute(
257 "Network.setUserAgentOverride",
258 json!({ "userAgent": ua }),
259 Some(&sid),
260 CDP_CALL_TIMEOUT,
261 )
262 .await
263 .map_err(|e| browser_err(&e))?;
264 }
265 if !extras.is_empty() {
266 let _: serde_json::Value = cdp
267 .execute(
268 "Network.setExtraHTTPHeaders",
269 json!({ "headers": extras }),
270 Some(&sid),
271 CDP_CALL_TIMEOUT,
272 )
273 .await
274 .map_err(|e| browser_err(&e))?;
275 }
276 }
277
278 let captured = Arc::new(Mutex::new(None::<(u16, String)>));
285 let captured_clone = Arc::clone(&captured);
286 let sid_for_collector = sid.clone();
287 let stop = Arc::new(AtomicBool::new(false));
288 let stop_clone = Arc::clone(&stop);
289 let mut collector_rx = cdp.subscribe_events();
290 let mut wait_rx = cdp.subscribe_events();
291 let collector = tokio::spawn(async move {
292 while !stop_clone.load(Ordering::Acquire) {
293 let Ok(evt) = collector_rx.recv().await else {
294 return;
295 };
296 if evt.session_id.as_deref() == Some(&sid_for_collector)
297 && evt.method == "Network.responseReceived"
298 {
299 if let Some((status, url)) = extract_document_response(&evt) {
300 let mut g = captured_clone.lock().await;
301 if g.is_none() {
302 *g = Some((status, url));
303 }
304 }
305 }
306 }
307 });
308
309 let nav: NavigateResult = cdp
314 .execute(
315 "Page.navigate",
316 json!({ "url": url.as_str() }),
317 Some(&sid),
318 CDP_CALL_TIMEOUT,
319 )
320 .await
321 .map_err(|e| browser_err(&e))?;
322 if let Some(err) = nav.error_text.as_deref().filter(|s| !s.is_empty()) {
323 return Err(Error::BrowserSetup {
324 message: format!("Page.navigate {url}: {err}"),
325 });
326 }
327
328 let target_frame = nav.frame_id.clone();
333 let sid_for_wait = sid.clone();
334 let _ = CdpClient::wait_for_event_on(
335 &mut wait_rx,
336 move |e| {
337 e.session_id.as_deref() == Some(&sid_for_wait)
338 && e.method == "Page.frameStoppedLoading"
339 && e.params.get("frameId").and_then(|v| v.as_str()) == Some(&target_frame)
340 },
341 CDP_CALL_TIMEOUT,
342 "Page.frameStoppedLoading",
343 )
344 .await
345 .map_err(|e| browser_err(&e))?;
346
347 let eval: EvaluateResult = cdp
349 .execute(
350 "Runtime.evaluate",
351 json!({
352 "expression": "document.documentElement.outerHTML",
353 "returnByValue": true,
354 }),
355 Some(&sid),
356 CDP_CALL_TIMEOUT,
357 )
358 .await
359 .map_err(|e| browser_err(&e))?;
360 if let Some(exc) = eval.exception_details {
361 return Err(Error::BrowserSetup {
362 message: format!("Runtime.evaluate threw: {exc}"),
363 });
364 }
365 let body = eval
366 .result
367 .value
368 .and_then(|v| v.as_str().map(str::to_owned))
369 .unwrap_or_default();
370
371 stop.store(true, Ordering::Release);
374 collector.abort();
375
376 let (status, final_url) = {
377 let g = captured.lock().await;
378 g.clone().map_or_else(
379 || (0_u16, url.clone()),
380 |(s, u)| (s, Url::parse(&u).unwrap_or_else(|_| url.clone())),
381 )
382 };
383
384 let _: std::result::Result<serde_json::Value, _> = cdp
388 .execute(
389 "Target.closeTarget",
390 json!({ "targetId": target_id }),
391 None,
392 CDP_CALL_TIMEOUT,
393 )
394 .await;
395
396 Ok::<_, Error>(RenderedPage {
397 status,
398 final_url,
399 body,
400 elapsed_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
401 })
402 };
403
404 tokio::time::timeout(timeout, work)
405 .await
406 .map_err(|_| Error::BrowserSetup {
407 message: format!("browser fetch timeout after {}s", timeout.as_secs()),
408 })?
409 }
410}
411
412fn browser_err(e: &CdpError) -> Error {
413 Error::BrowserSetup {
414 message: e.to_string(),
415 }
416}
417
418fn extract_document_response(evt: &CdpEvent) -> Option<(u16, String)> {
422 let kind = evt.params.get("type")?.as_str()?;
423 if kind != "Document" {
424 return None;
425 }
426 let response = evt.params.get("response")?;
427 let status = response.get("status")?.as_u64()?;
428 let url = response.get("url")?.as_str()?.to_owned();
429 Some((u16::try_from(status).unwrap_or(0), url))
430}
431
432#[cfg(test)]
433mod tests {
434 use super::*;
435 use crate::browser::mock_cdp::{FrameOut, MockCdpServer};
436
437 #[test]
438 fn extract_document_response_filters_non_documents() {
439 let xhr = CdpEvent {
440 method: "Network.responseReceived".into(),
441 params: json!({
442 "type": "XHR",
443 "response": { "status": 200, "url": "https://example.com/api" },
444 }),
445 session_id: Some("S".into()),
446 };
447 assert!(extract_document_response(&xhr).is_none());
448 }
449
450 #[test]
451 fn extract_document_response_picks_main_document() {
452 let doc = CdpEvent {
453 method: "Network.responseReceived".into(),
454 params: json!({
455 "type": "Document",
456 "response": { "status": 404, "url": "https://example.com/missing" },
457 }),
458 session_id: Some("S".into()),
459 };
460 assert_eq!(
461 extract_document_response(&doc),
462 Some((404_u16, "https://example.com/missing".into()))
463 );
464 }
465
466 fn happy_path_handler(
477 body: &'static str,
478 status: u16,
479 ) -> impl Fn(&str, &serde_json::Value, Option<&str>) -> Vec<FrameOut> + Send + Sync + 'static
480 {
481 move |method, params, _sid| match method {
482 "Target.createTarget" => vec![FrameOut::Response(json!({ "targetId": "T1" }))],
483 "Target.attachToTarget" => vec![FrameOut::Response(json!({ "sessionId": "S1" }))],
484 "Page.navigate" => {
485 let url = params
486 .get("url")
487 .and_then(serde_json::Value::as_str)
488 .unwrap_or("about:blank")
489 .to_owned();
490 vec![
491 FrameOut::Response(json!({ "frameId": "F1" })),
492 FrameOut::Event {
493 method: "Network.responseReceived".into(),
494 params: json!({
495 "type": "Document",
496 "response": { "status": status, "url": url },
497 }),
498 session_id: Some("S1".into()),
499 },
500 FrameOut::Event {
501 method: "Page.frameStoppedLoading".into(),
502 params: json!({ "frameId": "F1" }),
503 session_id: Some("S1".into()),
504 },
505 ]
506 }
507 "Runtime.evaluate" => vec![FrameOut::Response(json!({
508 "result": { "type": "string", "value": body },
509 }))],
510 _ => vec![FrameOut::Response(json!({}))],
515 }
516 }
517
518 #[tokio::test]
519 async fn fetch_returns_status_url_and_body_on_happy_path() {
520 let server =
521 MockCdpServer::start(happy_path_handler("<html><body>hello</body></html>", 200)).await;
522 let cdp = CdpClient::connect(&server.ws_url())
523 .await
524 .expect("cdp connect to mock");
525 let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
526 assert_eq!(backend.session_id(), "test-session");
527
528 let url = Url::parse("https://example.com/u/torvalds").unwrap();
529 let headers = BTreeMap::new();
530 let page = backend
531 .fetch(&url, &headers, Duration::from_secs(5))
532 .await
533 .expect("fetch ok");
534
535 assert_eq!(page.status, 200);
536 assert_eq!(page.final_url.as_str(), "https://example.com/u/torvalds");
537 assert!(page.body.contains("hello"), "body: {}", page.body);
538 }
539
540 #[tokio::test]
541 async fn fetch_propagates_404_status_from_navigation_response() {
542 let server =
543 MockCdpServer::start(happy_path_handler("<html><body>404</body></html>", 404)).await;
544 let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
545 let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
546
547 let url = Url::parse("https://example.com/u/nobody").unwrap();
548 let page = backend
549 .fetch(&url, &BTreeMap::new(), Duration::from_secs(5))
550 .await
551 .expect("fetch ok");
552
553 assert_eq!(page.status, 404);
554 }
555
556 #[tokio::test]
557 async fn fetch_sends_per_site_headers_via_extra_headers_and_ua_override() {
558 let server = MockCdpServer::start(happy_path_handler("<html></html>", 200)).await;
559 let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
560 let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
561
562 let mut headers = BTreeMap::new();
563 headers.insert("X-IG-App-ID".into(), "936619743392459".into());
564 headers.insert("User-Agent".into(), "Mozilla/5.0 (test)".into());
565
566 backend
567 .fetch(
568 &Url::parse("https://example.com/u/torvalds").unwrap(),
569 &headers,
570 Duration::from_secs(5),
571 )
572 .await
573 .expect("fetch ok");
574
575 let log = server.received().await;
576 let ua = log
577 .iter()
578 .find(|r| r.method == "Network.setUserAgentOverride")
579 .expect("setUserAgentOverride was sent");
580 assert_eq!(
581 ua.params
582 .get("userAgent")
583 .and_then(serde_json::Value::as_str),
584 Some("Mozilla/5.0 (test)"),
585 "UA override params: {:?}",
586 ua.params
587 );
588
589 let extras = log
590 .iter()
591 .find(|r| r.method == "Network.setExtraHTTPHeaders")
592 .expect("setExtraHTTPHeaders was sent");
593 let map = extras
594 .params
595 .get("headers")
596 .and_then(serde_json::Value::as_object)
597 .expect("headers object");
598 assert_eq!(
599 map.get("X-IG-App-ID").and_then(serde_json::Value::as_str),
600 Some("936619743392459")
601 );
602 assert!(
605 !map.contains_key("User-Agent"),
606 "User-Agent leaked into setExtraHTTPHeaders: {map:?}"
607 );
608
609 let nav_idx = log
611 .iter()
612 .position(|r| r.method == "Page.navigate")
613 .unwrap();
614 let ua_idx = log
615 .iter()
616 .position(|r| r.method == "Network.setUserAgentOverride")
617 .unwrap();
618 let extras_idx = log
619 .iter()
620 .position(|r| r.method == "Network.setExtraHTTPHeaders")
621 .unwrap();
622 assert!(
623 ua_idx < nav_idx && extras_idx < nav_idx,
624 "headers must be set before navigate; got order: \
625 ua={ua_idx} extras={extras_idx} nav={nav_idx}"
626 );
627 }
628
629 #[tokio::test]
630 async fn fetch_skips_header_commands_when_no_headers_given() {
631 let server = MockCdpServer::start(happy_path_handler("<html></html>", 200)).await;
632 let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
633 let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
634
635 backend
636 .fetch(
637 &Url::parse("https://example.com/u/x").unwrap(),
638 &BTreeMap::new(),
639 Duration::from_secs(5),
640 )
641 .await
642 .expect("fetch ok");
643
644 let methods: Vec<String> = server
645 .received()
646 .await
647 .into_iter()
648 .map(|r| r.method)
649 .collect();
650 assert!(
651 !methods.iter().any(|m| m == "Network.setExtraHTTPHeaders"),
652 "setExtraHTTPHeaders should not fire on empty headers; saw {methods:?}"
653 );
654 assert!(
655 !methods.iter().any(|m| m == "Network.setUserAgentOverride"),
656 "setUserAgentOverride should not fire on empty headers; saw {methods:?}"
657 );
658 }
659}