use std::collections::{HashMap, HashSet};
use std::time::Duration;
use tracing::{debug, error, info, warn};
use crate::config::{BrowserConfig, FetchParams, ResolvedFetchParams, StealthConfig, WaitState};
use crate::engine::{build_launch_options, launch_playwright};
use crate::error::{BrowserError, Result};
use crate::intercept::should_block_request;
use crate::page_pool::PagePool;
use crate::response_factory;
use scrapling_fetch::Response;
async fn setup_page(
page: &playwright_rs::Page,
timeout_ms: f64,
extra_headers: &HashMap<String, String>,
disable_resources: bool,
blocked_domains: &HashSet<String>,
) -> Result<()> {
page.set_default_timeout(timeout_ms).await;
page.set_default_navigation_timeout(timeout_ms).await;
if !extra_headers.is_empty() {
page.set_extra_http_headers(extra_headers.clone()).await?;
}
if disable_resources || !blocked_domains.is_empty() {
let disable = disable_resources;
let domains = blocked_domains.clone();
page.route("**/*", move |route| {
let domains = domains.clone();
async move {
let request = route.request();
let resource_type = request.resource_type();
let req_url = request.url();
if should_block_request(resource_type, req_url, disable, &domains) {
route.abort(Some("blockedbyclient")).await
} else {
route.continue_(None).await
}
}
})
.await?;
}
Ok(())
}
async fn wait_for_stability(
page: &playwright_rs::Page,
load_dom: bool,
network_idle: bool,
) -> Result<()> {
let _ = page
.wait_for_load_state(Some(playwright_rs::WaitUntil::Load))
.await;
if load_dom {
let _ = page
.wait_for_load_state(Some(playwright_rs::WaitUntil::DomContentLoaded))
.await;
}
if network_idle {
let _ = page
.wait_for_load_state(Some(playwright_rs::WaitUntil::NetworkIdle))
.await;
}
Ok(())
}
async fn wait_for_selector(
page: &playwright_rs::Page,
selector: &str,
_state: WaitState,
timeout_ms: f64,
) -> Result<()> {
let locator = page.locator(selector).await;
let wait_opts = playwright_rs::protocol::WaitForOptions::builder()
.timeout(timeout_ms)
.build();
locator
.wait_for(Some(wait_opts))
.await
.map_err(|e| BrowserError::Timeout(format!("wait_for_selector({selector}): {e}")))?;
Ok(())
}
async fn initialize_context(
context: &playwright_rs::BrowserContext,
config: &BrowserConfig,
) -> Result<()> {
if let Some(ref script_path) = config.init_script {
let script = std::fs::read_to_string(script_path)
.map_err(|e| BrowserError::Config(format!("failed to read init_script: {e}")))?;
context.add_init_script(&script).await?;
}
if !config.cookies.is_empty() {
let pw_cookies: Vec<playwright_rs::protocol::Cookie> = config
.cookies
.iter()
.map(|c| playwright_rs::protocol::Cookie {
name: c.name.clone(),
value: c.value.clone(),
domain: c.domain.clone().unwrap_or_default(),
path: c.path.clone().unwrap_or_else(|| "/".into()),
expires: -1.0,
http_only: false,
secure: false,
same_site: None,
})
.collect();
context.add_cookies(&pw_cookies).await?;
}
Ok(())
}
fn make_goto_options(timeout_ms: f64, network_idle: bool) -> playwright_rs::GotoOptions {
let mut opts =
playwright_rs::GotoOptions::new().timeout(Duration::from_millis(timeout_ms as u64));
if network_idle {
opts = opts.wait_until(playwright_rs::WaitUntil::NetworkIdle);
}
opts
}
pub struct DynamicSession {
config: BrowserConfig,
playwright: Option<playwright_rs::Playwright>,
browser: Option<playwright_rs::Browser>,
context: Option<playwright_rs::BrowserContext>,
page_pool: PagePool,
is_alive: bool,
}
impl DynamicSession {
pub fn new(mut config: BrowserConfig) -> Result<Self> {
config.validate()?;
let max_pages = config.max_pages;
Ok(Self {
config,
playwright: None,
browser: None,
context: None,
page_pool: PagePool::new(max_pages),
is_alive: false,
})
}
pub async fn start(&mut self) -> Result<()> {
let pw = launch_playwright().await?;
let chromium = pw.chromium();
let launch_opts = build_launch_options(&self.config, false, &[]);
if let Some(ref cdp_url) = self.config.cdp_url {
let browser = chromium.connect_over_cdp(cdp_url, None).await?;
if !self.config.has_proxy_rotator() {
let ctx = browser.new_context().await?;
initialize_context(&ctx, &self.config).await?;
self.context = Some(ctx);
}
self.browser = Some(browser);
} else if self.config.has_proxy_rotator() {
let browser = chromium.launch_with_options(launch_opts).await?;
self.browser = Some(browser);
} else {
let browser = chromium.launch_with_options(launch_opts).await?;
let ctx = browser.new_context().await?;
initialize_context(&ctx, &self.config).await?;
self.context = Some(ctx);
self.browser = Some(browser);
}
self.playwright = Some(pw);
self.is_alive = true;
info!("DynamicSession started");
Ok(())
}
pub async fn fetch(&self, url: &str, params: Option<FetchParams>) -> Result<Response> {
if !self.is_alive {
return Err(BrowserError::Config("session not started".into()));
}
let resolved = params.unwrap_or_default().merge_with_config(&self.config);
let mut last_error = None;
for attempt in 0..self.config.retries {
match self.do_fetch(url, &resolved).await {
Ok(response) => return Ok(response),
Err(e) => {
if attempt < self.config.retries - 1 {
warn!(attempt = attempt + 1, error = %e, "fetch failed, retrying");
tokio::time::sleep(Duration::from_secs_f64(self.config.retry_delay_secs))
.await;
} else {
error!(attempts = self.config.retries, "all retries exhausted");
}
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(BrowserError::Other("unknown error".into())))
}
async fn do_fetch(&self, url: &str, params: &ResolvedFetchParams) -> Result<Response> {
let page = self.get_page().await?;
setup_page(
&page,
params.timeout_ms,
¶ms.extra_headers,
params.disable_resources,
¶ms.blocked_domains,
)
.await?;
if let Some(ref cb) = self.config.page_setup {
cb(page.clone()).await?;
}
let goto_opts = make_goto_options(params.timeout_ms, params.network_idle);
debug!(url = %url, "navigating");
let nav_response = page.goto(url, Some(goto_opts)).await?;
wait_for_stability(&page, params.load_dom, params.network_idle).await?;
if let Some(ref cb) = self.config.page_action {
cb(page.clone()).await?;
}
if let Some(ref selector) = params.wait_selector {
wait_for_selector(
&page,
selector,
params.wait_selector_state,
params.timeout_ms,
)
.await?;
}
if params.wait_ms > 0 {
tokio::time::sleep(Duration::from_millis(params.wait_ms)).await;
}
let response = response_factory::from_browser_page(
&page,
nav_response.as_ref(),
nav_response.as_ref(),
HashMap::new(),
Vec::new(),
)
.await?;
page.close().await?;
info!(status = response.status, url = url, "fetch complete");
Ok(response)
}
async fn get_page(&self) -> Result<playwright_rs::Page> {
if let Some(ref ctx) = self.context {
ctx.new_page().await.map_err(Into::into)
} else if let Some(ref browser) = self.browser {
let ctx = browser.new_context().await?;
ctx.new_page().await.map_err(Into::into)
} else {
Err(BrowserError::Config("no browser available".into()))
}
}
pub async fn close(&mut self) -> Result<()> {
if let Some(ctx) = self.context.take() {
let _ = ctx.close().await;
}
if let Some(browser) = self.browser.take() {
let _ = browser.close().await;
}
self.playwright = None;
self.is_alive = false;
info!("DynamicSession closed");
Ok(())
}
pub fn is_alive(&self) -> bool {
self.is_alive
}
pub fn pool_stats(&self) -> crate::page_pool::PoolStats {
self.page_pool.stats()
}
}
pub struct StealthySession {
config: StealthConfig,
playwright: Option<playwright_rs::Playwright>,
browser: Option<playwright_rs::Browser>,
context: Option<playwright_rs::BrowserContext>,
page_pool: PagePool,
is_alive: bool,
}
impl StealthySession {
pub fn new(mut config: StealthConfig) -> Result<Self> {
config.validate()?;
let max_pages = config.base.max_pages;
Ok(Self {
config,
playwright: None,
browser: None,
context: None,
page_pool: PagePool::new(max_pages),
is_alive: false,
})
}
pub async fn start(&mut self) -> Result<()> {
let pw = launch_playwright().await?;
let chromium = pw.chromium();
let extra = self.config.extra_stealth_args();
let launch_opts = build_launch_options(&self.config.base, true, &extra);
if let Some(ref cdp_url) = self.config.base.cdp_url {
let browser = chromium.connect_over_cdp(cdp_url, None).await?;
if !self.config.base.has_proxy_rotator() {
let ctx = browser.new_context().await?;
initialize_context(&ctx, &self.config.base).await?;
self.context = Some(ctx);
}
self.browser = Some(browser);
} else if self.config.base.has_proxy_rotator() {
let browser = chromium.launch_with_options(launch_opts).await?;
self.browser = Some(browser);
} else {
let browser = chromium.launch_with_options(launch_opts).await?;
let ctx = browser.new_context().await?;
initialize_context(&ctx, &self.config.base).await?;
self.context = Some(ctx);
self.browser = Some(browser);
}
self.playwright = Some(pw);
self.is_alive = true;
info!("StealthySession started");
Ok(())
}
pub async fn fetch(&self, url: &str, params: Option<FetchParams>) -> Result<Response> {
if !self.is_alive {
return Err(BrowserError::Config("session not started".into()));
}
let mut resolved = params
.unwrap_or_default()
.merge_with_config(&self.config.base);
if self.config.solve_cloudflare {
resolved.solve_cloudflare = true;
}
let mut last_error = None;
for attempt in 0..self.config.base.retries {
match self.do_fetch(url, &resolved).await {
Ok(response) => return Ok(response),
Err(e) => {
if attempt < self.config.base.retries - 1 {
warn!(attempt = attempt + 1, error = %e, "stealth fetch failed, retrying");
tokio::time::sleep(Duration::from_secs_f64(
self.config.base.retry_delay_secs,
))
.await;
}
last_error = Some(e);
}
}
}
Err(last_error.unwrap_or(BrowserError::Other("unknown error".into())))
}
async fn do_fetch(&self, url: &str, params: &ResolvedFetchParams) -> Result<Response> {
let page = self.get_page().await?;
setup_page(
&page,
params.timeout_ms,
¶ms.extra_headers,
params.disable_resources,
¶ms.blocked_domains,
)
.await?;
if let Some(ref cb) = self.config.base.page_setup {
cb(page.clone()).await?;
}
let goto_opts = make_goto_options(params.timeout_ms, params.network_idle);
debug!(url = %url, "stealth navigating");
let nav_response = page.goto(url, Some(goto_opts)).await?;
wait_for_stability(&page, params.load_dom, params.network_idle).await?;
if params.solve_cloudflare {
self.cloudflare_solver(&page).await?;
}
if let Some(ref cb) = self.config.base.page_action {
cb(page.clone()).await?;
}
if let Some(ref selector) = params.wait_selector {
wait_for_selector(
&page,
selector,
params.wait_selector_state,
params.timeout_ms,
)
.await?;
}
if params.wait_ms > 0 {
tokio::time::sleep(Duration::from_millis(params.wait_ms)).await;
}
let response = response_factory::from_browser_page(
&page,
nav_response.as_ref(),
nav_response.as_ref(),
HashMap::new(),
Vec::new(),
)
.await?;
page.close().await?;
info!(
status = response.status,
url = url,
"stealth fetch complete"
);
Ok(response)
}
async fn get_page(&self) -> Result<playwright_rs::Page> {
if let Some(ref ctx) = self.context {
ctx.new_page().await.map_err(Into::into)
} else if let Some(ref browser) = self.browser {
let ctx = browser.new_context().await?;
ctx.new_page().await.map_err(Into::into)
} else {
Err(BrowserError::Config("no browser available".into()))
}
}
async fn cloudflare_solver(&self, page: &playwright_rs::Page) -> Result<()> {
let _ = page
.wait_for_load_state(Some(playwright_rs::WaitUntil::NetworkIdle))
.await;
tokio::time::sleep(Duration::from_secs(2)).await;
let content = page
.content()
.await
.map_err(|e| BrowserError::Navigation(format!("cloudflare solver: {e}")))?;
let Some(challenge) = detect_cloudflare_challenge(&content) else {
debug!("no Cloudflare challenge detected");
return Ok(());
};
info!(challenge = %challenge, "Cloudflare challenge detected");
match challenge.as_str() {
"non-interactive" => {
for _ in 0..30 {
tokio::time::sleep(Duration::from_secs(2)).await;
let title = page.title().await.unwrap_or_default();
if !title.contains("Just a moment") {
debug!("Cloudflare non-interactive challenge resolved");
return Ok(());
}
}
warn!("Cloudflare non-interactive challenge did not resolve");
}
"managed" | "interactive" | "embedded" => {
let selectors = [
"iframe[src*='challenges.cloudflare.com']",
"#turnstile-wrapper iframe",
".cf-turnstile iframe",
];
for _ in 0..10 {
for selector in &selectors {
let locator = page.locator(selector).await;
if let Ok(count) = locator.count().await {
if count > 0 {
debug!(selector, "found Cloudflare iframe, clicking");
let _ = locator.first().click(None).await;
tokio::time::sleep(Duration::from_secs(3)).await;
let new_content = page.content().await.unwrap_or_default();
if detect_cloudflare_challenge(&new_content).is_none() {
info!("Cloudflare challenge solved");
return Ok(());
}
}
}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
warn!("Cloudflare challenge could not be solved after retries");
}
_ => {}
}
Ok(())
}
pub async fn close(&mut self) -> Result<()> {
if let Some(ctx) = self.context.take() {
let _ = ctx.close().await;
}
if let Some(browser) = self.browser.take() {
let _ = browser.close().await;
}
self.playwright = None;
self.is_alive = false;
info!("StealthySession closed");
Ok(())
}
pub fn is_alive(&self) -> bool {
self.is_alive
}
pub fn pool_stats(&self) -> crate::page_pool::PoolStats {
self.page_pool.stats()
}
}
fn detect_cloudflare_challenge(content: &str) -> Option<String> {
if content.contains("cType: 'non-interactive'") {
return Some("non-interactive".into());
}
if content.contains("cType: 'managed'") {
return Some("managed".into());
}
if content.contains("cType: 'interactive'") {
return Some("interactive".into());
}
if content.contains("challenges.cloudflare.com/turnstile/v") {
return Some("embedded".into());
}
None
}