Skip to main content

voidcrawl_mcp/tools/
fetch.rs

1//! Stateless fetch over the shared `BrowserPool`.
2
3use std::time::{Duration, Instant};
4
5use futures::future::join_all;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8use void_crawl_core::{AntibotVerdict, PooledTab, VoidCrawlError};
9
10use crate::{server::VoidCrawlServer, tools::wait};
11
12pub const DEFAULT_TIMEOUT_SECS: u64 = 30;
13
14#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
15pub struct FetchArgs {
16    /// Absolute URL to load.
17    pub url:          String,
18    /// Optional wait strategy: "networkidle" (default) or "selector:<css>".
19    /// Both are event-driven — no polling, no sleeps.
20    #[serde(default)]
21    pub wait_for:     Option<String>,
22    /// Optional JavaScript expression evaluated after the wait. Its
23    /// return value is serialized into `extracted`.
24    #[serde(default)]
25    pub extract:      Option<String>,
26    /// Navigation + wait timeout in seconds (default 30).
27    #[serde(default)]
28    pub timeout_secs: Option<u64>,
29}
30
31/// JSON Schema helper: emit `{}` (any-value) instead of `true`.
32/// Claude Code's validator rejects boolean schemas in outputSchema.properties.
33fn any_value_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
34    serde_json::Map::new().into()
35}
36
37/// Anti-bot / CDN vendor fingerprint of the fetched response, surfaced so an
38/// agent can route deterministically (rotate proxy/profile, go headful) instead
39/// of retrying blind. The raw headers it was derived from are available on the
40/// Python `PageResponse`; only the actionable verdict is surfaced here to keep
41/// the agent response small. See `crate::antibot` (core).
42#[derive(Debug, Serialize, JsonSchema)]
43pub struct AntibotInfo {
44    /// Canonical vendor tags detected (e.g. `cloudflare`, `datadome`), sorted.
45    pub vendors:          Vec<String>,
46    /// `true` when an active wall/challenge fired (rotate), vs. mere CDN
47    /// presence (no action needed).
48    pub challenged:       bool,
49    /// Vendor whose challenge fired, when `challenged`.
50    pub challenge_vendor: Option<String>,
51    /// Signature corpus version the verdict was produced against.
52    pub corpus_version:   String,
53    /// Which tier matched: `none` / `headers` / `body`.
54    pub evidence:         String,
55}
56
57impl From<void_crawl_core::AntibotVerdict> for AntibotInfo {
58    fn from(v: void_crawl_core::AntibotVerdict) -> Self {
59        let evidence = match v.evidence {
60            void_crawl_core::AntibotEvidence::None => "none",
61            void_crawl_core::AntibotEvidence::Headers => "headers",
62            void_crawl_core::AntibotEvidence::Body => "body",
63        };
64        Self {
65            vendors:          v.vendors,
66            challenged:       v.challenged,
67            challenge_vendor: v.challenge_vendor,
68            corpus_version:   v.corpus_version.to_string(),
69            evidence:         evidence.to_string(),
70        }
71    }
72}
73
74#[derive(Debug, Serialize, JsonSchema)]
75pub struct FetchResult {
76    pub url:         String,
77    pub status_code: Option<u16>,
78    pub redirected:  bool,
79    pub html:        String,
80    pub title:       Option<String>,
81    #[schemars(schema_with = "any_value_schema")]
82    pub extracted:   Option<serde_json::Value>,
83    /// Anti-bot / CDN vendor fingerprint, or `null` when no vendor was detected
84    /// (or no network response was captured).
85    pub antibot:     Option<AntibotInfo>,
86    /// Milliseconds this request spent queued for a free pool tab before work
87    /// began. ~0 means a tab was free immediately; a larger value means the
88    /// pool was saturated and this request waited behind other in-flight work.
89    pub waited_ms:   u64,
90}
91
92#[derive(Debug, Deserialize, JsonSchema, Default)]
93pub struct FetchManyArgs {
94    /// List of fetch requests to run concurrently. The server's pool
95    /// semaphore caps in-flight work — passing more URLs than the
96    /// pool can serve simply queues the rest.
97    pub requests: Vec<FetchArgs>,
98}
99
100#[derive(Debug, Serialize, JsonSchema)]
101pub struct FetchManyItem {
102    pub ok:     bool,
103    pub result: Option<FetchResult>,
104    pub error:  Option<String>,
105}
106
107/// Batch-level concurrency summary so an agent driving `fetch_many` can see
108/// whether it oversubscribed the pool and adjust — without a separate
109/// `pool_status` round-trip.
110#[derive(Debug, Serialize, JsonSchema)]
111pub struct PoolMeta {
112    /// Server concurrency ceiling: `browsers × tabs_per_browser`.
113    pub max_tabs:      usize,
114    /// Requests submitted in this batch.
115    pub submitted:     usize,
116    /// How many of them had to queue for a tab (waited measurably for a
117    /// permit). `0` means everything ran fully in parallel.
118    pub queued:        usize,
119    /// Worst per-request queue wait observed in the batch, milliseconds.
120    pub max_waited_ms: u64,
121    /// Present only when the batch oversubscribed the pool — a plain-language
122    /// hint the agent can act on (cap the next batch at `max_tabs`, or raise
123    /// the pool size).
124    pub note:          Option<String>,
125}
126
127#[derive(Debug, Serialize, JsonSchema)]
128pub struct FetchManyResult {
129    pub results: Vec<FetchManyItem>,
130    pub pool:    PoolMeta,
131}
132
133/// Queue waits at or below this (scheduler jitter) don't count as "queued".
134const QUEUE_WAIT_THRESHOLD_MS: u64 = 5;
135
136pub async fn run(server: &VoidCrawlServer, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
137    run_timed(server, args).await.1
138}
139
140/// One fetch, returning the pool queue-wait alongside the result so callers
141/// can report concurrency even when the request itself failed.
142async fn run_timed(
143    server: &VoidCrawlServer,
144    args: FetchArgs,
145) -> (u64, Result<FetchResult, VoidCrawlError>) {
146    let pool = match server.state().pool().await {
147        Ok(p) => p,
148        Err(e) => return (0, Err(e)),
149    };
150    // On the error path (e.g. acquire timeout) the precise semaphore-only wait
151    // isn't returned, so fall back to wall-clock around the acquire.
152    let started = Instant::now();
153    let (tab, waited_ms) = match pool.acquire_timed().await {
154        Ok((tab, waited)) => (tab, waited),
155        Err(e) => {
156            let waited = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
157            return (waited, Err(e));
158        }
159    };
160    let mut result = fetch_on_tab(&tab, args).await;
161    pool.release(tab).await;
162    if let Ok(ref mut r) = result {
163        r.waited_ms = waited_ms;
164    }
165    (waited_ms, result)
166}
167
168pub async fn run_many(server: &VoidCrawlServer, args: FetchManyArgs) -> FetchManyResult {
169    let submitted = args.requests.len();
170    let max_tabs = server.state().pool().await.map_or(0, |p| {
171        let c = p.config();
172        c.browsers.saturating_mul(c.tabs_per_browser)
173    });
174
175    let futures = args.requests.into_iter().map(|req| run_timed(server, req));
176    let outcomes = join_all(futures).await;
177
178    let mut max_waited_ms = 0u64;
179    let mut queued = 0usize;
180    let results = outcomes
181        .into_iter()
182        .map(|(waited, r)| {
183            max_waited_ms = max_waited_ms.max(waited);
184            if waited > QUEUE_WAIT_THRESHOLD_MS {
185                queued += 1;
186            }
187            match r {
188                Ok(result) => FetchManyItem { ok: true, result: Some(result), error: None },
189                Err(e) => {
190                    FetchManyItem { ok: false, result: None, error: Some(e.to_string()) }
191                }
192            }
193        })
194        .collect();
195
196    let note = (queued > 0 && max_tabs > 0).then(|| {
197        format!(
198            "{queued} of {submitted} requests queued behind the pool's {max_tabs}-tab limit \
199             (worst wait {max_waited_ms}ms). For full parallelism, submit at most {max_tabs} \
200             per batch, or raise TABS_PER_BROWSER / BROWSER_COUNT."
201        )
202    });
203
204    FetchManyResult { results, pool: PoolMeta { max_tabs, submitted, queued, max_waited_ms, note } }
205}
206
207async fn fetch_on_tab(tab: &PooledTab, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
208    use tokio::time::{Instant, timeout};
209
210    let total_timeout = Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
211    let start = Instant::now();
212    let resp = tab.page.goto_and_wait_for_idle(&args.url, total_timeout).await?;
213    wait::apply_post_navigate(&tab.page, args.wait_for.as_deref(), total_timeout).await?;
214    let remaining = total_timeout.saturating_sub(start.elapsed());
215    // Cap title + extract JS at the remaining budget so a runaway
216    // user-supplied `extract` (e.g. `while(1){}`) can't pin a pool tab
217    // indefinitely — a hung script would otherwise survive every
218    // `release`/`acquire` cycle and eventually drain the pool.
219    let title = timeout(remaining, tab.page.title())
220        .await
221        .map_err(|_| VoidCrawlError::Timeout("title read exceeded timeout_secs".into()))?
222        .ok()
223        .flatten();
224    let extracted = match args.extract {
225        Some(js) => {
226            let value = timeout(remaining, tab.page.evaluate_js(&js)).await.map_err(|_| {
227                VoidCrawlError::Timeout(format!(
228                    "extract evaluate_js exceeded {}s",
229                    total_timeout.as_secs()
230                ))
231            })??;
232            Some(value)
233        }
234        None => None,
235    };
236    // Surface the verdict only when a vendor was actually detected — keeps the
237    // common (un-walled) response free of empty `antibot` noise.
238    let antibot = resp.antibot.filter(AntibotVerdict::detected).map(AntibotInfo::from);
239    Ok(FetchResult {
240        url: resp.url,
241        status_code: resp.status_code,
242        redirected: resp.redirected,
243        html: resp.html,
244        title,
245        extracted,
246        antibot,
247        // Overwritten by `run_timed` with the real pool queue-wait.
248        waited_ms: 0,
249    })
250}