use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use anyhow::{Result, anyhow};
use serde_json::{Value, json};
use tokio::sync::{Mutex, Semaphore};
use tracing::{debug, info, warn};
use super::{CdpClient, ChromeProcess, can_launch_chrome, ACTIVE_INSTANCES};
use crate::agent::platform::detect_chrome;
const MAX_TABS_PER_INSTANCE: usize = 8;
const POOL_IDLE_TIMEOUT: Duration = Duration::from_secs(600);
pub struct BrowserPool {
chrome: Mutex<Option<PooledChrome>>,
tab_semaphore: Arc<Semaphore>,
chrome_path: Mutex<Option<String>>,
profile: Mutex<Option<String>>,
last_activity: AtomicU64,
engine_counter: std::sync::atomic::AtomicU32,
}
struct PooledChrome {
process: ChromeProcess,
port: u16,
}
pub struct TabSession {
cdp: CdpClient,
target_id: String,
port: u16,
_permit: tokio::sync::OwnedSemaphorePermit,
}
impl BrowserPool {
pub fn new() -> Self {
Self {
chrome: Mutex::new(None),
tab_semaphore: Arc::new(Semaphore::new(MAX_TABS_PER_INSTANCE)),
chrome_path: Mutex::new(None),
profile: Mutex::new(None),
last_activity: AtomicU64::new(now_ms()),
engine_counter: std::sync::atomic::AtomicU32::new(0),
}
}
pub fn global() -> &'static BrowserPool {
static POOL: std::sync::OnceLock<BrowserPool> = std::sync::OnceLock::new();
POOL.get_or_init(BrowserPool::new)
}
pub async fn acquire_tab(&self) -> Result<TabSession> {
let permit = self.tab_semaphore.clone().acquire_owned().await
.map_err(|_| anyhow!("browser pool semaphore closed"))?;
let port = self.ensure_chrome().await?;
let discovery_url = format!("http://127.0.0.1:{port}/json");
let browser_ws = format!("http://127.0.0.1:{port}/json/version");
let version_info: Value = reqwest::get(&browser_ws).await?.json().await?;
let browser_ws_url = version_info["webSocketDebuggerUrl"]
.as_str()
.ok_or_else(|| anyhow!("pool: no browser webSocketDebuggerUrl"))?;
let browser_cdp = CdpClient::connect(browser_ws_url).await?;
let create_result = browser_cdp.send("Target.createTarget", json!({
"url": "about:blank"
})).await?;
let target_id = create_result["targetId"]
.as_str()
.ok_or_else(|| anyhow!("pool: Target.createTarget did not return targetId"))?
.to_owned();
let targets: Vec<Value> = reqwest::get(&discovery_url).await?.json().await?;
let tab_ws_url = targets.iter()
.find(|t| t["id"].as_str() == Some(&target_id))
.and_then(|t| t["webSocketDebuggerUrl"].as_str())
.ok_or_else(|| anyhow!("pool: new tab {target_id} not found in target list"))?
.to_owned();
let cdp = CdpClient::connect(&tab_ws_url).await?;
cdp.send("Page.enable", json!({})).await?;
cdp.send("DOM.enable", json!({})).await?;
cdp.send("Runtime.enable", json!({})).await?;
cdp.send("Network.enable", json!({})).await?;
self.touch();
debug!(target_id = %target_id, "pool: tab acquired");
Ok(TabSession {
cdp,
target_id,
port,
_permit: permit,
})
}
pub async fn chrome_ws_url(&self) -> Result<String> {
let port = self.ensure_chrome().await?;
let version_info: Value =
reqwest::get(format!("http://127.0.0.1:{port}/json/version"))
.await?
.json()
.await?;
version_info["webSocketDebuggerUrl"]
.as_str()
.map(String::from)
.ok_or_else(|| anyhow!("pool: /json/version missing webSocketDebuggerUrl"))
}
async fn ensure_chrome(&self) -> Result<u16> {
let mut guard = self.chrome.lock().await;
if let Some(ref mut pooled) = *guard {
if pooled.process.child.try_wait().is_ok_and(|s| s.is_some()) {
warn!("pool: Chrome process exited, will restart");
ACTIVE_INSTANCES.fetch_sub(1, Ordering::Relaxed);
*guard = None;
} else {
return Ok(pooled.port);
}
}
let chrome_path = {
let mut path_guard = self.chrome_path.lock().await;
if path_guard.is_none() {
*path_guard = detect_chrome();
}
path_guard.clone()
.ok_or_else(|| anyhow!("pool: Chrome not found"))?
};
can_launch_chrome()?;
let profile = {
let mut profile_guard = self.profile.lock().await;
if profile_guard.is_none() {
let config_path = crate::config::loader::base_dir().join("rsclaw.json5");
let cfg_profile = crate::config::loader::load_json5(&config_path)
.ok()
.and_then(|c| c.tools)
.and_then(|t| t.web_browser)
.and_then(|b| b.profile);
*profile_guard = cfg_profile;
}
profile_guard.clone()
};
let process = ChromeProcess::launch(&chrome_path, false, profile.as_deref()).await?;
let port = process.port()?;
info!(port, profile = ?profile, "pool: shared headless Chrome launched");
*guard = Some(PooledChrome { process, port });
Ok(port)
}
pub fn next_engine_index(&self) -> u32 {
self.engine_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
fn touch(&self) {
self.last_activity.store(now_ms(), Ordering::Relaxed);
}
pub fn is_idle_expired(&self) -> bool {
let last = self.last_activity.load(Ordering::Relaxed);
let elapsed = now_ms().saturating_sub(last);
elapsed > POOL_IDLE_TIMEOUT.as_millis() as u64
}
pub async fn reap_if_idle(&self) {
if !self.is_idle_expired() {
return;
}
let mut guard = self.chrome.lock().await;
if guard.is_some() {
info!("pool: idle timeout, shutting down shared Chrome");
*guard = None; }
}
}
impl TabSession {
pub async fn navigate(&self, url: &str) -> Result<()> {
self.cdp.send("Page.navigate", json!({"url": url})).await?;
let _ = tokio::time::timeout(
Duration::from_secs(15),
self.cdp.wait_event("Page.loadEventFired", 15),
).await;
Ok(())
}
pub async fn wait_for_selector(&self, selector: &str, timeout_secs: u64) -> Result<()> {
let js = format!(
r#"new Promise((resolve, reject) => {{
const check = () => {{
if (document.querySelector({sel})) return resolve(true);
setTimeout(check, 200);
}};
check();
setTimeout(() => reject('timeout'), {ms});
}})"#,
sel = serde_json::to_string(selector)?,
ms = timeout_secs * 1000,
);
let _ = tokio::time::timeout(
Duration::from_secs(timeout_secs + 1),
self.cdp.send("Runtime.evaluate", json!({
"expression": js,
"awaitPromise": true,
})),
).await;
Ok(())
}
pub async fn evaluate(&self, js: &str) -> Result<Value> {
let result = self.cdp.send("Runtime.evaluate", json!({
"expression": js,
"returnByValue": true,
})).await?;
Ok(result["result"]["value"].clone())
}
pub async fn get_text(&self) -> Result<String> {
let result = self.evaluate("document.body?.innerText || ''").await?;
Ok(result.as_str().unwrap_or("").to_owned())
}
pub async fn get_html(&self) -> Result<String> {
let result = self.evaluate("document.documentElement?.outerHTML || ''").await?;
Ok(result.as_str().unwrap_or("").to_owned())
}
}
impl Drop for TabSession {
fn drop(&mut self) {
let target_id = self.target_id.clone();
let port = self.port;
debug!(target_id = %target_id, "pool: releasing tab");
let Ok(handle) = tokio::runtime::Handle::try_current() else {
return;
};
handle.spawn(async move {
let browser_ws = format!("http://127.0.0.1:{port}/json/version");
if let Ok(resp) = reqwest::get(&browser_ws).await {
if let Ok(info) = resp.json::<Value>().await {
if let Some(ws_url) = info["webSocketDebuggerUrl"].as_str() {
if let Ok(browser_cdp) = CdpClient::connect(ws_url).await {
let _ = browser_cdp.send("Target.closeTarget", json!({
"targetId": target_id
})).await;
}
}
}
}
});
}
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64
}