1use std::time::{Duration, Instant};
4
5use futures::future::join_all;
6use schemars::JsonSchema;
7use serde::{Deserialize, Serialize};
8use void_crawl_core::{PooledTab, VoidCrawlError};
9
10use crate::{server::VoidCrawlServer, tools::wait};
11
12pub const DEFAULT_TIMEOUT_SECS: u64 = 30;
13
14#[derive(Debug, Clone, Deserialize, JsonSchema, Default)]
15pub struct FetchArgs {
16 pub url: String,
18 #[serde(default)]
21 pub wait_for: Option<String>,
22 #[serde(default)]
25 pub extract: Option<String>,
26 #[serde(default)]
28 pub timeout_secs: Option<u64>,
29}
30
31fn any_value_schema(_: &mut schemars::SchemaGenerator) -> schemars::Schema {
34 serde_json::Map::new().into()
35}
36
37#[derive(Debug, Serialize, JsonSchema)]
38pub struct FetchResult {
39 pub url: String,
40 pub status_code: Option<u16>,
41 pub redirected: bool,
42 pub html: String,
43 pub title: Option<String>,
44 #[schemars(schema_with = "any_value_schema")]
45 pub extracted: Option<serde_json::Value>,
46 pub waited_ms: u64,
50}
51
52#[derive(Debug, Deserialize, JsonSchema, Default)]
53pub struct FetchManyArgs {
54 pub requests: Vec<FetchArgs>,
58}
59
60#[derive(Debug, Serialize, JsonSchema)]
61pub struct FetchManyItem {
62 pub ok: bool,
63 pub result: Option<FetchResult>,
64 pub error: Option<String>,
65}
66
67#[derive(Debug, Serialize, JsonSchema)]
71pub struct PoolMeta {
72 pub max_tabs: usize,
74 pub submitted: usize,
76 pub queued: usize,
79 pub max_waited_ms: u64,
81 pub note: Option<String>,
85}
86
87#[derive(Debug, Serialize, JsonSchema)]
88pub struct FetchManyResult {
89 pub results: Vec<FetchManyItem>,
90 pub pool: PoolMeta,
91}
92
93const QUEUE_WAIT_THRESHOLD_MS: u64 = 5;
95
96pub async fn run(server: &VoidCrawlServer, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
97 run_timed(server, args).await.1
98}
99
100async fn run_timed(
103 server: &VoidCrawlServer,
104 args: FetchArgs,
105) -> (u64, Result<FetchResult, VoidCrawlError>) {
106 let pool = match server.state().pool().await {
107 Ok(p) => p,
108 Err(e) => return (0, Err(e)),
109 };
110 let started = Instant::now();
113 let (tab, waited_ms) = match pool.acquire_timed().await {
114 Ok((tab, waited)) => (tab, waited),
115 Err(e) => {
116 let waited = u64::try_from(started.elapsed().as_millis()).unwrap_or(u64::MAX);
117 return (waited, Err(e));
118 }
119 };
120 let mut result = fetch_on_tab(&tab, args).await;
121 pool.release(tab).await;
122 if let Ok(ref mut r) = result {
123 r.waited_ms = waited_ms;
124 }
125 (waited_ms, result)
126}
127
128pub async fn run_many(server: &VoidCrawlServer, args: FetchManyArgs) -> FetchManyResult {
129 let submitted = args.requests.len();
130 let max_tabs = server.state().pool().await.map_or(0, |p| {
131 let c = p.config();
132 c.browsers.saturating_mul(c.tabs_per_browser)
133 });
134
135 let futures = args.requests.into_iter().map(|req| run_timed(server, req));
136 let outcomes = join_all(futures).await;
137
138 let mut max_waited_ms = 0u64;
139 let mut queued = 0usize;
140 let results = outcomes
141 .into_iter()
142 .map(|(waited, r)| {
143 max_waited_ms = max_waited_ms.max(waited);
144 if waited > QUEUE_WAIT_THRESHOLD_MS {
145 queued += 1;
146 }
147 match r {
148 Ok(result) => FetchManyItem { ok: true, result: Some(result), error: None },
149 Err(e) => {
150 FetchManyItem { ok: false, result: None, error: Some(e.to_string()) }
151 }
152 }
153 })
154 .collect();
155
156 let note = (queued > 0 && max_tabs > 0).then(|| {
157 format!(
158 "{queued} of {submitted} requests queued behind the pool's {max_tabs}-tab limit \
159 (worst wait {max_waited_ms}ms). For full parallelism, submit at most {max_tabs} \
160 per batch, or raise TABS_PER_BROWSER / BROWSER_COUNT."
161 )
162 });
163
164 FetchManyResult { results, pool: PoolMeta { max_tabs, submitted, queued, max_waited_ms, note } }
165}
166
167async fn fetch_on_tab(tab: &PooledTab, args: FetchArgs) -> Result<FetchResult, VoidCrawlError> {
168 use tokio::time::{Instant, timeout};
169
170 let total_timeout = Duration::from_secs(args.timeout_secs.unwrap_or(DEFAULT_TIMEOUT_SECS));
171 let start = Instant::now();
172 let resp = tab.page.goto_and_wait_for_idle(&args.url, total_timeout).await?;
173 wait::apply_post_navigate(&tab.page, args.wait_for.as_deref(), total_timeout).await?;
174 let remaining = total_timeout.saturating_sub(start.elapsed());
175 let title = timeout(remaining, tab.page.title())
180 .await
181 .map_err(|_| VoidCrawlError::Timeout("title read exceeded timeout_secs".into()))?
182 .ok()
183 .flatten();
184 let extracted = match args.extract {
185 Some(js) => {
186 let value = timeout(remaining, tab.page.evaluate_js(&js)).await.map_err(|_| {
187 VoidCrawlError::Timeout(format!(
188 "extract evaluate_js exceeded {}s",
189 total_timeout.as_secs()
190 ))
191 })??;
192 Some(value)
193 }
194 None => None,
195 };
196 Ok(FetchResult {
197 url: resp.url,
198 status_code: resp.status_code,
199 redirected: resp.redirected,
200 html: resp.html,
201 title,
202 extracted,
203 waited_ms: 0,
205 })
206}