1use std::time::{Duration, Instant};
4
5use futures::future::join_all;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8use void_crawl_core::{AntibotVerdict, PooledTab, VoidCrawlError};
9
10use crate::{server::VoidCrawlServer, tools::wait};
11
12pub const DEFAULT_TIMEOUT_SECS: u64 = 30;
13
14#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
15pub struct FetchArgs {
16 pub url: String,
18 #[serde(default)]
21 pub wait_for: Option<String>,
22 #[serde(default)]
25 pub extract: Option<String>,
26 #[serde(default)]
28 pub timeout_secs: Option<u64>,
29}
30
31fn any_value_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
34 serde_json::Map::new().into()
35}
36
37#[derive(Debug, Serialize, JsonSchema)]
43pub struct AntibotInfo {
44 pub vendors: Vec<String>,
46 pub challenged: bool,
49 pub challenge_vendor: Option<String>,
51 pub corpus_version: String,
53 pub evidence: String,
55}
56
57impl From<void_crawl_core::AntibotVerdict> for AntibotInfo {
58 fn from(v: void_crawl_core::AntibotVerdict) -> Self {
59 let evidence = match v.evidence {
60 void_crawl_core::AntibotEvidence::None => "none",
61 void_crawl_core::AntibotEvidence::Headers => "headers",
62 void_crawl_core::AntibotEvidence::Body => "body",
63 };
64 Self {
65 vendors: v.vendors,
66 challenged: v.challenged,
67 challenge_vendor: v.challenge_vendor,
68 corpus_version: v.corpus_version.to_string(),
69 evidence: evidence.to_string(),
70 }
71 }
72}
73
74#[derive(Debug, Serialize, JsonSchema)]
75pub struct FetchResult {
76 pub url: String,
77 pub status_code: Option<u16>,
78 pub redirected: bool,
79 pub html: String,
80 pub title: Option<String>,
81 #[schemars(schema_with = "any_value_schema")]
82 pub extracted: Option<serde_json::Value>,
83 pub antibot: Option<AntibotInfo>,
86 pub waited_ms: u64,
90}
91
92#[derive(Debug, Deserialize, JsonSchema, Default)]
93pub struct FetchManyArgs {
94 pub requests: Vec<FetchArgs>,
98}
99
100#[derive(Debug, Serialize, JsonSchema)]
101pub struct FetchManyItem {
102 pub ok: bool,
103 pub result: Option<FetchResult>,
104 pub error: Option<String>,
105}
106
107#[derive(Debug, Serialize, JsonSchema)]
111pub struct PoolMeta {
112 pub max_tabs: usize,
114 pub submitted: usize,
116 pub queued: usize,
119 pub max_waited_ms: u64,
121 pub note: Option<String>,
125}
126
127#[derive(Debug, Serialize, JsonSchema)]
128pub struct FetchManyResult {
129 pub results: Vec<FetchManyItem>,
130 pub pool: PoolMeta,
131}
132
133const QUEUE_WAIT_THRESHOLD_MS: u64 = 5;
135
136pub async fn run(server: &VoidCrawlServer, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
137 run_timed(server, args).await.1
138}
139
140async fn run_timed(
143 server: &VoidCrawlServer,
144 args: FetchArgs,
145) -> (u64, Result<FetchResult, VoidCrawlError>) {
146 let pool = match server.state().pool().await {
147 Ok(p) => p,
148 Err(e) => return (0, Err(e)),
149 };
150 let started = Instant::now();
153 let (tab, waited_ms) = match pool.acquire_timed().await {
154 Ok((tab, waited)) => (tab, waited),
155 Err(e) => {
156 let waited = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
157 return (waited, Err(e));
158 }
159 };
160 let mut result = fetch_on_tab(&tab, args).await;
161 pool.release(tab).await;
162 if let Ok(ref mut r) = result {
163 r.waited_ms = waited_ms;
164 }
165 (waited_ms, result)
166}
167
168pub async fn run_many(server: &VoidCrawlServer, args: FetchManyArgs) -> FetchManyResult {
169 let submitted = args.requests.len();
170 let max_tabs = server.state().pool().await.map_or(0, |p| {
171 let c = p.config();
172 c.browsers.saturating_mul(c.tabs_per_browser)
173 });
174
175 let futures = args.requests.into_iter().map(|req| run_timed(server, req));
176 let outcomes = join_all(futures).await;
177
178 let mut max_waited_ms = 0u64;
179 let mut queued = 0usize;
180 let results = outcomes
181 .into_iter()
182 .map(|(waited, r)| {
183 max_waited_ms = max_waited_ms.max(waited);
184 if waited > QUEUE_WAIT_THRESHOLD_MS {
185 queued += 1;
186 }
187 match r {
188 Ok(result) => FetchManyItem { ok: true, result: Some(result), error: None },
189 Err(e) => {
190 FetchManyItem { ok: false, result: None, error: Some(e.to_string()) }
191 }
192 }
193 })
194 .collect();
195
196 let note = (queued > 0 && max_tabs > 0).then(|| {
197 format!(
198 "{queued} of {submitted} requests queued behind the pool's {max_tabs}-tab limit \
199 (worst wait {max_waited_ms}ms). For full parallelism, submit at most {max_tabs} \
200 per batch, or raise TABS_PER_BROWSER / BROWSER_COUNT."
201 )
202 });
203
204 FetchManyResult { results, pool: PoolMeta { max_tabs, submitted, queued, max_waited_ms, note } }
205}
206
207async fn fetch_on_tab(tab: &PooledTab, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
208 use tokio::time::{Instant, timeout};
209
210 let total_timeout = Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
211 let start = Instant::now();
212 let resp = tab.page.goto_and_wait_for_idle(&args.url, total_timeout).await?;
213 wait::apply_post_navigate(&tab.page, args.wait_for.as_deref(), total_timeout).await?;
214 let remaining = total_timeout.saturating_sub(start.elapsed());
215 let title = timeout(remaining, tab.page.title())
220 .await
221 .map_err(|_| VoidCrawlError::Timeout("title read exceeded timeout_secs".into()))?
222 .ok()
223 .flatten();
224 let extracted = match args.extract {
225 Some(js) => {
226 let value = timeout(remaining, tab.page.evaluate_js(&js)).await.map_err(|_| {
227 VoidCrawlError::Timeout(format!(
228 "extract evaluate_js exceeded {}s",
229 total_timeout.as_secs()
230 ))
231 })??;
232 Some(value)
233 }
234 None => None,
235 };
236 let antibot = resp.antibot.filter(AntibotVerdict::detected).map(AntibotInfo::from);
239 Ok(FetchResult {
240 url: resp.url,
241 status_code: resp.status_code,
242 redirected: resp.redirected,
243 html: resp.html,
244 title,
245 extracted,
246 antibot,
247 waited_ms: 0,
249 })
250}