use std::time::{Duration, Instant};
use futures::future::join_all;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use void_crawl_core::{AntibotVerdict, PooledTab, VoidCrawlError};
use crate::{server::VoidCrawlServer, tools::wait};
pub const DEFAULT_TIMEOUT_SECS: u64 = 30;
#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
pub struct FetchArgs {
pub url: String,
#[serde(default)]
pub wait_for: Option<String>,
#[serde(default)]
pub extract: Option<String>,
#[serde(default)]
pub timeout_secs: Option<u64>,
}
fn any_value_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
serde_json::Map::new().into()
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct AntibotInfo {
pub vendors: Vec<String>,
pub challenged: bool,
pub challenge_vendor: Option<String>,
pub corpus_version: String,
pub evidence: String,
}
impl From<void_crawl_core::AntibotVerdict> for AntibotInfo {
fn from(v: void_crawl_core::AntibotVerdict) -> Self {
let evidence = match v.evidence {
void_crawl_core::AntibotEvidence::None => "none",
void_crawl_core::AntibotEvidence::Headers => "headers",
void_crawl_core::AntibotEvidence::Body => "body",
};
Self {
vendors: v.vendors,
challenged: v.challenged,
challenge_vendor: v.challenge_vendor,
corpus_version: v.corpus_version.to_string(),
evidence: evidence.to_string(),
}
}
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct FetchResult {
pub url: String,
pub status_code: Option<u16>,
pub redirected: bool,
pub html: String,
pub title: Option<String>,
#[schemars(schema_with = "any_value_schema")]
pub extracted: Option<serde_json::Value>,
pub antibot: Option<AntibotInfo>,
pub waited_ms: u64,
}
#[derive(Debug, Deserialize, JsonSchema, Default)]
pub struct FetchManyArgs {
pub requests: Vec<FetchArgs>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct FetchManyItem {
pub ok: bool,
pub result: Option<FetchResult>,
pub error: Option<String>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct PoolMeta {
pub max_tabs: usize,
pub submitted: usize,
pub queued: usize,
pub max_waited_ms: u64,
pub note: Option<String>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct FetchManyResult {
pub results: Vec<FetchManyItem>,
pub pool: PoolMeta,
}
const QUEUE_WAIT_THRESHOLD_MS: u64 = 5;
pub async fn run(server: &VoidCrawlServer, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
run_timed(server, args).await.1
}
async fn run_timed(
server: &VoidCrawlServer,
args: FetchArgs,
) -> (u64, Result<FetchResult, VoidCrawlError>) {
let pool = match server.state().pool().await {
Ok(p) => p,
Err(e) => return (0, Err(e)),
};
let started = Instant::now();
let (tab, waited_ms) = match pool.acquire_timed().await {
Ok((tab, waited)) => (tab, waited),
Err(e) => {
let waited = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
return (waited, Err(e));
}
};
let mut result = fetch_on_tab(&tab, args).await;
pool.release(tab).await;
if let Ok(ref mut r) = result {
r.waited_ms = waited_ms;
}
(waited_ms, result)
}
pub async fn run_many(server: &VoidCrawlServer, args: FetchManyArgs) -> FetchManyResult {
let submitted = args.requests.len();
let max_tabs = server.state().pool().await.map_or(0, |p| {
let c = p.config();
c.browsers.saturating_mul(c.tabs_per_browser)
});
let futures = args.requests.into_iter().map(|req| run_timed(server, req));
let outcomes = join_all(futures).await;
let mut max_waited_ms = 0u64;
let mut queued = 0usize;
let results = outcomes
.into_iter()
.map(|(waited, r)| {
max_waited_ms = max_waited_ms.max(waited);
if waited > QUEUE_WAIT_THRESHOLD_MS {
queued += 1;
}
match r {
Ok(result) => FetchManyItem { ok: true, result: Some(result), error: None },
Err(e) => {
FetchManyItem { ok: false, result: None, error: Some(e.to_string()) }
}
}
})
.collect();
let note = (queued > 0 && max_tabs > 0).then(|| {
format!(
"{queued} of {submitted} requests queued behind the pool's {max_tabs}-tab limit \
(worst wait {max_waited_ms}ms). For full parallelism, submit at most {max_tabs} \
per batch, or raise TABS_PER_BROWSER / BROWSER_COUNT."
)
});
FetchManyResult { results, pool: PoolMeta { max_tabs, submitted, queued, max_waited_ms, note } }
}
async fn fetch_on_tab(tab: &PooledTab, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
use tokio::time::{Instant, timeout};
let total_timeout = Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
let start = Instant::now();
let resp = tab.page.goto_and_wait_for_idle(&args.url, total_timeout).await?;
wait::apply_post_navigate(&tab.page, args.wait_for.as_deref(), total_timeout).await?;
let remaining = total_timeout.saturating_sub(start.elapsed());
let title = timeout(remaining, tab.page.title())
.await
.map_err(|_| VoidCrawlError::Timeout("title read exceeded timeout_secs".into()))?
.ok()
.flatten();
let extracted = match args.extract {
Some(js) => {
let value = timeout(remaining, tab.page.evaluate_js(&js)).await.map_err(|_| {
VoidCrawlError::Timeout(format!(
"extract evaluate_js exceeded {}s",
total_timeout.as_secs()
))
})??;
Some(value)
}
None => None,
};
let antibot = resp.antibot.filter(AntibotVerdict::detected).map(AntibotInfo::from);
Ok(FetchResult {
url: resp.url,
status_code: resp.status_code,
redirected: resp.redirected,
html: resp.html,
title,
extracted,
antibot,
waited_ms: 0,
})
}