use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::BrowserPool;
use crate::error::BrowserError;
use crate::page::WaitUntil;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum AcquisitionMode {
Fast,
Resilient,
Hostile,
Investigate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StrategyUsed {
DirectHttp,
TlsProfiledHttp,
BrowserLightStealth,
StickyProxyBrowserSession,
InvestigateEntry,
}
#[derive(Debug, Clone)]
pub struct AcquisitionRequest {
pub url: String,
pub mode: AcquisitionMode,
pub wait_for_selector: Option<String>,
pub extraction_js: Option<String>,
pub total_timeout: Duration,
pub navigation_timeout: Duration,
pub request_timeout: Duration,
pub html_excerpt_bytes: usize,
pub investigate_start: Option<StrategyUsed>,
}
impl Default for AcquisitionRequest {
fn default() -> Self {
Self {
url: String::new(),
mode: AcquisitionMode::Resilient,
wait_for_selector: None,
extraction_js: None,
total_timeout: Duration::from_secs(45),
navigation_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(15),
html_excerpt_bytes: 4_096,
investigate_start: None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum StageFailureKind {
Setup,
Timeout,
Blocked,
Transport,
Extraction,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StageFailure {
pub strategy: StrategyUsed,
pub kind: StageFailureKind,
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AcquisitionResult {
pub success: bool,
pub strategy_used: Option<StrategyUsed>,
pub attempted: Vec<StrategyUsed>,
pub final_url: Option<String>,
pub status_code: Option<u16>,
pub html_excerpt: Option<String>,
pub extracted: Option<Value>,
pub failures: Vec<StageFailure>,
pub timed_out: bool,
}
impl AcquisitionResult {
const fn empty() -> Self {
Self {
success: false,
strategy_used: None,
attempted: Vec::new(),
final_url: None,
status_code: None,
html_excerpt: None,
extracted: None,
failures: Vec::new(),
timed_out: false,
}
}
}
#[derive(Debug, Clone)]
struct StageSuccess {
final_url: Option<String>,
status_code: Option<u16>,
html_excerpt: Option<String>,
extracted: Option<Value>,
}
#[derive(Debug, Clone)]
enum StageOutcome {
Marker,
Success(StageSuccess),
Failure(StageFailure),
}
#[derive(Clone)]
pub struct AcquisitionRunner {
pool: Arc<BrowserPool>,
}
impl AcquisitionRunner {
#[must_use]
pub const fn new(pool: Arc<BrowserPool>) -> Self {
Self { pool }
}
#[must_use]
pub fn strategy_ladder(
mode: AcquisitionMode,
investigate_start: Option<StrategyUsed>,
) -> Vec<StrategyUsed> {
let mut stages = match mode {
AcquisitionMode::Fast => vec![
StrategyUsed::DirectHttp,
StrategyUsed::TlsProfiledHttp,
StrategyUsed::BrowserLightStealth,
],
AcquisitionMode::Resilient => vec![
StrategyUsed::DirectHttp,
StrategyUsed::TlsProfiledHttp,
StrategyUsed::BrowserLightStealth,
StrategyUsed::StickyProxyBrowserSession,
],
AcquisitionMode::Hostile => vec![
StrategyUsed::BrowserLightStealth,
StrategyUsed::StickyProxyBrowserSession,
StrategyUsed::TlsProfiledHttp,
StrategyUsed::DirectHttp,
],
AcquisitionMode::Investigate => {
let start = investigate_start.unwrap_or(StrategyUsed::BrowserLightStealth);
vec![
StrategyUsed::InvestigateEntry,
start,
StrategyUsed::StickyProxyBrowserSession,
StrategyUsed::TlsProfiledHttp,
]
}
};
dedupe_preserve_order(&mut stages);
stages
}
pub async fn run(&self, request: AcquisitionRequest) -> AcquisitionResult {
let timeout = request.total_timeout;
let timeout_strategy = Self::strategy_ladder(request.mode, request.investigate_start)
.into_iter()
.find(|strategy| *strategy != StrategyUsed::InvestigateEntry)
.unwrap_or(StrategyUsed::DirectHttp);
let mut result = tokio::time::timeout(timeout, self.run_inner(&request))
.await
.unwrap_or_else(|_| {
let mut timed_out = AcquisitionResult::empty();
timed_out.timed_out = true;
timed_out.failures.push(StageFailure {
strategy: timeout_strategy,
kind: StageFailureKind::Timeout,
message: format!("acquisition timed out after {}ms", timeout.as_millis()),
});
timed_out
});
if !result.success {
if result.failures.is_empty() {
result.failures.push(StageFailure {
strategy: timeout_strategy,
kind: StageFailureKind::Transport,
message: "acquisition ended without stage output".to_string(),
});
}
}
result
}
async fn run_inner(&self, request: &AcquisitionRequest) -> AcquisitionResult {
let mut result = AcquisitionResult::empty();
let ladder = Self::strategy_ladder(request.mode, request.investigate_start);
let started = Instant::now();
for strategy in ladder {
if started.elapsed() >= request.total_timeout {
result.timed_out = true;
result.failures.push(StageFailure {
strategy,
kind: StageFailureKind::Timeout,
message: "wall-clock timeout reached before stage execution".to_string(),
});
break;
}
result.attempted.push(strategy);
match self.execute_stage(strategy, request).await {
StageOutcome::Marker => {}
StageOutcome::Success(success) => {
result.success = true;
result.strategy_used = Some(strategy);
result.final_url = success.final_url;
result.status_code = success.status_code;
result.html_excerpt = success.html_excerpt;
result.extracted = success.extracted;
break;
}
StageOutcome::Failure(failure) => result.failures.push(failure),
}
}
result
}
async fn execute_stage(
&self,
strategy: StrategyUsed,
request: &AcquisitionRequest,
) -> StageOutcome {
match strategy {
StrategyUsed::DirectHttp => self.run_http_stage(request, false).await,
StrategyUsed::TlsProfiledHttp => self.run_http_stage(request, true).await,
StrategyUsed::BrowserLightStealth => self.run_browser_stage(request, false).await,
StrategyUsed::StickyProxyBrowserSession => self.run_browser_stage(request, true).await,
StrategyUsed::InvestigateEntry => StageOutcome::Marker,
}
}
async fn run_browser_stage(&self, request: &AcquisitionRequest, sticky: bool) -> StageOutcome {
let strategy = if sticky {
StrategyUsed::StickyProxyBrowserSession
} else {
StrategyUsed::BrowserLightStealth
};
let handle_result = if sticky {
let context = host_hint(&request.url).unwrap_or_else(|| "default".to_string());
self.pool.acquire_for(&context).await
} else {
self.pool.acquire().await
};
let handle = match handle_result {
Ok(handle) => handle,
Err(err) => {
return StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Setup,
message: format!("browser acquire failed: {err}"),
});
}
};
let page_result = async {
let browser = handle.browser().ok_or_else(|| {
BrowserError::ConfigError("browser handle already released".to_string())
})?;
let mut page = browser.new_page().await?;
page.navigate(
&request.url,
WaitUntil::DomContentLoaded,
request.navigation_timeout,
)
.await?;
if let Some(selector) = &request.wait_for_selector {
page.wait_for_selector(selector, request.navigation_timeout)
.await?;
}
let extracted = match request.extraction_js.as_deref() {
Some(script) => Some(page.eval::<Value>(script).await.map_err(|err| {
BrowserError::ScriptExecutionFailed {
script: script.to_string(),
reason: err.to_string(),
}
})?),
None => None,
};
let html = page.content().await?;
let final_url = page.url().await.ok();
let status_code = page.status_code().ok().flatten();
let html_excerpt = truncate_html(&html, request.html_excerpt_bytes);
drop(page);
Ok::<StageSuccess, BrowserError>(StageSuccess {
final_url,
status_code,
html_excerpt: Some(html_excerpt),
extracted,
})
}
.await;
handle.release().await;
match page_result {
Ok(success) => {
if is_block_status(success.status_code) {
StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Blocked,
message: format!(
"blocked status during browser stage: {:?}",
success.status_code
),
})
} else {
StageOutcome::Success(success)
}
}
Err(err) => StageOutcome::Failure(StageFailure {
strategy,
kind: classify_browser_error(&err),
message: err.to_string(),
}),
}
}
async fn run_http_stage(
&self,
request: &AcquisitionRequest,
tls_profiled: bool,
) -> StageOutcome {
if request.wait_for_selector.is_some() || request.extraction_js.is_some() {
return StageOutcome::Failure(StageFailure {
strategy: if tls_profiled {
StrategyUsed::TlsProfiledHttp
} else {
StrategyUsed::DirectHttp
},
kind: StageFailureKind::Extraction,
message: "HTTP stages cannot satisfy selector/extraction requirements".to_string(),
});
}
self.run_http_stage_impl(request, tls_profiled).await
}
#[cfg(feature = "tls-config")]
async fn run_http_stage_impl(
&self,
request: &AcquisitionRequest,
tls_profiled: bool,
) -> StageOutcome {
use crate::tls::{CHROME_131, build_profiled_client_preset};
let strategy = if tls_profiled {
StrategyUsed::TlsProfiledHttp
} else {
StrategyUsed::DirectHttp
};
let client = if tls_profiled {
match build_profiled_client_preset(&CHROME_131, None) {
Ok(client) => client,
Err(err) => {
return StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Setup,
message: format!("tls-profiled client setup failed: {err}"),
});
}
}
} else {
match reqwest::Client::builder()
.timeout(request.request_timeout)
.cookie_store(true)
.build()
{
Ok(client) => client,
Err(err) => {
return StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Setup,
message: format!("http client setup failed: {err}"),
});
}
}
};
let response = match client
.get(&request.url)
.timeout(request.request_timeout)
.send()
.await
{
Ok(response) => response,
Err(err) => {
return StageOutcome::Failure(StageFailure {
strategy,
kind: if err.is_timeout() {
StageFailureKind::Timeout
} else {
StageFailureKind::Transport
},
message: err.to_string(),
});
}
};
let status_code = Some(response.status().as_u16());
let final_url = Some(response.url().to_string());
let html = match response.text().await {
Ok(text) => text,
Err(err) => {
return StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Transport,
message: format!("response body read failed: {err}"),
});
}
};
if is_block_status(status_code) {
return StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Blocked,
message: format!("blocked status from HTTP stage: {status_code:?}"),
});
}
StageOutcome::Success(StageSuccess {
final_url,
status_code,
html_excerpt: Some(truncate_html(&html, request.html_excerpt_bytes)),
extracted: None,
})
}
#[cfg(not(feature = "tls-config"))]
async fn run_http_stage_impl(
&self,
_request: &AcquisitionRequest,
tls_profiled: bool,
) -> StageOutcome {
let strategy = if tls_profiled {
StrategyUsed::TlsProfiledHttp
} else {
StrategyUsed::DirectHttp
};
StageOutcome::Failure(StageFailure {
strategy,
kind: StageFailureKind::Setup,
message: "HTTP acquisition requires the `tls-config` feature".to_string(),
})
}
}
fn dedupe_preserve_order(stages: &mut Vec<StrategyUsed>) {
let mut seen = Vec::new();
stages.retain(|stage| {
if seen.contains(stage) {
false
} else {
seen.push(*stage);
true
}
});
}
fn classify_browser_error(error: &BrowserError) -> StageFailureKind {
match error {
BrowserError::Timeout { .. } => StageFailureKind::Timeout,
BrowserError::NavigationFailed { reason, .. } if reason.contains("selector") => {
StageFailureKind::Blocked
}
BrowserError::ScriptExecutionFailed { .. } => StageFailureKind::Extraction,
BrowserError::ConfigError(_) | BrowserError::PoolExhausted { .. } => {
StageFailureKind::Setup
}
BrowserError::ProxyUnavailable { .. }
| BrowserError::ConnectionError { .. }
| BrowserError::CdpError { .. }
| BrowserError::LaunchFailed { .. }
| BrowserError::NavigationFailed { .. }
| BrowserError::Io(_)
| BrowserError::StaleNode { .. } => StageFailureKind::Transport,
#[cfg(feature = "extract")]
BrowserError::ExtractionFailed(_) => StageFailureKind::Extraction,
}
}
const fn is_block_status(status: Option<u16>) -> bool {
matches!(status, Some(401 | 403 | 407 | 429 | 503))
}
fn truncate_html(html: &str, max_bytes: usize) -> String {
if html.len() <= max_bytes {
return html.to_string();
}
let mut out = String::new();
for ch in html.chars() {
if out.len() + ch.len_utf8() > max_bytes {
break;
}
out.push(ch);
}
out
}
fn host_hint(url: &str) -> Option<String> {
let without_scheme = url.split_once("://")?.1;
let authority = without_scheme.split('/').next()?;
let host = authority.rsplit('@').next()?.split(':').next()?;
if host.is_empty() {
None
} else {
Some(host.to_ascii_lowercase())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ladder_is_deterministic_for_modes() {
assert_eq!(
AcquisitionRunner::strategy_ladder(AcquisitionMode::Fast, None),
vec![
StrategyUsed::DirectHttp,
StrategyUsed::TlsProfiledHttp,
StrategyUsed::BrowserLightStealth,
]
);
assert_eq!(
AcquisitionRunner::strategy_ladder(
AcquisitionMode::Investigate,
Some(StrategyUsed::StickyProxyBrowserSession)
),
vec![
StrategyUsed::InvestigateEntry,
StrategyUsed::StickyProxyBrowserSession,
StrategyUsed::TlsProfiledHttp,
]
);
}
#[test]
fn block_statuses_are_classified() {
assert!(is_block_status(Some(403)));
assert!(is_block_status(Some(429)));
assert!(!is_block_status(Some(200)));
assert!(!is_block_status(None));
}
#[test]
fn host_hint_extracts_authority() {
assert_eq!(
host_hint("https://user:pass@example.com:8443/path"),
Some("example.com".to_string())
);
}
#[test]
fn truncate_html_respects_utf8_boundaries() {
let src = "abc😀def";
let out = truncate_html(src, 5);
assert_eq!(out, "abc");
}
}