use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use std::time::Instant;
use tokio::sync::{Mutex, Semaphore};
use tokio::time::{sleep, timeout};
use tracing::{debug, info, warn};
use crate::{
BrowserConfig,
browser::BrowserInstance,
error::{BrowserError, Result},
};
struct PoolEntry {
instance: BrowserInstance,
last_used: Instant,
}
struct PoolInner {
shared: std::collections::VecDeque<PoolEntry>,
scoped: std::collections::HashMap<String, std::collections::VecDeque<PoolEntry>>,
}
pub struct BrowserPool {
config: Arc<BrowserConfig>,
semaphore: Arc<Semaphore>,
inner: Arc<Mutex<PoolInner>>,
active_count: Arc<AtomicUsize>,
max_size: usize,
}
impl BrowserPool {
pub async fn new(config: BrowserConfig) -> Result<Arc<Self>> {
let max_size = config.pool.max_size;
let min_size = config.pool.min_size;
let pool = Self {
config: Arc::new(config),
semaphore: Arc::new(Semaphore::new(max_size)),
inner: Arc::new(Mutex::new(PoolInner {
shared: std::collections::VecDeque::new(),
scoped: std::collections::HashMap::new(),
})),
active_count: Arc::new(AtomicUsize::new(0)),
max_size,
};
info!("Warming browser pool: min_size={min_size}, max_size={max_size}");
for i in 0..min_size {
match BrowserInstance::launch((*pool.config).clone()).await {
Ok(instance) => {
pool.active_count.fetch_add(1, Ordering::Relaxed);
pool.inner.lock().await.shared.push_back(PoolEntry {
instance,
last_used: Instant::now(),
});
debug!("Warmed browser {}/{min_size}", i + 1);
}
Err(e) => {
warn!("Warmup browser {i} failed (non-fatal): {e}");
}
}
}
let eviction_inner = pool.inner.clone();
let eviction_active = pool.active_count.clone();
let idle_timeout = pool.config.pool.idle_timeout;
let eviction_min = min_size;
tokio::spawn(async move {
loop {
sleep(idle_timeout / 2).await;
let mut guard = eviction_inner.lock().await;
let now = Instant::now();
let active = eviction_active.load(Ordering::Relaxed);
let total_idle: usize = guard.shared.len()
+ guard
.scoped
.values()
.map(std::collections::VecDeque::len)
.sum::<usize>();
let evict_count = if active > eviction_min {
(active - eviction_min).min(total_idle)
} else {
0
};
let mut evicted = 0usize;
let mut kept: std::collections::VecDeque<PoolEntry> =
std::collections::VecDeque::new();
while let Some(entry) = guard.shared.pop_front() {
if evicted < evict_count && now.duration_since(entry.last_used) >= idle_timeout
{
tokio::spawn(async move {
let _ = entry.instance.shutdown().await;
});
eviction_active.fetch_sub(1, Ordering::Relaxed);
evicted += 1;
} else {
kept.push_back(entry);
}
}
guard.shared = kept;
let context_ids: Vec<String> = guard.scoped.keys().cloned().collect();
for cid in &context_ids {
if let Some(queue) = guard.scoped.get_mut(cid) {
let mut kept: std::collections::VecDeque<PoolEntry> =
std::collections::VecDeque::new();
while let Some(entry) = queue.pop_front() {
if evicted < evict_count
&& now.duration_since(entry.last_used) >= idle_timeout
{
tokio::spawn(async move {
let _ = entry.instance.shutdown().await;
});
eviction_active.fetch_sub(1, Ordering::Relaxed);
evicted += 1;
} else {
kept.push_back(entry);
}
}
*queue = kept;
}
}
guard.scoped.retain(|_, q| !q.is_empty());
drop(guard);
if evicted > 0 {
info!("Evicted {evicted} idle browsers (idle_timeout={idle_timeout:?})");
}
}
});
Ok(Arc::new(pool))
}
pub async fn acquire(self: &Arc<Self>) -> Result<BrowserHandle> {
#[cfg(feature = "metrics")]
let acquire_start = std::time::Instant::now();
let result = self.acquire_inner(None).await;
#[cfg(feature = "metrics")]
{
let elapsed = acquire_start.elapsed();
crate::metrics::METRICS.record_acquisition(elapsed);
crate::metrics::METRICS.set_pool_size(
i64::try_from(self.active_count.load(Ordering::Relaxed)).unwrap_or(i64::MAX),
);
}
result
}
pub async fn acquire_for(self: &Arc<Self>, context_id: &str) -> Result<BrowserHandle> {
#[cfg(feature = "metrics")]
let acquire_start = std::time::Instant::now();
let result = self.acquire_inner(Some(context_id)).await;
#[cfg(feature = "metrics")]
{
let elapsed = acquire_start.elapsed();
crate::metrics::METRICS.record_acquisition(elapsed);
crate::metrics::METRICS.set_pool_size(
i64::try_from(self.active_count.load(Ordering::Relaxed)).unwrap_or(i64::MAX),
);
}
result
}
#[allow(clippy::significant_drop_tightening)] async fn acquire_inner(self: &Arc<Self>, context_id: Option<&str>) -> Result<BrowserHandle> {
let acquire_timeout = self.config.pool.acquire_timeout;
let active = self.active_count.load(Ordering::Relaxed);
let max = self.max_size;
let ctx_owned: Option<String> = context_id.map(String::from);
let fast_result = {
let mut guard = self.inner.lock().await;
let queue = match context_id {
Some(id) => guard.scoped.get_mut(id),
None => Some(&mut guard.shared),
};
let mut healthy: Option<BrowserInstance> = None;
let mut unhealthy: Vec<BrowserInstance> = Vec::new();
if let Some(queue) = queue {
while let Some(entry) = queue.pop_front() {
if healthy.is_none() && entry.instance.is_healthy_cached() {
healthy = Some(entry.instance);
} else if !entry.instance.is_healthy_cached() {
unhealthy.push(entry.instance);
} else {
queue.push_front(entry);
break;
}
}
}
(healthy, unhealthy)
};
for instance in fast_result.1 {
#[cfg(feature = "metrics")]
crate::metrics::METRICS.record_crash();
let active_count = self.active_count.clone();
tokio::spawn(async move {
let _ = instance.shutdown().await;
active_count.fetch_sub(1, Ordering::Relaxed);
});
}
if let Some(instance) = fast_result.0 {
debug!(
context = context_id.unwrap_or("shared"),
"Reusing idle browser (uptime={:?})",
instance.uptime()
);
return Ok(BrowserHandle::new(instance, Arc::clone(self), ctx_owned));
}
if active < max {
timeout(acquire_timeout, self.semaphore.acquire())
.await
.map_err(|_| BrowserError::PoolExhausted { active, max })?
.map_err(|_| BrowserError::PoolExhausted { active, max })?
.forget(); self.active_count.fetch_add(1, Ordering::Relaxed);
let instance = match BrowserInstance::launch((*self.config).clone()).await {
Ok(i) => i,
Err(e) => {
self.active_count.fetch_sub(1, Ordering::Relaxed);
self.semaphore.add_permits(1);
return Err(e);
}
};
info!(
context = context_id.unwrap_or("shared"),
"Launched fresh browser (pool active={})",
self.active_count.load(Ordering::Relaxed)
);
return Ok(BrowserHandle::new(instance, Arc::clone(self), ctx_owned));
}
let ctx_for_poll = context_id.map(String::from);
timeout(acquire_timeout, async {
loop {
sleep(std::time::Duration::from_millis(50)).await;
let mut guard = self.inner.lock().await;
let queue = match ctx_for_poll.as_deref() {
Some(id) => guard.scoped.get_mut(id),
None => Some(&mut guard.shared),
};
if let Some(queue) = queue
&& let Some(entry) = queue.pop_front()
{
drop(guard);
if entry.instance.is_healthy_cached() {
return Ok(BrowserHandle::new(
entry.instance,
Arc::clone(self),
ctx_for_poll.clone(),
));
}
#[cfg(feature = "metrics")]
crate::metrics::METRICS.record_crash();
let active_count = self.active_count.clone();
tokio::spawn(async move {
let _ = entry.instance.shutdown().await;
active_count.fetch_sub(1, Ordering::Relaxed);
});
}
}
})
.await
.map_err(|_| BrowserError::PoolExhausted { active, max })?
}
async fn release(&self, instance: BrowserInstance, context_id: Option<&str>) {
if instance.is_healthy_cached() {
let mut guard = self.inner.lock().await;
let total_idle: usize = guard.shared.len()
+ guard
.scoped
.values()
.map(std::collections::VecDeque::len)
.sum::<usize>();
if total_idle < self.max_size {
let queue = match context_id {
Some(id) => guard.scoped.entry(id.to_owned()).or_default(),
None => &mut guard.shared,
};
queue.push_back(PoolEntry {
instance,
last_used: Instant::now(),
});
debug!(
context = context_id.unwrap_or("shared"),
"Returned browser to idle pool"
);
return;
}
drop(guard);
}
#[cfg(feature = "metrics")]
if !instance.is_healthy_cached() {
crate::metrics::METRICS.record_crash();
}
let active_count = self.active_count.clone();
tokio::spawn(async move {
let _ = instance.shutdown().await;
active_count.fetch_sub(1, Ordering::Relaxed);
});
self.semaphore.add_permits(1);
}
pub async fn release_context(&self, context_id: &str) -> usize {
let mut guard = self.inner.lock().await;
let entries = guard.scoped.remove(context_id).unwrap_or_default();
drop(guard);
let count = entries.len();
for entry in entries {
let active_count = self.active_count.clone();
tokio::spawn(async move {
let _ = entry.instance.shutdown().await;
active_count.fetch_sub(1, Ordering::Relaxed);
});
self.semaphore.add_permits(1);
}
if count > 0 {
info!("Released {count} browsers for context '{context_id}'");
}
count
}
pub async fn context_ids(&self) -> Vec<String> {
let guard = self.inner.lock().await;
guard.scoped.keys().cloned().collect()
}
pub fn stats(&self) -> PoolStats {
PoolStats {
active: self.active_count.load(Ordering::Relaxed),
max: self.max_size,
available: self
.max_size
.saturating_sub(self.active_count.load(Ordering::Relaxed)),
idle: 0, }
}
}
pub struct BrowserHandle {
instance: Option<BrowserInstance>,
pool: Arc<BrowserPool>,
context_id: Option<String>,
}
impl BrowserHandle {
const fn new(
instance: BrowserInstance,
pool: Arc<BrowserPool>,
context_id: Option<String>,
) -> Self {
Self {
instance: Some(instance),
pool,
context_id,
}
}
pub const fn browser(&self) -> Option<&BrowserInstance> {
self.instance.as_ref()
}
pub const fn browser_mut(&mut self) -> Option<&mut BrowserInstance> {
self.instance.as_mut()
}
pub fn context_id(&self) -> Option<&str> {
self.context_id.as_deref()
}
pub async fn release(mut self) {
if let Some(instance) = self.instance.take() {
self.pool
.release(instance, self.context_id.as_deref())
.await;
}
}
}
impl Drop for BrowserHandle {
fn drop(&mut self) {
if let Some(instance) = self.instance.take() {
let pool = Arc::clone(&self.pool);
let context_id = self.context_id.clone();
tokio::spawn(async move {
pool.release(instance, context_id.as_deref()).await;
});
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub active: usize,
pub max: usize,
pub available: usize,
pub idle: usize,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::{PoolConfig, StealthLevel};
use std::time::Duration;
fn test_config() -> BrowserConfig {
BrowserConfig::builder()
.stealth_level(StealthLevel::None)
.pool(PoolConfig {
min_size: 0, max_size: 5,
idle_timeout: Duration::from_secs(300),
acquire_timeout: Duration::from_millis(100),
})
.build()
}
#[test]
fn pool_stats_reflects_max() {
let config = test_config();
assert_eq!(config.pool.max_size, 5);
assert_eq!(config.pool.min_size, 0);
}
#[test]
fn pool_stats_available_saturates() {
let stats = PoolStats {
active: 10,
max: 10,
available: 0,
idle: 0,
};
assert_eq!(stats.available, 0);
assert_eq!(stats.active, stats.max);
}
#[test]
fn pool_stats_partial_usage() {
let stats = PoolStats {
active: 3,
max: 10,
available: 7,
idle: 2,
};
assert_eq!(stats.available, 7);
}
#[tokio::test]
async fn pool_new_with_zero_min_size_ok() {
let config = test_config();
assert_eq!(config.pool.min_size, 0);
}
#[test]
fn pool_stats_available_is_max_minus_active() {
let stats = PoolStats {
active: 6,
max: 10,
available: 4,
idle: 3,
};
assert_eq!(stats.available, stats.max - stats.active);
}
#[test]
fn pool_stats_available_cannot_underflow() {
let stats = PoolStats {
active: 12,
max: 10,
available: 0_usize.saturating_sub(2),
idle: 0,
};
assert_eq!(stats.available, 0);
}
#[test]
fn pool_config_acquire_timeout_respected() {
let cfg = BrowserConfig::builder()
.pool(PoolConfig {
min_size: 0,
max_size: 1,
idle_timeout: Duration::from_secs(300),
acquire_timeout: Duration::from_millis(10),
})
.build();
assert_eq!(cfg.pool.acquire_timeout, Duration::from_millis(10));
}
#[test]
fn pool_config_idle_timeout_respected() {
let cfg = BrowserConfig::builder()
.pool(PoolConfig {
min_size: 1,
max_size: 5,
idle_timeout: Duration::from_secs(60),
acquire_timeout: Duration::from_secs(5),
})
.build();
assert_eq!(cfg.pool.idle_timeout, Duration::from_secs(60));
}
#[test]
fn browser_handle_drop_does_not_panic_without_runtime() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<BrowserPool>();
assert_send::<PoolStats>();
assert_sync::<BrowserPool>();
}
#[test]
fn pool_stats_zero_active_means_full_availability() {
let stats = PoolStats {
active: 0,
max: 8,
available: 8,
idle: 0,
};
assert_eq!(stats.available, stats.max);
}
#[test]
fn pool_entry_last_used_ordering() {
use std::time::Duration;
let now = std::time::Instant::now();
let older = now.checked_sub(Duration::from_secs(400)).unwrap_or(now);
let idle_timeout = Duration::from_secs(300);
assert!(now.duration_since(older) >= idle_timeout);
}
#[test]
fn pool_stats_debug_format() {
let stats = PoolStats {
active: 2,
max: 10,
available: 8,
idle: 1,
};
let dbg = format!("{stats:?}");
assert!(dbg.contains("active"));
assert!(dbg.contains("max"));
}
#[test]
fn pool_inner_scoped_default_is_empty() {
let inner = PoolInner {
shared: std::collections::VecDeque::new(),
scoped: std::collections::HashMap::new(),
};
assert!(inner.shared.is_empty());
assert!(inner.scoped.is_empty());
}
#[test]
fn pool_inner_scoped_insert_and_retrieve() {
let mut inner = PoolInner {
shared: std::collections::VecDeque::new(),
scoped: std::collections::HashMap::new(),
};
inner.scoped.entry("bot-a".to_owned()).or_default();
inner.scoped.entry("bot-b".to_owned()).or_default();
assert_eq!(inner.scoped.len(), 2);
assert!(inner.scoped.contains_key("bot-a"));
assert!(inner.scoped.contains_key("bot-b"));
assert!(inner.shared.is_empty());
}
#[test]
fn pool_inner_scoped_retain_removes_empty() {
let mut inner = PoolInner {
shared: std::collections::VecDeque::new(),
scoped: std::collections::HashMap::new(),
};
inner.scoped.entry("empty".to_owned()).or_default();
assert_eq!(inner.scoped.len(), 1);
inner.scoped.retain(|_, q| !q.is_empty());
assert!(inner.scoped.is_empty());
}
#[tokio::test]
async fn pool_context_ids_empty_by_default() {
let config = test_config();
assert_eq!(config.pool.min_size, 0);
}
#[test]
fn browser_handle_context_id_none_for_shared() {
fn _check_context_api(handle: &BrowserHandle) {
let _: Option<&str> = handle.context_id();
}
}
#[test]
fn pool_inner_total_idle_calculation() {
fn total_idle(inner: &PoolInner) -> usize {
inner.shared.len()
+ inner
.scoped
.values()
.map(std::collections::VecDeque::len)
.sum::<usize>()
}
let mut inner = PoolInner {
shared: std::collections::VecDeque::new(),
scoped: std::collections::HashMap::new(),
};
assert_eq!(total_idle(&inner), 0);
inner.scoped.entry("a".to_owned()).or_default();
inner.scoped.entry("b".to_owned()).or_default();
assert_eq!(total_idle(&inner), 0); }
}