1use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12
13use crate::BrowserPool;
14use crate::error::BrowserError;
15use crate::page::WaitUntil;
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "snake_case")]
20pub enum AcquisitionMode {
21 Fast,
23 Resilient,
25 Hostile,
27 Investigate,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
33#[serde(rename_all = "snake_case")]
34pub enum StrategyUsed {
35 DirectHttp,
37 TlsProfiledHttp,
39 BrowserLightStealth,
41 StickyProxyBrowserSession,
43 InvestigateEntry,
45}
46
47#[derive(Debug, Clone)]
49pub struct AcquisitionRequest {
50 pub url: String,
52 pub mode: AcquisitionMode,
54 pub wait_for_selector: Option<String>,
56 pub extraction_js: Option<String>,
58 pub total_timeout: Duration,
60 pub navigation_timeout: Duration,
62 pub request_timeout: Duration,
64 pub html_excerpt_bytes: usize,
66 pub investigate_start: Option<StrategyUsed>,
68}
69
70impl Default for AcquisitionRequest {
71 fn default() -> Self {
72 Self {
73 url: String::new(),
74 mode: AcquisitionMode::Resilient,
75 wait_for_selector: None,
76 extraction_js: None,
77 total_timeout: Duration::from_secs(45),
78 navigation_timeout: Duration::from_secs(30),
79 request_timeout: Duration::from_secs(15),
80 html_excerpt_bytes: 4_096,
81 investigate_start: None,
82 }
83 }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
88#[serde(rename_all = "snake_case")]
89pub enum StageFailureKind {
90 Setup,
92 Timeout,
94 Blocked,
96 Transport,
98 Extraction,
100}
101
102#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
104pub struct StageFailure {
105 pub strategy: StrategyUsed,
107 pub kind: StageFailureKind,
109 pub message: String,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct AcquisitionResult {
116 pub success: bool,
118 pub strategy_used: Option<StrategyUsed>,
120 pub attempted: Vec<StrategyUsed>,
122 pub final_url: Option<String>,
124 pub status_code: Option<u16>,
126 pub html_excerpt: Option<String>,
128 pub extracted: Option<Value>,
130 pub failures: Vec<StageFailure>,
132 pub timed_out: bool,
134}
135
136impl AcquisitionResult {
137 const fn empty() -> Self {
138 Self {
139 success: false,
140 strategy_used: None,
141 attempted: Vec::new(),
142 final_url: None,
143 status_code: None,
144 html_excerpt: None,
145 extracted: None,
146 failures: Vec::new(),
147 timed_out: false,
148 }
149 }
150}
151
152#[derive(Debug, Clone)]
153struct StageSuccess {
154 final_url: Option<String>,
155 status_code: Option<u16>,
156 html_excerpt: Option<String>,
157 extracted: Option<Value>,
158}
159
160#[derive(Debug, Clone)]
161enum StageOutcome {
162 Marker,
163 Success(StageSuccess),
164 Failure(StageFailure),
165}
166
167#[derive(Clone)]
169pub struct AcquisitionRunner {
170 pool: Arc<BrowserPool>,
171}
172
173impl AcquisitionRunner {
174 #[must_use]
188 pub const fn new(pool: Arc<BrowserPool>) -> Self {
189 Self { pool }
190 }
191
192 #[must_use]
196 pub fn strategy_ladder(
197 mode: AcquisitionMode,
198 investigate_start: Option<StrategyUsed>,
199 ) -> Vec<StrategyUsed> {
200 let mut stages = match mode {
201 AcquisitionMode::Fast => vec![
202 StrategyUsed::DirectHttp,
203 StrategyUsed::TlsProfiledHttp,
204 StrategyUsed::BrowserLightStealth,
205 ],
206 AcquisitionMode::Resilient => vec![
207 StrategyUsed::DirectHttp,
208 StrategyUsed::TlsProfiledHttp,
209 StrategyUsed::BrowserLightStealth,
210 StrategyUsed::StickyProxyBrowserSession,
211 ],
212 AcquisitionMode::Hostile => vec![
213 StrategyUsed::BrowserLightStealth,
214 StrategyUsed::StickyProxyBrowserSession,
215 StrategyUsed::TlsProfiledHttp,
216 StrategyUsed::DirectHttp,
217 ],
218 AcquisitionMode::Investigate => {
219 let start = investigate_start.unwrap_or(StrategyUsed::BrowserLightStealth);
220 vec![
221 StrategyUsed::InvestigateEntry,
222 start,
223 StrategyUsed::StickyProxyBrowserSession,
224 StrategyUsed::TlsProfiledHttp,
225 ]
226 }
227 };
228
229 dedupe_preserve_order(&mut stages);
230 stages
231 }
232
233 pub async fn run(&self, request: AcquisitionRequest) -> AcquisitionResult {
256 let timeout = request.total_timeout;
257 let timeout_strategy = Self::strategy_ladder(request.mode, request.investigate_start)
258 .into_iter()
259 .find(|strategy| *strategy != StrategyUsed::InvestigateEntry)
260 .unwrap_or(StrategyUsed::DirectHttp);
261 let mut result = tokio::time::timeout(timeout, self.run_inner(&request))
262 .await
263 .unwrap_or_else(|_| {
264 let mut timed_out = AcquisitionResult::empty();
265 timed_out.timed_out = true;
266 timed_out.failures.push(StageFailure {
267 strategy: timeout_strategy,
268 kind: StageFailureKind::Timeout,
269 message: format!("acquisition timed out after {}ms", timeout.as_millis()),
270 });
271 timed_out
272 });
273
274 if !result.success {
275 if result.failures.is_empty() {
277 result.failures.push(StageFailure {
278 strategy: timeout_strategy,
279 kind: StageFailureKind::Transport,
280 message: "acquisition ended without stage output".to_string(),
281 });
282 }
283 }
284
285 result
286 }
287
288 async fn run_inner(&self, request: &AcquisitionRequest) -> AcquisitionResult {
289 let mut result = AcquisitionResult::empty();
290 let ladder = Self::strategy_ladder(request.mode, request.investigate_start);
291 let started = Instant::now();
292
293 for strategy in ladder {
294 if started.elapsed() >= request.total_timeout {
295 result.timed_out = true;
296 result.failures.push(StageFailure {
297 strategy,
298 kind: StageFailureKind::Timeout,
299 message: "wall-clock timeout reached before stage execution".to_string(),
300 });
301 break;
302 }
303
304 result.attempted.push(strategy);
305 match self.execute_stage(strategy, request).await {
306 StageOutcome::Marker => {}
307 StageOutcome::Success(success) => {
308 result.success = true;
309 result.strategy_used = Some(strategy);
310 result.final_url = success.final_url;
311 result.status_code = success.status_code;
312 result.html_excerpt = success.html_excerpt;
313 result.extracted = success.extracted;
314 break;
315 }
316 StageOutcome::Failure(failure) => result.failures.push(failure),
317 }
318 }
319
320 result
321 }
322
323 async fn execute_stage(
324 &self,
325 strategy: StrategyUsed,
326 request: &AcquisitionRequest,
327 ) -> StageOutcome {
328 match strategy {
329 StrategyUsed::DirectHttp => self.run_http_stage(request, false).await,
330 StrategyUsed::TlsProfiledHttp => self.run_http_stage(request, true).await,
331 StrategyUsed::BrowserLightStealth => self.run_browser_stage(request, false).await,
332 StrategyUsed::StickyProxyBrowserSession => self.run_browser_stage(request, true).await,
333 StrategyUsed::InvestigateEntry => StageOutcome::Marker,
334 }
335 }
336
337 async fn run_browser_stage(&self, request: &AcquisitionRequest, sticky: bool) -> StageOutcome {
338 let strategy = if sticky {
339 StrategyUsed::StickyProxyBrowserSession
340 } else {
341 StrategyUsed::BrowserLightStealth
342 };
343
344 let handle_result = if sticky {
345 let context = host_hint(&request.url).unwrap_or_else(|| "default".to_string());
346 self.pool.acquire_for(&context).await
347 } else {
348 self.pool.acquire().await
349 };
350
351 let handle = match handle_result {
352 Ok(handle) => handle,
353 Err(err) => {
354 return StageOutcome::Failure(StageFailure {
355 strategy,
356 kind: StageFailureKind::Setup,
357 message: format!("browser acquire failed: {err}"),
358 });
359 }
360 };
361
362 let page_result = async {
363 let browser = handle.browser().ok_or_else(|| {
364 BrowserError::ConfigError("browser handle already released".to_string())
365 })?;
366 let mut page = browser.new_page().await?;
367 page.navigate(
368 &request.url,
369 WaitUntil::DomContentLoaded,
370 request.navigation_timeout,
371 )
372 .await?;
373
374 if let Some(selector) = &request.wait_for_selector {
375 page.wait_for_selector(selector, request.navigation_timeout)
376 .await?;
377 }
378
379 let extracted = match request.extraction_js.as_deref() {
380 Some(script) => Some(page.eval::<Value>(script).await.map_err(|err| {
381 BrowserError::ScriptExecutionFailed {
382 script: script.to_string(),
383 reason: err.to_string(),
384 }
385 })?),
386 None => None,
387 };
388
389 let html = page.content().await?;
390 let final_url = page.url().await.ok();
391 let status_code = page.status_code().ok().flatten();
392 let html_excerpt = truncate_html(&html, request.html_excerpt_bytes);
393
394 drop(page);
395
396 Ok::<StageSuccess, BrowserError>(StageSuccess {
397 final_url,
398 status_code,
399 html_excerpt: Some(html_excerpt),
400 extracted,
401 })
402 }
403 .await;
404
405 handle.release().await;
406
407 match page_result {
408 Ok(success) => {
409 if is_block_status(success.status_code) {
410 StageOutcome::Failure(StageFailure {
411 strategy,
412 kind: StageFailureKind::Blocked,
413 message: format!(
414 "blocked status during browser stage: {:?}",
415 success.status_code
416 ),
417 })
418 } else {
419 StageOutcome::Success(success)
420 }
421 }
422 Err(err) => StageOutcome::Failure(StageFailure {
423 strategy,
424 kind: classify_browser_error(&err),
425 message: err.to_string(),
426 }),
427 }
428 }
429
430 async fn run_http_stage(
431 &self,
432 request: &AcquisitionRequest,
433 tls_profiled: bool,
434 ) -> StageOutcome {
435 if request.wait_for_selector.is_some() || request.extraction_js.is_some() {
436 return StageOutcome::Failure(StageFailure {
437 strategy: if tls_profiled {
438 StrategyUsed::TlsProfiledHttp
439 } else {
440 StrategyUsed::DirectHttp
441 },
442 kind: StageFailureKind::Extraction,
443 message: "HTTP stages cannot satisfy selector/extraction requirements".to_string(),
444 });
445 }
446
447 self.run_http_stage_impl(request, tls_profiled).await
448 }
449
450 #[cfg(feature = "tls-config")]
451 async fn run_http_stage_impl(
452 &self,
453 request: &AcquisitionRequest,
454 tls_profiled: bool,
455 ) -> StageOutcome {
456 use crate::tls::{CHROME_131, build_profiled_client_preset};
457
458 let strategy = if tls_profiled {
459 StrategyUsed::TlsProfiledHttp
460 } else {
461 StrategyUsed::DirectHttp
462 };
463
464 let client = if tls_profiled {
465 match build_profiled_client_preset(&CHROME_131, None) {
466 Ok(client) => client,
467 Err(err) => {
468 return StageOutcome::Failure(StageFailure {
469 strategy,
470 kind: StageFailureKind::Setup,
471 message: format!("tls-profiled client setup failed: {err}"),
472 });
473 }
474 }
475 } else {
476 match reqwest::Client::builder()
477 .timeout(request.request_timeout)
478 .cookie_store(true)
479 .build()
480 {
481 Ok(client) => client,
482 Err(err) => {
483 return StageOutcome::Failure(StageFailure {
484 strategy,
485 kind: StageFailureKind::Setup,
486 message: format!("http client setup failed: {err}"),
487 });
488 }
489 }
490 };
491
492 let response = match client
493 .get(&request.url)
494 .timeout(request.request_timeout)
495 .send()
496 .await
497 {
498 Ok(response) => response,
499 Err(err) => {
500 return StageOutcome::Failure(StageFailure {
501 strategy,
502 kind: if err.is_timeout() {
503 StageFailureKind::Timeout
504 } else {
505 StageFailureKind::Transport
506 },
507 message: err.to_string(),
508 });
509 }
510 };
511
512 let status_code = Some(response.status().as_u16());
513 let final_url = Some(response.url().to_string());
514 let html = match response.text().await {
515 Ok(text) => text,
516 Err(err) => {
517 return StageOutcome::Failure(StageFailure {
518 strategy,
519 kind: StageFailureKind::Transport,
520 message: format!("response body read failed: {err}"),
521 });
522 }
523 };
524
525 if is_block_status(status_code) {
526 return StageOutcome::Failure(StageFailure {
527 strategy,
528 kind: StageFailureKind::Blocked,
529 message: format!("blocked status from HTTP stage: {status_code:?}"),
530 });
531 }
532
533 StageOutcome::Success(StageSuccess {
534 final_url,
535 status_code,
536 html_excerpt: Some(truncate_html(&html, request.html_excerpt_bytes)),
537 extracted: None,
538 })
539 }
540
541 #[cfg(not(feature = "tls-config"))]
542 async fn run_http_stage_impl(
543 &self,
544 _request: &AcquisitionRequest,
545 tls_profiled: bool,
546 ) -> StageOutcome {
547 let strategy = if tls_profiled {
548 StrategyUsed::TlsProfiledHttp
549 } else {
550 StrategyUsed::DirectHttp
551 };
552 StageOutcome::Failure(StageFailure {
553 strategy,
554 kind: StageFailureKind::Setup,
555 message: "HTTP acquisition requires the `tls-config` feature".to_string(),
556 })
557 }
558}
559
560fn dedupe_preserve_order(stages: &mut Vec<StrategyUsed>) {
561 let mut seen = Vec::new();
562 stages.retain(|stage| {
563 if seen.contains(stage) {
564 false
565 } else {
566 seen.push(*stage);
567 true
568 }
569 });
570}
571
572fn classify_browser_error(error: &BrowserError) -> StageFailureKind {
573 match error {
574 BrowserError::Timeout { .. } => StageFailureKind::Timeout,
575 BrowserError::NavigationFailed { reason, .. } if reason.contains("selector") => {
576 StageFailureKind::Blocked
577 }
578 BrowserError::ScriptExecutionFailed { .. } => StageFailureKind::Extraction,
579 BrowserError::ConfigError(_) | BrowserError::PoolExhausted { .. } => {
580 StageFailureKind::Setup
581 }
582 BrowserError::ProxyUnavailable { .. }
583 | BrowserError::ConnectionError { .. }
584 | BrowserError::CdpError { .. }
585 | BrowserError::LaunchFailed { .. }
586 | BrowserError::NavigationFailed { .. }
587 | BrowserError::Io(_)
588 | BrowserError::StaleNode { .. } => StageFailureKind::Transport,
589 #[cfg(feature = "extract")]
590 BrowserError::ExtractionFailed(_) => StageFailureKind::Extraction,
591 }
592}
593
594const fn is_block_status(status: Option<u16>) -> bool {
595 matches!(status, Some(401 | 403 | 407 | 429 | 503))
596}
597
598fn truncate_html(html: &str, max_bytes: usize) -> String {
599 if html.len() <= max_bytes {
600 return html.to_string();
601 }
602
603 let mut out = String::new();
604 for ch in html.chars() {
605 if out.len() + ch.len_utf8() > max_bytes {
606 break;
607 }
608 out.push(ch);
609 }
610 out
611}
612
613fn host_hint(url: &str) -> Option<String> {
614 let without_scheme = url.split_once("://")?.1;
615 let authority = without_scheme.split('/').next()?;
616 let host = authority.rsplit('@').next()?.split(':').next()?;
617 if host.is_empty() {
618 None
619 } else {
620 Some(host.to_ascii_lowercase())
621 }
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627
628 #[test]
629 fn ladder_is_deterministic_for_modes() {
630 assert_eq!(
631 AcquisitionRunner::strategy_ladder(AcquisitionMode::Fast, None),
632 vec![
633 StrategyUsed::DirectHttp,
634 StrategyUsed::TlsProfiledHttp,
635 StrategyUsed::BrowserLightStealth,
636 ]
637 );
638
639 assert_eq!(
640 AcquisitionRunner::strategy_ladder(
641 AcquisitionMode::Investigate,
642 Some(StrategyUsed::StickyProxyBrowserSession)
643 ),
644 vec![
645 StrategyUsed::InvestigateEntry,
646 StrategyUsed::StickyProxyBrowserSession,
647 StrategyUsed::TlsProfiledHttp,
648 ]
649 );
650 }
651
652 #[test]
653 fn block_statuses_are_classified() {
654 assert!(is_block_status(Some(403)));
655 assert!(is_block_status(Some(429)));
656 assert!(!is_block_status(Some(200)));
657 assert!(!is_block_status(None));
658 }
659
660 #[test]
661 fn host_hint_extracts_authority() {
662 assert_eq!(
663 host_hint("https://user:pass@example.com:8443/path"),
664 Some("example.com".to_string())
665 );
666 }
667
668 #[test]
669 fn truncate_html_respects_utf8_boundaries() {
670 let src = "abc😀def";
671 let out = truncate_html(src, 5);
672 assert_eq!(out, "abc");
673 }
674}