Skip to main content

spider/utils/
parallel_backends.rs

1/// Parallel crawl backends — race alternative browser engines alongside the
2/// primary crawl path. Lock-free, panic-free, zero overhead when disabled.
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
6use std::time::Duration;
7#[cfg(any(feature = "chrome", feature = "webdriver"))]
8use std::time::Instant;
9
10use crate::configuration::{
11    BackendEndpoint, BackendEngine, BackendProtocol, ParallelBackendsConfig, ProxyIgnore,
12    RequestProxy,
13};
14use crate::page::AntiBotTech;
15use reqwest::StatusCode;
16
17// ---------------------------------------------------------------------------
18// Global In-Flight Byte Tracking
19// ---------------------------------------------------------------------------
20
21/// Total HTML bytes currently held by in-flight backend responses across all
22/// concurrent races. This is a proactive memory safeguard that works without
23/// the `balance` feature — it caps the aggregate memory footprint of racing
24/// backend pages regardless of system memory monitoring.
25static BACKEND_BYTES_IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
26
27/// RAII guard that decrements [`BACKEND_BYTES_IN_FLIGHT`] on drop.
28///
29/// Attached to every [`BackendResponse`]. When the response is consumed
30/// (winner extracted by caller) or discarded (losers dropped after race),
31/// the tracked bytes are released automatically.
32pub struct BackendBytesGuard(usize);
33
34impl BackendBytesGuard {
35    /// Register `n` bytes as in-flight and return a guard that will release
36    /// them on drop. Returns `None` if adding `n` bytes would exceed `limit`,
37    /// indicating the caller should skip this backend fetch.
38    pub fn try_acquire(n: usize, limit: usize) -> Option<Self> {
39        if limit == 0 {
40            // Unlimited — always succeed, but still track for observability.
41            BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
42            return Some(Self(n));
43        }
44        // CAS loop: only add if we stay under the limit.
45        let mut current = BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed);
46        loop {
47            if current.saturating_add(n) > limit {
48                return None;
49            }
50            match BACKEND_BYTES_IN_FLIGHT.compare_exchange_weak(
51                current,
52                current + n,
53                Ordering::Relaxed,
54                Ordering::Relaxed,
55            ) {
56                Ok(_) => return Some(Self(n)),
57                Err(actual) => current = actual,
58            }
59        }
60    }
61
62    /// Register bytes unconditionally (no limit check). Used when the response
63    /// is already committed (e.g. primary path).
64    pub fn acquire_unchecked(n: usize) -> Self {
65        BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
66        Self(n)
67    }
68
69    /// Current total in-flight bytes (for testing / diagnostics).
70    pub fn in_flight() -> usize {
71        BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed)
72    }
73}
74
75impl Drop for BackendBytesGuard {
76    fn drop(&mut self) {
77        BACKEND_BYTES_IN_FLIGHT.fetch_sub(self.0, Ordering::Relaxed);
78    }
79}
80
81// ---------------------------------------------------------------------------
82// Asset / Binary Content-Type Detection
83// ---------------------------------------------------------------------------
84
85/// Returns `true` for `Content-Type` values where HTML quality racing is
86/// pointless — binary resources (images, fonts, video, archives, etc.)
87/// will be identical across all backends.
88pub fn is_binary_content_type(ct: &str) -> bool {
89    let ct = ct.split(';').next().unwrap_or(ct).trim();
90    ct.starts_with("image/")
91        || ct.starts_with("audio/")
92        || ct.starts_with("video/")
93        || ct.starts_with("font/")
94        || ct == "application/pdf"
95        || ct == "application/zip"
96        || ct == "application/gzip"
97        || ct == "application/x-gzip"
98        || ct == "application/octet-stream"
99        || ct == "application/wasm"
100        || ct == "application/x-tar"
101        || ct == "application/x-bzip2"
102        || ct == "application/x-7z-compressed"
103        || ct == "application/x-rar-compressed"
104        || ct == "application/vnd.ms-fontobject"
105        || ct == "application/x-font-ttf"
106        || ct == "application/x-font-woff"
107}
108
109/// Returns `true` when the URL extension indicates a binary asset or matches
110/// a user-supplied skip extension. Backends should not be spawned for these.
111pub fn should_skip_backend_for_url(
112    url: &str,
113    extra_extensions: &[crate::compact_str::CompactString],
114) -> bool {
115    // Check the built-in asset list first.
116    if crate::page::is_asset_url(url) {
117        return true;
118    }
119    // Check user-supplied extra extensions.
120    if !extra_extensions.is_empty() {
121        if let Some(pos) = url.rfind('.') {
122            let ext = &url[pos + 1..];
123            if ext.len() >= 2 {
124                let ext_lower = ext.to_ascii_lowercase();
125                for skip in extra_extensions {
126                    if skip.eq_ignore_ascii_case(&ext_lower) {
127                        return true;
128                    }
129                }
130            }
131        }
132    }
133    false
134}
135
136// ---------------------------------------------------------------------------
137// Custom Validator
138// ---------------------------------------------------------------------------
139
140/// The result of a custom quality validation.
141#[derive(Default)]
142pub struct ValidationResult {
143    /// Override the built-in score entirely. When `Some`, the built-in
144    /// scorer is bypassed and this value is used directly (0–100).
145    pub score_override: Option<u16>,
146    /// Additive adjustment applied on top of the built-in score.
147    /// Positive values boost, negative values penalise. Applied after
148    /// the built-in scorer, before clamping to 0–100.
149    pub score_adjust: i16,
150    /// When `true`, reject this response outright (treat as score 0).
151    pub reject: bool,
152}
153
154/// User-supplied quality validator. Called after the built-in scorer for
155/// every backend response. Receives the raw HTML bytes, status code, URL,
156/// and the backend source name ("primary", "cdp", "servo", "custom").
157///
158/// Must be `Send + Sync` so it can be shared across async tasks.
159pub type QualityValidator = std::sync::Arc<
160    dyn Fn(
161            Option<&[u8]>, // html content
162            StatusCode,    // status code
163            &str,          // url
164            &str,          // backend source name
165        ) -> ValidationResult
166        + Send
167        + Sync,
168>;
169
170// ---------------------------------------------------------------------------
171// HTML Quality Scorer
172// ---------------------------------------------------------------------------
173
174/// Score an HTML response with both the built-in scorer and an optional
175/// custom validator. Returns the final clamped score (0–100).
176pub fn html_quality_score_validated(
177    content: Option<&[u8]>,
178    status_code: StatusCode,
179    anti_bot: &AntiBotTech,
180    url: &str,
181    source: &str,
182    validator: Option<&QualityValidator>,
183) -> u16 {
184    let base = html_quality_score(content, status_code, anti_bot);
185
186    if let Some(v) = validator {
187        let result = v(content, status_code, url, source);
188        if result.reject {
189            return 0;
190        }
191        if let Some(ov) = result.score_override {
192            return ov.min(100);
193        }
194        let adjusted = (base as i16).saturating_add(result.score_adjust);
195        return (adjusted.max(0) as u16).min(100);
196    }
197
198    base
199}
200
201/// Score an HTML response for quality (0–100). Higher is better.
202///
203/// Used by [`race_backends`] to pick the best response when multiple backends
204/// complete within the grace period.
205pub fn html_quality_score(
206    content: Option<&[u8]>,
207    status_code: StatusCode,
208    anti_bot: &AntiBotTech,
209) -> u16 {
210    let mut score: u16 = 0;
211
212    // Status code contribution (max 30).
213    if status_code == StatusCode::OK {
214        score += 30;
215    } else if status_code.is_success() {
216        score += 20;
217    } else if status_code.is_redirection() {
218        score += 5;
219    }
220    // 4xx / 5xx contribute 0.
221
222    if let Some(body) = content {
223        let len = body.len();
224
225        // Content length contribution (max 25).
226        if len > 0 {
227            score += 5;
228        }
229        if len > 512 {
230            score += 10;
231        }
232        if len > 4096 {
233            score += 10;
234        }
235
236        // Has a <body tag (max 15). Fast memchr scan.
237        if memchr::memmem::find(body, b"<body").is_some()
238            || memchr::memmem::find(body, b"<BODY").is_some()
239        {
240            score += 15;
241        }
242
243        // Not an empty HTML shell (max 10).
244        if !crate::utils::is_cacheable_body_empty(body) {
245            score += 10;
246        }
247    }
248
249    // Anti-bot contribution (max 20).
250    if *anti_bot == AntiBotTech::None {
251        score += 20;
252    }
253
254    score.min(100)
255}
256
257// ---------------------------------------------------------------------------
258// Backend Tracker — lock-free per-backend statistics
259// ---------------------------------------------------------------------------
260
261/// Per-backend atomic statistics.
262struct BackendStats {
263    wins: AtomicU64,
264    races: AtomicU64,
265    ema_ms: AtomicU64,
266    consecutive_errors: AtomicU64,
267    disabled: AtomicBool,
268}
269
270impl BackendStats {
271    fn new() -> Self {
272        Self {
273            wins: AtomicU64::new(0),
274            races: AtomicU64::new(0),
275            ema_ms: AtomicU64::new(0),
276            consecutive_errors: AtomicU64::new(0),
277            disabled: AtomicBool::new(false),
278        }
279    }
280}
281
282impl Clone for BackendStats {
283    fn clone(&self) -> Self {
284        Self {
285            wins: AtomicU64::new(self.wins.load(Ordering::Relaxed)),
286            races: AtomicU64::new(self.races.load(Ordering::Relaxed)),
287            ema_ms: AtomicU64::new(self.ema_ms.load(Ordering::Relaxed)),
288            consecutive_errors: AtomicU64::new(self.consecutive_errors.load(Ordering::Relaxed)),
289            disabled: AtomicBool::new(self.disabled.load(Ordering::Relaxed)),
290        }
291    }
292}
293
294/// Tracks per-backend performance across a crawl session.
295///
296/// Fully lock-free — uses atomics only. Index 0 is the primary backend,
297/// 1..N are the alternatives in the order they appear in the config.
298pub struct BackendTracker {
299    stats: Vec<BackendStats>,
300    max_consecutive_errors: u64,
301}
302
303impl BackendTracker {
304    /// Create a new tracker for `count` backends (primary + alternatives).
305    pub fn new(count: usize, max_consecutive_errors: u16) -> Self {
306        let mut stats = Vec::with_capacity(count);
307        for _ in 0..count {
308            stats.push(BackendStats::new());
309        }
310        Self {
311            stats,
312            max_consecutive_errors: max_consecutive_errors as u64,
313        }
314    }
315
316    /// Record a win for backend at `idx`.
317    pub fn record_win(&self, idx: usize) {
318        if let Some(s) = self.stats.get(idx) {
319            s.wins.fetch_add(1, Ordering::Relaxed);
320        }
321    }
322
323    /// Record that backend `idx` participated in a race.
324    pub fn record_race(&self, idx: usize) {
325        if let Some(s) = self.stats.get(idx) {
326            s.races.fetch_add(1, Ordering::Relaxed);
327        }
328    }
329
330    /// Record a fetch duration for backend `idx` (EMA with alpha ~0.2).
331    pub fn record_duration(&self, idx: usize, dur: Duration) {
332        if let Some(s) = self.stats.get(idx) {
333            let ms = dur.as_millis() as u64;
334            let count = s.races.load(Ordering::Relaxed);
335            if count <= 1 {
336                s.ema_ms.store(ms, Ordering::Relaxed);
337            } else {
338                // CAS loop ensures concurrent record_duration() calls don't lose updates.
339                let _ = s
340                    .ema_ms
341                    .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
342                        Some((old * 4 + ms) / 5)
343                    });
344            }
345        }
346    }
347
348    /// Record a retryable error for backend `idx`.
349    ///
350    /// The first request to each backend acts as a probe: if the backend
351    /// has never succeeded (zero prior successes / zero wins), the first
352    /// error disables it immediately so a down backend is never retried.
353    /// After at least one success, the normal `max_consecutive_errors`
354    /// threshold applies.
355    pub fn record_error(&self, idx: usize) {
356        if let Some(s) = self.stats.get(idx) {
357            let prev = s.consecutive_errors.fetch_add(1, Ordering::Relaxed);
358
359            // Probe behaviour: backend has never produced a successful
360            // response → treat the very first error as fatal.
361            let never_succeeded =
362                s.wins.load(Ordering::Relaxed) == 0 && s.races.load(Ordering::Relaxed) <= 1;
363
364            if never_succeeded || prev + 1 >= self.max_consecutive_errors {
365                s.disabled.store(true, Ordering::Relaxed);
366                if never_succeeded {
367                    log::info!(
368                        "[parallel_backends] backend {} failed on probe (first request) — auto-disabled",
369                        idx
370                    );
371                }
372            }
373        }
374    }
375
376    /// Record a successful fetch — resets the consecutive error counter.
377    pub fn record_success(&self, idx: usize) {
378        if let Some(s) = self.stats.get(idx) {
379            s.consecutive_errors.store(0, Ordering::Relaxed);
380        }
381    }
382
383    /// Check whether backend `idx` has been auto-disabled.
384    pub fn is_disabled(&self, idx: usize) -> bool {
385        self.stats
386            .get(idx)
387            .is_none_or(|s| s.disabled.load(Ordering::Relaxed))
388    }
389
390    /// Get the win count for backend `idx`.
391    pub fn wins(&self, idx: usize) -> u64 {
392        self.stats
393            .get(idx)
394            .map_or(0, |s| s.wins.load(Ordering::Relaxed))
395    }
396
397    /// Get the race count for backend `idx`.
398    pub fn races(&self, idx: usize) -> u64 {
399        self.stats
400            .get(idx)
401            .map_or(0, |s| s.races.load(Ordering::Relaxed))
402    }
403
404    /// Get the EMA response time in ms for backend `idx`.
405    pub fn ema_ms(&self, idx: usize) -> u64 {
406        self.stats
407            .get(idx)
408            .map_or(0, |s| s.ema_ms.load(Ordering::Relaxed))
409    }
410
411    /// Get the consecutive error count for backend `idx`.
412    pub fn consecutive_errors(&self, idx: usize) -> u64 {
413        self.stats
414            .get(idx)
415            .map_or(0, |s| s.consecutive_errors.load(Ordering::Relaxed))
416    }
417
418    /// Win rate percentage (0–100) for backend `idx`. Returns 0 if no races.
419    pub fn win_rate_pct(&self, idx: usize) -> u64 {
420        let r = self.races(idx);
421        if r == 0 {
422            return 0;
423        }
424        self.wins(idx) * 100 / r
425    }
426
427    /// Number of tracked backends.
428    pub fn len(&self) -> usize {
429        self.stats.len()
430    }
431
432    /// Returns true if no backends are tracked.
433    pub fn is_empty(&self) -> bool {
434        self.stats.is_empty()
435    }
436}
437
438impl Clone for BackendTracker {
439    fn clone(&self) -> Self {
440        Self {
441            stats: self.stats.clone(),
442            max_consecutive_errors: self.max_consecutive_errors,
443        }
444    }
445}
446
447// ---------------------------------------------------------------------------
448// Backend Response
449// ---------------------------------------------------------------------------
450
451/// The result of a backend page fetch, carrying quality metadata.
452pub struct BackendResponse {
453    /// The fetched page.
454    pub page: crate::page::Page,
455    /// Quality score (0–100) computed by [`html_quality_score`].
456    pub quality_score: u16,
457    /// Backend index: 0 = primary, 1..N = alternatives.
458    pub backend_index: usize,
459    /// Wall-clock duration of the fetch.
460    pub duration: Duration,
461    /// RAII guard that releases the tracked in-flight bytes on drop.
462    /// When a losing response is discarded or the winning response is
463    /// consumed by the caller, the bytes are freed automatically.
464    pub _bytes_guard: Option<BackendBytesGuard>,
465}
466
467/// Wrapper returned by backend futures — always carries the backend index
468/// so that failures can be tracked for auto-disable.
469pub struct BackendResult {
470    /// The backend index (0 = primary, 1..N = alternatives).
471    pub backend_index: usize,
472    /// `Some` on success, `None` on failure.
473    pub response: Option<BackendResponse>,
474}
475
476/// Return a human-readable backend source name for the given config entry.
477pub fn backend_source_name(endpoint: &BackendEndpoint) -> &'static str {
478    match endpoint.engine {
479        BackendEngine::Cdp => "cdp",
480        BackendEngine::Servo => "servo",
481        BackendEngine::Custom => "custom",
482    }
483}
484
485/// Resolve the protocol for a backend endpoint. Falls back to engine defaults.
486pub fn resolve_protocol(endpoint: &BackendEndpoint) -> BackendProtocol {
487    if let Some(ref p) = endpoint.protocol {
488        return p.clone();
489    }
490    match endpoint.engine {
491        BackendEngine::Cdp => BackendProtocol::Cdp,
492        BackendEngine::Servo => BackendProtocol::WebDriver,
493        BackendEngine::Custom => {
494            // Infer from URL scheme if possible.
495            if let Some(ref ep) = endpoint.endpoint {
496                if ep.starts_with("ws://") || ep.starts_with("wss://") {
497                    return BackendProtocol::Cdp;
498                }
499            }
500            BackendProtocol::WebDriver // default fallback
501        }
502    }
503}
504
505/// Set the `backend_source` field on a page (feature-gated).
506#[inline]
507pub fn tag_page_source(page: &mut crate::page::Page, source: &str) {
508    page.backend_source = Some(crate::compact_str::CompactString::from(source));
509}
510
511// ---------------------------------------------------------------------------
512// Race Orchestrator
513// ---------------------------------------------------------------------------
514
515/// Race the primary crawl against alternative backend futures.
516///
517/// 1. All futures start immediately.
518/// 2. When the first `Some` result arrives:
519///    - If `quality_score >= fast_accept_threshold`, return immediately.
520///    - Otherwise, start the grace period timer.
521/// 3. During the grace period, collect additional results.
522/// 4. After the grace period (or when all futures complete), pick the
523///    highest-scoring result.
524/// 5. Remaining futures are cancelled via drop.
525///
526/// Returns `None` only when every future returns `None`.
527pub async fn race_backends(
528    primary: Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>>,
529    alternatives: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>>,
530    config: &ParallelBackendsConfig,
531    tracker: &BackendTracker,
532) -> Option<BackendResponse> {
533    if !config.enabled || alternatives.is_empty() {
534        // No alternatives — just run primary.
535        let resp = primary.await;
536        if let Some(ref r) = resp {
537            tracker.record_race(r.backend_index);
538            tracker.record_win(r.backend_index);
539            tracker.record_duration(r.backend_index, r.duration);
540            tracker.record_success(r.backend_index);
541        }
542        return resp;
543    }
544
545    let total = 1 + alternatives.len();
546
547    // Randomise launch order: sometimes the primary goes first, sometimes
548    // a backend does. This prevents predictable timing patterns that could
549    // be fingerprinted. Uses a tighter jitter range (0–1ms) than backends
550    // so the primary is rarely meaningfully delayed.
551    let primary_jitter_us = {
552        use std::collections::hash_map::DefaultHasher;
553        use std::hash::{Hash, Hasher};
554        let mut h = DefaultHasher::new();
555        std::time::SystemTime::now().hash(&mut h);
556        0u16.hash(&mut h); // primary marker
557        h.finish() % 1000 // 0–999µs (~0–1ms)
558    };
559
560    let primary_wrapped: Pin<Box<dyn Future<Output = BackendResult> + Send>> =
561        Box::pin(async move {
562            if primary_jitter_us > 0 {
563                tokio::time::sleep(Duration::from_micros(primary_jitter_us)).await;
564            }
565            let response = primary.await;
566            BackendResult {
567                backend_index: 0,
568                response,
569            }
570        });
571
572    let mut futs = tokio::task::JoinSet::new();
573    futs.spawn(primary_wrapped);
574    for alt in alternatives {
575        futs.spawn(alt);
576    }
577
578    // Under memory pressure, halve the grace period so losing responses
579    // are freed sooner. Under critical pressure, skip grace entirely.
580    let grace = {
581        let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
582        if mem_state >= 2 {
583            Duration::ZERO
584        } else if mem_state >= 1 {
585            Duration::from_millis(config.grace_period_ms / 2)
586        } else {
587            Duration::from_millis(config.grace_period_ms)
588        }
589    };
590    let threshold = config.fast_accept_threshold;
591
592    let mut best: Option<BackendResponse> = None;
593    let mut completed = 0usize;
594    let mut grace_deadline: Option<tokio::time::Instant> = None;
595
596    loop {
597        if completed >= total {
598            break;
599        }
600
601        let result = if let Some(deadline) = grace_deadline {
602            tokio::select! {
603                biased;
604                res = futs.join_next() => res,
605                _ = tokio::time::sleep_until(deadline) => break,
606            }
607        } else {
608            futs.join_next().await
609        };
610
611        match result {
612            Some(Ok(br)) => {
613                completed += 1;
614                let idx = br.backend_index;
615
616                match br.response {
617                    Some(resp) => {
618                        tracker.record_race(idx);
619                        tracker.record_duration(idx, resp.duration);
620                        tracker.record_success(idx);
621
622                        let score = resp.quality_score;
623
624                        // Fast-accept: high-quality first response skips grace.
625                        if best.is_none() && score >= threshold {
626                            tracker.record_win(idx);
627                            return Some(resp);
628                        }
629
630                        let dominated = match &best {
631                            Some(b) => score > b.quality_score,
632                            None => true,
633                        };
634                        if dominated {
635                            best = Some(resp);
636                        }
637
638                        if grace_deadline.is_none() {
639                            grace_deadline = Some(tokio::time::Instant::now() + grace);
640                        }
641                    }
642                    None => {
643                        // Backend failed — track error for auto-disable.
644                        tracker.record_race(idx);
645                        tracker.record_error(idx);
646                    }
647                }
648            }
649            Some(Err(_join_err)) => {
650                completed += 1;
651            }
652            None => {
653                break;
654            }
655        }
656    }
657
658    // Abort all remaining in-flight tasks immediately, then drop the JoinSet
659    // so completed-but-unread responses (carrying full Page data) are freed.
660    // abort_all() sends cancel signals eagerly — drop alone would do this too,
661    // but being explicit avoids relying on drop ordering.
662    futs.abort_all();
663    drop(futs);
664
665    if let Some(ref b) = best {
666        tracker.record_win(b.backend_index);
667    }
668
669    best
670}
671
672// ---------------------------------------------------------------------------
673// Proxy Rotator — lock-free round-robin
674// ---------------------------------------------------------------------------
675
676/// Round-robin proxy address selector for parallel backends.
677///
678/// Pre-filters proxy lists for CDP and WebDriver backends
679/// based on [`ProxyIgnore`]. Lock-free via [`AtomicUsize`].
680pub struct ProxyRotator {
681    /// Proxies for CDP backends (filtered: `ProxyIgnore != Chrome`).
682    cdp_addrs: Vec<String>,
683    /// Proxies for WebDriver backends (filtered: `ProxyIgnore != Http`).
684    wd_addrs: Vec<String>,
685    cdp_index: AtomicUsize,
686    wd_index: AtomicUsize,
687}
688
689impl ProxyRotator {
690    /// Build from the crawler's proxy list.
691    pub fn new(proxies: &Option<Vec<RequestProxy>>) -> Self {
692        let (mut cdp, mut wd) = (Vec::new(), Vec::new());
693
694        if let Some(proxies) = proxies {
695            for p in proxies {
696                if p.ignore != ProxyIgnore::Chrome {
697                    cdp.push(p.addr.clone());
698                }
699                if p.ignore != ProxyIgnore::Http {
700                    wd.push(p.addr.clone());
701                }
702            }
703        }
704
705        Self {
706            cdp_addrs: cdp,
707            wd_addrs: wd,
708            cdp_index: AtomicUsize::new(0),
709            wd_index: AtomicUsize::new(0),
710        }
711    }
712
713    /// Get the next CDP proxy address (round-robin). Returns `None` if empty.
714    pub fn next_cdp(&self) -> Option<&str> {
715        let len = self.cdp_addrs.len();
716        if len == 0 {
717            return None;
718        }
719        let idx = self.cdp_index.fetch_add(1, Ordering::Relaxed) % len;
720        self.cdp_addrs.get(idx).map(|s| s.as_str())
721    }
722
723    /// Get the next WebDriver proxy address (round-robin). Returns `None` if empty.
724    pub fn next_webdriver(&self) -> Option<&str> {
725        let len = self.wd_addrs.len();
726        if len == 0 {
727            return None;
728        }
729        let idx = self.wd_index.fetch_add(1, Ordering::Relaxed) % len;
730        self.wd_addrs.get(idx).map(|s| s.as_str())
731    }
732
733    /// Number of CDP proxies available.
734    pub fn cdp_count(&self) -> usize {
735        self.cdp_addrs.len()
736    }
737
738    /// Number of WebDriver proxies available.
739    pub fn webdriver_count(&self) -> usize {
740        self.wd_addrs.len()
741    }
742}
743
744impl Clone for ProxyRotator {
745    fn clone(&self) -> Self {
746        Self {
747            cdp_addrs: self.cdp_addrs.clone(),
748            wd_addrs: self.wd_addrs.clone(),
749            cdp_index: AtomicUsize::new(self.cdp_index.load(Ordering::Relaxed)),
750            wd_index: AtomicUsize::new(self.wd_index.load(Ordering::Relaxed)),
751        }
752    }
753}
754
755// ---------------------------------------------------------------------------
756// Backend Fetch Functions
757// ---------------------------------------------------------------------------
758
759/// Fetch a page via a remote CDP endpoint (any CDP-speaking browser).
760///
761/// Fresh CDP connection per fetch with the **same handler config** as the
762/// primary Chrome path — network interception, resource blocking, viewport,
763/// timeouts all pass through transparently via `connect_with_config()`.
764#[cfg(feature = "chrome")]
765pub async fn fetch_cdp(
766    url: &str,
767    endpoint: &str,
768    config: &std::sync::Arc<crate::configuration::Configuration>,
769    backend_index: usize,
770    connect_timeout: Duration,
771    proxy: Option<String>,
772    source_name: &str,
773) -> Option<BackendResponse> {
774    let start = Instant::now();
775    let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
776
777    // Build the same handler config as the primary Chrome crawl path.
778    // This gives CDP backends identical network interception: block_visuals,
779    // block_javascript, block_stylesheets, block_ads, block_analytics,
780    // whitelist/blacklist patterns, extra headers, viewport, etc.
781    let handler_config = crate::features::chrome::create_handler_config(config);
782
783    // Connect with a short timeout so down backends fail fast.
784    let connect_result = tokio::time::timeout(
785        connect_timeout,
786        chromiumoxide::Browser::connect_with_config(endpoint, handler_config),
787    )
788    .await;
789
790    let (mut browser, handler_handle) = match connect_result {
791        Ok(Ok((browser, mut handler))) => {
792            let h = tokio::spawn(async move {
793                use crate::tokio_stream::StreamExt;
794                while let Some(_) = handler.next().await {}
795            });
796            (browser, h)
797        }
798        Ok(Err(e)) => {
799            log::warn!("{} CDP connect failed ({}): {:?}", source_name, endpoint, e);
800            return None;
801        }
802        Err(_) => {
803            log::warn!("{} CDP connect timed out ({})", source_name, endpoint);
804            return None;
805        }
806    };
807
808    // If a proxy is configured, create an isolated browser context with
809    // proxy_server so this backend's requests route through it.
810    if let Some(ref proxy_addr) = proxy {
811        let mut ctx_params =
812            chromiumoxide::cdp::browser_protocol::target::CreateBrowserContextParams::default();
813        ctx_params.dispose_on_detach = Some(true);
814        ctx_params.proxy_server = Some(proxy_addr.clone());
815        if let Ok(ctx) = browser.create_browser_context(ctx_params).await {
816            let _ = browser.send_new_context(ctx).await;
817        } else {
818            log::warn!(
819                "{} proxy browser context failed for {}, continuing without proxy",
820                source_name,
821                proxy_addr
822            );
823        }
824    }
825
826    // Get the default page.
827    let page = match browser.pages().await {
828        Ok(mut p) if !p.is_empty() => p.swap_remove(0),
829        _ => match browser.new_page(url).await {
830            Ok(p) => p,
831            Err(e) => {
832                log::warn!("{} page failed: {:?}", source_name, e);
833                handler_handle.abort();
834                return None;
835            }
836        },
837    };
838
839    // Apply the same page-level config as the primary Chrome path.
840    crate::features::chrome::setup_chrome_events(&page, config).await;
841
842    // Auth challenge interception if enabled.
843    let _intercept_handle = crate::features::chrome::setup_chrome_interception_base(
844        &page,
845        config.chrome_intercept.enabled,
846        &config.auth_challenge_response,
847        config.chrome_intercept.block_visuals,
848        "",
849    )
850    .await;
851
852    // Navigate.
853    match tokio::time::timeout(timeout, page.goto(url)).await {
854        Ok(Ok(_)) => {}
855        Ok(Err(e)) => {
856            log::warn!("{} navigate failed for {}: {:?}", source_name, url, e);
857            handler_handle.abort();
858            return None;
859        }
860        Err(_) => {
861            log::warn!("{} navigate timed out for {}", source_name, url);
862            handler_handle.abort();
863            return None;
864        }
865    }
866
867    // Wait for load event if configured.
868    #[cfg(feature = "chrome")]
869    if let Some(ref wf) = config.wait_for {
870        if let Some(ref delay) = wf.delay {
871            if let Some(ms) = delay.timeout {
872                tokio::time::sleep(ms).await;
873            }
874        }
875    }
876
877    // Get the outer HTML.
878    let html_result = tokio::time::timeout(Duration::from_secs(10), page.outer_html_bytes()).await;
879
880    // Clean up.
881    handler_handle.abort();
882
883    let html_bytes: Vec<u8> = match html_result {
884        Ok(Ok(b)) => b.to_vec(),
885        Ok(Err(e)) => {
886            log::warn!(
887                "{} outer_html_bytes() failed for {}: {:?}",
888                source_name,
889                url,
890                e
891            );
892            return None;
893        }
894        Err(_) => {
895            log::warn!("{} outer_html_bytes() timed out for {}", source_name, url);
896            return None;
897        }
898    };
899
900    let dur = start.elapsed();
901    let status = StatusCode::OK;
902
903    let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
904    let byte_len = html_bytes.len();
905    let res = crate::utils::PageResponse {
906        content: Some(html_bytes),
907        status_code: status,
908        ..Default::default()
909    };
910    let mut page = crate::page::build(url, res);
911    tag_page_source(&mut page, source_name);
912
913    Some(BackendResponse {
914        page,
915        quality_score: score,
916        backend_index,
917        duration: dur,
918        _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
919    })
920}
921
922/// Fetch a page via a remote WebDriver endpoint (Servo, custom, or any WebDriver-speaking browser).
923///
924/// Reuses the existing `thirtyfour` / `webdriver.rs` infrastructure.
925#[cfg(feature = "webdriver")]
926pub async fn fetch_webdriver(
927    url: &str,
928    endpoint: &str,
929    config: &std::sync::Arc<crate::configuration::Configuration>,
930    backend_index: usize,
931    connect_timeout: Duration,
932    proxy: Option<String>,
933    source_name: &str,
934) -> Option<BackendResponse> {
935    use crate::features::webdriver_common::{WebDriverBrowser, WebDriverConfig};
936
937    let start = Instant::now();
938    let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
939
940    // Build a WebDriverConfig pointing at the remote endpoint.
941    let wd_config = WebDriverConfig {
942        server_url: endpoint.to_string(),
943        browser: WebDriverBrowser::Chrome, // Servo's WebDriver is browser-agnostic
944        headless: true,
945        timeout: Some(connect_timeout),
946        proxy, // Per-backend proxy or ProxyRotator fallback
947        user_agent: config.user_agent.as_ref().map(|ua| ua.to_string()),
948        viewport_width: config.viewport.as_ref().map(|v| v.width),
949        viewport_height: config.viewport.as_ref().map(|v| v.height),
950        accept_insecure_certs: config.accept_invalid_certs,
951        ..Default::default()
952    };
953
954    // Launch session with a short connect timeout so down backends fail fast.
955    let controller_opt = tokio::time::timeout(
956        connect_timeout,
957        crate::features::webdriver::launch_driver_base(&wd_config, config),
958    )
959    .await;
960
961    let mut controller = match controller_opt {
962        Ok(Some(c)) => c,
963        Ok(None) => {
964            log::warn!("{} WebDriver connect failed ({})", source_name, endpoint);
965            return None;
966        }
967        Err(_) => {
968            log::warn!("{} WebDriver connect timed out ({})", source_name, endpoint);
969            return None;
970        }
971    };
972
973    let driver = controller.driver().clone();
974
975    // Navigate with timeout.
976    match tokio::time::timeout(timeout, driver.goto(url)).await {
977        Ok(Ok(_)) => {}
978        Ok(Err(e)) => {
979            log::warn!(
980                "{} WebDriver navigate failed for {}: {:?}",
981                source_name,
982                url,
983                e
984            );
985            controller.dispose();
986            return None;
987        }
988        Err(_) => {
989            log::warn!("{} WebDriver navigate timed out for {}", source_name, url);
990            controller.dispose();
991            return None;
992        }
993    }
994
995    // Get page source with timeout.
996    let source = match tokio::time::timeout(Duration::from_secs(10), driver.source()).await {
997        Ok(Ok(s)) => s,
998        Ok(Err(e)) => {
999            log::warn!(
1000                "{} WebDriver source failed for {}: {:?}",
1001                source_name,
1002                url,
1003                e
1004            );
1005            controller.dispose();
1006            return None;
1007        }
1008        Err(_) => {
1009            log::warn!("{} WebDriver source timed out for {}", source_name, url);
1010            controller.dispose();
1011            return None;
1012        }
1013    };
1014
1015    controller.dispose();
1016
1017    let dur = start.elapsed();
1018    let html_bytes = source.into_bytes();
1019    let status = StatusCode::OK;
1020
1021    let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
1022    let byte_len = html_bytes.len();
1023    let res = crate::utils::PageResponse {
1024        content: Some(html_bytes),
1025        status_code: status,
1026        ..Default::default()
1027    };
1028    let mut page = crate::page::build(url, res);
1029    tag_page_source(&mut page, source_name);
1030
1031    Some(BackendResponse {
1032        page,
1033        quality_score: score,
1034        backend_index,
1035        duration: dur,
1036        _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
1037    })
1038}
1039
1040// ---------------------------------------------------------------------------
1041// Builder Helper
1042// ---------------------------------------------------------------------------
1043
1044/// Build alternative backend futures for a given URL from config.
1045///
1046/// Skips backends that have been auto-disabled by the tracker.
1047/// Build alternative backend futures for a given URL from config.
1048///
1049/// Skips backends that have been auto-disabled by the tracker.
1050/// Accepts `Arc<Configuration>` to avoid per-URL deep clones.
1051#[allow(unused_variables)]
1052pub fn build_backend_futures(
1053    url: &str,
1054    config: &ParallelBackendsConfig,
1055    crawl_config: &std::sync::Arc<crate::configuration::Configuration>,
1056    tracker: &BackendTracker,
1057    proxy_rotator: &Option<std::sync::Arc<ProxyRotator>>,
1058    semaphore: &Option<std::sync::Arc<tokio::sync::Semaphore>>,
1059) -> Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> {
1060    // Fast-path: skip backends for binary/asset URLs. There is no HTML
1061    // quality variance for images, fonts, PDFs, etc.
1062    if should_skip_backend_for_url(url, &config.skip_extensions) {
1063        log::debug!(
1064            "[parallel_backends] skipping backends for asset URL: {}",
1065            url
1066        );
1067        return Vec::new();
1068    }
1069
1070    // Proactive byte-level guard: skip backends when the aggregate in-flight
1071    // HTML from all concurrent races exceeds the configured cap. Works without
1072    // the `balance` feature.
1073    let byte_limit = config.max_backend_bytes_in_flight;
1074    if byte_limit > 0 && BackendBytesGuard::in_flight() >= byte_limit {
1075        log::debug!(
1076            "[parallel_backends] skipping backends — in-flight bytes ({}) >= limit ({})",
1077            BackendBytesGuard::in_flight(),
1078            byte_limit,
1079        );
1080        return Vec::new();
1081    }
1082
1083    // Reactive memory pressure guard (requires `balance` feature, otherwise
1084    // no-op returning 0). State 2 (critical) skips all backends; state 1
1085    // (pressure) limits to at most 1 alternative backend.
1086    let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
1087    if mem_state >= 2 {
1088        log::debug!("[parallel_backends] skipping all backends — process memory critical");
1089        return Vec::new();
1090    }
1091    let mem_pressure = mem_state >= 1;
1092
1093    // Hard outer deadline: prevents a stalled backend from blocking the
1094    // primary result during the grace window. 0 = disabled (phase timeouts
1095    // still apply). Computed once outside the loop — pure Copy value.
1096    let outer_timeout = if config.backend_timeout_ms > 0 {
1097        Some(Duration::from_millis(config.backend_timeout_ms))
1098    } else {
1099        None
1100    };
1101    let backend_timeout_ms_log = config.backend_timeout_ms;
1102
1103    #[allow(unused_mut)]
1104    let mut futs: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> = Vec::new();
1105
1106    for (i, backend) in config.backends.iter().enumerate() {
1107        let backend_index = i + 1; // 0 is primary
1108
1109        // Under memory pressure, only keep the first enabled backend.
1110        if mem_pressure && !futs.is_empty() {
1111            break;
1112        }
1113
1114        if tracker.is_disabled(backend_index) {
1115            continue;
1116        }
1117
1118        // Resolve the endpoint — remote uses `endpoint`, local uses `binary_path`
1119        // (local mode spawns the engine and connects to localhost).
1120        #[allow(unused_variables)]
1121        let resolved_endpoint = if let Some(ref ep) = backend.endpoint {
1122            ep.clone()
1123        } else if backend.binary_path.is_some() {
1124            log::debug!(
1125                "{:?} local mode not yet implemented, skipping",
1126                backend.engine
1127            );
1128            continue;
1129        } else {
1130            log::debug!(
1131                "{:?} backend has no endpoint or binary_path, skipping",
1132                backend.engine
1133            );
1134            continue;
1135        };
1136
1137        let proto = resolve_protocol(backend);
1138        let _source_name = backend_source_name(backend);
1139
1140        // Resolve proxy: per-backend proxy takes priority, then ProxyRotator fallback.
1141        #[allow(unused_variables)]
1142        let resolved_proxy: Option<String> = if backend.proxy.is_some() {
1143            backend.proxy.clone()
1144        } else if let Some(ref rotator) = proxy_rotator {
1145            match proto {
1146                BackendProtocol::Cdp => rotator.next_cdp().map(|s| s.to_string()),
1147                BackendProtocol::WebDriver => rotator.next_webdriver().map(|s| s.to_string()),
1148            }
1149        } else {
1150            None
1151        };
1152
1153        // Sub-ms jitter before each backend launch to prevent fingerprint
1154        // correlation from simultaneous requests. Kept small so backends
1155        // start quickly — the value of hedging drops with launch delay.
1156        let jitter_us = {
1157            use std::collections::hash_map::DefaultHasher;
1158            use std::hash::{Hash, Hasher};
1159            let mut hasher = DefaultHasher::new();
1160            url.hash(&mut hasher);
1161            backend_index.hash(&mut hasher);
1162            std::time::SystemTime::now().hash(&mut hasher);
1163            hasher.finish() % 1000 // 0–999µs (~0–1ms)
1164        };
1165
1166        let connect_timeout = Duration::from_millis(config.connect_timeout_ms);
1167
1168        // Clone semaphore Arc for the spawned future (cheap Arc clone).
1169        let sem = semaphore.clone();
1170
1171        match proto {
1172            #[cfg(feature = "chrome")]
1173            BackendProtocol::Cdp => {
1174                let url = url.to_string();
1175                let cfg = crawl_config.clone(); // Arc clone — cheap
1176                let proxy = resolved_proxy.clone();
1177                let source = backend_source_name(backend).to_string();
1178                futs.push(Box::pin(async move {
1179                    // Wrap the ENTIRE backend future (including semaphore
1180                    // acquire) in the outer timeout.  Previously the timeout
1181                    // only covered fetch_cdp — if all permits were held by
1182                    // other backends or leaked tabs, acquire() would block
1183                    // forever and stall the crawl.
1184                    let work = async {
1185                        // Acquire semaphore permit with a bounded wait.
1186                        // 10s is generous — if permits are still held after
1187                        // that, the backends are likely stalled or leaked.
1188                        let _permit = if let Some(ref s) = sem {
1189                            match tokio::time::timeout(
1190                                Duration::from_secs(10),
1191                                s.acquire(),
1192                            )
1193                            .await
1194                            {
1195                                Ok(Ok(p)) => Some(p),
1196                                _ => {
1197                                    log::warn!(
1198                                        "[parallel_backends] {} backend {} semaphore timeout for {}",
1199                                        source, backend_index, url
1200                                    );
1201                                    return BackendResult {
1202                                        backend_index,
1203                                        response: None,
1204                                    };
1205                                }
1206                            }
1207                        } else {
1208                            None
1209                        };
1210                        tokio::time::sleep(Duration::from_micros(jitter_us)).await;
1211                        let response = fetch_cdp(
1212                            &url,
1213                            &resolved_endpoint,
1214                            &cfg,
1215                            backend_index,
1216                            connect_timeout,
1217                            proxy,
1218                            &source,
1219                        )
1220                        .await;
1221                        BackendResult {
1222                            backend_index,
1223                            response,
1224                        }
1225                    };
1226                    match outer_timeout {
1227                        Some(deadline) => match tokio::time::timeout(deadline, work).await {
1228                            Ok(r) => r,
1229                            Err(_) => {
1230                                log::warn!(
1231                                    "[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
1232                                    source, backend_index, backend_timeout_ms_log, url
1233                                );
1234                                BackendResult {
1235                                    backend_index,
1236                                    response: None,
1237                                }
1238                            }
1239                        },
1240                        None => work.await,
1241                    }
1242                }));
1243            }
1244            #[cfg(feature = "webdriver")]
1245            BackendProtocol::WebDriver => {
1246                let url = url.to_string();
1247                let cfg = crawl_config.clone(); // Arc clone — cheap
1248                let proxy = resolved_proxy.clone();
1249                let source = backend_source_name(backend).to_string();
1250                futs.push(Box::pin(async move {
1251                    // Wrap the ENTIRE backend future (including semaphore
1252                    // acquire) in the outer timeout — same fix as CDP path.
1253                    let work = async {
1254                        // Acquire semaphore permit with a bounded wait.
1255                        let _permit = if let Some(ref s) = sem {
1256                            match tokio::time::timeout(
1257                                Duration::from_secs(10),
1258                                s.acquire(),
1259                            )
1260                            .await
1261                            {
1262                                Ok(Ok(p)) => Some(p),
1263                                _ => {
1264                                    log::warn!(
1265                                        "[parallel_backends] {} backend {} semaphore timeout for {}",
1266                                        source, backend_index, url
1267                                    );
1268                                    return BackendResult {
1269                                        backend_index,
1270                                        response: None,
1271                                    };
1272                                }
1273                            }
1274                        } else {
1275                            None
1276                        };
1277                        tokio::time::sleep(Duration::from_micros(jitter_us)).await;
1278                        let response = fetch_webdriver(
1279                            &url,
1280                            &resolved_endpoint,
1281                            &cfg,
1282                            backend_index,
1283                            connect_timeout,
1284                            proxy,
1285                            &source,
1286                        )
1287                        .await;
1288                        BackendResult {
1289                            backend_index,
1290                            response,
1291                        }
1292                    };
1293                    match outer_timeout {
1294                        Some(deadline) => match tokio::time::timeout(deadline, work).await {
1295                            Ok(r) => r,
1296                            Err(_) => {
1297                                log::warn!(
1298                                    "[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
1299                                    source, backend_index, backend_timeout_ms_log, url
1300                                );
1301                                BackendResult {
1302                                    backend_index,
1303                                    response: None,
1304                                }
1305                            }
1306                        },
1307                        None => work.await,
1308                    }
1309                }));
1310            }
1311            // When the specific feature is not enabled, skip silently.
1312            #[allow(unreachable_patterns)]
1313            _ => {}
1314        }
1315    }
1316
1317    futs
1318}
1319
1320// ---------------------------------------------------------------------------
1321// Tests
1322// ---------------------------------------------------------------------------
1323
1324#[cfg(test)]
1325mod tests {
1326    use super::*;
1327    use std::sync::Arc;
1328
1329    // ---- Quality Scorer ----
1330
1331    fn make_html(body_content: &str) -> Vec<u8> {
1332        format!(
1333            "<html><head><title>T</title></head><body>{}</body></html>",
1334            body_content
1335        )
1336        .into_bytes()
1337    }
1338
1339    #[test]
1340    fn test_quality_score_perfect_response() {
1341        let body = make_html(&"x".repeat(5000));
1342        let score = html_quality_score(Some(&body), StatusCode::OK, &AntiBotTech::None);
1343        // 30 (200) + 5 (>0) + 10 (>512) + 10 (>4096) + 15 (<body>) + 10 (not empty) + 20 (no bot) = 100
1344        assert_eq!(score, 100);
1345    }
1346
1347    #[test]
1348    fn test_quality_score_empty_body() {
1349        let score = html_quality_score(Some(&[]), StatusCode::OK, &AntiBotTech::None);
1350        // 30 (200) + 0 (empty) + 0 + 0 + 0 + 0 (is_cacheable_body_empty → true for empty) + 20 = 50
1351        assert_eq!(score, 50);
1352    }
1353
1354    #[test]
1355    fn test_quality_score_none_content() {
1356        let score = html_quality_score(None, StatusCode::OK, &AntiBotTech::None);
1357        // 30 (200) + 20 (no bot) = 50
1358        assert_eq!(score, 50);
1359    }
1360
1361    #[test]
1362    fn test_quality_score_empty_html_shell() {
1363        let body = b"<html><head></head><body></body></html>";
1364        let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
1365        // 30 + 5 (>0) + 0 (38 bytes, <512) + 0 + 15 (<body) + 0 (empty shell) + 20 = 70
1366        assert_eq!(score, 70);
1367    }
1368
1369    #[test]
1370    fn test_quality_score_antibot_cloudflare() {
1371        let body = make_html("blocked");
1372        let score =
1373            html_quality_score(Some(&body), StatusCode::FORBIDDEN, &AntiBotTech::Cloudflare);
1374        // 0 (403) + 5 + 0 + 0 + 15 + 10 + 0 (bot!) = 30
1375        assert_eq!(score, 30);
1376    }
1377
1378    #[test]
1379    fn test_quality_score_server_error() {
1380        let body = make_html("error");
1381        let score = html_quality_score(
1382            Some(&body),
1383            StatusCode::INTERNAL_SERVER_ERROR,
1384            &AntiBotTech::None,
1385        );
1386        // 0 (500) + 5 + 0 + 0 + 15 + 10 + 20 = 50
1387        assert_eq!(score, 50);
1388    }
1389
1390    #[test]
1391    fn test_quality_score_redirect() {
1392        let score = html_quality_score(None, StatusCode::MOVED_PERMANENTLY, &AntiBotTech::None);
1393        // 5 (301) + 20 = 25
1394        assert_eq!(score, 25);
1395    }
1396
1397    #[test]
1398    fn test_quality_score_small_body_with_body_tag() {
1399        let body = b"<html><body>hi</body></html>";
1400        let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
1401        // 30 + 5 (>0) + 0 (<512) + 0 + 15 (<body) + 10 (not empty) + 20 = 80
1402        assert_eq!(score, 80);
1403    }
1404
1405    #[test]
1406    fn test_quality_score_large_body_no_body_tag() {
1407        let body = "x".repeat(5000);
1408        let score = html_quality_score(Some(body.as_bytes()), StatusCode::OK, &AntiBotTech::None);
1409        // 30 + 5 + 10 + 10 + 0 (no <body) + 10 (not empty) + 20 = 85
1410        assert_eq!(score, 85);
1411    }
1412
1413    // ---- Backend Tracker ----
1414
1415    #[test]
1416    fn test_tracker_new_defaults() {
1417        let t = BackendTracker::new(3, 10);
1418        assert_eq!(t.len(), 3);
1419        assert!(!t.is_empty());
1420        for i in 0..3 {
1421            assert_eq!(t.wins(i), 0);
1422            assert_eq!(t.races(i), 0);
1423            assert_eq!(t.ema_ms(i), 0);
1424            assert_eq!(t.consecutive_errors(i), 0);
1425            assert!(!t.is_disabled(i));
1426        }
1427        // Out-of-bounds returns safe defaults.
1428        assert!(t.is_disabled(99));
1429        assert_eq!(t.wins(99), 0);
1430    }
1431
1432    #[test]
1433    fn test_tracker_record_win() {
1434        let t = BackendTracker::new(2, 10);
1435        t.record_win(0);
1436        t.record_win(0);
1437        t.record_win(1);
1438        assert_eq!(t.wins(0), 2);
1439        assert_eq!(t.wins(1), 1);
1440    }
1441
1442    #[test]
1443    fn test_tracker_ema_duration() {
1444        let t = BackendTracker::new(1, 10);
1445        t.record_race(0);
1446        t.record_duration(0, Duration::from_millis(100));
1447        assert_eq!(t.ema_ms(0), 100);
1448
1449        t.record_race(0);
1450        t.record_duration(0, Duration::from_millis(200));
1451        // EMA = (100 * 4 + 200) / 5 = 120
1452        assert_eq!(t.ema_ms(0), 120);
1453
1454        t.record_race(0);
1455        t.record_duration(0, Duration::from_millis(100));
1456        // EMA = (120 * 4 + 100) / 5 = 116
1457        assert_eq!(t.ema_ms(0), 116);
1458    }
1459
1460    #[test]
1461    fn test_tracker_probe_first_error_disables() {
1462        // A backend that has never succeeded is disabled on first error
1463        // (probe behaviour).
1464        let t = BackendTracker::new(1, 10);
1465        assert!(!t.is_disabled(0));
1466        t.record_race(0);
1467        t.record_error(0); // first ever error, no prior wins → probe disable
1468        assert!(t.is_disabled(0));
1469    }
1470
1471    #[test]
1472    fn test_tracker_consecutive_errors_disables() {
1473        // After at least one success, max_consecutive_errors threshold applies.
1474        let t = BackendTracker::new(1, 3);
1475        // Simulate a successful first request so probe mode doesn't kick in.
1476        t.record_race(0);
1477        t.record_win(0);
1478        t.record_success(0);
1479        assert!(!t.is_disabled(0));
1480        t.record_race(0);
1481        t.record_error(0);
1482        t.record_race(0);
1483        t.record_error(0);
1484        assert!(!t.is_disabled(0));
1485        t.record_race(0);
1486        t.record_error(0); // third consecutive error
1487        assert!(t.is_disabled(0));
1488    }
1489
1490    #[test]
1491    fn test_tracker_success_resets_errors() {
1492        let t = BackendTracker::new(1, 5);
1493        // Give it a prior win so probe mode doesn't trigger.
1494        t.record_race(0);
1495        t.record_win(0);
1496        t.record_success(0);
1497        t.record_race(0);
1498        t.record_error(0);
1499        t.record_race(0);
1500        t.record_error(0);
1501        assert_eq!(t.consecutive_errors(0), 2);
1502        t.record_success(0);
1503        assert_eq!(t.consecutive_errors(0), 0);
1504    }
1505
1506    #[test]
1507    fn test_tracker_clone_independence() {
1508        let t = BackendTracker::new(1, 10);
1509        t.record_win(0);
1510        let t2 = t.clone();
1511        t.record_win(0);
1512        assert_eq!(t.wins(0), 2);
1513        assert_eq!(t2.wins(0), 1);
1514    }
1515
1516    #[test]
1517    fn test_tracker_win_rate() {
1518        let t = BackendTracker::new(1, 10);
1519        assert_eq!(t.win_rate_pct(0), 0); // 0 races
1520        t.record_race(0);
1521        t.record_race(0);
1522        t.record_race(0);
1523        t.record_race(0);
1524        t.record_win(0);
1525        t.record_win(0);
1526        t.record_win(0);
1527        assert_eq!(t.win_rate_pct(0), 75);
1528    }
1529
1530    // ---- Race Orchestrator ----
1531
1532    /// Mock a successful primary response (returns Option<BackendResponse>).
1533    fn mock_primary(
1534        score: u16,
1535        delay_ms: u64,
1536    ) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
1537        Box::pin(async move {
1538            if delay_ms > 0 {
1539                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1540            }
1541            Some(BackendResponse {
1542                page: crate::page::Page::default(),
1543                quality_score: score,
1544                backend_index: 0,
1545                duration: Duration::from_millis(delay_ms),
1546                _bytes_guard: None,
1547            })
1548        })
1549    }
1550
1551    /// Mock an alternative backend response (returns BackendResult).
1552    fn mock_alt(
1553        idx: usize,
1554        score: u16,
1555        delay_ms: u64,
1556    ) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
1557        Box::pin(async move {
1558            if delay_ms > 0 {
1559                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1560            }
1561            BackendResult {
1562                backend_index: idx,
1563                response: Some(BackendResponse {
1564                    page: crate::page::Page::default(),
1565                    quality_score: score,
1566                    backend_index: idx,
1567                    duration: Duration::from_millis(delay_ms),
1568                    _bytes_guard: None,
1569                }),
1570            }
1571        })
1572    }
1573
1574    /// Mock a failing alternative backend (returns BackendResult with None).
1575    fn mock_alt_none(
1576        idx: usize,
1577        delay_ms: u64,
1578    ) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
1579        Box::pin(async move {
1580            if delay_ms > 0 {
1581                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1582            }
1583            BackendResult {
1584                backend_index: idx,
1585                response: None,
1586            }
1587        })
1588    }
1589
1590    /// Mock a failing primary (returns None).
1591    fn mock_primary_none(
1592        delay_ms: u64,
1593    ) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
1594        Box::pin(async move {
1595            if delay_ms > 0 {
1596                tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1597            }
1598            None
1599        })
1600    }
1601
1602    fn test_config(grace_ms: u64, threshold: u16) -> ParallelBackendsConfig {
1603        ParallelBackendsConfig {
1604            backends: vec![],
1605            grace_period_ms: grace_ms,
1606            enabled: true,
1607            fast_accept_threshold: threshold,
1608            max_consecutive_errors: 10,
1609            connect_timeout_ms: 5000,
1610            skip_binary_content_types: true,
1611            max_concurrent_sessions: 0,
1612            skip_extensions: Vec::new(),
1613            max_backend_bytes_in_flight: 0, // unlimited for test mocks
1614            backend_timeout_ms: 0,          // disabled for test mocks
1615        }
1616    }
1617
1618    #[tokio::test]
1619    async fn test_race_primary_fast_accept() {
1620        let tracker = BackendTracker::new(3, 10);
1621        let cfg = test_config(500, 80);
1622        let primary = mock_primary(95, 10);
1623        let alts = vec![mock_alt(1, 100, 1000), mock_alt(2, 100, 1000)];
1624
1625        let result = race_backends(primary, alts, &cfg, &tracker).await;
1626        let r = result.unwrap();
1627        assert_eq!(r.backend_index, 0); // primary won via fast-accept
1628        assert_eq!(r.quality_score, 95);
1629        assert_eq!(tracker.wins(0), 1);
1630    }
1631
1632    #[tokio::test]
1633    async fn test_race_alternative_wins_during_grace() {
1634        let tracker = BackendTracker::new(3, 10);
1635        let cfg = test_config(500, 80); // 500ms grace, threshold 80
1636        let primary = mock_primary(50, 10); // arrives first, low quality
1637        let alts = vec![
1638            mock_alt(1, 90, 100), // arrives during grace, high quality
1639            mock_alt(2, 30, 1000),
1640        ];
1641
1642        let result = race_backends(primary, alts, &cfg, &tracker).await;
1643        let r = result.unwrap();
1644        assert_eq!(r.backend_index, 1); // alt won with higher score
1645        assert_eq!(r.quality_score, 90);
1646    }
1647
1648    #[tokio::test]
1649    async fn test_race_primary_wins_after_grace() {
1650        let tracker = BackendTracker::new(2, 10);
1651        let cfg = test_config(50, 80); // 50ms grace
1652        let primary = mock_primary(60, 10); // below threshold
1653        let alts = vec![
1654            mock_alt(1, 40, 5000), // too slow, won't arrive during grace
1655        ];
1656
1657        let result = race_backends(primary, alts, &cfg, &tracker).await;
1658        let r = result.unwrap();
1659        assert_eq!(r.backend_index, 0); // primary best during grace
1660        assert_eq!(r.quality_score, 60);
1661    }
1662
1663    #[tokio::test]
1664    async fn test_race_all_none() {
1665        let tracker = BackendTracker::new(2, 10);
1666        let cfg = test_config(50, 80);
1667        let primary = mock_primary_none(10);
1668        let alts = vec![mock_alt_none(1, 10)];
1669
1670        let result = race_backends(primary, alts, &cfg, &tracker).await;
1671        assert!(result.is_none());
1672        // Failed alt should have error recorded.
1673        assert_eq!(tracker.consecutive_errors(1), 1);
1674    }
1675
1676    #[tokio::test]
1677    async fn test_race_primary_none_alt_some() {
1678        let tracker = BackendTracker::new(2, 10);
1679        let cfg = test_config(200, 80);
1680        let primary = mock_primary_none(10);
1681        let alts = vec![mock_alt(1, 85, 50)];
1682
1683        let result = race_backends(primary, alts, &cfg, &tracker).await;
1684        let r = result.unwrap();
1685        assert_eq!(r.backend_index, 1);
1686    }
1687
1688    #[tokio::test]
1689    async fn test_race_disabled_noop() {
1690        let tracker = BackendTracker::new(2, 10);
1691        let mut cfg = test_config(50, 80);
1692        cfg.enabled = false;
1693        let primary = mock_primary(70, 10);
1694        let alts = vec![mock_alt(1, 100, 10)];
1695
1696        let result = race_backends(primary, alts, &cfg, &tracker).await;
1697        let r = result.unwrap();
1698        assert_eq!(r.backend_index, 0); // disabled → primary only
1699    }
1700
1701    #[tokio::test]
1702    async fn test_race_single_alternative() {
1703        let tracker = BackendTracker::new(2, 10);
1704        let cfg = test_config(200, 80);
1705        let primary = mock_primary(50, 100);
1706        let alts = vec![mock_alt(1, 90, 20)]; // alt is faster AND better
1707
1708        let result = race_backends(primary, alts, &cfg, &tracker).await;
1709        let r = result.unwrap();
1710        // Alt arrives first with score 90 >= threshold 80 → fast accept
1711        assert_eq!(r.backend_index, 1);
1712        assert_eq!(r.quality_score, 90);
1713    }
1714
1715    #[tokio::test]
1716    async fn test_race_three_alternatives_best_during_grace() {
1717        let tracker = BackendTracker::new(4, 10);
1718        let cfg = test_config(300, 95); // high threshold
1719
1720        let primary = mock_primary(40, 10); // first, low quality
1721        let alts = vec![
1722            mock_alt(1, 60, 50),  // medium
1723            mock_alt(2, 85, 100), // best
1724            mock_alt(3, 70, 200), // arrives within grace but lower
1725        ];
1726
1727        let result = race_backends(primary, alts, &cfg, &tracker).await;
1728        let r = result.unwrap();
1729        assert_eq!(r.backend_index, 2);
1730        assert_eq!(r.quality_score, 85);
1731    }
1732
1733    #[tokio::test]
1734    async fn test_race_grace_period_zero() {
1735        let tracker = BackendTracker::new(2, 10);
1736        let cfg = test_config(0, 101); // threshold impossibly high, grace = 0
1737
1738        let primary = mock_primary(50, 10); // arrives first
1739        let alts = vec![mock_alt(1, 99, 50)]; // better but slower
1740
1741        let result = race_backends(primary, alts, &cfg, &tracker).await;
1742        let r = result.unwrap();
1743        assert_eq!(r.backend_index, 0);
1744    }
1745
1746    #[tokio::test]
1747    async fn test_race_cancellation_verified() {
1748        let finished = Arc::new(AtomicBool::new(false));
1749        let f = finished.clone();
1750
1751        let tracker = BackendTracker::new(2, 10);
1752        let cfg = test_config(50, 80);
1753
1754        let primary = mock_primary(95, 10); // fast-accept
1755
1756        let slow_alt: Pin<Box<dyn Future<Output = BackendResult> + Send>> = Box::pin(async move {
1757            tokio::time::sleep(Duration::from_secs(10)).await;
1758            f.store(true, Ordering::SeqCst);
1759            BackendResult {
1760                backend_index: 1,
1761                response: None,
1762            }
1763        });
1764
1765        let _result = race_backends(primary, vec![slow_alt], &cfg, &tracker).await;
1766
1767        tokio::time::sleep(Duration::from_millis(50)).await;
1768        assert!(!finished.load(Ordering::SeqCst));
1769    }
1770
1771    #[tokio::test]
1772    async fn test_race_failed_alt_records_error() {
1773        let tracker = BackendTracker::new(3, 5);
1774        let cfg = test_config(200, 80);
1775        let primary = mock_primary(50, 10);
1776        let alts = vec![
1777            mock_alt_none(1, 20), // fails
1778            mock_alt_none(2, 30), // fails
1779        ];
1780
1781        let result = race_backends(primary, alts, &cfg, &tracker).await;
1782        let r = result.unwrap();
1783        assert_eq!(r.backend_index, 0); // primary wins since alts failed
1784        assert_eq!(tracker.consecutive_errors(1), 1);
1785        assert_eq!(tracker.consecutive_errors(2), 1);
1786    }
1787
1788    #[tokio::test]
1789    async fn test_race_auto_disable_after_errors() {
1790        let tracker = BackendTracker::new(2, 2); // disable after 2 errors
1791        let cfg = test_config(100, 80);
1792
1793        // Run two races where alt fails
1794        for _ in 0..2 {
1795            let primary = mock_primary(50, 5);
1796            let alts = vec![mock_alt_none(1, 10)];
1797            let _ = race_backends(primary, alts, &cfg, &tracker).await;
1798        }
1799
1800        // Backend 1 should now be auto-disabled.
1801        assert!(tracker.is_disabled(1));
1802        assert_eq!(tracker.consecutive_errors(1), 2);
1803    }
1804
1805    // ---- Proxy Rotator ----
1806
1807    #[test]
1808    fn test_proxy_rotator_round_robin_cdp() {
1809        let proxies = vec![
1810            RequestProxy {
1811                addr: "http://p1".into(),
1812                ignore: ProxyIgnore::No,
1813            },
1814            RequestProxy {
1815                addr: "http://p2".into(),
1816                ignore: ProxyIgnore::No,
1817            },
1818        ];
1819        let r = ProxyRotator::new(&Some(proxies));
1820        assert_eq!(r.cdp_count(), 2);
1821        assert_eq!(r.next_cdp(), Some("http://p1"));
1822        assert_eq!(r.next_cdp(), Some("http://p2"));
1823        assert_eq!(r.next_cdp(), Some("http://p1")); // wraps
1824    }
1825
1826    #[test]
1827    fn test_proxy_rotator_round_robin_wd() {
1828        let proxies = vec![
1829            RequestProxy {
1830                addr: "http://p1".into(),
1831                ignore: ProxyIgnore::No,
1832            },
1833            RequestProxy {
1834                addr: "http://p2".into(),
1835                ignore: ProxyIgnore::No,
1836            },
1837        ];
1838        let r = ProxyRotator::new(&Some(proxies));
1839        assert_eq!(r.webdriver_count(), 2);
1840        assert_eq!(r.next_webdriver(), Some("http://p1"));
1841        assert_eq!(r.next_webdriver(), Some("http://p2"));
1842    }
1843
1844    #[test]
1845    fn test_proxy_rotator_filters_ignore() {
1846        let proxies = vec![
1847            RequestProxy {
1848                addr: "http://cdp-only".into(),
1849                ignore: ProxyIgnore::Http, // only for CDP, ignored for WebDriver
1850            },
1851            RequestProxy {
1852                addr: "http://wd-only".into(),
1853                ignore: ProxyIgnore::Chrome, // only for WebDriver, ignored for CDP
1854            },
1855            RequestProxy {
1856                addr: "http://both".into(),
1857                ignore: ProxyIgnore::No,
1858            },
1859        ];
1860        let r = ProxyRotator::new(&Some(proxies));
1861        // CDP: "cdp-only" (Http → not Chrome → included) + "both"
1862        assert_eq!(r.cdp_count(), 2);
1863        // WebDriver: "wd-only" (Chrome → not Http → included) + "both"
1864        assert_eq!(r.webdriver_count(), 2);
1865    }
1866
1867    #[test]
1868    fn test_proxy_rotator_empty_proxies() {
1869        let r = ProxyRotator::new(&None);
1870        assert_eq!(r.cdp_count(), 0);
1871        assert_eq!(r.webdriver_count(), 0);
1872        assert_eq!(r.next_cdp(), None);
1873        assert_eq!(r.next_webdriver(), None);
1874    }
1875
1876    #[test]
1877    fn test_proxy_rotator_concurrent_access() {
1878        let proxies = vec![
1879            RequestProxy {
1880                addr: "http://p1".into(),
1881                ignore: ProxyIgnore::No,
1882            },
1883            RequestProxy {
1884                addr: "http://p2".into(),
1885                ignore: ProxyIgnore::No,
1886            },
1887            RequestProxy {
1888                addr: "http://p3".into(),
1889                ignore: ProxyIgnore::No,
1890            },
1891        ];
1892        let r = Arc::new(ProxyRotator::new(&Some(proxies)));
1893
1894        let handles: Vec<_> = (0..10)
1895            .map(|_| {
1896                let r = r.clone();
1897                std::thread::spawn(move || {
1898                    for _ in 0..100 {
1899                        let addr = r.next_cdp().unwrap();
1900                        assert!(addr == "http://p1" || addr == "http://p2" || addr == "http://p3");
1901                    }
1902                })
1903            })
1904            .collect();
1905
1906        for h in handles {
1907            h.join().unwrap();
1908        }
1909    }
1910
1911    // ---- Binary Content-Type Detection ----
1912
1913    #[test]
1914    fn test_is_binary_content_type_images() {
1915        assert!(is_binary_content_type("image/png"));
1916        assert!(is_binary_content_type("image/jpeg"));
1917        assert!(is_binary_content_type("image/webp"));
1918        assert!(is_binary_content_type("image/svg+xml"));
1919        assert!(is_binary_content_type("image/gif"));
1920    }
1921
1922    #[test]
1923    fn test_is_binary_content_type_with_charset() {
1924        // Content-Type values often include charset or other params.
1925        assert!(is_binary_content_type("image/png; charset=utf-8"));
1926        assert!(is_binary_content_type(
1927            "application/pdf; boundary=something"
1928        ));
1929        assert!(is_binary_content_type("font/woff2; charset=binary"));
1930    }
1931
1932    #[test]
1933    fn test_is_binary_content_type_fonts() {
1934        assert!(is_binary_content_type("font/woff"));
1935        assert!(is_binary_content_type("font/woff2"));
1936        assert!(is_binary_content_type("font/ttf"));
1937        assert!(is_binary_content_type("application/vnd.ms-fontobject"));
1938        assert!(is_binary_content_type("application/x-font-ttf"));
1939        assert!(is_binary_content_type("application/x-font-woff"));
1940    }
1941
1942    #[test]
1943    fn test_is_binary_content_type_archives() {
1944        assert!(is_binary_content_type("application/pdf"));
1945        assert!(is_binary_content_type("application/zip"));
1946        assert!(is_binary_content_type("application/gzip"));
1947        assert!(is_binary_content_type("application/x-gzip"));
1948        assert!(is_binary_content_type("application/octet-stream"));
1949        assert!(is_binary_content_type("application/wasm"));
1950        assert!(is_binary_content_type("application/x-tar"));
1951        assert!(is_binary_content_type("application/x-bzip2"));
1952        assert!(is_binary_content_type("application/x-7z-compressed"));
1953        assert!(is_binary_content_type("application/x-rar-compressed"));
1954    }
1955
1956    #[test]
1957    fn test_is_binary_content_type_audio_video() {
1958        assert!(is_binary_content_type("audio/mpeg"));
1959        assert!(is_binary_content_type("audio/ogg"));
1960        assert!(is_binary_content_type("video/mp4"));
1961        assert!(is_binary_content_type("video/webm"));
1962    }
1963
1964    #[test]
1965    fn test_is_binary_content_type_html_not_binary() {
1966        assert!(!is_binary_content_type("text/html"));
1967        assert!(!is_binary_content_type("text/html; charset=utf-8"));
1968        assert!(!is_binary_content_type("text/plain"));
1969        assert!(!is_binary_content_type("application/json"));
1970        assert!(!is_binary_content_type("application/javascript"));
1971        assert!(!is_binary_content_type("text/css"));
1972        assert!(!is_binary_content_type("application/xml"));
1973    }
1974
1975    // ---- Asset URL Skip ----
1976
1977    #[test]
1978    fn test_should_skip_backend_for_asset_urls() {
1979        assert!(should_skip_backend_for_url(
1980            "https://example.com/photo.jpg",
1981            &[]
1982        ));
1983        assert!(should_skip_backend_for_url(
1984            "https://example.com/photo.png",
1985            &[]
1986        ));
1987        assert!(should_skip_backend_for_url(
1988            "https://example.com/font.woff2",
1989            &[]
1990        ));
1991        assert!(should_skip_backend_for_url(
1992            "https://example.com/doc.pdf",
1993            &[]
1994        ));
1995        assert!(should_skip_backend_for_url(
1996            "https://example.com/video.mp4",
1997            &[]
1998        ));
1999    }
2000
2001    #[test]
2002    fn test_should_not_skip_backend_for_html_urls() {
2003        assert!(!should_skip_backend_for_url(
2004            "https://example.com/page.html",
2005            &[]
2006        ));
2007        assert!(!should_skip_backend_for_url(
2008            "https://example.com/about",
2009            &[]
2010        ));
2011        assert!(!should_skip_backend_for_url(
2012            "https://example.com/api/data",
2013            &[]
2014        ));
2015        assert!(!should_skip_backend_for_url("https://example.com/", &[]));
2016    }
2017
2018    #[test]
2019    fn test_should_skip_backend_custom_extensions() {
2020        let extras = vec![
2021            crate::compact_str::CompactString::from("xml"),
2022            crate::compact_str::CompactString::from("rss"),
2023        ];
2024        assert!(should_skip_backend_for_url(
2025            "https://example.com/feed.xml",
2026            &extras
2027        ));
2028        assert!(should_skip_backend_for_url(
2029            "https://example.com/feed.rss",
2030            &extras
2031        ));
2032        assert!(should_skip_backend_for_url(
2033            "https://example.com/feed.RSS",
2034            &extras
2035        ));
2036        assert!(!should_skip_backend_for_url(
2037            "https://example.com/page.html",
2038            &extras
2039        ));
2040    }
2041
2042    // ---- BackendBytesGuard ----
2043    //
2044    // BACKEND_BYTES_IN_FLIGHT is a process-wide global atomic. Since tests
2045    // run in parallel threads, we consolidate all counter-sensitive assertions
2046    // into a single test function to guarantee sequential execution.
2047
2048    #[test]
2049    fn test_bytes_guard_all() {
2050        // --- acquire_unchecked + Drop ---
2051        let base = BackendBytesGuard::in_flight();
2052        {
2053            let g = BackendBytesGuard::acquire_unchecked(1000);
2054            assert_eq!(BackendBytesGuard::in_flight(), base + 1000);
2055            drop(g);
2056        }
2057        assert_eq!(BackendBytesGuard::in_flight(), base);
2058
2059        // --- try_acquire within limit ---
2060        let g = BackendBytesGuard::try_acquire(500, base + 1000);
2061        assert!(g.is_some());
2062        assert_eq!(BackendBytesGuard::in_flight(), base + 500);
2063        drop(g);
2064        assert_eq!(BackendBytesGuard::in_flight(), base);
2065
2066        // --- try_acquire exceeds limit ---
2067        let hold = BackendBytesGuard::acquire_unchecked(800);
2068        assert_eq!(BackendBytesGuard::in_flight(), base + 800);
2069        let g = BackendBytesGuard::try_acquire(300, base + 1000);
2070        assert!(g.is_none(), "should reject when would exceed limit");
2071        assert_eq!(BackendBytesGuard::in_flight(), base + 800);
2072        drop(hold);
2073        assert_eq!(BackendBytesGuard::in_flight(), base);
2074
2075        // --- try_acquire with limit=0 (unlimited) ---
2076        let g = BackendBytesGuard::try_acquire(1_000_000, 0);
2077        assert!(g.is_some(), "limit=0 means unlimited");
2078        assert_eq!(BackendBytesGuard::in_flight(), base + 1_000_000);
2079        drop(g);
2080        assert_eq!(BackendBytesGuard::in_flight(), base);
2081
2082        // --- multiple guards, selective drops ---
2083        let g1 = BackendBytesGuard::acquire_unchecked(100);
2084        let g2 = BackendBytesGuard::acquire_unchecked(200);
2085        let g3 = BackendBytesGuard::acquire_unchecked(300);
2086        assert_eq!(BackendBytesGuard::in_flight(), base + 600);
2087        drop(g2);
2088        assert_eq!(BackendBytesGuard::in_flight(), base + 400);
2089        drop(g1);
2090        drop(g3);
2091        assert_eq!(BackendBytesGuard::in_flight(), base);
2092
2093        // --- guard inside BackendResponse: full drop ---
2094        let resp = BackendResponse {
2095            page: crate::page::Page::default(),
2096            quality_score: 90,
2097            backend_index: 1,
2098            duration: Duration::from_millis(50),
2099            _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(5000)),
2100        };
2101        assert_eq!(BackendBytesGuard::in_flight(), base + 5000);
2102        drop(resp);
2103        assert_eq!(BackendBytesGuard::in_flight(), base);
2104
2105        // --- guard inside BackendResponse: partial move (page extracted) ---
2106        {
2107            let resp = BackendResponse {
2108                page: crate::page::Page::default(),
2109                quality_score: 90,
2110                backend_index: 0,
2111                duration: Duration::from_millis(10),
2112                _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(2000)),
2113            };
2114            assert_eq!(BackendBytesGuard::in_flight(), base + 2000);
2115            let _page = resp.page;
2116            // Remaining fields (including _bytes_guard) dropped at end of block.
2117        }
2118        assert_eq!(BackendBytesGuard::in_flight(), base);
2119
2120        // --- build_backend_futures: skips when byte limit exceeded ---
2121        let _hold = BackendBytesGuard::acquire_unchecked(1_000_000);
2122        let cfg = ParallelBackendsConfig {
2123            backends: vec![crate::configuration::BackendEndpoint {
2124                engine: crate::configuration::BackendEngine::Cdp,
2125                endpoint: Some("ws://localhost:9222".to_string()),
2126                binary_path: None,
2127                protocol: None,
2128                proxy: None,
2129            }],
2130            max_backend_bytes_in_flight: base + 500, // well below current
2131            ..Default::default()
2132        };
2133        let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
2134        let tracker = BackendTracker::new(2, 10);
2135        let futs = build_backend_futures(
2136            "https://example.com",
2137            &cfg,
2138            &crawl_cfg,
2139            &tracker,
2140            &None,
2141            &None,
2142        );
2143        assert!(
2144            futs.is_empty(),
2145            "should skip backends when byte limit exceeded"
2146        );
2147        drop(_hold);
2148        assert_eq!(BackendBytesGuard::in_flight(), base);
2149
2150        // --- thread safety: hammer the counter, verify no underflow ---
2151        let handles: Vec<_> = (0..8)
2152            .map(|_| {
2153                std::thread::spawn(|| {
2154                    for _ in 0..1000 {
2155                        let g = BackendBytesGuard::acquire_unchecked(100);
2156                        std::thread::yield_now();
2157                        drop(g);
2158                    }
2159                })
2160            })
2161            .collect();
2162        for h in handles {
2163            h.join().unwrap();
2164        }
2165        assert_eq!(
2166            BackendBytesGuard::in_flight(),
2167            base,
2168            "counter must return to baseline after concurrent thread usage"
2169        );
2170    }
2171
2172    #[tokio::test]
2173    async fn test_race_grace_zero_under_pressure_no_deadlock() {
2174        // Simulate what happens when grace=0 (critical memory pressure path).
2175        // This must not deadlock or panic.
2176        let tracker = BackendTracker::new(3, 10);
2177        let cfg = ParallelBackendsConfig {
2178            grace_period_ms: 0,
2179            ..Default::default()
2180        };
2181        let primary = mock_primary(50, 5);
2182        let alt = mock_alt(1, 95, 1);
2183        let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
2184        assert!(result.is_some());
2185    }
2186
2187    #[tokio::test]
2188    async fn test_race_backends_drops_futs_before_return() {
2189        // Verify that race_backends doesn't hold losing responses after
2190        // returning. The primary fast-accepts; the slow alt should be
2191        // aborted and dropped inside race_backends (not leaked to caller).
2192        let tracker = BackendTracker::new(2, 10);
2193        let cfg = test_config(200, 80);
2194
2195        // Primary: score=90 (above threshold=80), returns in 1ms.
2196        let primary = mock_primary(90, 1);
2197        // Alt: score=50, returns in 500ms — should be aborted.
2198        let alt = mock_alt(1, 50, 500);
2199
2200        let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
2201        assert!(result.is_some());
2202        let winner = result.unwrap();
2203        // Primary wins via fast-accept.
2204        assert_eq!(winner.backend_index, 0);
2205        assert_eq!(winner.quality_score, 90);
2206        // race_backends returned in ~1ms, not 500ms — alt was aborted.
2207    }
2208
2209    #[tokio::test]
2210    async fn test_race_backends_winner_replaces_losers() {
2211        // When multiple alts complete during grace, only the best is returned.
2212        // Others are dropped (not leaked).
2213        let tracker = BackendTracker::new(4, 10);
2214        let cfg = test_config(500, 95); // high threshold so grace period is used
2215
2216        let primary = mock_primary(40, 1); // low score, triggers grace
2217        let alt1 = mock_alt(1, 60, 5);
2218        let alt2 = mock_alt(2, 80, 10);
2219        let alt3 = mock_alt(3, 70, 15);
2220
2221        let result = race_backends(primary, vec![alt1, alt2, alt3], &cfg, &tracker).await;
2222        assert!(result.is_some());
2223        let winner = result.unwrap();
2224        // Best alt (score=80) should win.
2225        assert_eq!(winner.backend_index, 2);
2226        assert_eq!(winner.quality_score, 80);
2227        // The other responses (primary=40, alt1=60, alt3=70) were dropped
2228        // inside race_backends when futs was dropped before return.
2229    }
2230
2231    #[test]
2232    fn test_build_backend_futures_allows_when_byte_limit_not_exceeded() {
2233        let cfg = ParallelBackendsConfig {
2234            backends: vec![crate::configuration::BackendEndpoint {
2235                engine: crate::configuration::BackendEngine::Cdp,
2236                endpoint: Some("ws://localhost:9222".to_string()),
2237                binary_path: None,
2238                protocol: None,
2239                proxy: None,
2240            }],
2241            max_backend_bytes_in_flight: usize::MAX,
2242            ..Default::default()
2243        };
2244        let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
2245        let tracker = BackendTracker::new(2, 10);
2246        // Should not panic or deadlock regardless of feature flags.
2247        let _futs = build_backend_futures(
2248            "https://example.com",
2249            &cfg,
2250            &crawl_cfg,
2251            &tracker,
2252            &None,
2253            &None,
2254        );
2255    }
2256}