use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use anyhow::{Result, anyhow};
use serde_json::{Value, json};
use tokio::sync::{Mutex, Semaphore};
use tracing::{debug, info, warn};
use super::{CdpClient, ChromeProcess, can_launch_chrome};
use rsclaw_platform::detect_chrome;
const MAX_TABS_PER_INSTANCE: usize = 8;
const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(600);
pub struct BrowserPool {
chrome: Mutex<Option<PooledChrome>>,
tab_semaphore: Arc<Semaphore>,
chrome_path: Mutex<Option<String>>,
profile: Mutex<Option<String>>,
last_activity: AtomicU64,
engine_counter: std::sync::atomic::AtomicU32,
}
struct PooledChrome {
process: Option<ChromeProcess>,
port: u16,
}
pub struct TabSession {
cdp: CdpClient,
target_id: String,
port: u16,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl BrowserPool {
pub fn new() -> Self {
Self {
chrome: Mutex::new(None),
tab_semaphore: Arc::new(Semaphore::new(MAX_TABS_PER_INSTANCE)),
chrome_path: Mutex::new(None),
profile: Mutex::new(None),
last_activity: AtomicU64::new(now_ms()),
engine_counter: std::sync::atomic::AtomicU32::new(0),
}
}
pub fn global() -> &'static BrowserPool {
static POOL: std::sync::OnceLock<BrowserPool> = std::sync::OnceLock::new();
POOL.get_or_init(BrowserPool::new)
}
pub async fn acquire_tab(&self) -> Result<TabSession> {
let permit = self
.tab_semaphore
.clone()
.acquire_owned()
.await
.map_err(|_| anyhow!("browser pool semaphore closed"))?;
match tokio::time::timeout(Duration::from_secs(30), self.acquire_tab_inner(permit)).await {
Ok(result) => result,
Err(_) => {
warn!("pool: acquire_tab timed out (30s), Chrome may be unresponsive");
Err(anyhow!("browser pool: timed out connecting to Chrome"))
}
}
}
async fn acquire_tab_inner(
&self,
permit: tokio::sync::OwnedSemaphorePermit,
) -> Result<TabSession> {
let port = self.ensure_chrome().await?;
let discovery_url = format!("http://127.0.0.1:{port}/json");
let browser_ws = format!("http://127.0.0.1:{port}/json/version");
let version_info: Value = reqwest::get(&browser_ws).await?.json().await?;
let browser_ws_url = version_info["webSocketDebuggerUrl"]
.as_str()
.ok_or_else(|| anyhow!("pool: no browser webSocketDebuggerUrl"))?;
let browser_cdp = CdpClient::connect(browser_ws_url).await?;
let create_result = browser_cdp
.send(
"Target.createTarget",
json!({
"url": "about:blank"
}),
)
.await?;
let target_id = create_result["targetId"]
.as_str()
.ok_or_else(|| anyhow!("pool: Target.createTarget did not return targetId"))?
.to_owned();
let targets: Vec<Value> = reqwest::get(&discovery_url).await?.json().await?;
let tab_ws_url = targets
.iter()
.find(|t| t["id"].as_str() == Some(&target_id))
.and_then(|t| t["webSocketDebuggerUrl"].as_str())
.ok_or_else(|| anyhow!("pool: new tab {target_id} not found in target list"))?
.to_owned();
let cdp = CdpClient::connect(&tab_ws_url).await?;
cdp.send("Page.enable", json!({})).await?;
cdp.send("DOM.enable", json!({})).await?;
cdp.send("Runtime.enable", json!({})).await?;
cdp.send("Network.enable", json!({})).await?;
self.touch();
debug!(target_id = %target_id, "pool: tab acquired");
Ok(TabSession {
cdp,
target_id,
port,
_permit: permit,
})
}
pub async fn chrome_ws_url(&self) -> Result<String> {
let port = self.ensure_chrome().await?;
let version_info: Value = reqwest::get(format!("http://127.0.0.1:{port}/json/version"))
.await?
.json()
.await?;
version_info["webSocketDebuggerUrl"]
.as_str()
.map(String::from)
.ok_or_else(|| anyhow!("pool: /json/version missing webSocketDebuggerUrl"))
}
pub async fn owned_chrome_pid(&self) -> Option<u32> {
let mut guard = self.chrome.lock().await;
if let Some(ref mut pooled) = *guard {
if let Some(ref mut proc) = pooled.process {
if !proc.child.try_wait().is_ok_and(|s| s.is_some()) {
return proc.child.id();
}
}
}
None
}
async fn ws_url_for_port(&self, port: u16) -> Result<String> {
let version_info: Value =
reqwest::get(format!("http://127.0.0.1:{port}/json/version"))
.await?
.json()
.await?;
version_info["webSocketDebuggerUrl"]
.as_str()
.map(String::from)
.ok_or_else(|| anyhow!("pool: /json/version missing webSocketDebuggerUrl"))
}
async fn port_alive(port: u16) -> bool {
let url = format!("http://127.0.0.1:{port}/json/version");
match reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(2))
.build()
{
Ok(client) => client
.get(&url)
.send()
.await
.map(|r| r.status().is_success())
.unwrap_or(false),
Err(_) => false,
}
}
pub async fn ensure_owned_chrome(
&self,
chrome_path: &str,
headed: bool,
profile: Option<&str>,
) -> Result<String> {
let port = {
let mut guard = self.chrome.lock().await;
if let Some(ref mut pooled) = *guard {
let alive = match pooled.process {
None => true, Some(ref mut proc) => {
!proc.child.try_wait().is_ok_and(|s| s.is_some())
}
};
if alive {
let p = pooled.port;
self.touch();
drop(guard);
return self.ws_url_for_port(p).await;
}
*guard = None;
}
can_launch_chrome()?;
let process = ChromeProcess::launch(chrome_path, headed, profile).await?;
let port = process.port()?;
info!(port, headed, profile = ?profile, "pool: launched pool-owned Chrome");
*guard = Some(PooledChrome {
process: Some(process),
port,
});
self.touch();
port
};
self.ws_url_for_port(port).await
}
pub async fn set_chrome_ws_url(&self, ws_url: &str) -> Result<()> {
let parsed = url::Url::parse(ws_url)
.map_err(|e| anyhow!("pool: invalid ws_url {ws_url}: {e}"))?;
let port = parsed
.port()
.ok_or_else(|| anyhow!("pool: ws_url {ws_url} has no port"))?;
let mut guard = self.chrome.lock().await;
*guard = Some(PooledChrome {
process: None,
port,
});
self.touch();
info!(port, "pool: using external Chrome");
Ok(())
}
async fn ensure_chrome(&self) -> Result<u16> {
let mut guard = self.chrome.lock().await;
if let Some(ref mut pooled) = *guard {
match pooled.process {
None => {
if Self::port_alive(pooled.port).await {
self.touch();
return Ok(pooled.port);
}
warn!(
port = pooled.port,
"pool: external Chrome no longer reachable, launching own"
);
*guard = None;
}
Some(ref mut proc) => {
if proc.child.try_wait().is_ok_and(|s| s.is_some()) {
warn!("pool: Chrome process exited, will restart");
*guard = None;
} else {
self.touch();
return Ok(pooled.port);
}
}
}
}
let chrome_path = {
let mut path_guard = self.chrome_path.lock().await;
if path_guard.is_none() {
*path_guard = detect_chrome();
}
path_guard
.clone()
.ok_or_else(|| anyhow!("pool: Chrome not found"))?
};
can_launch_chrome()?;
let profile = {
let mut profile_guard = self.profile.lock().await;
if profile_guard.is_none() {
let config_path = rsclaw_config::loader::base_dir().join("rsclaw.json5");
let cfg_profile = rsclaw_config::loader::load_json5(&config_path)
.ok()
.and_then(|c| c.tools)
.and_then(|t| t.web_browser)
.and_then(|b| b.profile);
*profile_guard = cfg_profile;
}
profile_guard.clone()
};
let process = ChromeProcess::launch(&chrome_path, false, profile.as_deref()).await?;
let port = process.port()?;
info!(port, profile = ?profile, "pool: shared headless Chrome launched");
*guard = Some(PooledChrome {
process: Some(process),
port,
});
self.touch();
Ok(port)
}
pub fn next_engine_index(&self) -> u32 {
self.engine_counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
pub(crate) fn touch(&self) {
self.last_activity.store(now_ms(), Ordering::Relaxed);
}
pub fn is_idle_expired(&self) -> bool {
let last = self.last_activity.load(Ordering::Relaxed);
let elapsed = now_ms().saturating_sub(last);
elapsed > POOL_IDLE_TIMEOUT.as_millis() as u64
}
pub async fn reap_if_idle(&self) {
if !self.is_idle_expired() {
return;
}
let mut guard = self.chrome.lock().await;
if !self.is_idle_expired() {
return;
}
if guard.is_some() {
info!("pool: idle timeout, shutting down shared Chrome");
*guard = None; }
}
}
impl TabSession {
pub async fn navigate(&self, url: &str) -> Result<()> {
self.cdp.send("Page.navigate", json!({"url": url})).await?;
let _ = tokio::time::timeout(
Duration::from_secs(15),
self.cdp.wait_event("Page.loadEventFired", 15),
)
.await;
Ok(())
}
pub async fn wait_for_selector(&self, selector: &str, timeout_secs: u64) -> Result<()> {
let js = format!(
r#"new Promise((resolve, reject) => {{
const check = () => {{
if (document.querySelector({sel})) return resolve(true);
setTimeout(check, 200);
}};
check();
setTimeout(() => reject('timeout'), {ms});
}})"#,
sel = serde_json::to_string(selector)?,
ms = timeout_secs * 1000,
);
let _ = tokio::time::timeout(
Duration::from_secs(timeout_secs + 1),
self.cdp.send(
"Runtime.evaluate",
json!({
"expression": js,
"awaitPromise": true,
}),
),
)
.await;
Ok(())
}
pub async fn evaluate(&self, js: &str) -> Result<Value> {
let result = self
.cdp
.send(
"Runtime.evaluate",
json!({
"expression": js,
"returnByValue": true,
}),
)
.await?;
Ok(result["result"]["value"].clone())
}
pub async fn get_text(&self) -> Result<String> {
let result = self.evaluate("document.body?.innerText || ''").await?;
Ok(result.as_str().unwrap_or("").to_owned())
}
pub async fn get_html(&self) -> Result<String> {
let result = self
.evaluate("document.documentElement?.outerHTML || ''")
.await?;
Ok(result.as_str().unwrap_or("").to_owned())
}
}
impl Drop for TabSession {
fn drop(&mut self) {
let target_id = self.target_id.clone();
let port = self.port;
debug!(target_id = %target_id, "pool: releasing tab");
let Ok(handle) = tokio::runtime::Handle::try_current() else {
return;
};
handle.spawn(async move {
let browser_ws = format!("http://127.0.0.1:{port}/json/version");
if let Ok(resp) = reqwest::get(&browser_ws).await {
if let Ok(info) = resp.json::<Value>().await {
if let Some(ws_url) = info["webSocketDebuggerUrl"].as_str() {
if let Ok(browser_cdp) = CdpClient::connect(ws_url).await {
let _ = browser_cdp
.send(
"Target.closeTarget",
json!({
"targetId": target_id
}),
)
.await;
}
}
}
}
});
}
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}
#[cfg(test)]
mod tests {
use serde_json::json;
use wiremock::{Mock, MockServer, ResponseTemplate, matchers::method};
use super::*;
fn backdate_past_idle(pool: &BrowserPool) {
let stale = now_ms() - POOL_IDLE_TIMEOUT.as_millis() as u64 - 1_000;
pool.last_activity.store(stale, Ordering::Relaxed);
assert!(pool.is_idle_expired(), "precondition: pool must look idle");
}
async fn fake_chrome_endpoint() -> (MockServer, u16) {
let server = MockServer::start().await;
let port = server.address().port();
let body = json!({
"webSocketDebuggerUrl":
format!("ws://127.0.0.1:{port}/devtools/browser/test")
});
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(200).set_body_json(body))
.mount(&server)
.await;
(server, port)
}
#[tokio::test]
async fn set_chrome_ws_url_refreshes_idle_clock() {
let pool = BrowserPool::new();
backdate_past_idle(&pool);
pool.set_chrome_ws_url("ws://127.0.0.1:9222/devtools/browser/abc")
.await
.unwrap();
assert!(
!pool.is_idle_expired(),
"registering a Chrome must reset the idle clock"
);
}
#[tokio::test]
async fn ensure_owned_chrome_reuse_refreshes_idle_clock() {
let (_server, port) = fake_chrome_endpoint().await;
let pool = BrowserPool::new();
*pool.chrome.lock().await = Some(PooledChrome {
process: None,
port,
});
backdate_past_idle(&pool);
pool.ensure_owned_chrome("/nonexistent-chrome", false, None)
.await
.unwrap();
assert!(
!pool.is_idle_expired(),
"handing out the pool Chrome must reset the idle clock"
);
}
#[tokio::test]
async fn reap_rechecks_idle_clock_under_lock() {
let pool = std::sync::Arc::new(BrowserPool::new());
backdate_past_idle(&pool);
let mut guard = pool.chrome.lock().await;
let reaper = {
let pool = pool.clone();
tokio::spawn(async move { pool.reap_if_idle().await })
};
tokio::time::sleep(Duration::from_millis(50)).await;
*guard = Some(PooledChrome {
process: None,
port: 1,
});
pool.touch();
drop(guard);
reaper.await.unwrap();
assert!(
pool.chrome.lock().await.is_some(),
"reaper must not kill a Chrome launched while it waited for the lock"
);
}
#[tokio::test]
async fn chrome_ws_url_reuse_refreshes_idle_clock() {
let (_server, port) = fake_chrome_endpoint().await;
let pool = BrowserPool::new();
*pool.chrome.lock().await = Some(PooledChrome {
process: None,
port,
});
backdate_past_idle(&pool);
pool.chrome_ws_url().await.unwrap();
assert!(
!pool.is_idle_expired(),
"handing out the pool Chrome must reset the idle clock"
);
}
}