use std::{
collections::VecDeque,
env, fmt,
sync::{
Arc, Mutex as StdMutex, PoisonError,
atomic::{AtomicUsize, Ordering},
},
time::{Duration, Instant},
};
use futures::future;
use tokio::{
sync::{Mutex, Semaphore},
task::JoinHandle,
time::{sleep, timeout},
};
use crate::{
error::{Result, VoidCrawlError},
page::Page,
session::BrowserSession,
};
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub browsers: usize,
pub tabs_per_browser: usize,
pub tab_max_uses: u32,
pub tab_max_idle_secs: u64,
pub acquire_timeout_secs: u64,
pub auto_evict: bool,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
browsers: 1,
tabs_per_browser: 4,
tab_max_uses: 50,
tab_max_idle_secs: 60,
acquire_timeout_secs: 30,
auto_evict: true,
}
}
}
pub struct PooledTab {
pub page: Page,
pub use_count: u32,
pub last_used: Instant,
pub(crate) browser_idx: usize,
}
impl fmt::Debug for PooledTab {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PooledTab")
.field("page", &self.page)
.field("use_count", &self.use_count)
.field("last_used", &self.last_used)
.field("browser_idx", &self.browser_idx)
.finish()
}
}
pub struct BrowserPool {
sessions: Vec<BrowserSession>,
ready: Mutex<VecDeque<PooledTab>>,
semaphore: Arc<Semaphore>,
config: PoolConfig,
next_session: AtomicUsize,
eviction_task: StdMutex<Option<JoinHandle<()>>>,
}
impl fmt::Debug for BrowserPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("BrowserPool")
.field("config", &self.config)
.field("sessions", &self.sessions.len())
.finish_non_exhaustive()
}
}
impl BrowserPool {
pub fn new(config: PoolConfig, sessions: Vec<BrowserSession>) -> Self {
let total_tabs = config.browsers * config.tabs_per_browser;
Self {
sessions,
ready: Mutex::new(VecDeque::with_capacity(total_tabs)),
semaphore: Arc::new(Semaphore::new(total_tabs)),
config,
next_session: AtomicUsize::new(0),
eviction_task: StdMutex::new(None),
}
}
pub async fn from_env() -> Result<Self> {
let tabs_per_browser: usize =
env::var("TABS_PER_BROWSER").ok().and_then(|v| v.parse().ok()).unwrap_or(4);
let tab_max_uses: u32 =
env::var("TAB_MAX_USES").ok().and_then(|v| v.parse().ok()).unwrap_or(50);
let tab_max_idle_secs: u64 =
env::var("TAB_MAX_IDLE_SECS").ok().and_then(|v| v.parse().ok()).unwrap_or(60);
let acquire_timeout_secs: u64 =
env::var("ACQUIRE_TIMEOUT_SECS").ok().and_then(|v| v.parse().ok()).unwrap_or(30);
let no_sandbox = env::var("CHROME_NO_SANDBOX").ok().is_some_and(|v| v == "1");
let headless = env::var("CHROME_HEADLESS").ok().is_none_or(|v| v != "0");
let viewport_width: Option<u32> =
env::var("VIEWPORT_WIDTH").ok().and_then(|v| v.parse().ok());
let viewport_height: Option<u32> =
env::var("VIEWPORT_HEIGHT").ok().and_then(|v| v.parse().ok());
let cdp_port_base: Option<u16> =
env::var("CDP_PORT_BASE").ok().and_then(|v| v.parse().ok());
let sessions = if let Ok(urls) = env::var("CHROME_WS_URLS") {
let futs: Vec<_> = urls
.split(',')
.map(str::trim)
.filter(|u| !u.is_empty())
.map(|url| BrowserSession::connect(url.to_string()))
.collect();
if futs.is_empty() {
return Err(VoidCrawlError::Other(
"CHROME_WS_URLS is set but contains no valid URLs".into(),
));
}
let results = future::join_all(futs).await;
results.into_iter().collect::<Result<Vec<_>>>()?
} else {
let browser_count: usize =
env::var("BROWSER_COUNT").ok().and_then(|v| v.parse().ok()).unwrap_or(1);
let futs: Vec<_> = (0..browser_count)
.map(|i| {
let mut builder = if headless {
BrowserSession::builder().headless()
} else {
BrowserSession::builder().headful()
};
if no_sandbox {
builder = builder.no_sandbox();
}
if let (Some(w), Some(h)) = (viewport_width, viewport_height) {
builder = builder.viewport(w, h);
} else if let Some(w) = viewport_width {
builder = builder.viewport(w, 1080);
} else if let Some(h) = viewport_height {
builder = builder.viewport(1920, h);
}
if let Some(base) = cdp_port_base {
builder =
builder.port(base.saturating_add(u16::try_from(i).unwrap_or(u16::MAX)));
}
builder.launch()
})
.collect();
let results = future::join_all(futs).await;
results.into_iter().collect::<Result<Vec<_>>>()?
};
let config = PoolConfig {
browsers: sessions.len(),
tabs_per_browser,
tab_max_uses,
tab_max_idle_secs,
acquire_timeout_secs,
auto_evict: true,
};
Ok(Self::new(config, sessions))
}
fn next_browser_idx(&self) -> usize {
if self.sessions.len() == 1 {
return 0;
}
self.next_session.fetch_add(1, Ordering::Relaxed) % self.sessions.len()
}
async fn create_tab(&self) -> Result<PooledTab> {
let idx = self.next_browser_idx();
let page = self.sessions[idx].new_blank_page().await?;
Ok(PooledTab { page, use_count: 0, last_used: Instant::now(), browser_idx: idx })
}
pub async fn warmup(&self) -> Result<()> {
let mut futs = Vec::with_capacity(self.config.browsers * self.config.tabs_per_browser);
for (idx, session) in self.sessions.iter().enumerate() {
for _ in 0..self.config.tabs_per_browser {
futs.push(async move {
let page = session.new_blank_page().await?;
Ok::<_, VoidCrawlError>(PooledTab {
page,
use_count: 0,
last_used: Instant::now(),
browser_idx: idx,
})
});
}
}
let results = future::join_all(futs).await;
let mut ready = self.ready.lock().await;
let mut first_err: Option<VoidCrawlError> = None;
for result in results {
match result {
Ok(tab) => ready.push_back(tab),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
drop(ready);
if let Some(e) = first_err { Err(e) } else { Ok(()) }
}
pub async fn acquire(&self) -> Result<PooledTab> {
self.acquire_timed().await.map(|(tab, _waited_ms)| tab)
}
pub async fn acquire_timed(&self) -> Result<(PooledTab, u64)> {
let wait_start = Instant::now();
let permit = if self.config.acquire_timeout_secs == 0 {
self.semaphore
.acquire()
.await
.map_err(|_| VoidCrawlError::Other("pool semaphore closed".into()))?
} else {
let deadline = Duration::from_secs(self.config.acquire_timeout_secs);
match timeout(deadline, self.semaphore.acquire()).await {
Ok(Ok(permit)) => permit,
Ok(Err(_)) => {
return Err(VoidCrawlError::Other("pool semaphore closed".into()));
}
Err(_) => {
return Err(VoidCrawlError::Timeout(format!(
"pool.acquire() timed out after {}s — all {} tabs are checked out",
self.config.acquire_timeout_secs,
self.config.browsers * self.config.tabs_per_browser,
)));
}
}
};
let waited_ms = u64::try_from(wait_start.elapsed().as_millis()).unwrap_or(u64::MAX);
permit.forget();
let maybe_tab = {
let mut ready = self.ready.lock().await;
ready.pop_front()
};
let tab = match maybe_tab {
Some(tab) => tab,
None => match self.create_tab().await {
Ok(tab) => tab,
Err(e) => {
self.semaphore.add_permits(1);
return Err(e);
}
},
};
if tab.use_count >= self.config.tab_max_uses {
let browser_idx = tab.browser_idx;
let _ = tab.page.close().await;
match self.sessions[browser_idx].new_blank_page().await {
Ok(page) => {
return Ok((
PooledTab { page, use_count: 0, last_used: Instant::now(), browser_idx },
waited_ms,
));
}
Err(e) => {
self.semaphore.add_permits(1);
return Err(e);
}
}
}
Ok((tab, waited_ms))
}
pub async fn release(&self, mut tab: PooledTab) {
tab.use_count += 1;
tab.last_used = Instant::now();
if tab.page.is_download_armed() {
tab.page.reset_download_behavior().await;
}
self.ready.lock().await.push_back(tab);
self.semaphore.add_permits(1);
}
pub async fn evict_idle(&self) -> Result<()> {
let max_idle = Duration::from_secs(self.config.tab_max_idle_secs);
let now = Instant::now();
let to_evict: Vec<PooledTab> = {
let mut ready = self.ready.lock().await;
let mut keep = VecDeque::with_capacity(ready.len());
let mut evict = Vec::new();
while let Some(tab) = ready.pop_front() {
if now.duration_since(tab.last_used) > max_idle {
evict.push(tab);
} else {
keep.push_back(tab);
}
}
*ready = keep;
evict
};
let futs: Vec<_> = to_evict
.into_iter()
.map(|tab| {
let browser_idx = tab.browser_idx;
let session = &self.sessions[browser_idx];
async move {
if !session.is_alive() {
return Ok::<_, VoidCrawlError>(tab);
}
let _ = tab.page.close().await;
match session.new_blank_page().await {
Ok(page) => Ok(PooledTab {
page,
use_count: 0,
last_used: Instant::now(),
browser_idx,
}),
Err(e) => Err(e),
}
}
})
.collect();
let results = future::join_all(futs).await;
let mut ready = self.ready.lock().await;
let mut first_err: Option<VoidCrawlError> = None;
for result in results {
match result {
Ok(tab) => ready.push_back(tab),
Err(e) => {
if first_err.is_none() {
first_err = Some(e);
}
}
}
}
drop(ready);
if let Some(e) = first_err { Err(e) } else { Ok(()) }
}
pub fn config(&self) -> &PoolConfig {
&self.config
}
pub fn available_permits(&self) -> usize {
self.semaphore.available_permits()
}
pub fn start_eviction_task(self: Arc<Self>) {
let mut slot = self.eviction_task.lock().unwrap_or_else(PoisonError::into_inner);
if slot.is_some() {
return; }
let pool = Arc::clone(&self);
let interval = Duration::from_secs((self.config.tab_max_idle_secs / 2).max(1));
let handle = tokio::spawn(async move {
loop {
sleep(interval).await;
let _ = pool.evict_idle().await;
}
});
*slot = Some(handle);
}
pub fn stop_eviction_task(&self) {
let mut slot = self.eviction_task.lock().unwrap_or_else(PoisonError::into_inner);
if let Some(task) = slot.take() {
task.abort();
}
}
pub async fn close(&self) -> Result<()> {
self.stop_eviction_task();
let tabs: Vec<PooledTab> = {
let mut ready = self.ready.lock().await;
ready.drain(..).collect()
};
let mut first_err: Option<VoidCrawlError> = None;
let tab_futs: Vec<_> = tabs.into_iter().map(|tab| tab.page.close()).collect();
for result in future::join_all(tab_futs).await {
if let Err(e) = result {
if first_err.is_none() {
first_err = Some(e);
}
}
}
let session_futs: Vec<_> = self.sessions.iter().map(BrowserSession::close).collect();
for result in future::join_all(session_futs).await {
if let Err(e) = result {
if first_err.is_none() {
first_err = Some(e);
}
}
}
if let Some(e) = first_err { Err(e) } else { Ok(()) }
}
}