Skip to main content

aft/
url_fetch.rs

1use std::error::Error;
2use std::fmt;
3use std::fs;
4use std::io::{self, Read, Write};
5use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs};
6use std::path::{Path, PathBuf};
7use std::sync::{mpsc, Arc};
8use std::thread;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11use htmd::{
12    element_handler::{HandlerResult, Handlers},
13    Element, HtmlToMarkdown,
14};
15use reqwest::blocking::{Client, Response as HttpResponse};
16use reqwest::header::{ACCEPT, CONTENT_TYPE, LOCATION, USER_AGENT};
17use reqwest::redirect::Policy;
18use serde::{Deserialize, Serialize};
19use sha2::{Digest, Sha256};
20use url::Url;
21
22const MAX_RESPONSE_BYTES: u64 = 10 * 1024 * 1024;
23const CACHE_TTL_MS: u64 = 24 * 60 * 60 * 1000;
24const CONNECT_TIMEOUT: Duration = Duration::from_millis(30_000);
25const BODY_CHUNK_TIMEOUT: Duration = Duration::from_millis(15_000);
26const MAX_REDIRECTS: usize = 5;
27
28/// Retry budget for transient connect/transport failures only. Agents
29/// shouldn't have to retry manually for a single TCP/TLS hiccup. We cap
30/// at 2 retries (= 3 total attempts) with short jittered backoff so a
31/// genuinely-broken host fails fast instead of dragging the foreground
32/// fetch out to many seconds.
33///
34/// We deliberately do NOT retry on:
35///   - HTTP error status (4xx/5xx) — the server actually answered
36///   - Redirect errors / SSRF rejections — those are deterministic
37///   - Body read stalls — already handled by BODY_CHUNK_TIMEOUT
38const TRANSIENT_RETRY_ATTEMPTS: usize = 2;
39const TRANSIENT_RETRY_BACKOFFS_MS: [u64; TRANSIENT_RETRY_ATTEMPTS] = [200, 600];
40const ACCEPT_HEADER: &str = "application/vnd.github.raw, text/markdown, text/x-markdown, text/html;q=0.9, application/json;q=0.8, text/plain;q=0.5";
41const USER_AGENT_VALUE: &str = "aft-opencode-plugin";
42const CONVERTED_MARKDOWN_CONTENT_TYPE: &str = "text/markdown; charset=utf-8";
43
44#[derive(Clone, Default)]
45pub struct UrlFetchOptions {
46    pub allow_private: bool,
47    /// Test hook: treat a hostname as resolving to these IPs during SSRF validation.
48    /// Production callers leave this empty and use `std::net::ToSocketAddrs`.
49    #[doc(hidden)]
50    pub public_host_overrides: Vec<(String, Vec<IpAddr>)>,
51    /// Test hook: force reqwest to connect a hostname to a local mock server while
52    /// SSRF validation still sees `public_host_overrides` above.
53    #[doc(hidden)]
54    pub connect_overrides: Vec<(String, SocketAddr)>,
55    /// Test hook: observes the temp path immediately before the atomic rename.
56    #[doc(hidden)]
57    pub atomic_write_observer: Option<Arc<dyn Fn(&Path, &Path) + Send + Sync>>,
58}
59
60#[derive(Debug, Clone)]
61pub struct UrlFetchError {
62    message: String,
63}
64
65impl UrlFetchError {
66    fn new(message: impl Into<String>) -> Self {
67        Self {
68            message: message.into(),
69        }
70    }
71}
72
73impl fmt::Display for UrlFetchError {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        f.write_str(&self.message)
76    }
77}
78
79impl std::error::Error for UrlFetchError {}
80
81#[derive(Debug, Serialize, Deserialize)]
82struct CacheMeta {
83    url: String,
84    #[serde(rename = "contentType")]
85    content_type: String,
86    extension: String,
87    #[serde(rename = "fetchedAt")]
88    fetched_at: u64,
89}
90
91pub fn is_http_url(value: &str) -> bool {
92    value.starts_with("http://") || value.starts_with("https://")
93}
94
95pub fn fetch_url_to_cache(
96    url: &str,
97    storage_dir: &Path,
98    options: UrlFetchOptions,
99) -> Result<PathBuf, UrlFetchError> {
100    let parsed = Url::parse(url).map_err(|_| UrlFetchError::new(format!("Invalid URL: {url}")))?;
101    validate_public_url(&parsed, &options)?;
102
103    let dir = cache_dir(storage_dir);
104    fs::create_dir_all(&dir).map_err(|error| {
105        UrlFetchError::new(format!(
106            "Failed to create URL cache directory {}: {error}",
107            dir.display()
108        ))
109    })?;
110
111    let hash = hash_url(url);
112    let meta_file = meta_path(storage_dir, &hash);
113    if let Some(cached) = fresh_cached_path(storage_dir, &hash, &meta_file)? {
114        return Ok(cached);
115    }
116
117    let response = fetch_with_redirects(&parsed, url, &options)?;
118    if !response.status().is_success() {
119        return Err(UrlFetchError::new(format!(
120            "HTTP {} {} fetching {url}",
121            response.status().as_u16(),
122            response.status().canonical_reason().unwrap_or("")
123        )));
124    }
125
126    let content_type = response
127        .headers()
128        .get(CONTENT_TYPE)
129        .and_then(|value| value.to_str().ok())
130        .unwrap_or("text/plain")
131        .to_string();
132    let extension = resolve_extension(&content_type).ok_or_else(|| {
133        UrlFetchError::new(format!(
134            "Unsupported content type '{content_type}' for {url}. Supported: text/html, text/markdown, application/json, text/plain"
135        ))
136    })?;
137
138    if let Some(length) = response.content_length() {
139        if length > MAX_RESPONSE_BYTES {
140            return Err(UrlFetchError::new(format!(
141                "Response too large: {length} bytes (max {MAX_RESPONSE_BYTES})"
142            )));
143        }
144    }
145
146    let body = read_response_body(response, url)?;
147    let (body, content_type, extension) = if extension == ".html" {
148        (
149            convert_html_body_to_markdown(&body, url)?,
150            CONVERTED_MARKDOWN_CONTENT_TYPE.to_string(),
151            ".md",
152        )
153    } else {
154        (body, content_type, extension)
155    };
156
157    let content_file = content_path(storage_dir, &hash, extension);
158    atomic_write(&content_file, &body, &options)?;
159
160    let meta = CacheMeta {
161        url: url.to_string(),
162        content_type,
163        extension: extension.to_string(),
164        fetched_at: now_ms(),
165    };
166    let meta_bytes = serde_json::to_vec(&meta).map_err(|error| {
167        UrlFetchError::new(format!("Failed to encode URL cache metadata: {error}"))
168    })?;
169    atomic_write(&meta_file, &meta_bytes, &options)?;
170
171    Ok(content_file)
172}
173
174pub fn cleanup_url_cache(storage_dir: &Path) -> Result<usize, UrlFetchError> {
175    let dir = cache_dir(storage_dir);
176    if !dir.exists() {
177        return Ok(0);
178    }
179
180    let entries = fs::read_dir(&dir).map_err(|error| {
181        UrlFetchError::new(format!(
182            "URL cache cleanup failed reading {}: {error}",
183            dir.display()
184        ))
185    })?;
186    let mut removed = 0usize;
187    let now = now_ms();
188
189    for entry in entries.flatten() {
190        let path = entry.path();
191        let Some(name) = path.file_name().and_then(|name| name.to_str()) else {
192            continue;
193        };
194        if !name.ends_with(".meta.json") {
195            continue;
196        }
197
198        let meta = fs::read_to_string(&path)
199            .ok()
200            .and_then(|content| serde_json::from_str::<CacheMeta>(&content).ok());
201        let Some(meta) = meta else {
202            if fs::remove_file(&path).is_ok() {
203                removed += 1;
204            }
205            continue;
206        };
207
208        if now.saturating_sub(meta.fetched_at) <= CACHE_TTL_MS {
209            continue;
210        }
211
212        let hash = name.trim_end_matches(".meta.json");
213        let content = content_path(storage_dir, hash, &meta.extension);
214        let _ = fs::remove_file(content);
215        if fs::remove_file(&path).is_ok() {
216            removed += 1;
217        }
218    }
219
220    Ok(removed)
221}
222
223#[doc(hidden)]
224pub fn cache_content_path_for_url(storage_dir: &Path, url: &str, extension: &str) -> PathBuf {
225    content_path(storage_dir, &hash_url(url), extension)
226}
227
228#[doc(hidden)]
229pub fn cache_meta_path_for_url(storage_dir: &Path, url: &str) -> PathBuf {
230    meta_path(storage_dir, &hash_url(url))
231}
232
233#[doc(hidden)]
234pub fn is_private_ip_for_test(ip: IpAddr) -> bool {
235    is_private_ip(ip)
236}
237
238fn cache_dir(storage_dir: &Path) -> PathBuf {
239    storage_dir.join("url_cache")
240}
241
242fn hash_url(url: &str) -> String {
243    let digest = Sha256::digest(url.as_bytes());
244    format!("{digest:x}").chars().take(16).collect()
245}
246
247fn meta_path(storage_dir: &Path, hash: &str) -> PathBuf {
248    cache_dir(storage_dir).join(format!("{hash}.meta.json"))
249}
250
251fn content_path(storage_dir: &Path, hash: &str, extension: &str) -> PathBuf {
252    cache_dir(storage_dir).join(format!("{hash}{extension}"))
253}
254
255fn fresh_cached_path(
256    storage_dir: &Path,
257    hash: &str,
258    meta_file: &Path,
259) -> Result<Option<PathBuf>, UrlFetchError> {
260    if !meta_file.exists() {
261        return Ok(None);
262    }
263
264    let meta = match fs::read_to_string(meta_file)
265        .ok()
266        .and_then(|content| serde_json::from_str::<CacheMeta>(&content).ok())
267    {
268        Some(meta) => meta,
269        None => return Ok(None),
270    };
271    let age = now_ms().saturating_sub(meta.fetched_at);
272    if meta.extension == ".html" {
273        return Ok(None);
274    }
275
276    let cached = content_path(storage_dir, hash, &meta.extension);
277    if age < CACHE_TTL_MS && cached.exists() {
278        return Ok(Some(cached));
279    }
280    Ok(None)
281}
282
283fn fetch_with_redirects(
284    start_url: &Url,
285    original_url: &str,
286    options: &UrlFetchOptions,
287) -> Result<HttpResponse, UrlFetchError> {
288    let client = build_client(options)?;
289    let mut current_url = start_url.clone();
290
291    for redirect_count in 0..=MAX_REDIRECTS {
292        validate_public_url(&current_url, options)?;
293        let response = send_with_transient_retries(&client, &current_url)?;
294
295        if !response.status().is_redirection() {
296            return Ok(response);
297        }
298        if redirect_count == MAX_REDIRECTS {
299            return Err(UrlFetchError::new(format!(
300                "Too many redirects fetching {original_url}"
301            )));
302        }
303
304        let location = response
305            .headers()
306            .get(LOCATION)
307            .and_then(|value| value.to_str().ok())
308            .ok_or_else(|| {
309                UrlFetchError::new(format!(
310                    "Redirect from {} missing Location header",
311                    current_url.as_str()
312                ))
313            })?;
314        current_url = current_url.join(location).map_err(|error| {
315            UrlFetchError::new(format!(
316                "Invalid redirect Location '{location}' from {}: {error}",
317                current_url.as_str()
318            ))
319        })?;
320    }
321
322    Err(UrlFetchError::new(format!(
323        "Too many redirects fetching {original_url}"
324    )))
325}
326
327/// Issue a single GET with the configured User-Agent + Accept headers and
328/// transparently retry only on transient connect/transport failures.
329///
330/// Returns the response (including 4xx/5xx — caller decides how to treat
331/// those). On a non-transient reqwest error (e.g. an HTTP-shaped reply that
332/// reqwest still surfaces as Err, or a TLS handshake fault that doesn't read
333/// as `is_connect`), the original error is returned immediately so the user
334/// sees the real failure without an artificial 800ms-plus delay.
335fn send_with_transient_retries(
336    client: &Client,
337    target: &Url,
338) -> Result<HttpResponse, UrlFetchError> {
339    let mut last_error: Option<reqwest::Error> = None;
340    for attempt in 0..=TRANSIENT_RETRY_ATTEMPTS {
341        let result = client
342            .get(target.clone())
343            .header(USER_AGENT, USER_AGENT_VALUE)
344            .header(ACCEPT, ACCEPT_HEADER)
345            .send();
346        match result {
347            Ok(response) => return Ok(response),
348            Err(error) => {
349                if attempt < TRANSIENT_RETRY_ATTEMPTS && is_transient_reqwest_error(&error) {
350                    thread::sleep(Duration::from_millis(TRANSIENT_RETRY_BACKOFFS_MS[attempt]));
351                    last_error = Some(error);
352                    continue;
353                }
354                return Err(UrlFetchError::new(format!(
355                    "Failed to fetch {}: {}",
356                    target.as_str(),
357                    reqwest_error_detail(&error)
358                )));
359            }
360        }
361    }
362    // Loop fell through after the last allowed retry exhausted — surface the
363    // most recent transient error rather than swallowing it.
364    Err(UrlFetchError::new(format!(
365        "Failed to fetch {} after {} retries: {}",
366        target.as_str(),
367        TRANSIENT_RETRY_ATTEMPTS,
368        last_error
369            .as_ref()
370            .map(reqwest_error_detail)
371            .unwrap_or_else(|| "unknown transient error".to_string())
372    )))
373}
374
375/// Classify a reqwest error as transient (worth a quick retry) vs terminal.
376///
377/// Transient: TCP connect failures, request-build/send TCP-level failures
378/// that don't carry status, and timeouts. These typically clear on a single
379/// retry — agents shouldn't have to ask twice for a momentary blip.
380///
381/// Terminal: anything where reqwest got far enough to decode an HTTP-shaped
382/// reply (`is_status()`, `is_body()`, `is_decode()`). Retrying those would
383/// just hammer a server that already answered.
384fn is_transient_reqwest_error(error: &reqwest::Error) -> bool {
385    error.is_connect() || error.is_timeout() || error.is_request()
386}
387
388fn build_client(options: &UrlFetchOptions) -> Result<Client, UrlFetchError> {
389    let mut builder = Client::builder()
390        .redirect(Policy::none())
391        .connect_timeout(CONNECT_TIMEOUT);
392
393    for (host, address) in &options.connect_overrides {
394        builder = builder.resolve(host, *address);
395    }
396
397    builder
398        .build()
399        .map_err(|error| UrlFetchError::new(format!("Failed to build URL fetch client: {error}")))
400}
401
402fn validate_public_url(url: &Url, options: &UrlFetchOptions) -> Result<(), UrlFetchError> {
403    if url.scheme() != "http" && url.scheme() != "https" {
404        return Err(UrlFetchError::new(format!(
405            "Only http:// and https:// URLs are supported, got: {}:",
406            url.scheme()
407        )));
408    }
409    if options.allow_private {
410        return Ok(());
411    }
412
413    let host = url
414        .host_str()
415        .ok_or_else(|| UrlFetchError::new(format!("URL missing host: {url}")))?;
416    let host_for_parse = host
417        .trim_matches(['[', ']'])
418        .split('%')
419        .next()
420        .unwrap_or(host);
421
422    if let Ok(ip) = host_for_parse.parse::<IpAddr>() {
423        reject_private_ip(host, ip)?;
424        return Ok(());
425    }
426    if host_for_parse.contains(':') {
427        return Err(UrlFetchError::new(format!(
428            "Blocked private URL host {host} ({host_for_parse})"
429        )));
430    }
431
432    let addresses = resolve_host_ips(host_for_parse, url.port_or_known_default(), options)?;
433    if addresses.is_empty() {
434        return Err(UrlFetchError::new(format!(
435            "Failed to resolve URL host {host}"
436        )));
437    }
438    for ip in addresses {
439        reject_private_ip(host, ip)?;
440    }
441
442    // We validate all resolved addresses before issuing the request. Reqwest's
443    // default resolver runs again during TCP connect, leaving the same small
444    // DNS-rebinding window the old Bun fallback accepted. A custom per-request
445    // resolver hook would close that window but adds complexity for marginal
446    // value in this opt-in agent-tooling surface.
447    Ok(())
448}
449
450fn resolve_host_ips(
451    host: &str,
452    port: Option<u16>,
453    options: &UrlFetchOptions,
454) -> Result<Vec<IpAddr>, UrlFetchError> {
455    if let Some((_, ips)) = options
456        .public_host_overrides
457        .iter()
458        .find(|(override_host, _)| override_host == host)
459    {
460        return Ok(ips.clone());
461    }
462
463    let port = port.unwrap_or(80);
464    let addrs = (host, port).to_socket_addrs().map_err(|error| {
465        UrlFetchError::new(format!("Failed to resolve URL host {host}: {error}"))
466    })?;
467    Ok(addrs.map(|addr| addr.ip()).collect())
468}
469
470fn reject_private_ip(host: &str, ip: IpAddr) -> Result<(), UrlFetchError> {
471    if is_private_ip(ip) {
472        return Err(UrlFetchError::new(format!(
473            "Blocked private URL host {host} ({ip})"
474        )));
475    }
476    Ok(())
477}
478
479/// True for any private/link-local/CGNAT/benchmark/multicast/reserved/loopback
480/// address (the full set this module refuses to fetch). Exposed so the semantic
481/// embedding SSRF guard shares one authoritative range list instead of keeping a
482/// drifting copy. Note: this INCLUDES loopback — callers that intentionally
483/// allow loopback (e.g. a local Ollama endpoint) must exclude it themselves.
484pub fn is_private_or_reserved_ip(ip: IpAddr) -> bool {
485    is_private_ip(ip)
486}
487
488fn is_private_ip(ip: IpAddr) -> bool {
489    match ip {
490        IpAddr::V4(ipv4) => is_private_ipv4(ipv4),
491        IpAddr::V6(ipv6) => is_private_ipv6(ipv6),
492    }
493}
494
495fn is_private_ipv4(ip: Ipv4Addr) -> bool {
496    let [a, b, _, _] = ip.octets();
497    a == 0
498        || a == 10
499        || a == 127
500        || (a == 172 && (16..=31).contains(&b))
501        || (a == 192 && b == 168)
502        || (a == 169 && b == 254)
503        // RFC 6598 Shared Address Space (CGNAT): 100.64.0.0/10. Not globally
504        // routable; used for provider/VPC-internal endpoints — must not be
505        // reachable via SSRF.
506        || (a == 100 && (64..=127).contains(&b))
507        // RFC 2544 benchmark subnet: 198.18.0.0/15. Reserved, non-routable.
508        || (a == 198 && (18..=19).contains(&b))
509        || a >= 224
510}
511
512fn is_private_ipv6(ip: Ipv6Addr) -> bool {
513    let segments = ip.segments();
514    let top_six_zero = segments[..6].iter().all(|segment| *segment == 0);
515    let is_mapped = segments[..5].iter().all(|segment| *segment == 0) && segments[5] == 0xffff;
516    if is_mapped || top_six_zero {
517        let embedded = Ipv4Addr::new(
518            (segments[6] >> 8) as u8,
519            (segments[6] & 0xff) as u8,
520            (segments[7] >> 8) as u8,
521            (segments[7] & 0xff) as u8,
522        );
523        return is_private_ipv4(embedded);
524    }
525
526    let first = segments[0];
527    (0xfe80..=0xfebf).contains(&first) || (0xfc00..=0xfdff).contains(&first) || first >= 0xff00
528}
529
530fn resolve_extension(content_type: &str) -> Option<&'static str> {
531    let lower = content_type.to_ascii_lowercase();
532    let media_type = lower
533        .split(';')
534        .next()
535        .unwrap_or("")
536        .split(',')
537        .next()
538        .unwrap_or("")
539        .trim();
540
541    match media_type {
542        "text/html"
543        | "application/xhtml+xml"
544        | "application/vnd.github.html"
545        | "application/vnd.github+html" => Some(".html"),
546        "text/markdown"
547        | "text/x-markdown"
548        | "application/markdown"
549        | "application/vnd.github.raw"
550        | "application/vnd.github+raw"
551        | "application/vnd.github.v3.raw"
552        | "text/plain" => Some(".md"),
553        "application/json" | "application/ld+json" => Some(".json"),
554        other if other.ends_with("+json") => Some(".json"),
555        _ => None,
556    }
557}
558
559fn convert_html_body_to_markdown(body: &[u8], url: &str) -> Result<Vec<u8>, UrlFetchError> {
560    let html = String::from_utf8_lossy(body);
561    let mut markdown = html_to_markdown_converter()
562        .convert(&html)
563        .map_err(|error| {
564            UrlFetchError::new(format!(
565                "Failed to convert HTML from {url} to Markdown: {error}"
566            ))
567        })?;
568    if !markdown.ends_with('\n') {
569        markdown.push('\n');
570    }
571    Ok(markdown.into_bytes())
572}
573
574fn html_to_markdown_converter() -> HtmlToMarkdown {
575    HtmlToMarkdown::builder()
576        .skip_tags(vec![
577            "head", "script", "style", "nav", "footer", "aside", "noscript",
578        ])
579        .add_handler(
580            vec!["a"],
581            |handlers: &dyn Handlers, element: Element| -> Option<HandlerResult> {
582                if is_permalink_anchor(&element) {
583                    None
584                } else {
585                    handlers.fallback(element)
586                }
587            },
588        )
589        .add_handler(
590            vec!["header"],
591            |handlers: &dyn Handlers, element: Element| -> Option<HandlerResult> {
592                if should_skip_header(&element) {
593                    None
594                } else {
595                    handlers.fallback(element)
596                }
597            },
598        )
599        .add_handler(
600            vec!["span"],
601            |handlers: &dyn Handlers, element: Element| -> Option<HandlerResult> {
602                if element_has_class_token(&element, "token-line") {
603                    let mut content = handlers.walk_children(element.node).content;
604                    content.push('\n');
605                    Some(content.into())
606                } else {
607                    handlers.fallback(element)
608                }
609            },
610        )
611        .build()
612}
613
614fn is_permalink_anchor(element: &Element<'_>) -> bool {
615    element_has_class_token(element, "hash-link")
616        || element_attr_value(element, "aria-label")
617            .is_some_and(|value| value.to_ascii_lowercase().starts_with("direct link to"))
618}
619
620fn should_skip_header(element: &Element<'_>) -> bool {
621    element_has_class_token(element, "navbar")
622        || element_has_class_token(element, "site-header")
623        || element_has_class_token(element, "site-nav")
624        || element_has_class_token(element, "topbar")
625        || element_attr_value(element, "role")
626            .is_some_and(|value| value.eq_ignore_ascii_case("banner"))
627        || element_attr_value(element, "id").is_some_and(|value| {
628            let value = value.to_ascii_lowercase();
629            value.contains("navbar") || value.contains("site-header") || value.contains("site-nav")
630        })
631}
632
633fn element_has_class_token(element: &Element<'_>, token: &str) -> bool {
634    element_attr_value(element, "class")
635        .is_some_and(|value| value.split_ascii_whitespace().any(|class| class == token))
636}
637
638fn element_attr_value<'a>(element: &'a Element<'_>, name: &str) -> Option<&'a str> {
639    element
640        .attrs
641        .iter()
642        .find(|attr| attr.name.local.as_ref() == name)
643        .map(|attr| attr.value.as_ref())
644}
645
646enum BodyReadEvent {
647    Chunk(Vec<u8>),
648    Done,
649    Error(io::ErrorKind, String),
650}
651
652fn read_response_body(mut response: HttpResponse, url: &str) -> Result<Vec<u8>, UrlFetchError> {
653    let (tx, rx) = mpsc::channel();
654    thread::spawn(move || {
655        let mut buffer = [0u8; 16 * 1024];
656        loop {
657            match response.read(&mut buffer) {
658                Ok(0) => {
659                    let _ = tx.send(BodyReadEvent::Done);
660                    break;
661                }
662                Ok(n) => {
663                    if tx.send(BodyReadEvent::Chunk(buffer[..n].to_vec())).is_err() {
664                        break;
665                    }
666                }
667                Err(error) => {
668                    let kind = error.kind();
669                    let message = error.to_string();
670                    let _ = tx.send(BodyReadEvent::Error(kind, message));
671                    break;
672                }
673            }
674        }
675    });
676
677    let mut chunks = Vec::new();
678    let mut total = 0u64;
679    loop {
680        match rx.recv_timeout(BODY_CHUNK_TIMEOUT) {
681            Ok(BodyReadEvent::Chunk(chunk)) => {
682                total += chunk.len() as u64;
683                if total > MAX_RESPONSE_BYTES {
684                    return Err(UrlFetchError::new(format!(
685                        "Response exceeded {MAX_RESPONSE_BYTES} bytes, aborted"
686                    )));
687                }
688                chunks.extend_from_slice(&chunk);
689            }
690            Ok(BodyReadEvent::Done) => return Ok(chunks),
691            Ok(BodyReadEvent::Error(kind, _message)) if is_body_stall_kind(kind) => {
692                return Err(body_stall_error(url));
693            }
694            Ok(BodyReadEvent::Error(_, message)) => {
695                return Err(UrlFetchError::new(format!(
696                    "Failed to read response body for {url}: {message}"
697                )));
698            }
699            Err(mpsc::RecvTimeoutError::Timeout) => return Err(body_stall_error(url)),
700            Err(mpsc::RecvTimeoutError::Disconnected) => {
701                return Err(UrlFetchError::new(format!(
702                    "Failed to read response body for {url}: body reader stopped unexpectedly"
703                )));
704            }
705        }
706    }
707}
708
709fn body_stall_error(url: &str) -> UrlFetchError {
710    UrlFetchError::new(format!(
711        "Body read stalled (no data for {}ms) fetching {url}",
712        BODY_CHUNK_TIMEOUT.as_millis()
713    ))
714}
715
716fn is_body_stall_kind(kind: io::ErrorKind) -> bool {
717    matches!(kind, io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock)
718}
719
720fn atomic_write(
721    final_path: &Path,
722    bytes: &[u8],
723    options: &UrlFetchOptions,
724) -> Result<(), UrlFetchError> {
725    let parent = final_path.parent().unwrap_or_else(|| Path::new("."));
726    fs::create_dir_all(parent).map_err(|error| {
727        UrlFetchError::new(format!(
728            "Failed to create URL cache parent {}: {error}",
729            parent.display()
730        ))
731    })?;
732
733    let file_name = final_path
734        .file_name()
735        .and_then(|name| name.to_str())
736        .ok_or_else(|| {
737            UrlFetchError::new(format!("Invalid cache path: {}", final_path.display()))
738        })?;
739    let tmp_path = final_path.with_file_name(format!(
740        "{file_name}.tmp-{}-{}",
741        std::process::id(),
742        random_nonce()
743    ));
744
745    let write_result = (|| -> io::Result<()> {
746        let mut file = fs::File::create(&tmp_path)?;
747        file.write_all(bytes)?;
748        file.flush()?;
749        Ok(())
750    })();
751    if let Err(error) = write_result {
752        let _ = fs::remove_file(&tmp_path);
753        return Err(UrlFetchError::new(format!(
754            "Failed to write URL cache temp file {}: {error}",
755            tmp_path.display()
756        )));
757    }
758
759    if let Some(observer) = &options.atomic_write_observer {
760        observer(&tmp_path, final_path);
761    }
762
763    fs::rename(&tmp_path, final_path).map_err(|error| {
764        let _ = fs::remove_file(&tmp_path);
765        UrlFetchError::new(format!(
766            "Failed to finalize URL cache file {}: {error}",
767            final_path.display()
768        ))
769    })
770}
771
772fn random_nonce() -> String {
773    let mut bytes = [0u8; 8];
774    if getrandom::fill(&mut bytes).is_err() {
775        let fallback = now_ms() ^ u64::from(std::process::id());
776        bytes = fallback.to_le_bytes();
777    }
778    let mut out = String::with_capacity(bytes.len() * 2);
779    for byte in bytes {
780        use std::fmt::Write as _;
781        let _ = write!(out, "{byte:02x}");
782    }
783    out
784}
785
786fn now_ms() -> u64 {
787    SystemTime::now()
788        .duration_since(UNIX_EPOCH)
789        .unwrap_or_default()
790        .as_millis()
791        .try_into()
792        .unwrap_or(u64::MAX)
793}
794
795fn reqwest_error_detail(error: &reqwest::Error) -> String {
796    if error.is_timeout() {
797        return format!("timeout: {error}");
798    }
799    if let Some(source) = error.source() {
800        return format!("{source}");
801    }
802    error.to_string()
803}