Skip to main content

voidcrawl_mcp/tools/
fetch.rs

1//! Stateless fetch over the shared `BrowserPool`.
2
3use std::time::{Duration, Instant};
4
5use futures::future::join_all;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8use void_crawl_core::{PooledTab, VoidCrawlError};
9
10use crate::{server::VoidCrawlServer, tools::wait};
11
12pub const DEFAULT_TIMEOUT_SECS: u64 = 30;
13
14#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
15pub struct FetchArgs {
16    /// Absolute URL to load.
17    pub url:          String,
18    /// Optional wait strategy: "networkidle" (default) or "selector:<css>".
19    /// Both are event-driven — no polling, no sleeps.
20    #[serde(default)]
21    pub wait_for:     Option<String>,
22    /// Optional JavaScript expression evaluated after the wait. Its
23    /// return value is serialized into `extracted`.
24    #[serde(default)]
25    pub extract:      Option<String>,
26    /// Navigation + wait timeout in seconds (default 30).
27    #[serde(default)]
28    pub timeout_secs: Option<u64>,
29}
30
31/// JSON Schema helper: emit `{}` (any-value) instead of `true`.
32/// Claude Code's validator rejects boolean schemas in outputSchema.properties.
33fn any_value_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
34    serde_json::Map::new().into()
35}
36
37#[derive(Debug, Serialize, JsonSchema)]
38pub struct FetchResult {
39    pub url:         String,
40    pub status_code: Option<u16>,
41    pub redirected:  bool,
42    pub html:        String,
43    pub title:       Option<String>,
44    #[schemars(schema_with = "any_value_schema")]
45    pub extracted:   Option<serde_json::Value>,
46    /// Milliseconds this request spent queued for a free pool tab before work
47    /// began. ~0 means a tab was free immediately; a larger value means the
48    /// pool was saturated and this request waited behind other in-flight work.
49    pub waited_ms:   u64,
50}
51
52#[derive(Debug, Deserialize, JsonSchema, Default)]
53pub struct FetchManyArgs {
54    /// List of fetch requests to run concurrently. The server's pool
55    /// semaphore caps in-flight work — passing more URLs than the
56    /// pool can serve simply queues the rest.
57    pub requests: Vec<FetchArgs>,
58}
59
60#[derive(Debug, Serialize, JsonSchema)]
61pub struct FetchManyItem {
62    pub ok:     bool,
63    pub result: Option<FetchResult>,
64    pub error:  Option<String>,
65}
66
67/// Batch-level concurrency summary so an agent driving `fetch_many` can see
68/// whether it oversubscribed the pool and adjust — without a separate
69/// `pool_status` round-trip.
70#[derive(Debug, Serialize, JsonSchema)]
71pub struct PoolMeta {
72    /// Server concurrency ceiling: `browsers × tabs_per_browser`.
73    pub max_tabs:      usize,
74    /// Requests submitted in this batch.
75    pub submitted:     usize,
76    /// How many of them had to queue for a tab (waited measurably for a
77    /// permit). `0` means everything ran fully in parallel.
78    pub queued:        usize,
79    /// Worst per-request queue wait observed in the batch, milliseconds.
80    pub max_waited_ms: u64,
81    /// Present only when the batch oversubscribed the pool — a plain-language
82    /// hint the agent can act on (cap the next batch at `max_tabs`, or raise
83    /// the pool size).
84    pub note:          Option<String>,
85}
86
87#[derive(Debug, Serialize, JsonSchema)]
88pub struct FetchManyResult {
89    pub results: Vec<FetchManyItem>,
90    pub pool:    PoolMeta,
91}
92
93/// Queue waits at or below this (scheduler jitter) don't count as "queued".
94const QUEUE_WAIT_THRESHOLD_MS: u64 = 5;
95
96pub async fn run(server: &VoidCrawlServer, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
97    run_timed(server, args).await.1
98}
99
100/// One fetch, returning the pool queue-wait alongside the result so callers
101/// can report concurrency even when the request itself failed.
102async fn run_timed(
103    server: &VoidCrawlServer,
104    args: FetchArgs,
105) -> (u64, Result<FetchResult, VoidCrawlError>) {
106    let pool = match server.state().pool().await {
107        Ok(p) => p,
108        Err(e) => return (0, Err(e)),
109    };
110    // On the error path (e.g. acquire timeout) the precise semaphore-only wait
111    // isn't returned, so fall back to wall-clock around the acquire.
112    let started = Instant::now();
113    let (tab, waited_ms) = match pool.acquire_timed().await {
114        Ok((tab, waited)) => (tab, waited),
115        Err(e) => {
116            let waited = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
117            return (waited, Err(e));
118        }
119    };
120    let mut result = fetch_on_tab(&tab, args).await;
121    pool.release(tab).await;
122    if let Ok(ref mut r) = result {
123        r.waited_ms = waited_ms;
124    }
125    (waited_ms, result)
126}
127
128pub async fn run_many(server: &VoidCrawlServer, args: FetchManyArgs) -> FetchManyResult {
129    let submitted = args.requests.len();
130    let max_tabs = server.state().pool().await.map_or(0, |p| {
131        let c = p.config();
132        c.browsers.saturating_mul(c.tabs_per_browser)
133    });
134
135    let futures = args.requests.into_iter().map(|req| run_timed(server, req));
136    let outcomes = join_all(futures).await;
137
138    let mut max_waited_ms = 0u64;
139    let mut queued = 0usize;
140    let results = outcomes
141        .into_iter()
142        .map(|(waited, r)| {
143            max_waited_ms = max_waited_ms.max(waited);
144            if waited > QUEUE_WAIT_THRESHOLD_MS {
145                queued += 1;
146            }
147            match r {
148                Ok(result) => FetchManyItem { ok: true, result: Some(result), error: None },
149                Err(e) => {
150                    FetchManyItem { ok: false, result: None, error: Some(e.to_string()) }
151                }
152            }
153        })
154        .collect();
155
156    let note = (queued > 0 && max_tabs > 0).then(|| {
157        format!(
158            "{queued} of {submitted} requests queued behind the pool's {max_tabs}-tab limit \
159             (worst wait {max_waited_ms}ms). For full parallelism, submit at most {max_tabs} \
160             per batch, or raise TABS_PER_BROWSER / BROWSER_COUNT."
161        )
162    });
163
164    FetchManyResult { results, pool: PoolMeta { max_tabs, submitted, queued, max_waited_ms, note } }
165}
166
167async fn fetch_on_tab(tab: &PooledTab, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
168    use tokio::time::{Instant, timeout};
169
170    let total_timeout = Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
171    let start = Instant::now();
172    let resp = tab.page.goto_and_wait_for_idle(&args.url, total_timeout).await?;
173    wait::apply_post_navigate(&tab.page, args.wait_for.as_deref(), total_timeout).await?;
174    let remaining = total_timeout.saturating_sub(start.elapsed());
175    // Cap title + extract JS at the remaining budget so a runaway
176    // user-supplied `extract` (e.g. `while(1){}`) can't pin a pool tab
177    // indefinitely — a hung script would otherwise survive every
178    // `release`/`acquire` cycle and eventually drain the pool.
179    let title = timeout(remaining, tab.page.title())
180        .await
181        .map_err(|_| VoidCrawlError::Timeout("title read exceeded timeout_secs".into()))?
182        .ok()
183        .flatten();
184    let extracted = match args.extract {
185        Some(js) => {
186            let value = timeout(remaining, tab.page.evaluate_js(&js)).await.map_err(|_| {
187                VoidCrawlError::Timeout(format!(
188                    "extract evaluate_js exceeded {}s",
189                    total_timeout.as_secs()
190                ))
191            })??;
192            Some(value)
193        }
194        None => None,
195    };
196    Ok(FetchResult {
197        url: resp.url,
198        status_code: resp.status_code,
199        redirected: resp.redirected,
200        html: resp.html,
201        title,
202        extracted,
203        // Overwritten by `run_timed` with the real pool queue-wait.
204        waited_ms: 0,
205    })
206}