use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use drission::prelude::*;
fn tmp_path(name: &str) -> PathBuf {
let mut p = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
p.push("target");
p.push("test-tmp");
p.push(format!("pool_crawl-{}-{}.jsonl", name, std::process::id()));
p
}
#[tokio::main]
async fn main() -> drission::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| "warn".into()),
)
.init();
println!("[*] 启动并发池(2 worker × 2 标签,headless,指纹池=presets)…");
let pool = BrowserPool::launch(
PoolOptions::new()
.size(2)
.tabs_per_worker(2)
.base_options(BrowserOptions::new().headless(true))
.fingerprints(FingerprintPool::presets())
.retry(RetryPolicy::new(2)),
)
.await?;
println!(
" workers={} concurrency={}",
pool.worker_count(),
pool.concurrency()
);
let items: Vec<u32> = (0..6).collect();
let results = pool
.map(items.clone(), |i, tab| async move {
let url = format!("data:text/html,<html><body><h1 id=t>item-{i}</h1></body></html>");
tab.get(&url).await?;
let sig = tab
.run_js(
"JSON.stringify([\
navigator.language||'', \
(Intl.DateTimeFormat().resolvedOptions().timeZone)||'', \
''+window.innerWidth])",
)
.await?;
let arr: Vec<String> =
serde_json::from_str(sig.as_str().unwrap_or("[]")).unwrap_or_default();
let txt = tab.ele("#t").await?.text().await?;
Ok::<(String, Vec<String>), drission::Error>((txt, arr))
})
.await;
if let Some((_, Err(e))) = results.iter().find(|(_, r)| r.is_err()) {
println!(" [debug] A 首个错误: {e}");
}
let all_ok = results.len() == 6 && results.iter().all(|(_, r)| r.is_ok());
let content_ok = results.iter().enumerate().all(|(idx, (i, r))| {
*i == idx as u32
&& r.as_ref()
.map(|(t, _)| t == &format!("item-{i}"))
.unwrap_or(false)
});
let sig_at = |k: usize| -> Vec<String> {
results
.iter()
.filter_map(|(_, r)| r.as_ref().ok().and_then(|(_, v)| v.get(k).cloned()))
.collect()
};
let langs = sig_at(0);
let tzs = sig_at(1);
let vws = sig_at(2);
let distinct = |v: &[String]| -> usize { v.iter().collect::<HashSet<_>>().len() };
println!(
" [diag] language: distinct={} {:?}",
distinct(&langs),
langs
);
println!(" [diag] timezone: distinct={} {:?}", distinct(&tzs), tzs);
println!(
" [diag] innerWidth: distinct={} {:?}",
distinct(&vws),
vws
);
let rotate_ok = distinct(&langs).max(distinct(&tzs)).max(distinct(&vws)) >= 2;
println!(
"[A] 并发 map: {}/6 成功; 顺序&内容 ok={content_ok}; 指纹轮换 ok={rotate_ok}",
results.iter().filter(|(_, r)| r.is_ok()).count(),
);
let a_ok = all_ok && content_ok && rotate_ok;
let attempts = Arc::new(AtomicUsize::new(0));
let r_retry = {
let attempts = attempts.clone();
pool.run(move |tab| {
let attempts = attempts.clone();
async move {
let n = attempts.fetch_add(1, Ordering::SeqCst) + 1;
tab.get("data:text/html,<h1>retry</h1>").await?;
if n < 2 {
return Err(drission::Error::msg("注入的首次失败"));
}
Ok::<usize, drission::Error>(n)
}
})
.await
};
let b_ok =
r_retry.as_ref().map(|n| *n == 2).unwrap_or(false) && attempts.load(Ordering::SeqCst) == 2;
println!(
"[B] 重试: 结果={:?} 总尝试={} (ok={b_ok})",
r_retry,
attempts.load(Ordering::SeqCst)
);
let ckpt_path = tmp_path("resume");
let _ = tokio::fs::remove_file(&ckpt_path).await;
let ckpt = Checkpoint::load(&ckpt_path).await?;
let crawl = |i: u32, tab: Tab| async move {
tab.get("data:text/html,<h1>ok</h1>").await?;
Ok::<u32, drission::Error>(i)
};
let r1 = pool
.map_resumable(
(0..4).collect::<Vec<_>>(),
|i| format!("k{i}"),
&ckpt,
crawl,
)
.await;
let first_ok = r1.len() == 4 && r1.iter().all(|(_, r)| r.is_ok());
let done1 = ckpt.done_count().await;
let r2 = pool
.map_resumable(
(0..6).collect::<Vec<_>>(),
|i| format!("k{i}"),
&ckpt,
crawl,
)
.await;
let resume_ok = r2.len() == 2 && r2.iter().all(|(_, r)| r.is_ok());
let done2 = ckpt.done_count().await;
let c_ok = first_ok && done1 == 4 && resume_ok && done2 == 6;
println!(
"[C] 断点续抓: 首跑 {} 项(done={done1}); 续跑只补 {} 项(done={done2}) (ok={c_ok})",
r1.len(),
r2.len()
);
let _ = tokio::fs::remove_file(&ckpt_path).await;
let pass = a_ok && b_ok && c_ok;
println!(
"\n==== {} ====",
if pass {
"ALL CHECKS PASSED"
} else {
"SOME CHECKS FAILED"
}
);
pool.shutdown().await?;
if pass {
Ok(())
} else {
Err(drission::Error::msg("pool_crawl 自验证未通过"))
}
}