use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use chromiumoxide::browser::{Browser, BrowserConfig};
use chromiumoxide::cdp::browser_protocol::network::SetUserAgentOverrideParams;
use futures::StreamExt;
use tokio::sync::{Mutex, Semaphore};
use tracing::{debug, warn};
use crate::fetcher::{PageFetcher, WaitStrategy};
use crate::{Result, SearchError};
#[derive(Debug, Clone)]
pub struct BrowserPoolConfig {
pub max_tabs: usize,
pub headless: bool,
pub chrome_path: Option<String>,
pub proxy_url: Option<String>,
pub launch_args: Vec<String>,
}
impl Default for BrowserPoolConfig {
fn default() -> Self {
Self {
max_tabs: 4,
headless: true,
chrome_path: None,
proxy_url: None,
launch_args: Vec::new(),
}
}
}
pub struct BrowserPool {
config: BrowserPoolConfig,
browser: Mutex<Option<Arc<Browser>>>,
tab_semaphore: Arc<Semaphore>,
}
impl BrowserPool {
pub fn new(config: BrowserPoolConfig) -> Self {
let max_tabs = config.max_tabs;
Self {
config,
browser: Mutex::new(None),
tab_semaphore: Arc::new(Semaphore::new(max_tabs)),
}
}
pub fn tab_semaphore(&self) -> &Arc<Semaphore> {
&self.tab_semaphore
}
pub async fn acquire_browser(&self) -> Result<Arc<Browser>> {
let mut guard = self.browser.lock().await;
if let Some(ref browser) = *guard {
return Ok(Arc::clone(browser));
}
debug!("Launching headless browser");
let mut builder = BrowserConfig::builder();
if self.config.headless {
builder = builder.arg("--headless=new");
}
if let Some(ref path) = self.config.chrome_path {
builder = builder.chrome_executable(path);
}
builder = builder
.arg("--disable-gpu")
.arg("--no-sandbox")
.arg("--disable-dev-shm-usage")
.arg("--disable-extensions")
.arg("--disable-background-networking")
.arg("--disable-default-apps")
.arg("--disable-sync")
.arg("--disable-translate")
.arg("--mute-audio")
.arg("--no-first-run");
if let Some(ref proxy) = self.config.proxy_url {
builder = builder.arg(format!("--proxy-server={}", proxy));
}
for arg in &self.config.launch_args {
builder = builder.arg(arg);
}
let browser_config = builder
.build()
.map_err(|e| SearchError::Browser(format!("Failed to build browser config: {}", e)))?;
let (browser, mut handler) = Browser::launch(browser_config)
.await
.map_err(|e| SearchError::Browser(format!("Failed to launch browser: {}", e)))?;
tokio::spawn(async move {
while let Some(event) = handler.next().await {
if let Err(e) = event {
warn!("Browser CDP handler error: {}", e);
}
}
debug!("Browser CDP handler exited");
});
let browser = Arc::new(browser);
*guard = Some(Arc::clone(&browser));
Ok(browser)
}
pub async fn shutdown(&self) {
let mut guard = self.browser.lock().await;
if guard.take().is_some() {
debug!("Browser pool shut down");
}
}
}
pub struct BrowserFetcher {
pool: Arc<BrowserPool>,
wait: WaitStrategy,
user_agent: Option<String>,
}
impl BrowserFetcher {
pub fn new(pool: Arc<BrowserPool>) -> Self {
Self {
pool,
wait: WaitStrategy::default(),
user_agent: None,
}
}
pub fn with_wait(mut self, wait: WaitStrategy) -> Self {
self.wait = wait;
self
}
pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
self.user_agent = Some(user_agent.into());
self
}
}
#[async_trait]
impl PageFetcher for BrowserFetcher {
async fn fetch(&self, url: &str) -> Result<String> {
let _permit = self
.pool
.tab_semaphore()
.acquire()
.await
.map_err(|e| SearchError::Browser(format!("Tab semaphore closed: {}", e)))?;
let browser = self.pool.acquire_browser().await?;
let page = browser
.new_page(url)
.await
.map_err(|e| SearchError::Browser(format!("Failed to open tab: {}", e)))?;
if let Some(ref ua) = self.user_agent {
page.set_user_agent(SetUserAgentOverrideParams::new(ua))
.await
.map_err(|e| SearchError::Browser(format!("Failed to set user agent: {}", e)))?;
}
match &self.wait {
WaitStrategy::Load => {
page.wait_for_navigation()
.await
.map_err(|e| SearchError::Browser(format!("Navigation wait failed: {}", e)))?;
}
WaitStrategy::NetworkIdle { idle_ms } => {
page.wait_for_navigation()
.await
.map_err(|e| SearchError::Browser(format!("Navigation wait failed: {}", e)))?;
tokio::time::sleep(Duration::from_millis(*idle_ms)).await;
}
WaitStrategy::Selector { css, timeout_ms } => {
tokio::time::timeout(Duration::from_millis(*timeout_ms), async {
page.find_element(css.as_str())
.await
.map_err(|e| SearchError::Browser(format!("Selector wait failed: {}", e)))
})
.await
.map_err(|_| {
SearchError::Browser(format!(
"Timed out waiting for selector '{}' after {}ms",
css, timeout_ms
))
})??;
}
WaitStrategy::Delay { ms } => {
page.wait_for_navigation()
.await
.map_err(|e| SearchError::Browser(format!("Navigation wait failed: {}", e)))?;
tokio::time::sleep(Duration::from_millis(*ms)).await;
}
}
let html = page
.content()
.await
.map_err(|e| SearchError::Browser(format!("Failed to get page content: {}", e)))?;
if let Err(e) = page.close().await {
warn!("Failed to close browser tab: {}", e);
}
Ok(html)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_browser_pool_config_default() {
let config = BrowserPoolConfig::default();
assert_eq!(config.max_tabs, 4);
assert!(config.headless);
assert!(config.chrome_path.is_none());
assert!(config.proxy_url.is_none());
assert!(config.launch_args.is_empty());
}
#[test]
fn test_browser_pool_config_custom() {
let config = BrowserPoolConfig {
max_tabs: 8,
headless: false,
chrome_path: Some("/usr/bin/chromium".to_string()),
proxy_url: Some("http://localhost:8080".to_string()),
launch_args: vec!["--disable-web-security".to_string()],
};
assert_eq!(config.max_tabs, 8);
assert!(!config.headless);
assert_eq!(config.chrome_path.as_deref(), Some("/usr/bin/chromium"));
assert_eq!(config.proxy_url.as_deref(), Some("http://localhost:8080"));
assert_eq!(config.launch_args.len(), 1);
}
#[test]
fn test_browser_pool_new() {
let pool = BrowserPool::new(BrowserPoolConfig::default());
assert_eq!(pool.tab_semaphore().available_permits(), 4);
}
#[test]
fn test_browser_pool_custom_tabs() {
let config = BrowserPoolConfig {
max_tabs: 2,
..Default::default()
};
let pool = BrowserPool::new(config);
assert_eq!(pool.tab_semaphore().available_permits(), 2);
}
#[test]
fn test_browser_fetcher_new() {
let pool = Arc::new(BrowserPool::new(BrowserPoolConfig::default()));
let fetcher = BrowserFetcher::new(pool);
assert!(matches!(fetcher.wait, WaitStrategy::Load));
assert!(fetcher.user_agent.is_none());
}
#[test]
fn test_browser_fetcher_with_wait() {
let pool = Arc::new(BrowserPool::new(BrowserPoolConfig::default()));
let fetcher = BrowserFetcher::new(pool).with_wait(WaitStrategy::Selector {
css: "div.g".to_string(),
timeout_ms: 5000,
});
assert!(matches!(fetcher.wait, WaitStrategy::Selector { .. }));
}
#[test]
fn test_browser_fetcher_with_user_agent() {
let pool = Arc::new(BrowserPool::new(BrowserPoolConfig::default()));
let fetcher = BrowserFetcher::new(pool).with_user_agent("CustomBot/1.0");
assert_eq!(fetcher.user_agent.as_deref(), Some("CustomBot/1.0"));
}
}