1use std::future::Future;
4use std::pin::Pin;
5use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
6use std::time::Duration;
7#[cfg(any(feature = "chrome", feature = "webdriver"))]
8use std::time::Instant;
9
10use crate::configuration::{
11 BackendEndpoint, BackendEngine, BackendProtocol, ParallelBackendsConfig, ProxyIgnore,
12 RequestProxy,
13};
14use crate::page::AntiBotTech;
15use reqwest::StatusCode;
16
17static BACKEND_BYTES_IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
26
27pub struct BackendBytesGuard(usize);
33
34impl BackendBytesGuard {
35 pub fn try_acquire(n: usize, limit: usize) -> Option<Self> {
39 if limit == 0 {
40 BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
42 return Some(Self(n));
43 }
44 let mut current = BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed);
46 loop {
47 if current.saturating_add(n) > limit {
48 return None;
49 }
50 match BACKEND_BYTES_IN_FLIGHT.compare_exchange_weak(
51 current,
52 current + n,
53 Ordering::Relaxed,
54 Ordering::Relaxed,
55 ) {
56 Ok(_) => return Some(Self(n)),
57 Err(actual) => current = actual,
58 }
59 }
60 }
61
62 pub fn acquire_unchecked(n: usize) -> Self {
65 BACKEND_BYTES_IN_FLIGHT.fetch_add(n, Ordering::Relaxed);
66 Self(n)
67 }
68
69 pub fn in_flight() -> usize {
71 BACKEND_BYTES_IN_FLIGHT.load(Ordering::Relaxed)
72 }
73}
74
75impl Drop for BackendBytesGuard {
76 fn drop(&mut self) {
77 BACKEND_BYTES_IN_FLIGHT.fetch_sub(self.0, Ordering::Relaxed);
78 }
79}
80
81pub fn is_binary_content_type(ct: &str) -> bool {
89 let ct = ct.split(';').next().unwrap_or(ct).trim();
90 ct.starts_with("image/")
91 || ct.starts_with("audio/")
92 || ct.starts_with("video/")
93 || ct.starts_with("font/")
94 || ct == "application/pdf"
95 || ct == "application/zip"
96 || ct == "application/gzip"
97 || ct == "application/x-gzip"
98 || ct == "application/octet-stream"
99 || ct == "application/wasm"
100 || ct == "application/x-tar"
101 || ct == "application/x-bzip2"
102 || ct == "application/x-7z-compressed"
103 || ct == "application/x-rar-compressed"
104 || ct == "application/vnd.ms-fontobject"
105 || ct == "application/x-font-ttf"
106 || ct == "application/x-font-woff"
107}
108
109pub fn should_skip_backend_for_url(
112 url: &str,
113 extra_extensions: &[crate::compact_str::CompactString],
114) -> bool {
115 if crate::page::is_asset_url(url) {
117 return true;
118 }
119 if !extra_extensions.is_empty() {
121 if let Some(pos) = url.rfind('.') {
122 let ext = &url[pos + 1..];
123 if ext.len() >= 2 {
124 let ext_lower = ext.to_ascii_lowercase();
125 for skip in extra_extensions {
126 if skip.eq_ignore_ascii_case(&ext_lower) {
127 return true;
128 }
129 }
130 }
131 }
132 }
133 false
134}
135
136#[derive(Default)]
142pub struct ValidationResult {
143 pub score_override: Option<u16>,
146 pub score_adjust: i16,
150 pub reject: bool,
152}
153
154pub type QualityValidator = std::sync::Arc<
160 dyn Fn(
161 Option<&[u8]>, StatusCode, &str, &str, ) -> ValidationResult
166 + Send
167 + Sync,
168>;
169
170pub fn html_quality_score_validated(
177 content: Option<&[u8]>,
178 status_code: StatusCode,
179 anti_bot: &AntiBotTech,
180 url: &str,
181 source: &str,
182 validator: Option<&QualityValidator>,
183) -> u16 {
184 let base = html_quality_score(content, status_code, anti_bot);
185
186 if let Some(v) = validator {
187 let result = v(content, status_code, url, source);
188 if result.reject {
189 return 0;
190 }
191 if let Some(ov) = result.score_override {
192 return ov.min(100);
193 }
194 let adjusted = (base as i16).saturating_add(result.score_adjust);
195 return (adjusted.max(0) as u16).min(100);
196 }
197
198 base
199}
200
201pub fn html_quality_score(
206 content: Option<&[u8]>,
207 status_code: StatusCode,
208 anti_bot: &AntiBotTech,
209) -> u16 {
210 let mut score: u16 = 0;
211
212 if status_code == StatusCode::OK {
214 score += 30;
215 } else if status_code.is_success() {
216 score += 20;
217 } else if status_code.is_redirection() {
218 score += 5;
219 }
220 if let Some(body) = content {
223 let len = body.len();
224
225 if len > 0 {
227 score += 5;
228 }
229 if len > 512 {
230 score += 10;
231 }
232 if len > 4096 {
233 score += 10;
234 }
235
236 if memchr::memmem::find(body, b"<body").is_some()
238 || memchr::memmem::find(body, b"<BODY").is_some()
239 {
240 score += 15;
241 }
242
243 if !crate::utils::is_cacheable_body_empty(body) {
245 score += 10;
246 }
247 }
248
249 if *anti_bot == AntiBotTech::None {
251 score += 20;
252 }
253
254 score.min(100)
255}
256
257struct BackendStats {
263 wins: AtomicU64,
264 races: AtomicU64,
265 ema_ms: AtomicU64,
266 consecutive_errors: AtomicU64,
267 disabled: AtomicBool,
268}
269
270impl BackendStats {
271 fn new() -> Self {
272 Self {
273 wins: AtomicU64::new(0),
274 races: AtomicU64::new(0),
275 ema_ms: AtomicU64::new(0),
276 consecutive_errors: AtomicU64::new(0),
277 disabled: AtomicBool::new(false),
278 }
279 }
280}
281
282impl Clone for BackendStats {
283 fn clone(&self) -> Self {
284 Self {
285 wins: AtomicU64::new(self.wins.load(Ordering::Relaxed)),
286 races: AtomicU64::new(self.races.load(Ordering::Relaxed)),
287 ema_ms: AtomicU64::new(self.ema_ms.load(Ordering::Relaxed)),
288 consecutive_errors: AtomicU64::new(self.consecutive_errors.load(Ordering::Relaxed)),
289 disabled: AtomicBool::new(self.disabled.load(Ordering::Relaxed)),
290 }
291 }
292}
293
294pub struct BackendTracker {
299 stats: Vec<BackendStats>,
300 max_consecutive_errors: u64,
301}
302
303impl BackendTracker {
304 pub fn new(count: usize, max_consecutive_errors: u16) -> Self {
306 let mut stats = Vec::with_capacity(count);
307 for _ in 0..count {
308 stats.push(BackendStats::new());
309 }
310 Self {
311 stats,
312 max_consecutive_errors: max_consecutive_errors as u64,
313 }
314 }
315
316 pub fn record_win(&self, idx: usize) {
318 if let Some(s) = self.stats.get(idx) {
319 s.wins.fetch_add(1, Ordering::Relaxed);
320 }
321 }
322
323 pub fn record_race(&self, idx: usize) {
325 if let Some(s) = self.stats.get(idx) {
326 s.races.fetch_add(1, Ordering::Relaxed);
327 }
328 }
329
330 pub fn record_duration(&self, idx: usize, dur: Duration) {
332 if let Some(s) = self.stats.get(idx) {
333 let ms = dur.as_millis() as u64;
334 let count = s.races.load(Ordering::Relaxed);
335 if count <= 1 {
336 s.ema_ms.store(ms, Ordering::Relaxed);
337 } else {
338 let _ = s
340 .ema_ms
341 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |old| {
342 Some((old * 4 + ms) / 5)
343 });
344 }
345 }
346 }
347
348 pub fn record_error(&self, idx: usize) {
356 if let Some(s) = self.stats.get(idx) {
357 let prev = s.consecutive_errors.fetch_add(1, Ordering::Relaxed);
358
359 let never_succeeded =
362 s.wins.load(Ordering::Relaxed) == 0 && s.races.load(Ordering::Relaxed) <= 1;
363
364 if never_succeeded || prev + 1 >= self.max_consecutive_errors {
365 s.disabled.store(true, Ordering::Relaxed);
366 if never_succeeded {
367 log::info!(
368 "[parallel_backends] backend {} failed on probe (first request) — auto-disabled",
369 idx
370 );
371 }
372 }
373 }
374 }
375
376 pub fn record_success(&self, idx: usize) {
378 if let Some(s) = self.stats.get(idx) {
379 s.consecutive_errors.store(0, Ordering::Relaxed);
380 }
381 }
382
383 pub fn is_disabled(&self, idx: usize) -> bool {
385 self.stats
386 .get(idx)
387 .is_none_or(|s| s.disabled.load(Ordering::Relaxed))
388 }
389
390 pub fn wins(&self, idx: usize) -> u64 {
392 self.stats
393 .get(idx)
394 .map_or(0, |s| s.wins.load(Ordering::Relaxed))
395 }
396
397 pub fn races(&self, idx: usize) -> u64 {
399 self.stats
400 .get(idx)
401 .map_or(0, |s| s.races.load(Ordering::Relaxed))
402 }
403
404 pub fn ema_ms(&self, idx: usize) -> u64 {
406 self.stats
407 .get(idx)
408 .map_or(0, |s| s.ema_ms.load(Ordering::Relaxed))
409 }
410
411 pub fn consecutive_errors(&self, idx: usize) -> u64 {
413 self.stats
414 .get(idx)
415 .map_or(0, |s| s.consecutive_errors.load(Ordering::Relaxed))
416 }
417
418 pub fn win_rate_pct(&self, idx: usize) -> u64 {
420 let r = self.races(idx);
421 if r == 0 {
422 return 0;
423 }
424 self.wins(idx) * 100 / r
425 }
426
427 pub fn len(&self) -> usize {
429 self.stats.len()
430 }
431
432 pub fn is_empty(&self) -> bool {
434 self.stats.is_empty()
435 }
436}
437
438impl Clone for BackendTracker {
439 fn clone(&self) -> Self {
440 Self {
441 stats: self.stats.clone(),
442 max_consecutive_errors: self.max_consecutive_errors,
443 }
444 }
445}
446
447pub struct BackendResponse {
453 pub page: crate::page::Page,
455 pub quality_score: u16,
457 pub backend_index: usize,
459 pub duration: Duration,
461 pub _bytes_guard: Option<BackendBytesGuard>,
465}
466
467pub struct BackendResult {
470 pub backend_index: usize,
472 pub response: Option<BackendResponse>,
474}
475
476pub fn backend_source_name(endpoint: &BackendEndpoint) -> &'static str {
478 match endpoint.engine {
479 BackendEngine::Cdp => "cdp",
480 BackendEngine::Servo => "servo",
481 BackendEngine::Custom => "custom",
482 }
483}
484
485pub fn resolve_protocol(endpoint: &BackendEndpoint) -> BackendProtocol {
487 if let Some(ref p) = endpoint.protocol {
488 return p.clone();
489 }
490 match endpoint.engine {
491 BackendEngine::Cdp => BackendProtocol::Cdp,
492 BackendEngine::Servo => BackendProtocol::WebDriver,
493 BackendEngine::Custom => {
494 if let Some(ref ep) = endpoint.endpoint {
496 if ep.starts_with("ws://") || ep.starts_with("wss://") {
497 return BackendProtocol::Cdp;
498 }
499 }
500 BackendProtocol::WebDriver }
502 }
503}
504
505#[inline]
507pub fn tag_page_source(page: &mut crate::page::Page, source: &str) {
508 page.backend_source = Some(crate::compact_str::CompactString::from(source));
509}
510
511pub async fn race_backends(
528 primary: Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>>,
529 alternatives: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>>,
530 config: &ParallelBackendsConfig,
531 tracker: &BackendTracker,
532) -> Option<BackendResponse> {
533 if !config.enabled || alternatives.is_empty() {
534 let resp = primary.await;
536 if let Some(ref r) = resp {
537 tracker.record_race(r.backend_index);
538 tracker.record_win(r.backend_index);
539 tracker.record_duration(r.backend_index, r.duration);
540 tracker.record_success(r.backend_index);
541 }
542 return resp;
543 }
544
545 let total = 1 + alternatives.len();
546
547 let primary_jitter_us = {
552 use std::collections::hash_map::DefaultHasher;
553 use std::hash::{Hash, Hasher};
554 let mut h = DefaultHasher::new();
555 std::time::SystemTime::now().hash(&mut h);
556 0u16.hash(&mut h); h.finish() % 1000 };
559
560 let primary_wrapped: Pin<Box<dyn Future<Output = BackendResult> + Send>> =
561 Box::pin(async move {
562 if primary_jitter_us > 0 {
563 tokio::time::sleep(Duration::from_micros(primary_jitter_us)).await;
564 }
565 let response = primary.await;
566 BackendResult {
567 backend_index: 0,
568 response,
569 }
570 });
571
572 let mut futs = tokio::task::JoinSet::new();
573 futs.spawn(primary_wrapped);
574 for alt in alternatives {
575 futs.spawn(alt);
576 }
577
578 let grace = {
581 let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
582 if mem_state >= 2 {
583 Duration::ZERO
584 } else if mem_state >= 1 {
585 Duration::from_millis(config.grace_period_ms / 2)
586 } else {
587 Duration::from_millis(config.grace_period_ms)
588 }
589 };
590 let threshold = config.fast_accept_threshold;
591
592 let mut best: Option<BackendResponse> = None;
593 let mut completed = 0usize;
594 let mut grace_deadline: Option<tokio::time::Instant> = None;
595
596 loop {
597 if completed >= total {
598 break;
599 }
600
601 let result = if let Some(deadline) = grace_deadline {
602 tokio::select! {
603 biased;
604 res = futs.join_next() => res,
605 _ = tokio::time::sleep_until(deadline) => break,
606 }
607 } else {
608 futs.join_next().await
609 };
610
611 match result {
612 Some(Ok(br)) => {
613 completed += 1;
614 let idx = br.backend_index;
615
616 match br.response {
617 Some(resp) => {
618 tracker.record_race(idx);
619 tracker.record_duration(idx, resp.duration);
620 tracker.record_success(idx);
621
622 let score = resp.quality_score;
623
624 if best.is_none() && score >= threshold {
626 tracker.record_win(idx);
627 return Some(resp);
628 }
629
630 let dominated = match &best {
631 Some(b) => score > b.quality_score,
632 None => true,
633 };
634 if dominated {
635 best = Some(resp);
636 }
637
638 if grace_deadline.is_none() {
639 grace_deadline = Some(tokio::time::Instant::now() + grace);
640 }
641 }
642 None => {
643 tracker.record_race(idx);
645 tracker.record_error(idx);
646 }
647 }
648 }
649 Some(Err(_join_err)) => {
650 completed += 1;
651 }
652 None => {
653 break;
654 }
655 }
656 }
657
658 futs.abort_all();
663 drop(futs);
664
665 if let Some(ref b) = best {
666 tracker.record_win(b.backend_index);
667 }
668
669 best
670}
671
672pub struct ProxyRotator {
681 cdp_addrs: Vec<String>,
683 wd_addrs: Vec<String>,
685 cdp_index: AtomicUsize,
686 wd_index: AtomicUsize,
687}
688
689impl ProxyRotator {
690 pub fn new(proxies: &Option<Vec<RequestProxy>>) -> Self {
692 let (mut cdp, mut wd) = (Vec::new(), Vec::new());
693
694 if let Some(proxies) = proxies {
695 for p in proxies {
696 if p.ignore != ProxyIgnore::Chrome {
697 cdp.push(p.addr.clone());
698 }
699 if p.ignore != ProxyIgnore::Http {
700 wd.push(p.addr.clone());
701 }
702 }
703 }
704
705 Self {
706 cdp_addrs: cdp,
707 wd_addrs: wd,
708 cdp_index: AtomicUsize::new(0),
709 wd_index: AtomicUsize::new(0),
710 }
711 }
712
713 pub fn next_cdp(&self) -> Option<&str> {
715 let len = self.cdp_addrs.len();
716 if len == 0 {
717 return None;
718 }
719 let idx = self.cdp_index.fetch_add(1, Ordering::Relaxed) % len;
720 self.cdp_addrs.get(idx).map(|s| s.as_str())
721 }
722
723 pub fn next_webdriver(&self) -> Option<&str> {
725 let len = self.wd_addrs.len();
726 if len == 0 {
727 return None;
728 }
729 let idx = self.wd_index.fetch_add(1, Ordering::Relaxed) % len;
730 self.wd_addrs.get(idx).map(|s| s.as_str())
731 }
732
733 pub fn cdp_count(&self) -> usize {
735 self.cdp_addrs.len()
736 }
737
738 pub fn webdriver_count(&self) -> usize {
740 self.wd_addrs.len()
741 }
742}
743
744impl Clone for ProxyRotator {
745 fn clone(&self) -> Self {
746 Self {
747 cdp_addrs: self.cdp_addrs.clone(),
748 wd_addrs: self.wd_addrs.clone(),
749 cdp_index: AtomicUsize::new(self.cdp_index.load(Ordering::Relaxed)),
750 wd_index: AtomicUsize::new(self.wd_index.load(Ordering::Relaxed)),
751 }
752 }
753}
754
755#[cfg(feature = "chrome")]
765pub async fn fetch_cdp(
766 url: &str,
767 endpoint: &str,
768 config: &std::sync::Arc<crate::configuration::Configuration>,
769 backend_index: usize,
770 connect_timeout: Duration,
771 proxy: Option<String>,
772 source_name: &str,
773) -> Option<BackendResponse> {
774 let start = Instant::now();
775 let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
776
777 let handler_config = crate::features::chrome::create_handler_config(config);
782
783 let connect_result = tokio::time::timeout(
785 connect_timeout,
786 chromiumoxide::Browser::connect_with_config(endpoint, handler_config),
787 )
788 .await;
789
790 let (mut browser, handler_handle) = match connect_result {
791 Ok(Ok((browser, mut handler))) => {
792 let h = tokio::spawn(async move {
793 use crate::tokio_stream::StreamExt;
794 while let Some(_) = handler.next().await {}
795 });
796 (browser, h)
797 }
798 Ok(Err(e)) => {
799 log::warn!("{} CDP connect failed ({}): {:?}", source_name, endpoint, e);
800 return None;
801 }
802 Err(_) => {
803 log::warn!("{} CDP connect timed out ({})", source_name, endpoint);
804 return None;
805 }
806 };
807
808 if let Some(ref proxy_addr) = proxy {
811 let mut ctx_params =
812 chromiumoxide::cdp::browser_protocol::target::CreateBrowserContextParams::default();
813 ctx_params.dispose_on_detach = Some(true);
814 ctx_params.proxy_server = Some(proxy_addr.clone());
815 if let Ok(ctx) = browser.create_browser_context(ctx_params).await {
816 let _ = browser.send_new_context(ctx).await;
817 } else {
818 log::warn!(
819 "{} proxy browser context failed for {}, continuing without proxy",
820 source_name,
821 proxy_addr
822 );
823 }
824 }
825
826 let page = match browser.pages().await {
828 Ok(mut p) if !p.is_empty() => p.swap_remove(0),
829 _ => match browser.new_page(url).await {
830 Ok(p) => p,
831 Err(e) => {
832 log::warn!("{} page failed: {:?}", source_name, e);
833 handler_handle.abort();
834 return None;
835 }
836 },
837 };
838
839 crate::features::chrome::setup_chrome_events(&page, config).await;
841
842 let _intercept_handle = crate::features::chrome::setup_chrome_interception_base(
844 &page,
845 config.chrome_intercept.enabled,
846 &config.auth_challenge_response,
847 config.chrome_intercept.block_visuals,
848 "",
849 )
850 .await;
851
852 match tokio::time::timeout(timeout, page.goto(url)).await {
854 Ok(Ok(_)) => {}
855 Ok(Err(e)) => {
856 log::warn!("{} navigate failed for {}: {:?}", source_name, url, e);
857 handler_handle.abort();
858 return None;
859 }
860 Err(_) => {
861 log::warn!("{} navigate timed out for {}", source_name, url);
862 handler_handle.abort();
863 return None;
864 }
865 }
866
867 #[cfg(feature = "chrome")]
869 if let Some(ref wf) = config.wait_for {
870 if let Some(ref delay) = wf.delay {
871 if let Some(ms) = delay.timeout {
872 tokio::time::sleep(ms).await;
873 }
874 }
875 }
876
877 let html_result = tokio::time::timeout(Duration::from_secs(10), page.outer_html_bytes()).await;
879
880 handler_handle.abort();
882
883 let html_bytes: Vec<u8> = match html_result {
884 Ok(Ok(b)) => b.to_vec(),
885 Ok(Err(e)) => {
886 log::warn!(
887 "{} outer_html_bytes() failed for {}: {:?}",
888 source_name,
889 url,
890 e
891 );
892 return None;
893 }
894 Err(_) => {
895 log::warn!("{} outer_html_bytes() timed out for {}", source_name, url);
896 return None;
897 }
898 };
899
900 let dur = start.elapsed();
901 let status = StatusCode::OK;
902
903 let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
904 let byte_len = html_bytes.len();
905 let res = crate::utils::PageResponse {
906 content: Some(html_bytes),
907 status_code: status,
908 ..Default::default()
909 };
910 let mut page = crate::page::build(url, res);
911 tag_page_source(&mut page, source_name);
912
913 Some(BackendResponse {
914 page,
915 quality_score: score,
916 backend_index,
917 duration: dur,
918 _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
919 })
920}
921
922#[cfg(feature = "webdriver")]
926pub async fn fetch_webdriver(
927 url: &str,
928 endpoint: &str,
929 config: &std::sync::Arc<crate::configuration::Configuration>,
930 backend_index: usize,
931 connect_timeout: Duration,
932 proxy: Option<String>,
933 source_name: &str,
934) -> Option<BackendResponse> {
935 use crate::features::webdriver_common::{WebDriverBrowser, WebDriverConfig};
936
937 let start = Instant::now();
938 let timeout = config.request_timeout.unwrap_or(Duration::from_secs(15));
939
940 let wd_config = WebDriverConfig {
942 server_url: endpoint.to_string(),
943 browser: WebDriverBrowser::Chrome, headless: true,
945 timeout: Some(connect_timeout),
946 proxy, user_agent: config.user_agent.as_ref().map(|ua| ua.to_string()),
948 viewport_width: config.viewport.as_ref().map(|v| v.width),
949 viewport_height: config.viewport.as_ref().map(|v| v.height),
950 accept_insecure_certs: config.accept_invalid_certs,
951 ..Default::default()
952 };
953
954 let controller_opt = tokio::time::timeout(
956 connect_timeout,
957 crate::features::webdriver::launch_driver_base(&wd_config, config),
958 )
959 .await;
960
961 let mut controller = match controller_opt {
962 Ok(Some(c)) => c,
963 Ok(None) => {
964 log::warn!("{} WebDriver connect failed ({})", source_name, endpoint);
965 return None;
966 }
967 Err(_) => {
968 log::warn!("{} WebDriver connect timed out ({})", source_name, endpoint);
969 return None;
970 }
971 };
972
973 let driver = controller.driver().clone();
974
975 match tokio::time::timeout(timeout, driver.goto(url)).await {
977 Ok(Ok(_)) => {}
978 Ok(Err(e)) => {
979 log::warn!(
980 "{} WebDriver navigate failed for {}: {:?}",
981 source_name,
982 url,
983 e
984 );
985 controller.dispose();
986 return None;
987 }
988 Err(_) => {
989 log::warn!("{} WebDriver navigate timed out for {}", source_name, url);
990 controller.dispose();
991 return None;
992 }
993 }
994
995 let source = match tokio::time::timeout(Duration::from_secs(10), driver.source()).await {
997 Ok(Ok(s)) => s,
998 Ok(Err(e)) => {
999 log::warn!(
1000 "{} WebDriver source failed for {}: {:?}",
1001 source_name,
1002 url,
1003 e
1004 );
1005 controller.dispose();
1006 return None;
1007 }
1008 Err(_) => {
1009 log::warn!("{} WebDriver source timed out for {}", source_name, url);
1010 controller.dispose();
1011 return None;
1012 }
1013 };
1014
1015 controller.dispose();
1016
1017 let dur = start.elapsed();
1018 let html_bytes = source.into_bytes();
1019 let status = StatusCode::OK;
1020
1021 let score = html_quality_score(Some(&html_bytes), status, &AntiBotTech::None);
1022 let byte_len = html_bytes.len();
1023 let res = crate::utils::PageResponse {
1024 content: Some(html_bytes),
1025 status_code: status,
1026 ..Default::default()
1027 };
1028 let mut page = crate::page::build(url, res);
1029 tag_page_source(&mut page, source_name);
1030
1031 Some(BackendResponse {
1032 page,
1033 quality_score: score,
1034 backend_index,
1035 duration: dur,
1036 _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(byte_len)),
1037 })
1038}
1039
1040#[allow(unused_variables)]
1052pub fn build_backend_futures(
1053 url: &str,
1054 config: &ParallelBackendsConfig,
1055 crawl_config: &std::sync::Arc<crate::configuration::Configuration>,
1056 tracker: &BackendTracker,
1057 proxy_rotator: &Option<std::sync::Arc<ProxyRotator>>,
1058 semaphore: &Option<std::sync::Arc<tokio::sync::Semaphore>>,
1059) -> Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> {
1060 if should_skip_backend_for_url(url, &config.skip_extensions) {
1063 log::debug!(
1064 "[parallel_backends] skipping backends for asset URL: {}",
1065 url
1066 );
1067 return Vec::new();
1068 }
1069
1070 let byte_limit = config.max_backend_bytes_in_flight;
1074 if byte_limit > 0 && BackendBytesGuard::in_flight() >= byte_limit {
1075 log::debug!(
1076 "[parallel_backends] skipping backends — in-flight bytes ({}) >= limit ({})",
1077 BackendBytesGuard::in_flight(),
1078 byte_limit,
1079 );
1080 return Vec::new();
1081 }
1082
1083 let mem_state = crate::utils::detect_system::get_process_memory_state_sync();
1087 if mem_state >= 2 {
1088 log::debug!("[parallel_backends] skipping all backends — process memory critical");
1089 return Vec::new();
1090 }
1091 let mem_pressure = mem_state >= 1;
1092
1093 let outer_timeout = if config.backend_timeout_ms > 0 {
1097 Some(Duration::from_millis(config.backend_timeout_ms))
1098 } else {
1099 None
1100 };
1101 let backend_timeout_ms_log = config.backend_timeout_ms;
1102
1103 #[allow(unused_mut)]
1104 let mut futs: Vec<Pin<Box<dyn Future<Output = BackendResult> + Send>>> = Vec::new();
1105
1106 for (i, backend) in config.backends.iter().enumerate() {
1107 let backend_index = i + 1; if mem_pressure && !futs.is_empty() {
1111 break;
1112 }
1113
1114 if tracker.is_disabled(backend_index) {
1115 continue;
1116 }
1117
1118 #[allow(unused_variables)]
1121 let resolved_endpoint = if let Some(ref ep) = backend.endpoint {
1122 ep.clone()
1123 } else if backend.binary_path.is_some() {
1124 log::debug!(
1125 "{:?} local mode not yet implemented, skipping",
1126 backend.engine
1127 );
1128 continue;
1129 } else {
1130 log::debug!(
1131 "{:?} backend has no endpoint or binary_path, skipping",
1132 backend.engine
1133 );
1134 continue;
1135 };
1136
1137 let proto = resolve_protocol(backend);
1138 let _source_name = backend_source_name(backend);
1139
1140 #[allow(unused_variables)]
1142 let resolved_proxy: Option<String> = if backend.proxy.is_some() {
1143 backend.proxy.clone()
1144 } else if let Some(ref rotator) = proxy_rotator {
1145 match proto {
1146 BackendProtocol::Cdp => rotator.next_cdp().map(|s| s.to_string()),
1147 BackendProtocol::WebDriver => rotator.next_webdriver().map(|s| s.to_string()),
1148 }
1149 } else {
1150 None
1151 };
1152
1153 let jitter_us = {
1157 use std::collections::hash_map::DefaultHasher;
1158 use std::hash::{Hash, Hasher};
1159 let mut hasher = DefaultHasher::new();
1160 url.hash(&mut hasher);
1161 backend_index.hash(&mut hasher);
1162 std::time::SystemTime::now().hash(&mut hasher);
1163 hasher.finish() % 1000 };
1165
1166 let connect_timeout = Duration::from_millis(config.connect_timeout_ms);
1167
1168 let sem = semaphore.clone();
1170
1171 match proto {
1172 #[cfg(feature = "chrome")]
1173 BackendProtocol::Cdp => {
1174 let url = url.to_string();
1175 let cfg = crawl_config.clone(); let proxy = resolved_proxy.clone();
1177 let source = backend_source_name(backend).to_string();
1178 futs.push(Box::pin(async move {
1179 let work = async {
1185 let _permit = if let Some(ref s) = sem {
1189 match tokio::time::timeout(
1190 Duration::from_secs(10),
1191 s.acquire(),
1192 )
1193 .await
1194 {
1195 Ok(Ok(p)) => Some(p),
1196 _ => {
1197 log::warn!(
1198 "[parallel_backends] {} backend {} semaphore timeout for {}",
1199 source, backend_index, url
1200 );
1201 return BackendResult {
1202 backend_index,
1203 response: None,
1204 };
1205 }
1206 }
1207 } else {
1208 None
1209 };
1210 tokio::time::sleep(Duration::from_micros(jitter_us)).await;
1211 let response = fetch_cdp(
1212 &url,
1213 &resolved_endpoint,
1214 &cfg,
1215 backend_index,
1216 connect_timeout,
1217 proxy,
1218 &source,
1219 )
1220 .await;
1221 BackendResult {
1222 backend_index,
1223 response,
1224 }
1225 };
1226 match outer_timeout {
1227 Some(deadline) => match tokio::time::timeout(deadline, work).await {
1228 Ok(r) => r,
1229 Err(_) => {
1230 log::warn!(
1231 "[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
1232 source, backend_index, backend_timeout_ms_log, url
1233 );
1234 BackendResult {
1235 backend_index,
1236 response: None,
1237 }
1238 }
1239 },
1240 None => work.await,
1241 }
1242 }));
1243 }
1244 #[cfg(feature = "webdriver")]
1245 BackendProtocol::WebDriver => {
1246 let url = url.to_string();
1247 let cfg = crawl_config.clone(); let proxy = resolved_proxy.clone();
1249 let source = backend_source_name(backend).to_string();
1250 futs.push(Box::pin(async move {
1251 let work = async {
1254 let _permit = if let Some(ref s) = sem {
1256 match tokio::time::timeout(
1257 Duration::from_secs(10),
1258 s.acquire(),
1259 )
1260 .await
1261 {
1262 Ok(Ok(p)) => Some(p),
1263 _ => {
1264 log::warn!(
1265 "[parallel_backends] {} backend {} semaphore timeout for {}",
1266 source, backend_index, url
1267 );
1268 return BackendResult {
1269 backend_index,
1270 response: None,
1271 };
1272 }
1273 }
1274 } else {
1275 None
1276 };
1277 tokio::time::sleep(Duration::from_micros(jitter_us)).await;
1278 let response = fetch_webdriver(
1279 &url,
1280 &resolved_endpoint,
1281 &cfg,
1282 backend_index,
1283 connect_timeout,
1284 proxy,
1285 &source,
1286 )
1287 .await;
1288 BackendResult {
1289 backend_index,
1290 response,
1291 }
1292 };
1293 match outer_timeout {
1294 Some(deadline) => match tokio::time::timeout(deadline, work).await {
1295 Ok(r) => r,
1296 Err(_) => {
1297 log::warn!(
1298 "[parallel_backends] {} backend {} hard timeout ({}ms) for {}",
1299 source, backend_index, backend_timeout_ms_log, url
1300 );
1301 BackendResult {
1302 backend_index,
1303 response: None,
1304 }
1305 }
1306 },
1307 None => work.await,
1308 }
1309 }));
1310 }
1311 #[allow(unreachable_patterns)]
1313 _ => {}
1314 }
1315 }
1316
1317 futs
1318}
1319
1320#[cfg(test)]
1325mod tests {
1326 use super::*;
1327 use std::sync::Arc;
1328
1329 fn make_html(body_content: &str) -> Vec<u8> {
1332 format!(
1333 "<html><head><title>T</title></head><body>{}</body></html>",
1334 body_content
1335 )
1336 .into_bytes()
1337 }
1338
1339 #[test]
1340 fn test_quality_score_perfect_response() {
1341 let body = make_html(&"x".repeat(5000));
1342 let score = html_quality_score(Some(&body), StatusCode::OK, &AntiBotTech::None);
1343 assert_eq!(score, 100);
1345 }
1346
1347 #[test]
1348 fn test_quality_score_empty_body() {
1349 let score = html_quality_score(Some(&[]), StatusCode::OK, &AntiBotTech::None);
1350 assert_eq!(score, 50);
1352 }
1353
1354 #[test]
1355 fn test_quality_score_none_content() {
1356 let score = html_quality_score(None, StatusCode::OK, &AntiBotTech::None);
1357 assert_eq!(score, 50);
1359 }
1360
1361 #[test]
1362 fn test_quality_score_empty_html_shell() {
1363 let body = b"<html><head></head><body></body></html>";
1364 let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
1365 assert_eq!(score, 70);
1367 }
1368
1369 #[test]
1370 fn test_quality_score_antibot_cloudflare() {
1371 let body = make_html("blocked");
1372 let score =
1373 html_quality_score(Some(&body), StatusCode::FORBIDDEN, &AntiBotTech::Cloudflare);
1374 assert_eq!(score, 30);
1376 }
1377
1378 #[test]
1379 fn test_quality_score_server_error() {
1380 let body = make_html("error");
1381 let score = html_quality_score(
1382 Some(&body),
1383 StatusCode::INTERNAL_SERVER_ERROR,
1384 &AntiBotTech::None,
1385 );
1386 assert_eq!(score, 50);
1388 }
1389
1390 #[test]
1391 fn test_quality_score_redirect() {
1392 let score = html_quality_score(None, StatusCode::MOVED_PERMANENTLY, &AntiBotTech::None);
1393 assert_eq!(score, 25);
1395 }
1396
1397 #[test]
1398 fn test_quality_score_small_body_with_body_tag() {
1399 let body = b"<html><body>hi</body></html>";
1400 let score = html_quality_score(Some(body), StatusCode::OK, &AntiBotTech::None);
1401 assert_eq!(score, 80);
1403 }
1404
1405 #[test]
1406 fn test_quality_score_large_body_no_body_tag() {
1407 let body = "x".repeat(5000);
1408 let score = html_quality_score(Some(body.as_bytes()), StatusCode::OK, &AntiBotTech::None);
1409 assert_eq!(score, 85);
1411 }
1412
1413 #[test]
1416 fn test_tracker_new_defaults() {
1417 let t = BackendTracker::new(3, 10);
1418 assert_eq!(t.len(), 3);
1419 assert!(!t.is_empty());
1420 for i in 0..3 {
1421 assert_eq!(t.wins(i), 0);
1422 assert_eq!(t.races(i), 0);
1423 assert_eq!(t.ema_ms(i), 0);
1424 assert_eq!(t.consecutive_errors(i), 0);
1425 assert!(!t.is_disabled(i));
1426 }
1427 assert!(t.is_disabled(99));
1429 assert_eq!(t.wins(99), 0);
1430 }
1431
1432 #[test]
1433 fn test_tracker_record_win() {
1434 let t = BackendTracker::new(2, 10);
1435 t.record_win(0);
1436 t.record_win(0);
1437 t.record_win(1);
1438 assert_eq!(t.wins(0), 2);
1439 assert_eq!(t.wins(1), 1);
1440 }
1441
1442 #[test]
1443 fn test_tracker_ema_duration() {
1444 let t = BackendTracker::new(1, 10);
1445 t.record_race(0);
1446 t.record_duration(0, Duration::from_millis(100));
1447 assert_eq!(t.ema_ms(0), 100);
1448
1449 t.record_race(0);
1450 t.record_duration(0, Duration::from_millis(200));
1451 assert_eq!(t.ema_ms(0), 120);
1453
1454 t.record_race(0);
1455 t.record_duration(0, Duration::from_millis(100));
1456 assert_eq!(t.ema_ms(0), 116);
1458 }
1459
1460 #[test]
1461 fn test_tracker_probe_first_error_disables() {
1462 let t = BackendTracker::new(1, 10);
1465 assert!(!t.is_disabled(0));
1466 t.record_race(0);
1467 t.record_error(0); assert!(t.is_disabled(0));
1469 }
1470
1471 #[test]
1472 fn test_tracker_consecutive_errors_disables() {
1473 let t = BackendTracker::new(1, 3);
1475 t.record_race(0);
1477 t.record_win(0);
1478 t.record_success(0);
1479 assert!(!t.is_disabled(0));
1480 t.record_race(0);
1481 t.record_error(0);
1482 t.record_race(0);
1483 t.record_error(0);
1484 assert!(!t.is_disabled(0));
1485 t.record_race(0);
1486 t.record_error(0); assert!(t.is_disabled(0));
1488 }
1489
1490 #[test]
1491 fn test_tracker_success_resets_errors() {
1492 let t = BackendTracker::new(1, 5);
1493 t.record_race(0);
1495 t.record_win(0);
1496 t.record_success(0);
1497 t.record_race(0);
1498 t.record_error(0);
1499 t.record_race(0);
1500 t.record_error(0);
1501 assert_eq!(t.consecutive_errors(0), 2);
1502 t.record_success(0);
1503 assert_eq!(t.consecutive_errors(0), 0);
1504 }
1505
1506 #[test]
1507 fn test_tracker_clone_independence() {
1508 let t = BackendTracker::new(1, 10);
1509 t.record_win(0);
1510 let t2 = t.clone();
1511 t.record_win(0);
1512 assert_eq!(t.wins(0), 2);
1513 assert_eq!(t2.wins(0), 1);
1514 }
1515
1516 #[test]
1517 fn test_tracker_win_rate() {
1518 let t = BackendTracker::new(1, 10);
1519 assert_eq!(t.win_rate_pct(0), 0); t.record_race(0);
1521 t.record_race(0);
1522 t.record_race(0);
1523 t.record_race(0);
1524 t.record_win(0);
1525 t.record_win(0);
1526 t.record_win(0);
1527 assert_eq!(t.win_rate_pct(0), 75);
1528 }
1529
1530 fn mock_primary(
1534 score: u16,
1535 delay_ms: u64,
1536 ) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
1537 Box::pin(async move {
1538 if delay_ms > 0 {
1539 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1540 }
1541 Some(BackendResponse {
1542 page: crate::page::Page::default(),
1543 quality_score: score,
1544 backend_index: 0,
1545 duration: Duration::from_millis(delay_ms),
1546 _bytes_guard: None,
1547 })
1548 })
1549 }
1550
1551 fn mock_alt(
1553 idx: usize,
1554 score: u16,
1555 delay_ms: u64,
1556 ) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
1557 Box::pin(async move {
1558 if delay_ms > 0 {
1559 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1560 }
1561 BackendResult {
1562 backend_index: idx,
1563 response: Some(BackendResponse {
1564 page: crate::page::Page::default(),
1565 quality_score: score,
1566 backend_index: idx,
1567 duration: Duration::from_millis(delay_ms),
1568 _bytes_guard: None,
1569 }),
1570 }
1571 })
1572 }
1573
1574 fn mock_alt_none(
1576 idx: usize,
1577 delay_ms: u64,
1578 ) -> Pin<Box<dyn Future<Output = BackendResult> + Send>> {
1579 Box::pin(async move {
1580 if delay_ms > 0 {
1581 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1582 }
1583 BackendResult {
1584 backend_index: idx,
1585 response: None,
1586 }
1587 })
1588 }
1589
1590 fn mock_primary_none(
1592 delay_ms: u64,
1593 ) -> Pin<Box<dyn Future<Output = Option<BackendResponse>> + Send>> {
1594 Box::pin(async move {
1595 if delay_ms > 0 {
1596 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
1597 }
1598 None
1599 })
1600 }
1601
1602 fn test_config(grace_ms: u64, threshold: u16) -> ParallelBackendsConfig {
1603 ParallelBackendsConfig {
1604 backends: vec![],
1605 grace_period_ms: grace_ms,
1606 enabled: true,
1607 fast_accept_threshold: threshold,
1608 max_consecutive_errors: 10,
1609 connect_timeout_ms: 5000,
1610 skip_binary_content_types: true,
1611 max_concurrent_sessions: 0,
1612 skip_extensions: Vec::new(),
1613 max_backend_bytes_in_flight: 0, backend_timeout_ms: 0, }
1616 }
1617
1618 #[tokio::test]
1619 async fn test_race_primary_fast_accept() {
1620 let tracker = BackendTracker::new(3, 10);
1621 let cfg = test_config(500, 80);
1622 let primary = mock_primary(95, 10);
1623 let alts = vec![mock_alt(1, 100, 1000), mock_alt(2, 100, 1000)];
1624
1625 let result = race_backends(primary, alts, &cfg, &tracker).await;
1626 let r = result.unwrap();
1627 assert_eq!(r.backend_index, 0); assert_eq!(r.quality_score, 95);
1629 assert_eq!(tracker.wins(0), 1);
1630 }
1631
1632 #[tokio::test]
1633 async fn test_race_alternative_wins_during_grace() {
1634 let tracker = BackendTracker::new(3, 10);
1635 let cfg = test_config(500, 80); let primary = mock_primary(50, 10); let alts = vec![
1638 mock_alt(1, 90, 100), mock_alt(2, 30, 1000),
1640 ];
1641
1642 let result = race_backends(primary, alts, &cfg, &tracker).await;
1643 let r = result.unwrap();
1644 assert_eq!(r.backend_index, 1); assert_eq!(r.quality_score, 90);
1646 }
1647
1648 #[tokio::test]
1649 async fn test_race_primary_wins_after_grace() {
1650 let tracker = BackendTracker::new(2, 10);
1651 let cfg = test_config(50, 80); let primary = mock_primary(60, 10); let alts = vec![
1654 mock_alt(1, 40, 5000), ];
1656
1657 let result = race_backends(primary, alts, &cfg, &tracker).await;
1658 let r = result.unwrap();
1659 assert_eq!(r.backend_index, 0); assert_eq!(r.quality_score, 60);
1661 }
1662
1663 #[tokio::test]
1664 async fn test_race_all_none() {
1665 let tracker = BackendTracker::new(2, 10);
1666 let cfg = test_config(50, 80);
1667 let primary = mock_primary_none(10);
1668 let alts = vec![mock_alt_none(1, 10)];
1669
1670 let result = race_backends(primary, alts, &cfg, &tracker).await;
1671 assert!(result.is_none());
1672 assert_eq!(tracker.consecutive_errors(1), 1);
1674 }
1675
1676 #[tokio::test]
1677 async fn test_race_primary_none_alt_some() {
1678 let tracker = BackendTracker::new(2, 10);
1679 let cfg = test_config(200, 80);
1680 let primary = mock_primary_none(10);
1681 let alts = vec![mock_alt(1, 85, 50)];
1682
1683 let result = race_backends(primary, alts, &cfg, &tracker).await;
1684 let r = result.unwrap();
1685 assert_eq!(r.backend_index, 1);
1686 }
1687
1688 #[tokio::test]
1689 async fn test_race_disabled_noop() {
1690 let tracker = BackendTracker::new(2, 10);
1691 let mut cfg = test_config(50, 80);
1692 cfg.enabled = false;
1693 let primary = mock_primary(70, 10);
1694 let alts = vec![mock_alt(1, 100, 10)];
1695
1696 let result = race_backends(primary, alts, &cfg, &tracker).await;
1697 let r = result.unwrap();
1698 assert_eq!(r.backend_index, 0); }
1700
1701 #[tokio::test]
1702 async fn test_race_single_alternative() {
1703 let tracker = BackendTracker::new(2, 10);
1704 let cfg = test_config(200, 80);
1705 let primary = mock_primary(50, 100);
1706 let alts = vec![mock_alt(1, 90, 20)]; let result = race_backends(primary, alts, &cfg, &tracker).await;
1709 let r = result.unwrap();
1710 assert_eq!(r.backend_index, 1);
1712 assert_eq!(r.quality_score, 90);
1713 }
1714
1715 #[tokio::test]
1716 async fn test_race_three_alternatives_best_during_grace() {
1717 let tracker = BackendTracker::new(4, 10);
1718 let cfg = test_config(300, 95); let primary = mock_primary(40, 10); let alts = vec![
1722 mock_alt(1, 60, 50), mock_alt(2, 85, 100), mock_alt(3, 70, 200), ];
1726
1727 let result = race_backends(primary, alts, &cfg, &tracker).await;
1728 let r = result.unwrap();
1729 assert_eq!(r.backend_index, 2);
1730 assert_eq!(r.quality_score, 85);
1731 }
1732
1733 #[tokio::test]
1734 async fn test_race_grace_period_zero() {
1735 let tracker = BackendTracker::new(2, 10);
1736 let cfg = test_config(0, 101); let primary = mock_primary(50, 10); let alts = vec![mock_alt(1, 99, 50)]; let result = race_backends(primary, alts, &cfg, &tracker).await;
1742 let r = result.unwrap();
1743 assert_eq!(r.backend_index, 0);
1744 }
1745
1746 #[tokio::test]
1747 async fn test_race_cancellation_verified() {
1748 let finished = Arc::new(AtomicBool::new(false));
1749 let f = finished.clone();
1750
1751 let tracker = BackendTracker::new(2, 10);
1752 let cfg = test_config(50, 80);
1753
1754 let primary = mock_primary(95, 10); let slow_alt: Pin<Box<dyn Future<Output = BackendResult> + Send>> = Box::pin(async move {
1757 tokio::time::sleep(Duration::from_secs(10)).await;
1758 f.store(true, Ordering::SeqCst);
1759 BackendResult {
1760 backend_index: 1,
1761 response: None,
1762 }
1763 });
1764
1765 let _result = race_backends(primary, vec![slow_alt], &cfg, &tracker).await;
1766
1767 tokio::time::sleep(Duration::from_millis(50)).await;
1768 assert!(!finished.load(Ordering::SeqCst));
1769 }
1770
1771 #[tokio::test]
1772 async fn test_race_failed_alt_records_error() {
1773 let tracker = BackendTracker::new(3, 5);
1774 let cfg = test_config(200, 80);
1775 let primary = mock_primary(50, 10);
1776 let alts = vec![
1777 mock_alt_none(1, 20), mock_alt_none(2, 30), ];
1780
1781 let result = race_backends(primary, alts, &cfg, &tracker).await;
1782 let r = result.unwrap();
1783 assert_eq!(r.backend_index, 0); assert_eq!(tracker.consecutive_errors(1), 1);
1785 assert_eq!(tracker.consecutive_errors(2), 1);
1786 }
1787
1788 #[tokio::test]
1789 async fn test_race_auto_disable_after_errors() {
1790 let tracker = BackendTracker::new(2, 2); let cfg = test_config(100, 80);
1792
1793 for _ in 0..2 {
1795 let primary = mock_primary(50, 5);
1796 let alts = vec![mock_alt_none(1, 10)];
1797 let _ = race_backends(primary, alts, &cfg, &tracker).await;
1798 }
1799
1800 assert!(tracker.is_disabled(1));
1802 assert_eq!(tracker.consecutive_errors(1), 2);
1803 }
1804
1805 #[test]
1808 fn test_proxy_rotator_round_robin_cdp() {
1809 let proxies = vec![
1810 RequestProxy {
1811 addr: "http://p1".into(),
1812 ignore: ProxyIgnore::No,
1813 },
1814 RequestProxy {
1815 addr: "http://p2".into(),
1816 ignore: ProxyIgnore::No,
1817 },
1818 ];
1819 let r = ProxyRotator::new(&Some(proxies));
1820 assert_eq!(r.cdp_count(), 2);
1821 assert_eq!(r.next_cdp(), Some("http://p1"));
1822 assert_eq!(r.next_cdp(), Some("http://p2"));
1823 assert_eq!(r.next_cdp(), Some("http://p1")); }
1825
1826 #[test]
1827 fn test_proxy_rotator_round_robin_wd() {
1828 let proxies = vec![
1829 RequestProxy {
1830 addr: "http://p1".into(),
1831 ignore: ProxyIgnore::No,
1832 },
1833 RequestProxy {
1834 addr: "http://p2".into(),
1835 ignore: ProxyIgnore::No,
1836 },
1837 ];
1838 let r = ProxyRotator::new(&Some(proxies));
1839 assert_eq!(r.webdriver_count(), 2);
1840 assert_eq!(r.next_webdriver(), Some("http://p1"));
1841 assert_eq!(r.next_webdriver(), Some("http://p2"));
1842 }
1843
1844 #[test]
1845 fn test_proxy_rotator_filters_ignore() {
1846 let proxies = vec![
1847 RequestProxy {
1848 addr: "http://cdp-only".into(),
1849 ignore: ProxyIgnore::Http, },
1851 RequestProxy {
1852 addr: "http://wd-only".into(),
1853 ignore: ProxyIgnore::Chrome, },
1855 RequestProxy {
1856 addr: "http://both".into(),
1857 ignore: ProxyIgnore::No,
1858 },
1859 ];
1860 let r = ProxyRotator::new(&Some(proxies));
1861 assert_eq!(r.cdp_count(), 2);
1863 assert_eq!(r.webdriver_count(), 2);
1865 }
1866
1867 #[test]
1868 fn test_proxy_rotator_empty_proxies() {
1869 let r = ProxyRotator::new(&None);
1870 assert_eq!(r.cdp_count(), 0);
1871 assert_eq!(r.webdriver_count(), 0);
1872 assert_eq!(r.next_cdp(), None);
1873 assert_eq!(r.next_webdriver(), None);
1874 }
1875
1876 #[test]
1877 fn test_proxy_rotator_concurrent_access() {
1878 let proxies = vec![
1879 RequestProxy {
1880 addr: "http://p1".into(),
1881 ignore: ProxyIgnore::No,
1882 },
1883 RequestProxy {
1884 addr: "http://p2".into(),
1885 ignore: ProxyIgnore::No,
1886 },
1887 RequestProxy {
1888 addr: "http://p3".into(),
1889 ignore: ProxyIgnore::No,
1890 },
1891 ];
1892 let r = Arc::new(ProxyRotator::new(&Some(proxies)));
1893
1894 let handles: Vec<_> = (0..10)
1895 .map(|_| {
1896 let r = r.clone();
1897 std::thread::spawn(move || {
1898 for _ in 0..100 {
1899 let addr = r.next_cdp().unwrap();
1900 assert!(addr == "http://p1" || addr == "http://p2" || addr == "http://p3");
1901 }
1902 })
1903 })
1904 .collect();
1905
1906 for h in handles {
1907 h.join().unwrap();
1908 }
1909 }
1910
1911 #[test]
1914 fn test_is_binary_content_type_images() {
1915 assert!(is_binary_content_type("image/png"));
1916 assert!(is_binary_content_type("image/jpeg"));
1917 assert!(is_binary_content_type("image/webp"));
1918 assert!(is_binary_content_type("image/svg+xml"));
1919 assert!(is_binary_content_type("image/gif"));
1920 }
1921
1922 #[test]
1923 fn test_is_binary_content_type_with_charset() {
1924 assert!(is_binary_content_type("image/png; charset=utf-8"));
1926 assert!(is_binary_content_type(
1927 "application/pdf; boundary=something"
1928 ));
1929 assert!(is_binary_content_type("font/woff2; charset=binary"));
1930 }
1931
1932 #[test]
1933 fn test_is_binary_content_type_fonts() {
1934 assert!(is_binary_content_type("font/woff"));
1935 assert!(is_binary_content_type("font/woff2"));
1936 assert!(is_binary_content_type("font/ttf"));
1937 assert!(is_binary_content_type("application/vnd.ms-fontobject"));
1938 assert!(is_binary_content_type("application/x-font-ttf"));
1939 assert!(is_binary_content_type("application/x-font-woff"));
1940 }
1941
1942 #[test]
1943 fn test_is_binary_content_type_archives() {
1944 assert!(is_binary_content_type("application/pdf"));
1945 assert!(is_binary_content_type("application/zip"));
1946 assert!(is_binary_content_type("application/gzip"));
1947 assert!(is_binary_content_type("application/x-gzip"));
1948 assert!(is_binary_content_type("application/octet-stream"));
1949 assert!(is_binary_content_type("application/wasm"));
1950 assert!(is_binary_content_type("application/x-tar"));
1951 assert!(is_binary_content_type("application/x-bzip2"));
1952 assert!(is_binary_content_type("application/x-7z-compressed"));
1953 assert!(is_binary_content_type("application/x-rar-compressed"));
1954 }
1955
1956 #[test]
1957 fn test_is_binary_content_type_audio_video() {
1958 assert!(is_binary_content_type("audio/mpeg"));
1959 assert!(is_binary_content_type("audio/ogg"));
1960 assert!(is_binary_content_type("video/mp4"));
1961 assert!(is_binary_content_type("video/webm"));
1962 }
1963
1964 #[test]
1965 fn test_is_binary_content_type_html_not_binary() {
1966 assert!(!is_binary_content_type("text/html"));
1967 assert!(!is_binary_content_type("text/html; charset=utf-8"));
1968 assert!(!is_binary_content_type("text/plain"));
1969 assert!(!is_binary_content_type("application/json"));
1970 assert!(!is_binary_content_type("application/javascript"));
1971 assert!(!is_binary_content_type("text/css"));
1972 assert!(!is_binary_content_type("application/xml"));
1973 }
1974
1975 #[test]
1978 fn test_should_skip_backend_for_asset_urls() {
1979 assert!(should_skip_backend_for_url(
1980 "https://example.com/photo.jpg",
1981 &[]
1982 ));
1983 assert!(should_skip_backend_for_url(
1984 "https://example.com/photo.png",
1985 &[]
1986 ));
1987 assert!(should_skip_backend_for_url(
1988 "https://example.com/font.woff2",
1989 &[]
1990 ));
1991 assert!(should_skip_backend_for_url(
1992 "https://example.com/doc.pdf",
1993 &[]
1994 ));
1995 assert!(should_skip_backend_for_url(
1996 "https://example.com/video.mp4",
1997 &[]
1998 ));
1999 }
2000
2001 #[test]
2002 fn test_should_not_skip_backend_for_html_urls() {
2003 assert!(!should_skip_backend_for_url(
2004 "https://example.com/page.html",
2005 &[]
2006 ));
2007 assert!(!should_skip_backend_for_url(
2008 "https://example.com/about",
2009 &[]
2010 ));
2011 assert!(!should_skip_backend_for_url(
2012 "https://example.com/api/data",
2013 &[]
2014 ));
2015 assert!(!should_skip_backend_for_url("https://example.com/", &[]));
2016 }
2017
2018 #[test]
2019 fn test_should_skip_backend_custom_extensions() {
2020 let extras = vec![
2021 crate::compact_str::CompactString::from("xml"),
2022 crate::compact_str::CompactString::from("rss"),
2023 ];
2024 assert!(should_skip_backend_for_url(
2025 "https://example.com/feed.xml",
2026 &extras
2027 ));
2028 assert!(should_skip_backend_for_url(
2029 "https://example.com/feed.rss",
2030 &extras
2031 ));
2032 assert!(should_skip_backend_for_url(
2033 "https://example.com/feed.RSS",
2034 &extras
2035 ));
2036 assert!(!should_skip_backend_for_url(
2037 "https://example.com/page.html",
2038 &extras
2039 ));
2040 }
2041
2042 #[test]
2049 fn test_bytes_guard_all() {
2050 let base = BackendBytesGuard::in_flight();
2052 {
2053 let g = BackendBytesGuard::acquire_unchecked(1000);
2054 assert_eq!(BackendBytesGuard::in_flight(), base + 1000);
2055 drop(g);
2056 }
2057 assert_eq!(BackendBytesGuard::in_flight(), base);
2058
2059 let g = BackendBytesGuard::try_acquire(500, base + 1000);
2061 assert!(g.is_some());
2062 assert_eq!(BackendBytesGuard::in_flight(), base + 500);
2063 drop(g);
2064 assert_eq!(BackendBytesGuard::in_flight(), base);
2065
2066 let hold = BackendBytesGuard::acquire_unchecked(800);
2068 assert_eq!(BackendBytesGuard::in_flight(), base + 800);
2069 let g = BackendBytesGuard::try_acquire(300, base + 1000);
2070 assert!(g.is_none(), "should reject when would exceed limit");
2071 assert_eq!(BackendBytesGuard::in_flight(), base + 800);
2072 drop(hold);
2073 assert_eq!(BackendBytesGuard::in_flight(), base);
2074
2075 let g = BackendBytesGuard::try_acquire(1_000_000, 0);
2077 assert!(g.is_some(), "limit=0 means unlimited");
2078 assert_eq!(BackendBytesGuard::in_flight(), base + 1_000_000);
2079 drop(g);
2080 assert_eq!(BackendBytesGuard::in_flight(), base);
2081
2082 let g1 = BackendBytesGuard::acquire_unchecked(100);
2084 let g2 = BackendBytesGuard::acquire_unchecked(200);
2085 let g3 = BackendBytesGuard::acquire_unchecked(300);
2086 assert_eq!(BackendBytesGuard::in_flight(), base + 600);
2087 drop(g2);
2088 assert_eq!(BackendBytesGuard::in_flight(), base + 400);
2089 drop(g1);
2090 drop(g3);
2091 assert_eq!(BackendBytesGuard::in_flight(), base);
2092
2093 let resp = BackendResponse {
2095 page: crate::page::Page::default(),
2096 quality_score: 90,
2097 backend_index: 1,
2098 duration: Duration::from_millis(50),
2099 _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(5000)),
2100 };
2101 assert_eq!(BackendBytesGuard::in_flight(), base + 5000);
2102 drop(resp);
2103 assert_eq!(BackendBytesGuard::in_flight(), base);
2104
2105 {
2107 let resp = BackendResponse {
2108 page: crate::page::Page::default(),
2109 quality_score: 90,
2110 backend_index: 0,
2111 duration: Duration::from_millis(10),
2112 _bytes_guard: Some(BackendBytesGuard::acquire_unchecked(2000)),
2113 };
2114 assert_eq!(BackendBytesGuard::in_flight(), base + 2000);
2115 let _page = resp.page;
2116 }
2118 assert_eq!(BackendBytesGuard::in_flight(), base);
2119
2120 let _hold = BackendBytesGuard::acquire_unchecked(1_000_000);
2122 let cfg = ParallelBackendsConfig {
2123 backends: vec![crate::configuration::BackendEndpoint {
2124 engine: crate::configuration::BackendEngine::Cdp,
2125 endpoint: Some("ws://localhost:9222".to_string()),
2126 binary_path: None,
2127 protocol: None,
2128 proxy: None,
2129 }],
2130 max_backend_bytes_in_flight: base + 500, ..Default::default()
2132 };
2133 let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
2134 let tracker = BackendTracker::new(2, 10);
2135 let futs = build_backend_futures(
2136 "https://example.com",
2137 &cfg,
2138 &crawl_cfg,
2139 &tracker,
2140 &None,
2141 &None,
2142 );
2143 assert!(
2144 futs.is_empty(),
2145 "should skip backends when byte limit exceeded"
2146 );
2147 drop(_hold);
2148 assert_eq!(BackendBytesGuard::in_flight(), base);
2149
2150 let handles: Vec<_> = (0..8)
2152 .map(|_| {
2153 std::thread::spawn(|| {
2154 for _ in 0..1000 {
2155 let g = BackendBytesGuard::acquire_unchecked(100);
2156 std::thread::yield_now();
2157 drop(g);
2158 }
2159 })
2160 })
2161 .collect();
2162 for h in handles {
2163 h.join().unwrap();
2164 }
2165 assert_eq!(
2166 BackendBytesGuard::in_flight(),
2167 base,
2168 "counter must return to baseline after concurrent thread usage"
2169 );
2170 }
2171
2172 #[tokio::test]
2173 async fn test_race_grace_zero_under_pressure_no_deadlock() {
2174 let tracker = BackendTracker::new(3, 10);
2177 let cfg = ParallelBackendsConfig {
2178 grace_period_ms: 0,
2179 ..Default::default()
2180 };
2181 let primary = mock_primary(50, 5);
2182 let alt = mock_alt(1, 95, 1);
2183 let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
2184 assert!(result.is_some());
2185 }
2186
2187 #[tokio::test]
2188 async fn test_race_backends_drops_futs_before_return() {
2189 let tracker = BackendTracker::new(2, 10);
2193 let cfg = test_config(200, 80);
2194
2195 let primary = mock_primary(90, 1);
2197 let alt = mock_alt(1, 50, 500);
2199
2200 let result = race_backends(primary, vec![alt], &cfg, &tracker).await;
2201 assert!(result.is_some());
2202 let winner = result.unwrap();
2203 assert_eq!(winner.backend_index, 0);
2205 assert_eq!(winner.quality_score, 90);
2206 }
2208
2209 #[tokio::test]
2210 async fn test_race_backends_winner_replaces_losers() {
2211 let tracker = BackendTracker::new(4, 10);
2214 let cfg = test_config(500, 95); let primary = mock_primary(40, 1); let alt1 = mock_alt(1, 60, 5);
2218 let alt2 = mock_alt(2, 80, 10);
2219 let alt3 = mock_alt(3, 70, 15);
2220
2221 let result = race_backends(primary, vec![alt1, alt2, alt3], &cfg, &tracker).await;
2222 assert!(result.is_some());
2223 let winner = result.unwrap();
2224 assert_eq!(winner.backend_index, 2);
2226 assert_eq!(winner.quality_score, 80);
2227 }
2230
2231 #[test]
2232 fn test_build_backend_futures_allows_when_byte_limit_not_exceeded() {
2233 let cfg = ParallelBackendsConfig {
2234 backends: vec![crate::configuration::BackendEndpoint {
2235 engine: crate::configuration::BackendEngine::Cdp,
2236 endpoint: Some("ws://localhost:9222".to_string()),
2237 binary_path: None,
2238 protocol: None,
2239 proxy: None,
2240 }],
2241 max_backend_bytes_in_flight: usize::MAX,
2242 ..Default::default()
2243 };
2244 let crawl_cfg = Arc::new(crate::configuration::Configuration::default());
2245 let tracker = BackendTracker::new(2, 10);
2246 let _futs = build_backend_futures(
2248 "https://example.com",
2249 &cfg,
2250 &crawl_cfg,
2251 &tracker,
2252 &None,
2253 &None,
2254 );
2255 }
2256}