use std::future::Future;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use futures_util::StreamExt;
use futures_util::stream;
use tokio::sync::{Mutex, Semaphore};
use crate::cdp::{ChromiumBrowser, ChromiumContextOverride, ChromiumOptions, ChromiumTab};
use crate::pool::rotate::{RotateStrategy, Rotator, hash_key};
use crate::pool::{Checkpoint, RetryPolicy, is_worker_dead};
use crate::{Error, Result};
pub struct ChromiumPoolOptions {
pub size: usize,
pub tabs_per_worker: usize,
pub base_options: ChromiumOptions,
pub worker_options: Vec<ChromiumOptions>,
pub proxies: Vec<String>,
pub user_agents: Vec<String>,
pub rotate: RotateStrategy,
pub retry: RetryPolicy,
pub close_tab_after_task: bool,
}
impl Default for ChromiumPoolOptions {
fn default() -> Self {
Self {
size: 4,
tabs_per_worker: 1,
base_options: ChromiumOptions::default(),
worker_options: Vec::new(),
proxies: Vec::new(),
user_agents: Vec::new(),
rotate: RotateStrategy::default(),
retry: RetryPolicy::default(),
close_tab_after_task: true,
}
}
}
impl ChromiumPoolOptions {
pub fn new() -> Self {
Self::default()
}
pub fn size(mut self, n: usize) -> Self {
self.size = n;
self
}
pub fn tabs_per_worker(mut self, n: usize) -> Self {
self.tabs_per_worker = n;
self
}
pub fn base_options(mut self, opts: ChromiumOptions) -> Self {
self.base_options = opts;
self
}
pub fn worker_options(mut self, opts: Vec<ChromiumOptions>) -> Self {
self.worker_options = opts;
self
}
pub fn proxies(mut self, proxies: Vec<String>) -> Self {
self.proxies = proxies;
self
}
pub fn user_agents(mut self, uas: Vec<String>) -> Self {
self.user_agents = uas;
self
}
pub fn rotate(mut self, s: RotateStrategy) -> Self {
self.rotate = s;
self
}
pub fn retry(mut self, policy: RetryPolicy) -> Self {
self.retry = policy;
self
}
pub fn close_tab_after_task(mut self, yes: bool) -> Self {
self.close_tab_after_task = yes;
self
}
}
struct CdpWorker {
browser: Mutex<Arc<ChromiumBrowser>>,
options: ChromiumOptions,
healthy: AtomicBool,
}
impl CdpWorker {
async fn handle(&self) -> Result<Arc<ChromiumBrowser>> {
if self.healthy.load(Ordering::Acquire) {
return Ok(self.browser.lock().await.clone());
}
let mut guard = self.browser.lock().await;
if !self.healthy.load(Ordering::Acquire) {
tracing::warn!("CDP worker 不健康,重建 Chrome 进程");
let fresh = ChromiumBrowser::launch(self.options.clone()).await?;
*guard = Arc::new(fresh);
self.healthy.store(true, Ordering::Release);
}
Ok(guard.clone())
}
fn mark_unhealthy(&self) {
self.healthy.store(false, Ordering::Release);
}
}
pub struct ChromiumPool {
workers: Vec<Arc<CdpWorker>>,
sem: Arc<Semaphore>,
worker_cursor: AtomicU64,
concurrency: usize,
proxies: Vec<String>,
user_agents: Vec<String>,
proxy_rotator: Rotator,
ua_rotator: Rotator,
retry: RetryPolicy,
close_tab_after_task: bool,
}
impl ChromiumPool {
pub async fn launch(opts: ChromiumPoolOptions) -> Result<Self> {
let ChromiumPoolOptions {
size,
tabs_per_worker,
base_options,
worker_options,
proxies,
user_agents,
rotate,
retry,
close_tab_after_task,
} = opts;
let size = size.max(1);
let tabs_per_worker = tabs_per_worker.max(1);
let worker_opts: Vec<ChromiumOptions> = (0..size)
.map(|i| {
worker_options
.get(i)
.cloned()
.unwrap_or_else(|| base_options.clone())
})
.collect();
let workers =
futures_util::future::try_join_all(worker_opts.into_iter().map(|o| async move {
let browser = ChromiumBrowser::launch(o.clone()).await?;
Ok::<Arc<CdpWorker>, Error>(Arc::new(CdpWorker {
browser: Mutex::new(Arc::new(browser)),
options: o,
healthy: AtomicBool::new(true),
}))
}))
.await?;
let concurrency = size * tabs_per_worker;
Ok(Self {
workers,
sem: Arc::new(Semaphore::new(concurrency)),
worker_cursor: AtomicU64::new(0),
concurrency,
proxies,
user_agents,
proxy_rotator: Rotator::new(rotate),
ua_rotator: Rotator::new(rotate),
retry,
close_tab_after_task,
})
}
pub fn concurrency(&self) -> usize {
self.concurrency
}
pub fn worker_count(&self) -> usize {
self.workers.len()
}
fn pick_worker(&self, key: Option<&str>) -> Arc<CdpWorker> {
let n = self.workers.len() as u64;
let idx = match key {
Some(k) => hash_key(k) % n,
None => self.worker_cursor.fetch_add(1, Ordering::Relaxed) % n,
};
self.workers[idx as usize].clone()
}
fn build_override(&self, key: Option<&str>) -> ChromiumContextOverride {
let mut ov = ChromiumContextOverride::new();
if let Some(i) = self.proxy_rotator.pick(self.proxies.len(), key) {
ov = ov.proxy(self.proxies[i].clone());
}
if let Some(i) = self.ua_rotator.pick(self.user_agents.len(), key) {
ov = ov.user_agent(self.user_agents[i].clone());
}
ov
}
pub async fn run<F, Fut, T>(&self, task: F) -> Result<T>
where
F: Fn(ChromiumTab) -> Fut,
Fut: Future<Output = Result<T>>,
{
self.run_keyed(None, task).await
}
pub async fn run_keyed<F, Fut, T>(&self, key: Option<&str>, task: F) -> Result<T>
where
F: Fn(ChromiumTab) -> Fut,
Fut: Future<Output = Result<T>>,
{
let _permit = self
.sem
.acquire()
.await
.map_err(|_| Error::Other("并发池信号量已关闭".into()))?;
let mut attempt = 0u32;
let mut backoff = self.retry.backoff;
loop {
let worker = self.pick_worker(key);
match self.try_once(&worker, key, &task).await {
Ok(v) => return Ok(v),
Err(e) => {
if is_worker_dead(&e) {
worker.mark_unhealthy();
}
if attempt >= self.retry.max_retries {
return Err(e);
}
attempt += 1;
tracing::debug!(attempt, error = %e, "CDP 任务失败,重试");
if !backoff.is_zero() {
tokio::time::sleep(backoff).await;
backoff = Duration::from_secs_f64(
backoff.as_secs_f64() * self.retry.backoff_factor,
);
}
}
}
}
}
async fn try_once<F, Fut, T>(
&self,
worker: &Arc<CdpWorker>,
key: Option<&str>,
task: &F,
) -> Result<T>
where
F: Fn(ChromiumTab) -> Fut,
Fut: Future<Output = Result<T>>,
{
let browser = worker.handle().await?;
let ov = self.build_override(key);
let tab = browser.new_tab_with(&ov).await.inspect_err(|e| {
if is_worker_dead(e) {
worker.mark_unhealthy();
}
})?;
let out = task(tab.clone()).await;
if self.close_tab_after_task {
let _ = tab.close().await;
}
out
}
pub async fn map<I, F, Fut, T>(&self, items: Vec<I>, task: F) -> Vec<(I, Result<T>)>
where
I: Clone,
F: Fn(I, ChromiumTab) -> Fut + Clone,
Fut: Future<Output = Result<T>>,
{
let n = items.len();
let concurrency = self.concurrency.max(1);
let mut collected: Vec<(usize, I, Result<T>)> = stream::iter(items.into_iter().enumerate())
.map(|(i, item)| {
let task = task.clone();
async move {
let item2 = item.clone();
let r = self.run(move |tab| task(item2.clone(), tab)).await;
(i, item, r)
}
})
.buffer_unordered(concurrency)
.collect()
.await;
collected.sort_by_key(|(i, _, _)| *i);
let mut out: Vec<(I, Result<T>)> = Vec::with_capacity(n);
for (_, item, r) in collected {
out.push((item, r));
}
out
}
pub async fn map_resumable<I, K, F, Fut, T>(
&self,
items: Vec<I>,
key_of: K,
ckpt: &Checkpoint,
task: F,
) -> Vec<(I, Result<T>)>
where
I: Clone,
K: Fn(&I) -> String,
F: Fn(I, ChromiumTab) -> Fut + Clone,
Fut: Future<Output = Result<T>>,
{
let mut pending: Vec<(String, I)> = Vec::new();
for it in items {
let k = key_of(&it);
if !ckpt.is_done(&k).await {
pending.push((k, it));
}
}
let n = pending.len();
let concurrency = self.concurrency.max(1);
let mut collected: Vec<(usize, I, Result<T>)> =
stream::iter(pending.into_iter().enumerate())
.map(|(i, (k, item))| {
let task = task.clone();
async move {
let item2 = item.clone();
let r = self.run(move |tab| task(item2.clone(), tab)).await;
if r.is_ok() {
let _ = ckpt.mark_done(&k, None).await;
}
(i, item, r)
}
})
.buffer_unordered(concurrency)
.collect()
.await;
collected.sort_by_key(|(i, _, _)| *i);
let mut out: Vec<(I, Result<T>)> = Vec::with_capacity(n);
for (_, item, r) in collected {
out.push((item, r));
}
out
}
pub async fn shutdown(self) -> Result<()> {
let mut futs = Vec::new();
for w in &self.workers {
let b = w.browser.lock().await.clone();
futs.push(async move {
let _ = b.quit().await;
});
}
futures_util::future::join_all(futs).await;
Ok(())
}
}