use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::time::Duration;
#[cfg(any(feature = "chrome", feature = "webdriver"))]
use std::time::Instant;
use crate::configuration::{
BackendEndpoint, BackendEngine, BackendProtocol, ParallelBackendsConfig, ProxyIgnore,
RequestProxy,
};
use crate::page::AntiBotTech;
use reqwest::StatusCode;
static BACKEND_BYTES_IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
pub struct BackendBytesGuard(usize);
impl BackendBytesGuard {
pub fn try_acquire(n: usize, limit: usize) -> Option<Self> {
if limit == 0 {
BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
return Some(Self(n));
}
let mut current = BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed);
loop {
if current.saturating_add(n) > limit {
return None;
}
match BACKEND_BYTES_IN_FLIGHT.compare_exchange_weak(
current,
current + n,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return Some(Self(n)),
Err(actual) => current = actual,
}
}
}
pub fn acquire_unchecked(n: usize) -> Self {
BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
Self(n)
}
pub fn in_flight() -> usize {
BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed)
}
}
impl Drop for BackendBytesGuard {
fn drop(&mut self) {
BACKEND_BYTES_IN_FLIGHT.fetch_sub(self.0, Ordering::Relaxed);
}
}
pub fn is_binary_content_type(ct: &str) -> bool {
let ct = ct.split(';').next().unwrap_or(ct).trim();
ct.starts_with("image/")
|| ct.starts_with("audio/")
|| ct.starts_with("video/")
|| ct.starts_with("font/")
|| ct == "application/pdf"
|| ct == "application/zip"
|| ct == "application/gzip"
|| ct == "application/x-gzip"
|| ct == "application/octet-stream"
|| ct == "application/wasm"
|| ct == "application/x-tar"
|| ct == "application/x-bzip2"
|| ct == "application/x-7z-compressed"
|| ct == "application/x-rar-compressed"
|| ct == "application/vnd.ms-fontobject"
|| ct == "application/x-font-ttf"
|| ct == "application/x-font-woff"
}
pub fn should_skip_backend_for_url(
url: &str,
extra_extensions: &[crate::compact_str::CompactString],
) -> bool {
if crate::page::is_asset_url(url) {
return true;
}
if !extra_extensions.is_empty() {
if let Some(pos) = url.rfind('.') {
let ext = &url[pos + 1..];
if ext.len() >= 2 {
let ext_lower = ext.to_ascii_lowercase();
for skip in extra_extensions {
if skip.eq_ignore_ascii_case(&ext_lower) {
return true;
}
}
}
}
}
false
}
#[derive(Default)]
pub struct ValidationResult {
pub score_override: Option<u16>,
pub score_adjust: i16,
pub reject: bool,
}
pub type QualityValidator = std::sync::Arc<
dyn Fn(
Option<&[u8]>, // html content
StatusCode, // status code
&str, // url
&str, // backend source name
) -> ValidationResult
+ Send
+ Sync,
>;
pub fn html_quality_score_validated(
content: Option<&[u8]>,
status_code: StatusCode,
anti_bot: &AntiBotTech,
url: &str,
source: &str,
validator: Option<&QualityValidator>,
) -> u16 {
let base = html_quality_score(content, status_code, anti_bot);
if let Some(v) = validator {
let result = v(content, status_code, url, source);
if result.reject {
return 0;
}
if let Some(ov) = result.score_override {
return ov.min(100);
}
let adjusted = (base as i16).saturating_add(result.score_adjust);
return (adjusted.max(0) as u16).min(100);
}
base
}
pub fn html_quality_score(
content: Option<&[u8]>,
status_code: StatusCode,
anti_bot: &AntiBotTech,
) -> u16 {
let mut score: u16 = 0;
if status_code == StatusCode::OK {
score += 30;
} else if status_code.is_success() {
score += 20;
} else if status_code.is_redirection() {
score += 5;
}
if let Some(body) = content {
let len = body.len();
if len > 0 {
score += 5;
}
if len > 512 {
score += 10;
}
if len > 4096 {
score += 10;
}
if memchr::memmem::find(body, b"<body").is_some()
|| memchr::memmem::find(body, b"<BODY").is_some()
{
score += 15;
}
if !crate::utils::is_cacheable_body_empty(body) {
score += 10;
}
}
if *anti_bot == AntiBotTech::None {
score += 20;
}
score.min(100)
}
struct BackendStats {
wins: AtomicU64,
races: AtomicU64,
ema_ms: AtomicU64,
consecutive_errors: AtomicU64,
disabled: AtomicBool,
}
impl BackendStats {
fn new() -> Self {
Self {
wins: AtomicU64::new(0),
races: AtomicU64::new(0),
ema_ms: AtomicU64::new(0),
consecutive_errors: AtomicU64::new(0),
disabled: AtomicBool::new(false),
}
}
}
impl Clone for BackendStats {
fn clone(&self) -> Self {
Self {
wins: AtomicU64::new(self.wins.load(Ordering::Relaxed)),
races: AtomicU64::new(self.races.load(Ordering::Relaxed)),
ema_ms: AtomicU64::new(self.ema_ms.load(Ordering::Relaxed)),
consecutive_errors: AtomicU64::new(self.consecutive_errors.load(Ordering::Relaxed)),
disabled: AtomicBool::new(self.disabled.load(Ordering::Relaxed)),
}
}
}
pub struct BackendTracker {
stats: Vec<BackendStats>,
max_consecutive_errors: u64,
}
impl BackendTracker {
pub fn new(count: usize, max_consecutive_errors: u16) -> Self {
let mut stats = Vec::with_capacity(count);
for _ in 0..count {
stats.push(BackendStats::new());
}
Self {
stats,
max_consecutive_errors: max_consecutive_errors as u64,
}
}
pub fn record_win(&self, idx: usize) {
if let Some(s) = self.stats.get(idx) {
s.wins.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_race(&self, idx: usize) {
if let Some(s) = self.stats.get(idx) {
s.races.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_duration(&self, idx: usize, dur: Duration) {
if let Some(s) = self.stats.get(idx) {
let ms = dur.as_millis() as u64;
let count = s.races.load(Ordering::Relaxed);
if count <= 1 {
s.ema_ms.store(ms, Ordering::Relaxed);
} else {
let _ = s
.ema_ms
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
Some((old * 4 + ms) / 5)
});
}
}
}
pub fn record_error(&self, idx: usize) {
if let Some(s) = self.stats.get(idx) {
let prev = s.consecutive_errors.fetch_add(1, Ordering::Relaxed);
let never_succeeded =
s.wins.load(Ordering::Relaxed) == 0 && s.races.load(Ordering::Relaxed) <= 1;
if never_succeeded || prev + 1 >= self.max_consecutive_errors {
s.disabled.store(true, Ordering::Relaxed);
if never_succeeded {
log::info!(
"[parallel_backends] backend {} failed on probe (first request) — auto-disabled",
idx
);
}
}
}
}
pub fn record_success(&self, idx: usize) {
if let Some(s) = self.stats.get(idx) {
s.consecutive_errors.store(0, Ordering::Relaxed);
}
}
pub fn is_disabled(&self, idx: usize) -> bool {
self.stats
.get(idx)
.is_none_or(|s| s.disabled.load(Ordering::Relaxed))
}
pub fn wins(&self, idx: usize) -> u64 {
self.stats
.get(idx)
.map_or(0, |s| s.wins.load(Ordering::Relaxed))
}
pub fn races(&self, idx: usize) -> u64 {
self.stats
.get(idx)
.map_or(0, |s| s.races.load(Ordering::Relaxed))
}
pub fn ema_ms(&self, idx: usize) -> u64 {
self.stats
.get(idx)
.map_or(0, |s| s.ema_ms.load(Ordering::Relaxed))
}
pub fn consecutive_errors(&self, idx: usize) -> u64 {
self.stats
.get(idx)
.map_or(0, |s| s.consecutive_errors.load(Ordering::Relaxed))
}
pub fn win_rate_pct(&self, idx: usize) -> u64 {
let r = self.races(idx);
if r == 0 {
return 0;
}
self.wins(idx) * 100 / r
}
pub fn len(&self) -> usize {
self.stats.len()
}
pub fn is_empty(&self) -> bool {
self.stats.is_empty()
}
}
impl Clone for BackendTracker {
fn clone(&self) -> Self {
Self {
stats: self.stats.clone(),
max_consecutive_errors: self.max_consecutive_errors,
}
}
}
pub struct BackendResponse {
pub page: crate::page::Page,
pub quality_score: u16,
pub backend_index: usize,
pub duration: Duration,
pub _bytes_guard: Option<BackendBytesGuard>,
}
pub struct BackendResult {
pub backend_index: usize,
pub response: Option<BackendResponse>,
}
pub fn backend_source_name(endpoint: &BackendEndpoint) -> &'static str {
match endpoint.engine {
BackendEngine::Cdp => "cdp",
BackendEngine::Servo => "servo",
BackendEngine::Custom => "custom",
}
}
pub fn resolve_protocol(endpoint: &BackendEndpoint) -> BackendProtocol {
if let Some(ref p) = endpoint.protocol {
return p.clone();
}
match endpoint.engine {
BackendEngine::Cdp => BackendProtocol::Cdp,
BackendEngine::Servo => BackendProtocol::WebDriver,
BackendEngine::Custom => {
if let Some(ref ep) = endpoint.endpoint {
if ep.starts_with("ws://") || ep.starts_with("wss://") {
return BackendProtocol::Cdp;
}
}
BackendProtocol::WebDriver }
}
}
#[inline]
pub fn tag_page_source(page: &mut crate::page::Page, source: &str) {
page.backend_source = Some(crate::compact_str::CompactString::from(source));
}
pub async fn race_backends(
primary: Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>>,
alternatives: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>>,
config: &ParallelBackendsConfig,
tracker: &BackendTracker,
) -> Option<BackendResponse> {
if !config.enabled || alternatives.is_empty() {
let resp = primary.await;
if let Some(ref r) = resp {
tracker.record_race(r.backend_index);
tracker.record_win(r.backend_index);
tracker.record_duration(r.backend_index, r.duration);
tracker.record_success(r.backend_index);
}
return resp;
}
let total = 1 + alternatives.len();
let primary_jitter_us = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut h = DefaultHasher::new();
std::time::SystemTime::now().hash(&mut h);
0u16.hash(&mut h); h.finish() % 1000 };
let primary_wrapped: Pin<Box<dyn Future<Output = BackendResult> + Send>> =
Box::pin(async move {
if primary_jitter_us > 0 {
tokio::time::sleep(Duration::from_micros(primary_jitter_us)).await;
}
let response = primary.await;
BackendResult {
backend_index: 0,
response,
}
});
let mut futs = tokio::task::JoinSet::new();
futs.spawn(primary_wrapped);
for alt in alternatives {
futs.spawn(alt);
}
let grace = {
let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
if mem_state >= 2 {
Duration::ZERO
} else if mem_state >= 1 {
Duration::from_millis(config.grace_period_ms / 2)
} else {
Duration::from_millis(config.grace_period_ms)
}
};
let threshold = config.fast_accept_threshold;
let mut best: Option<BackendResponse> = None;
let mut completed = 0usize;
let mut grace_deadline: Option<tokio::time::Instant> = None;
loop {
if completed >= total {
break;
}
let result = if let Some(deadline) = grace_deadline {
tokio::select! {
biased;
res = futs.join_next() => res,
_ = tokio::time::sleep_until(deadline) => break,
}
} else {
futs.join_next().await
};
match result {
Some(Ok(br)) => {
completed += 1;
let idx = br.backend_index;
match br.response {
Some(resp) => {
tracker.record_race(idx);
tracker.record_duration(idx, resp.duration);
tracker.record_success(idx);
let score = resp.quality_score;
if best.is_none() && score >= threshold {
tracker.record_win(idx);
return Some(resp);
}
let dominated = match &best {
Some(b) => score > b.quality_score,
None => true,
};
if dominated {
best = Some(resp);
}
if grace_deadline.is_none() {
grace_deadline = Some(tokio::time::Instant::now() + grace);
}
}
None => {
tracker.record_race(idx);
tracker.record_error(idx);
}
}
}
Some(Err(_join_err)) => {
completed += 1;
}
None => {
break;
}
}
}
futs.abort_all();
drop(futs);
if let Some(ref b) = best {
tracker.record_win(b.backend_index);
}
best
}
pub struct ProxyRotator {
cdp_addrs: Vec<String>,
wd_addrs: Vec<String>,
cdp_index: AtomicUsize,
wd_index: AtomicUsize,
}
impl ProxyRotator {
pub fn new(proxies: &Option<Vec<RequestProxy>>) -> Self {
let (mut cdp, mut wd) = (Vec::new(), Vec::new());
if let Some(proxies) = proxies {
for p in proxies {
if p.ignore != ProxyIgnore::Chrome {
cdp.push(p.addr.clone());
}
if p.ignore != ProxyIgnore::Http {
wd.push(p.addr.clone());
}
}
}
Self {
cdp_addrs: cdp,
wd_addrs: wd,
cdp_index: AtomicUsize::new(0),
wd_index: AtomicUsize::new(0),
}
}
pub fn next_cdp(&self) -> Option<&str> {
let len = self.cdp_addrs.len();
if len == 0 {
return None;
}
let idx = self.cdp_index.fetch_add(1, Ordering::Relaxed) % len;
self.cdp_addrs.get(idx).map(|s| s.as_str())
}
pub fn next_webdriver(&self) -> Option<&str> {
let len = self.wd_addrs.len();
if len == 0 {
return None;
}
let idx = self.wd_index.fetch_add(1, Ordering::Relaxed) % len;
self.wd_addrs.get(idx).map(|s| s.as_str())
}
pub fn cdp_count(&self) -> usize {
self.cdp_addrs.len()
}
pub fn webdriver_count(&self) -> usize {
self.wd_addrs.len()
}
}
impl Clone for ProxyRotator {
fn clone(&self) -> Self {
Self {
cdp_addrs: self.cdp_addrs.clone(),
wd_addrs: self.wd_addrs.clone(),
cdp_index: AtomicUsize::new(self.cdp_index.load(Ordering::Relaxed)),
wd_index: AtomicUsize::new(self.wd_index.load(Ordering::Relaxed)),
}
}
}
#[cfg(feature = "chrome")]
pub async fn fetch_cdp(
url: &str,
endpoint: &str,
config: &std::sync::Arc<crate::configuration::Configuration>,
backend_index: usize,
connect_timeout: Duration,
proxy: Option<String>,
source_name: &str,
) -> Option<BackendResponse> {
let start = Instant::now();
let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
let handler_config = crate::features::chrome::create_handler_config(config);
let connect_result = tokio::time::timeout(
connect_timeout,
chromiumoxide::Browser::connect_with_config(endpoint, handler_config),
)
.await;
let (mut browser, handler_handle) = match connect_result {
Ok(Ok((browser, mut handler))) => {
let h = tokio::spawn(async move {
use crate::tokio_stream::StreamExt;
while let Some(_) = handler.next().await {}
});
(browser, h)
}
Ok(Err(e)) => {
log::warn!("{} CDP connect failed ({}): {:?}", source_name, endpoint, e);
return None;
}
Err(_) => {
log::warn!("{} CDP connect timed out ({})", source_name, endpoint);
return None;
}
};
if let Some(ref proxy_addr) = proxy {
let mut ctx_params =
chromiumoxide::cdp::browser_protocol::target::CreateBrowserContextParams::default();
ctx_params.dispose_on_detach = Some(true);
ctx_params.proxy_server = Some(proxy_addr.clone());
if let Ok(ctx) = browser.create_browser_context(ctx_params).await {
let _ = browser.send_new_context(ctx).await;
} else {
log::warn!(
"{} proxy browser context failed for {}, continuing without proxy",
source_name,
proxy_addr
);
}
}
let page = match browser.pages().await {
Ok(mut p) if !p.is_empty() => p.swap_remove(0),
_ => match browser.new_page(url).await {
Ok(p) => p,
Err(e) => {
log::warn!("{} page failed: {:?}", source_name, e);
handler_handle.abort();
return None;
}
},
};
crate::features::chrome::setup_chrome_events(&page, config).await;
let _intercept_handle = crate::features::chrome::setup_chrome_interception_base(
&page,
config.chrome_intercept.enabled,
&config.auth_challenge_response,
config.chrome_intercept.block_visuals,
"",
)
.await;
match tokio::time::timeout(timeout, page.goto(url)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
log::warn!("{} navigate failed for {}: {:?}", source_name, url, e);
handler_handle.abort();
return None;
}
Err(_) => {
log::warn!("{} navigate timed out for {}", source_name, url);
handler_handle.abort();
return None;
}
}
#[cfg(feature = "chrome")]
if let Some(ref wf) = config.wait_for {
if let Some(ref delay) = wf.delay {
if let Some(ms) = delay.timeout {
tokio::time::sleep(ms).await;
}
}
}
let html_result = tokio::time::timeout(Duration::from_secs(10), page.outer_html_bytes()).await;
handler_handle.abort();
let html_bytes: Vec<u8> = match html_result {
Ok(Ok(b)) => b.to_vec(),
Ok(Err(e)) => {
log::warn!(
"{} outer_html_bytes() failed for {}: {:?}",
source_name,
url,
e
);
return None;
}
Err(_) => {
log::warn!("{} outer_html_bytes() timed out for {}", source_name, url);
return None;
}
};
let dur = start.elapsed();
let status = StatusCode::OK;
let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
let byte_len = html_bytes.len();
let res = crate::utils::PageResponse {
content: Some(html_bytes),
status_code: status,
..Default::default()
};
let mut page = crate::page::build(url, res);
tag_page_source(&mut page, source_name);
Some(BackendResponse {
page,
quality_score: score,
backend_index,
duration: dur,
_bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
})
}
#[cfg(feature = "webdriver")]
pub async fn fetch_webdriver(
url: &str,
endpoint: &str,
config: &std::sync::Arc<crate::configuration::Configuration>,
backend_index: usize,
connect_timeout: Duration,
proxy: Option<String>,
source_name: &str,
) -> Option<BackendResponse> {
use crate::features::webdriver_common::{WebDriverBrowser, WebDriverConfig};
let start = Instant::now();
let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
let wd_config = WebDriverConfig {
server_url: endpoint.to_string(),
browser: WebDriverBrowser::Chrome, headless: true,
timeout: Some(connect_timeout),
proxy, user_agent: config.user_agent.as_ref().map(|ua| ua.to_string()),
viewport_width: config.viewport.as_ref().map(|v| v.width),
viewport_height: config.viewport.as_ref().map(|v| v.height),
accept_insecure_certs: config.accept_invalid_certs,
..Default::default()
};
let controller_opt = tokio::time::timeout(
connect_timeout,
crate::features::webdriver::launch_driver_base(&wd_config, config),
)
.await;
let mut controller = match controller_opt {
Ok(Some(c)) => c,
Ok(None) => {
log::warn!("{} WebDriver connect failed ({})", source_name, endpoint);
return None;
}
Err(_) => {
log::warn!("{} WebDriver connect timed out ({})", source_name, endpoint);
return None;
}
};
let driver = controller.driver().clone();
match tokio::time::timeout(timeout, driver.goto(url)).await {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
log::warn!(
"{} WebDriver navigate failed for {}: {:?}",
source_name,
url,
e
);
controller.dispose();
return None;
}
Err(_) => {
log::warn!("{} WebDriver navigate timed out for {}", source_name, url);
controller.dispose();
return None;
}
}
let source = match tokio::time::timeout(Duration::from_secs(10), driver.source()).await {
Ok(Ok(s)) => s,
Ok(Err(e)) => {
log::warn!(
"{} WebDriver source failed for {}: {:?}",
source_name,
url,
e
);
controller.dispose();
return None;
}
Err(_) => {
log::warn!("{} WebDriver source timed out for {}", source_name, url);
controller.dispose();
return None;
}
};
controller.dispose();
let dur = start.elapsed();
let html_bytes = source.into_bytes();
let status = StatusCode::OK;
let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
let byte_len = html_bytes.len();
let res = crate::utils::PageResponse {
content: Some(html_bytes),
status_code: status,
..Default::default()
};
let mut page = crate::page::build(url, res);
tag_page_source(&mut page, source_name);
Some(BackendResponse {
page,
quality_score: score,
backend_index,
duration: dur,
_bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
})
}
#[allow(unused_variables)]
pub fn build_backend_futures(
url: &str,
config: &ParallelBackendsConfig,
crawl_config: &std::sync::Arc<crate::configuration::Configuration>,
tracker: &BackendTracker,
proxy_rotator: &Option<std::sync::Arc<ProxyRotator>>,
semaphore: &Option<std::sync::Arc<tokio::sync::Semaphore>>,
) -> Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> {
if should_skip_backend_for_url(url, &config.skip_extensions) {
log::debug!(
"[parallel_backends] skipping backends for asset URL: {}",
url
);
return Vec::new();
}
let byte_limit = config.max_backend_bytes_in_flight;
if byte_limit > 0 && BackendBytesGuard::in_flight() >= byte_limit {
log::debug!(
"[parallel_backends] skipping backends — in-flight bytes ({}) >= limit ({})",
BackendBytesGuard::in_flight(),
byte_limit,
);
return Vec::new();
}
let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
if mem_state >= 2 {
log::debug!("[parallel_backends] skipping all backends — process memory critical");
return Vec::new();
}
let mem_pressure = mem_state >= 1;
let outer_timeout = if config.backend_timeout_ms > 0 {
Some(Duration::from_millis(config.backend_timeout_ms))
} else {
None
};
let backend_timeout_ms_log = config.backend_timeout_ms;
#[allow(unused_mut)]
let mut futs: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> = Vec::new();
for (i, backend) in config.backends.iter().enumerate() {
let backend_index = i + 1;
if mem_pressure && !futs.is_empty() {
break;
}
if tracker.is_disabled(backend_index) {
continue;
}
#[allow(unused_variables)]
let resolved_endpoint = if let Some(ref ep) = backend.endpoint {
ep.clone()
} else if backend.binary_path.is_some() {
log::debug!(
"{:?} local mode not yet implemented, skipping",
backend.engine
);
continue;
} else {
log::debug!(
"{:?} backend has no endpoint or binary_path, skipping",
backend.engine
);
continue;
};
let proto = resolve_protocol(backend);
let _source_name = backend_source_name(backend);
#[allow(unused_variables)]
let resolved_proxy: Option<String> = if backend.proxy.is_some() {
backend.proxy.clone()
} else if let Some(ref rotator) = proxy_rotator {
match proto {
BackendProtocol::Cdp => rotator.next_cdp().map(|s| s.to_string()),
BackendProtocol::WebDriver => rotator.next_webdriver().map(|s| s.to_string()),
}
} else {
None
};
let jitter_us = {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
url.hash(&mut hasher);
backend_index.hash(&mut hasher);
std::time::SystemTime::now().hash(&mut hasher);
hasher.finish() % 1000 };
let connect_timeout = Duration::from_millis(config.connect_timeout_ms);
let sem = semaphore.clone();
match proto {
#[cfg(feature = "chrome")]
BackendProtocol::Cdp => {
let url = url.to_string();
let cfg = crawl_config.clone(); let proxy = resolved_proxy.clone();
let source = backend_source_name(backend).to_string();
futs.push(Box::pin(async move {
let work = async {
let _permit = if let Some(ref s) = sem {
match tokio::time::timeout(
Duration::from_secs(10),
s.acquire(),
)
.await
{
Ok(Ok(p)) => Some(p),
_ => {
log::warn!(
"[parallel_backends] {} backend {} semaphore timeout for {}",
source, backend_index, url
);
return BackendResult {
backend_index,
response: None,
};
}
}
} else {
None
};
tokio::time::sleep(Duration::from_micros(jitter_us)).await;
let response = fetch_cdp(
&url,
&resolved_endpoint,
&cfg,
backend_index,
connect_timeout,
proxy,
&source,
)
.await;
BackendResult {
backend_index,
response,
}
};
match outer_timeout {
Some(deadline) => match tokio::time::timeout(deadline, work).await {
Ok(r) => r,
Err(_) => {
log::warn!(
"[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
source, backend_index, backend_timeout_ms_log, url
);
BackendResult {
backend_index,
response: None,
}
}
},
None => work.await,
}
}));
}
#[cfg(feature = "webdriver")]
BackendProtocol::WebDriver => {
let url = url.to_string();
let cfg = crawl_config.clone(); let proxy = resolved_proxy.clone();
let source = backend_source_name(backend).to_string();
futs.push(Box::pin(async move {
let work = async {
let _permit = if let Some(ref s) = sem {
match tokio::time::timeout(
Duration::from_secs(10),
s.acquire(),
)
.await
{
Ok(Ok(p)) => Some(p),
_ => {
log::warn!(
"[parallel_backends] {} backend {} semaphore timeout for {}",
source, backend_index, url
);
return BackendResult {
backend_index,
response: None,
};
}
}
} else {
None
};
tokio::time::sleep(Duration::from_micros(jitter_us)).await;
let response = fetch_webdriver(
&url,
&resolved_endpoint,
&cfg,
backend_index,
connect_timeout,
proxy,
&source,
)
.await;
BackendResult {
backend_index,
response,
}
};
match outer_timeout {
Some(deadline) => match tokio::time::timeout(deadline, work).await {
Ok(r) => r,
Err(_) => {
log::warn!(
"[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
source, backend_index, backend_timeout_ms_log, url
);
BackendResult {
backend_index,
response: None,
}
}
},
None => work.await,
}
}));
}
#[allow(unreachable_patterns)]
_ => {}
}
}
futs
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn make_html(body_content: &str) -> Vec<u8> {
format!(
"<html><head><title>T</title></head><body>{}</body></html>",
body_content
)
.into_bytes()
}
#[test]
fn test_quality_score_perfect_response() {
let body = make_html(&"x".repeat(5000));
let score = html_quality_score(Some(&body), StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 100);
}
#[test]
fn test_quality_score_empty_body() {
let score = html_quality_score(Some(&[]), StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 50);
}
#[test]
fn test_quality_score_none_content() {
let score = html_quality_score(None, StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 50);
}
#[test]
fn test_quality_score_empty_html_shell() {
let body = b"<html><head></head><body></body></html>";
let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 70);
}
#[test]
fn test_quality_score_antibot_cloudflare() {
let body = make_html("blocked");
let score =
html_quality_score(Some(&body), StatusCode::FORBIDDEN, &AntiBotTech::Cloudflare);
assert_eq!(score, 30);
}
#[test]
fn test_quality_score_server_error() {
let body = make_html("error");
let score = html_quality_score(
Some(&body),
StatusCode::INTERNAL_SERVER_ERROR,
&AntiBotTech::None,
);
assert_eq!(score, 50);
}
#[test]
fn test_quality_score_redirect() {
let score = html_quality_score(None, StatusCode::MOVED_PERMANENTLY, &AntiBotTech::None);
assert_eq!(score, 25);
}
#[test]
fn test_quality_score_small_body_with_body_tag() {
let body = b"<html><body>hi</body></html>";
let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 80);
}
#[test]
fn test_quality_score_large_body_no_body_tag() {
let body = "x".repeat(5000);
let score = html_quality_score(Some(body.as_bytes()), StatusCode::OK, &AntiBotTech::None);
assert_eq!(score, 85);
}
#[test]
fn test_tracker_new_defaults() {
let t = BackendTracker::new(3, 10);
assert_eq!(t.len(), 3);
assert!(!t.is_empty());
for i in 0..3 {
assert_eq!(t.wins(i), 0);
assert_eq!(t.races(i), 0);
assert_eq!(t.ema_ms(i), 0);
assert_eq!(t.consecutive_errors(i), 0);
assert!(!t.is_disabled(i));
}
assert!(t.is_disabled(99));
assert_eq!(t.wins(99), 0);
}
#[test]
fn test_tracker_record_win() {
let t = BackendTracker::new(2, 10);
t.record_win(0);
t.record_win(0);
t.record_win(1);
assert_eq!(t.wins(0), 2);
assert_eq!(t.wins(1), 1);
}
#[test]
fn test_tracker_ema_duration() {
let t = BackendTracker::new(1, 10);
t.record_race(0);
t.record_duration(0, Duration::from_millis(100));
assert_eq!(t.ema_ms(0), 100);
t.record_race(0);
t.record_duration(0, Duration::from_millis(200));
assert_eq!(t.ema_ms(0), 120);
t.record_race(0);
t.record_duration(0, Duration::from_millis(100));
assert_eq!(t.ema_ms(0), 116);
}
#[test]
fn test_tracker_probe_first_error_disables() {
let t = BackendTracker::new(1, 10);
assert!(!t.is_disabled(0));
t.record_race(0);
t.record_error(0); assert!(t.is_disabled(0));
}
#[test]
fn test_tracker_consecutive_errors_disables() {
let t = BackendTracker::new(1, 3);
t.record_race(0);
t.record_win(0);
t.record_success(0);
assert!(!t.is_disabled(0));
t.record_race(0);
t.record_error(0);
t.record_race(0);
t.record_error(0);
assert!(!t.is_disabled(0));
t.record_race(0);
t.record_error(0); assert!(t.is_disabled(0));
}
#[test]
fn test_tracker_success_resets_errors() {
let t = BackendTracker::new(1, 5);
t.record_race(0);
t.record_win(0);
t.record_success(0);
t.record_race(0);
t.record_error(0);
t.record_race(0);
t.record_error(0);
assert_eq!(t.consecutive_errors(0), 2);
t.record_success(0);
assert_eq!(t.consecutive_errors(0), 0);
}
#[test]
fn test_tracker_clone_independence() {
let t = BackendTracker::new(1, 10);
t.record_win(0);
let t2 = t.clone();
t.record_win(0);
assert_eq!(t.wins(0), 2);
assert_eq!(t2.wins(0), 1);
}
#[test]
fn test_tracker_win_rate() {
let t = BackendTracker::new(1, 10);
assert_eq!(t.win_rate_pct(0), 0); t.record_race(0);
t.record_race(0);
t.record_race(0);
t.record_race(0);
t.record_win(0);
t.record_win(0);
t.record_win(0);
assert_eq!(t.win_rate_pct(0), 75);
}
fn mock_primary(
score: u16,
delay_ms: u64,
) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
Box::pin(async move {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
Some(BackendResponse {
page: crate::page::Page::default(),
quality_score: score,
backend_index: 0,
duration: Duration::from_millis(delay_ms),
_bytes_guard: None,
})
})
}
fn mock_alt(
idx: usize,
score: u16,
delay_ms: u64,
) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
Box::pin(async move {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
BackendResult {
backend_index: idx,
response: Some(BackendResponse {
page: crate::page::Page::default(),
quality_score: score,
backend_index: idx,
duration: Duration::from_millis(delay_ms),
_bytes_guard: None,
}),
}
})
}
fn mock_alt_none(
idx: usize,
delay_ms: u64,
) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
Box::pin(async move {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
BackendResult {
backend_index: idx,
response: None,
}
})
}
fn mock_primary_none(
delay_ms: u64,
) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
Box::pin(async move {
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
None
})
}
fn test_config(grace_ms: u64, threshold: u16) -> ParallelBackendsConfig {
ParallelBackendsConfig {
backends: vec![],
grace_period_ms: grace_ms,
enabled: true,
fast_accept_threshold: threshold,
max_consecutive_errors: 10,
connect_timeout_ms: 5000,
skip_binary_content_types: true,
max_concurrent_sessions: 0,
skip_extensions: Vec::new(),
max_backend_bytes_in_flight: 0, backend_timeout_ms: 0, }
}
#[tokio::test]
async fn test_race_primary_fast_accept() {
let tracker = BackendTracker::new(3, 10);
let cfg = test_config(500, 80);
let primary = mock_primary(95, 10);
let alts = vec![mock_alt(1, 100, 1000), mock_alt(2, 100, 1000)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 0); assert_eq!(r.quality_score, 95);
assert_eq!(tracker.wins(0), 1);
}
#[tokio::test]
async fn test_race_alternative_wins_during_grace() {
let tracker = BackendTracker::new(3, 10);
let cfg = test_config(500, 80); let primary = mock_primary(50, 10); let alts = vec![
mock_alt(1, 90, 100), mock_alt(2, 30, 1000),
];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 1); assert_eq!(r.quality_score, 90);
}
#[tokio::test]
async fn test_race_primary_wins_after_grace() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(50, 80); let primary = mock_primary(60, 10); let alts = vec![
mock_alt(1, 40, 5000), ];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 0); assert_eq!(r.quality_score, 60);
}
#[tokio::test]
async fn test_race_all_none() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(50, 80);
let primary = mock_primary_none(10);
let alts = vec![mock_alt_none(1, 10)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
assert!(result.is_none());
assert_eq!(tracker.consecutive_errors(1), 1);
}
#[tokio::test]
async fn test_race_primary_none_alt_some() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(200, 80);
let primary = mock_primary_none(10);
let alts = vec![mock_alt(1, 85, 50)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 1);
}
#[tokio::test]
async fn test_race_disabled_noop() {
let tracker = BackendTracker::new(2, 10);
let mut cfg = test_config(50, 80);
cfg.enabled = false;
let primary = mock_primary(70, 10);
let alts = vec![mock_alt(1, 100, 10)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 0); }
#[tokio::test]
async fn test_race_single_alternative() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(200, 80);
let primary = mock_primary(50, 100);
let alts = vec![mock_alt(1, 90, 20)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 1);
assert_eq!(r.quality_score, 90);
}
#[tokio::test]
async fn test_race_three_alternatives_best_during_grace() {
let tracker = BackendTracker::new(4, 10);
let cfg = test_config(300, 95);
let primary = mock_primary(40, 10); let alts = vec![
mock_alt(1, 60, 50), mock_alt(2, 85, 100), mock_alt(3, 70, 200), ];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 2);
assert_eq!(r.quality_score, 85);
}
#[tokio::test]
async fn test_race_grace_period_zero() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(0, 101);
let primary = mock_primary(50, 10); let alts = vec![mock_alt(1, 99, 50)];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 0);
}
#[tokio::test]
async fn test_race_cancellation_verified() {
let finished = Arc::new(AtomicBool::new(false));
let f = finished.clone();
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(50, 80);
let primary = mock_primary(95, 10);
let slow_alt: Pin<Box<dyn Future<Output = BackendResult> + Send>> = Box::pin(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
f.store(true, Ordering::SeqCst);
BackendResult {
backend_index: 1,
response: None,
}
});
let _result = race_backends(primary, vec![slow_alt], &cfg, &tracker).await;
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!finished.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_race_failed_alt_records_error() {
let tracker = BackendTracker::new(3, 5);
let cfg = test_config(200, 80);
let primary = mock_primary(50, 10);
let alts = vec![
mock_alt_none(1, 20), mock_alt_none(2, 30), ];
let result = race_backends(primary, alts, &cfg, &tracker).await;
let r = result.unwrap();
assert_eq!(r.backend_index, 0); assert_eq!(tracker.consecutive_errors(1), 1);
assert_eq!(tracker.consecutive_errors(2), 1);
}
#[tokio::test]
async fn test_race_auto_disable_after_errors() {
let tracker = BackendTracker::new(2, 2); let cfg = test_config(100, 80);
for _ in 0..2 {
let primary = mock_primary(50, 5);
let alts = vec![mock_alt_none(1, 10)];
let _ = race_backends(primary, alts, &cfg, &tracker).await;
}
assert!(tracker.is_disabled(1));
assert_eq!(tracker.consecutive_errors(1), 2);
}
#[test]
fn test_proxy_rotator_round_robin_cdp() {
let proxies = vec![
RequestProxy {
addr: "http://p1".into(),
ignore: ProxyIgnore::No,
},
RequestProxy {
addr: "http://p2".into(),
ignore: ProxyIgnore::No,
},
];
let r = ProxyRotator::new(&Some(proxies));
assert_eq!(r.cdp_count(), 2);
assert_eq!(r.next_cdp(), Some("http://p1"));
assert_eq!(r.next_cdp(), Some("http://p2"));
assert_eq!(r.next_cdp(), Some("http://p1")); }
#[test]
fn test_proxy_rotator_round_robin_wd() {
let proxies = vec![
RequestProxy {
addr: "http://p1".into(),
ignore: ProxyIgnore::No,
},
RequestProxy {
addr: "http://p2".into(),
ignore: ProxyIgnore::No,
},
];
let r = ProxyRotator::new(&Some(proxies));
assert_eq!(r.webdriver_count(), 2);
assert_eq!(r.next_webdriver(), Some("http://p1"));
assert_eq!(r.next_webdriver(), Some("http://p2"));
}
#[test]
fn test_proxy_rotator_filters_ignore() {
let proxies = vec![
RequestProxy {
addr: "http://cdp-only".into(),
ignore: ProxyIgnore::Http, },
RequestProxy {
addr: "http://wd-only".into(),
ignore: ProxyIgnore::Chrome, },
RequestProxy {
addr: "http://both".into(),
ignore: ProxyIgnore::No,
},
];
let r = ProxyRotator::new(&Some(proxies));
assert_eq!(r.cdp_count(), 2);
assert_eq!(r.webdriver_count(), 2);
}
#[test]
fn test_proxy_rotator_empty_proxies() {
let r = ProxyRotator::new(&None);
assert_eq!(r.cdp_count(), 0);
assert_eq!(r.webdriver_count(), 0);
assert_eq!(r.next_cdp(), None);
assert_eq!(r.next_webdriver(), None);
}
#[test]
fn test_proxy_rotator_concurrent_access() {
let proxies = vec![
RequestProxy {
addr: "http://p1".into(),
ignore: ProxyIgnore::No,
},
RequestProxy {
addr: "http://p2".into(),
ignore: ProxyIgnore::No,
},
RequestProxy {
addr: "http://p3".into(),
ignore: ProxyIgnore::No,
},
];
let r = Arc::new(ProxyRotator::new(&Some(proxies)));
let handles: Vec<_> = (0..10)
.map(|_| {
let r = r.clone();
std::thread::spawn(move || {
for _ in 0..100 {
let addr = r.next_cdp().unwrap();
assert!(addr == "http://p1" || addr == "http://p2" || addr == "http://p3");
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn test_is_binary_content_type_images() {
assert!(is_binary_content_type("image/png"));
assert!(is_binary_content_type("image/jpeg"));
assert!(is_binary_content_type("image/webp"));
assert!(is_binary_content_type("image/svg+xml"));
assert!(is_binary_content_type("image/gif"));
}
#[test]
fn test_is_binary_content_type_with_charset() {
assert!(is_binary_content_type("image/png; charset=utf-8"));
assert!(is_binary_content_type(
"application/pdf; boundary=something"
));
assert!(is_binary_content_type("font/woff2; charset=binary"));
}
#[test]
fn test_is_binary_content_type_fonts() {
assert!(is_binary_content_type("font/woff"));
assert!(is_binary_content_type("font/woff2"));
assert!(is_binary_content_type("font/ttf"));
assert!(is_binary_content_type("application/vnd.ms-fontobject"));
assert!(is_binary_content_type("application/x-font-ttf"));
assert!(is_binary_content_type("application/x-font-woff"));
}
#[test]
fn test_is_binary_content_type_archives() {
assert!(is_binary_content_type("application/pdf"));
assert!(is_binary_content_type("application/zip"));
assert!(is_binary_content_type("application/gzip"));
assert!(is_binary_content_type("application/x-gzip"));
assert!(is_binary_content_type("application/octet-stream"));
assert!(is_binary_content_type("application/wasm"));
assert!(is_binary_content_type("application/x-tar"));
assert!(is_binary_content_type("application/x-bzip2"));
assert!(is_binary_content_type("application/x-7z-compressed"));
assert!(is_binary_content_type("application/x-rar-compressed"));
}
#[test]
fn test_is_binary_content_type_audio_video() {
assert!(is_binary_content_type("audio/mpeg"));
assert!(is_binary_content_type("audio/ogg"));
assert!(is_binary_content_type("video/mp4"));
assert!(is_binary_content_type("video/webm"));
}
#[test]
fn test_is_binary_content_type_html_not_binary() {
assert!(!is_binary_content_type("text/html"));
assert!(!is_binary_content_type("text/html; charset=utf-8"));
assert!(!is_binary_content_type("text/plain"));
assert!(!is_binary_content_type("application/json"));
assert!(!is_binary_content_type("application/javascript"));
assert!(!is_binary_content_type("text/css"));
assert!(!is_binary_content_type("application/xml"));
}
#[test]
fn test_should_skip_backend_for_asset_urls() {
assert!(should_skip_backend_for_url(
"https://example.com/photo.jpg",
&[]
));
assert!(should_skip_backend_for_url(
"https://example.com/photo.png",
&[]
));
assert!(should_skip_backend_for_url(
"https://example.com/font.woff2",
&[]
));
assert!(should_skip_backend_for_url(
"https://example.com/doc.pdf",
&[]
));
assert!(should_skip_backend_for_url(
"https://example.com/video.mp4",
&[]
));
}
#[test]
fn test_should_not_skip_backend_for_html_urls() {
assert!(!should_skip_backend_for_url(
"https://example.com/page.html",
&[]
));
assert!(!should_skip_backend_for_url(
"https://example.com/about",
&[]
));
assert!(!should_skip_backend_for_url(
"https://example.com/api/data",
&[]
));
assert!(!should_skip_backend_for_url("https://example.com/", &[]));
}
#[test]
fn test_should_skip_backend_custom_extensions() {
let extras = vec![
crate::compact_str::CompactString::from("xml"),
crate::compact_str::CompactString::from("rss"),
];
assert!(should_skip_backend_for_url(
"https://example.com/feed.xml",
&extras
));
assert!(should_skip_backend_for_url(
"https://example.com/feed.rss",
&extras
));
assert!(should_skip_backend_for_url(
"https://example.com/feed.RSS",
&extras
));
assert!(!should_skip_backend_for_url(
"https://example.com/page.html",
&extras
));
}
#[test]
fn test_bytes_guard_all() {
let base = BackendBytesGuard::in_flight();
{
let g = BackendBytesGuard::acquire_unchecked(1000);
assert_eq!(BackendBytesGuard::in_flight(), base + 1000);
drop(g);
}
assert_eq!(BackendBytesGuard::in_flight(), base);
let g = BackendBytesGuard::try_acquire(500, base + 1000);
assert!(g.is_some());
assert_eq!(BackendBytesGuard::in_flight(), base + 500);
drop(g);
assert_eq!(BackendBytesGuard::in_flight(), base);
let hold = BackendBytesGuard::acquire_unchecked(800);
assert_eq!(BackendBytesGuard::in_flight(), base + 800);
let g = BackendBytesGuard::try_acquire(300, base + 1000);
assert!(g.is_none(), "should reject when would exceed limit");
assert_eq!(BackendBytesGuard::in_flight(), base + 800);
drop(hold);
assert_eq!(BackendBytesGuard::in_flight(), base);
let g = BackendBytesGuard::try_acquire(1_000_000, 0);
assert!(g.is_some(), "limit=0 means unlimited");
assert_eq!(BackendBytesGuard::in_flight(), base + 1_000_000);
drop(g);
assert_eq!(BackendBytesGuard::in_flight(), base);
let g1 = BackendBytesGuard::acquire_unchecked(100);
let g2 = BackendBytesGuard::acquire_unchecked(200);
let g3 = BackendBytesGuard::acquire_unchecked(300);
assert_eq!(BackendBytesGuard::in_flight(), base + 600);
drop(g2);
assert_eq!(BackendBytesGuard::in_flight(), base + 400);
drop(g1);
drop(g3);
assert_eq!(BackendBytesGuard::in_flight(), base);
let resp = BackendResponse {
page: crate::page::Page::default(),
quality_score: 90,
backend_index: 1,
duration: Duration::from_millis(50),
_bytes_guard: Some(BackendBytesGuard::acquire_unchecked(5000)),
};
assert_eq!(BackendBytesGuard::in_flight(), base + 5000);
drop(resp);
assert_eq!(BackendBytesGuard::in_flight(), base);
{
let resp = BackendResponse {
page: crate::page::Page::default(),
quality_score: 90,
backend_index: 0,
duration: Duration::from_millis(10),
_bytes_guard: Some(BackendBytesGuard::acquire_unchecked(2000)),
};
assert_eq!(BackendBytesGuard::in_flight(), base + 2000);
let _page = resp.page;
}
assert_eq!(BackendBytesGuard::in_flight(), base);
let _hold = BackendBytesGuard::acquire_unchecked(1_000_000);
let cfg = ParallelBackendsConfig {
backends: vec![crate::configuration::BackendEndpoint {
engine: crate::configuration::BackendEngine::Cdp,
endpoint: Some("ws://localhost:9222".to_string()),
binary_path: None,
protocol: None,
proxy: None,
}],
max_backend_bytes_in_flight: base + 500, ..Default::default()
};
let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
let tracker = BackendTracker::new(2, 10);
let futs = build_backend_futures(
"https://example.com",
&cfg,
&crawl_cfg,
&tracker,
&None,
&None,
);
assert!(
futs.is_empty(),
"should skip backends when byte limit exceeded"
);
drop(_hold);
assert_eq!(BackendBytesGuard::in_flight(), base);
let handles: Vec<_> = (0..8)
.map(|_| {
std::thread::spawn(|| {
for _ in 0..1000 {
let g = BackendBytesGuard::acquire_unchecked(100);
std::thread::yield_now();
drop(g);
}
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(
BackendBytesGuard::in_flight(),
base,
"counter must return to baseline after concurrent thread usage"
);
}
#[tokio::test]
async fn test_race_grace_zero_under_pressure_no_deadlock() {
let tracker = BackendTracker::new(3, 10);
let cfg = ParallelBackendsConfig {
grace_period_ms: 0,
..Default::default()
};
let primary = mock_primary(50, 5);
let alt = mock_alt(1, 95, 1);
let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
assert!(result.is_some());
}
#[tokio::test]
async fn test_race_backends_drops_futs_before_return() {
let tracker = BackendTracker::new(2, 10);
let cfg = test_config(200, 80);
let primary = mock_primary(90, 1);
let alt = mock_alt(1, 50, 500);
let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
assert!(result.is_some());
let winner = result.unwrap();
assert_eq!(winner.backend_index, 0);
assert_eq!(winner.quality_score, 90);
}
#[tokio::test]
async fn test_race_backends_winner_replaces_losers() {
let tracker = BackendTracker::new(4, 10);
let cfg = test_config(500, 95);
let primary = mock_primary(40, 1); let alt1 = mock_alt(1, 60, 5);
let alt2 = mock_alt(2, 80, 10);
let alt3 = mock_alt(3, 70, 15);
let result = race_backends(primary, vec![alt1, alt2, alt3], &cfg, &tracker).await;
assert!(result.is_some());
let winner = result.unwrap();
assert_eq!(winner.backend_index, 2);
assert_eq!(winner.quality_score, 80);
}
#[test]
fn test_build_backend_futures_allows_when_byte_limit_not_exceeded() {
let cfg = ParallelBackendsConfig {
backends: vec![crate::configuration::BackendEndpoint {
engine: crate::configuration::BackendEngine::Cdp,
endpoint: Some("ws://localhost:9222".to_string()),
binary_path: None,
protocol: None,
proxy: None,
}],
max_backend_bytes_in_flight: usize::MAX,
..Default::default()
};
let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
let tracker = BackendTracker::new(2, 10);
let _futs = build_backend_futures(
"https://example.com",
&cfg,
&crawl_cfg,
&tracker,
&None,
&None,
);
}
}