use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use async_trait::async_trait;
use secrecy::{ExposeSecret, SecretString};
use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex;
use url::Url;
use super::cdp::{CdpClient, CdpError, CdpEvent};
use super::{BrowserBackend, RenderedPage};
use crate::{Error, Result};
const API_BASE: &str = "https://api.browserbase.com/v1";
const CDP_CALL_TIMEOUT: Duration = Duration::from_secs(45);
#[derive(Debug, Clone)]
pub struct BrowserbaseConfig {
pub api_key: SecretString,
pub project_id: String,
}
pub struct BrowserbaseBackend {
cdp: CdpClient,
fetch_lock: Mutex<()>,
session_id: String,
}
#[derive(Debug, Deserialize)]
struct CreateSessionResponse {
id: String,
#[serde(rename = "connectUrl")]
connect_url: String,
}
#[derive(Debug, Deserialize)]
struct CreateTargetResult {
#[serde(rename = "targetId")]
target_id: String,
}
#[derive(Debug, Deserialize)]
struct AttachToTargetResult {
#[serde(rename = "sessionId")]
session_id: String,
}
#[derive(Debug, Deserialize)]
struct NavigateResult {
#[serde(rename = "frameId")]
frame_id: String,
#[serde(rename = "errorText", default)]
error_text: Option<String>,
}
#[derive(Debug, Deserialize)]
struct EvaluateResult {
result: RemoteObject,
#[serde(rename = "exceptionDetails", default)]
exception_details: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct RemoteObject {
#[serde(default)]
value: Option<serde_json::Value>,
}
impl BrowserbaseBackend {
pub async fn connect(cfg: BrowserbaseConfig) -> Result<Self> {
let session = create_session(&cfg).await?;
let cdp = CdpClient::connect(&session.connect_url)
.await
.map_err(|e| Error::BrowserSetup {
message: format!("connect CDP: {e}"),
})?;
tracing::info!(session_id = %session.id, "browserbase session opened");
Ok(Self {
cdp,
fetch_lock: Mutex::new(()),
session_id: session.id,
})
}
#[must_use]
pub fn session_id(&self) -> &str {
&self.session_id
}
#[cfg(test)]
pub(crate) fn from_parts(cdp: CdpClient, session_id: String) -> Self {
Self {
cdp,
fetch_lock: Mutex::new(()),
session_id,
}
}
}
async fn create_session(cfg: &BrowserbaseConfig) -> Result<CreateSessionResponse> {
let http = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.build()
.map_err(|e| Error::BrowserSetup {
message: format!("http client: {e}"),
})?;
let resp = http
.post(format!("{API_BASE}/sessions"))
.header("x-bb-api-key", cfg.api_key.expose_secret())
.header("content-type", "application/json")
.body(json!({ "projectId": cfg.project_id }).to_string())
.send()
.await
.map_err(|e| Error::BrowserSetup {
message: format!("create session: {e}"),
})?;
let status = resp.status();
if !status.is_success() {
let detail = resp.text().await.unwrap_or_default();
return Err(Error::BrowserSetup {
message: format!("create session: HTTP {status}: {detail}"),
});
}
resp.json::<CreateSessionResponse>()
.await
.map_err(|e| Error::BrowserSetup {
message: format!("decode session response: {e}"),
})
}
#[async_trait]
impl BrowserBackend for BrowserbaseBackend {
#[allow(clippy::too_many_lines)]
async fn fetch(
&self,
url: &Url,
headers: &BTreeMap<String, String>,
timeout: Duration,
) -> Result<RenderedPage> {
let start = Instant::now();
let cdp = &self.cdp;
let _guard = self.fetch_lock.lock().await;
let work = async {
let CreateTargetResult { target_id } = cdp
.execute(
"Target.createTarget",
json!({ "url": "about:blank" }),
None,
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
let AttachToTargetResult { session_id: sid } = cdp
.execute(
"Target.attachToTarget",
json!({ "targetId": target_id, "flatten": true }),
None,
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
let _: serde_json::Value = cdp
.execute("Page.enable", json!({}), Some(&sid), CDP_CALL_TIMEOUT)
.await
.map_err(|e| browser_err(&e))?;
let _: serde_json::Value = cdp
.execute("Network.enable", json!({}), Some(&sid), CDP_CALL_TIMEOUT)
.await
.map_err(|e| browser_err(&e))?;
if !headers.is_empty() {
let mut ua: Option<&str> = None;
let mut extras = serde_json::Map::new();
for (k, v) in headers {
if k.eq_ignore_ascii_case("user-agent") {
ua = Some(v.as_str());
} else {
extras.insert(k.clone(), serde_json::Value::String(v.clone()));
}
}
if let Some(ua) = ua {
let _: serde_json::Value = cdp
.execute(
"Network.setUserAgentOverride",
json!({ "userAgent": ua }),
Some(&sid),
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
}
if !extras.is_empty() {
let _: serde_json::Value = cdp
.execute(
"Network.setExtraHTTPHeaders",
json!({ "headers": extras }),
Some(&sid),
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
}
}
let captured = Arc::new(Mutex::new(None::<(u16, String)>));
let captured_clone = Arc::clone(&captured);
let sid_for_collector = sid.clone();
let stop = Arc::new(AtomicBool::new(false));
let stop_clone = Arc::clone(&stop);
let mut collector_rx = cdp.subscribe_events();
let mut wait_rx = cdp.subscribe_events();
let collector = tokio::spawn(async move {
while !stop_clone.load(Ordering::Acquire) {
let Ok(evt) = collector_rx.recv().await else {
return;
};
if evt.session_id.as_deref() == Some(&sid_for_collector)
&& evt.method == "Network.responseReceived"
{
if let Some((status, url)) = extract_document_response(&evt) {
let mut g = captured_clone.lock().await;
if g.is_none() {
*g = Some((status, url));
}
}
}
}
});
let nav: NavigateResult = cdp
.execute(
"Page.navigate",
json!({ "url": url.as_str() }),
Some(&sid),
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
if let Some(err) = nav.error_text.as_deref().filter(|s| !s.is_empty()) {
return Err(Error::BrowserSetup {
message: format!("Page.navigate {url}: {err}"),
});
}
let target_frame = nav.frame_id.clone();
let sid_for_wait = sid.clone();
let _ = CdpClient::wait_for_event_on(
&mut wait_rx,
move |e| {
e.session_id.as_deref() == Some(&sid_for_wait)
&& e.method == "Page.frameStoppedLoading"
&& e.params.get("frameId").and_then(|v| v.as_str()) == Some(&target_frame)
},
CDP_CALL_TIMEOUT,
"Page.frameStoppedLoading",
)
.await
.map_err(|e| browser_err(&e))?;
let eval: EvaluateResult = cdp
.execute(
"Runtime.evaluate",
json!({
"expression": "document.documentElement.outerHTML",
"returnByValue": true,
}),
Some(&sid),
CDP_CALL_TIMEOUT,
)
.await
.map_err(|e| browser_err(&e))?;
if let Some(exc) = eval.exception_details {
return Err(Error::BrowserSetup {
message: format!("Runtime.evaluate threw: {exc}"),
});
}
let body = eval
.result
.value
.and_then(|v| v.as_str().map(str::to_owned))
.unwrap_or_default();
stop.store(true, Ordering::Release);
collector.abort();
let (status, final_url) = {
let g = captured.lock().await;
g.clone().map_or_else(
|| (0_u16, url.clone()),
|(s, u)| (s, Url::parse(&u).unwrap_or_else(|_| url.clone())),
)
};
let _: std::result::Result<serde_json::Value, _> = cdp
.execute(
"Target.closeTarget",
json!({ "targetId": target_id }),
None,
CDP_CALL_TIMEOUT,
)
.await;
Ok::<_, Error>(RenderedPage {
status,
final_url,
body,
elapsed_ms: u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX),
})
};
tokio::time::timeout(timeout, work)
.await
.map_err(|_| Error::BrowserSetup {
message: format!("browser fetch timeout after {}s", timeout.as_secs()),
})?
}
}
fn browser_err(e: &CdpError) -> Error {
Error::BrowserSetup {
message: e.to_string(),
}
}
fn extract_document_response(evt: &CdpEvent) -> Option<(u16, String)> {
let kind = evt.params.get("type")?.as_str()?;
if kind != "Document" {
return None;
}
let response = evt.params.get("response")?;
let status = response.get("status")?.as_u64()?;
let url = response.get("url")?.as_str()?.to_owned();
Some((u16::try_from(status).unwrap_or(0), url))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::browser::mock_cdp::{FrameOut, MockCdpServer};
#[test]
fn extract_document_response_filters_non_documents() {
let xhr = CdpEvent {
method: "Network.responseReceived".into(),
params: json!({
"type": "XHR",
"response": { "status": 200, "url": "https://example.com/api" },
}),
session_id: Some("S".into()),
};
assert!(extract_document_response(&xhr).is_none());
}
#[test]
fn extract_document_response_picks_main_document() {
let doc = CdpEvent {
method: "Network.responseReceived".into(),
params: json!({
"type": "Document",
"response": { "status": 404, "url": "https://example.com/missing" },
}),
session_id: Some("S".into()),
};
assert_eq!(
extract_document_response(&doc),
Some((404_u16, "https://example.com/missing".into()))
);
}
fn happy_path_handler(
body: &'static str,
status: u16,
) -> impl Fn(&str, &serde_json::Value, Option<&str>) -> Vec<FrameOut> + Send + Sync + 'static
{
move |method, params, _sid| match method {
"Target.createTarget" => vec![FrameOut::Response(json!({ "targetId": "T1" }))],
"Target.attachToTarget" => vec![FrameOut::Response(json!({ "sessionId": "S1" }))],
"Page.navigate" => {
let url = params
.get("url")
.and_then(serde_json::Value::as_str)
.unwrap_or("about:blank")
.to_owned();
vec![
FrameOut::Response(json!({ "frameId": "F1" })),
FrameOut::Event {
method: "Network.responseReceived".into(),
params: json!({
"type": "Document",
"response": { "status": status, "url": url },
}),
session_id: Some("S1".into()),
},
FrameOut::Event {
method: "Page.frameStoppedLoading".into(),
params: json!({ "frameId": "F1" }),
session_id: Some("S1".into()),
},
]
}
"Runtime.evaluate" => vec![FrameOut::Response(json!({
"result": { "type": "string", "value": body },
}))],
_ => vec![FrameOut::Response(json!({}))],
}
}
#[tokio::test]
async fn fetch_returns_status_url_and_body_on_happy_path() {
let server =
MockCdpServer::start(happy_path_handler("<html><body>hello</body></html>", 200)).await;
let cdp = CdpClient::connect(&server.ws_url())
.await
.expect("cdp connect to mock");
let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
assert_eq!(backend.session_id(), "test-session");
let url = Url::parse("https://example.com/u/torvalds").unwrap();
let headers = BTreeMap::new();
let page = backend
.fetch(&url, &headers, Duration::from_secs(5))
.await
.expect("fetch ok");
assert_eq!(page.status, 200);
assert_eq!(page.final_url.as_str(), "https://example.com/u/torvalds");
assert!(page.body.contains("hello"), "body: {}", page.body);
}
#[tokio::test]
async fn fetch_propagates_404_status_from_navigation_response() {
let server =
MockCdpServer::start(happy_path_handler("<html><body>404</body></html>", 404)).await;
let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
let url = Url::parse("https://example.com/u/nobody").unwrap();
let page = backend
.fetch(&url, &BTreeMap::new(), Duration::from_secs(5))
.await
.expect("fetch ok");
assert_eq!(page.status, 404);
}
#[tokio::test]
async fn fetch_sends_per_site_headers_via_extra_headers_and_ua_override() {
let server = MockCdpServer::start(happy_path_handler("<html></html>", 200)).await;
let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
let mut headers = BTreeMap::new();
headers.insert("X-IG-App-ID".into(), "936619743392459".into());
headers.insert("User-Agent".into(), "Mozilla/5.0 (test)".into());
backend
.fetch(
&Url::parse("https://example.com/u/torvalds").unwrap(),
&headers,
Duration::from_secs(5),
)
.await
.expect("fetch ok");
let log = server.received().await;
let ua = log
.iter()
.find(|r| r.method == "Network.setUserAgentOverride")
.expect("setUserAgentOverride was sent");
assert_eq!(
ua.params
.get("userAgent")
.and_then(serde_json::Value::as_str),
Some("Mozilla/5.0 (test)"),
"UA override params: {:?}",
ua.params
);
let extras = log
.iter()
.find(|r| r.method == "Network.setExtraHTTPHeaders")
.expect("setExtraHTTPHeaders was sent");
let map = extras
.params
.get("headers")
.and_then(serde_json::Value::as_object)
.expect("headers object");
assert_eq!(
map.get("X-IG-App-ID").and_then(serde_json::Value::as_str),
Some("936619743392459")
);
assert!(
!map.contains_key("User-Agent"),
"User-Agent leaked into setExtraHTTPHeaders: {map:?}"
);
let nav_idx = log
.iter()
.position(|r| r.method == "Page.navigate")
.unwrap();
let ua_idx = log
.iter()
.position(|r| r.method == "Network.setUserAgentOverride")
.unwrap();
let extras_idx = log
.iter()
.position(|r| r.method == "Network.setExtraHTTPHeaders")
.unwrap();
assert!(
ua_idx < nav_idx && extras_idx < nav_idx,
"headers must be set before navigate; got order: \
ua={ua_idx} extras={extras_idx} nav={nav_idx}"
);
}
#[tokio::test]
async fn fetch_skips_header_commands_when_no_headers_given() {
let server = MockCdpServer::start(happy_path_handler("<html></html>", 200)).await;
let cdp = CdpClient::connect(&server.ws_url()).await.unwrap();
let backend = BrowserbaseBackend::from_parts(cdp, "test-session".into());
backend
.fetch(
&Url::parse("https://example.com/u/x").unwrap(),
&BTreeMap::new(),
Duration::from_secs(5),
)
.await
.expect("fetch ok");
let methods: Vec<String> = server
.received()
.await
.into_iter()
.map(|r| r.method)
.collect();
assert!(
!methods.iter().any(|m| m == "Network.setExtraHTTPHeaders"),
"setExtraHTTPHeaders should not fire on empty headers; saw {methods:?}"
);
assert!(
!methods.iter().any(|m| m == "Network.setUserAgentOverride"),
"setUserAgentOverride should not fire on empty headers; saw {methods:?}"
);
}
}