Skip to main content

http_ferry/
http.rs

1//! HTTP layer for the transfer engine: a resumable range fetcher with
2//! request-level retry. Auth (and any other per-request customization) is
3//! injected by the caller via a closure, so this layer carries no knowledge
4//! of how a particular service authenticates.
5
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10use reqwest::StatusCode;
11use url::Url;
12
13use crate::Error;
14
15type Customize = dyn Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync;
16
17/// HTTP client for resumable range fetches. The `customize` closure is the
18/// auth seam: it is applied to every request, so callers inject basic auth, a
19/// bearer token, signed headers, or nothing — the engine stays auth-agnostic.
20pub struct Downloader {
21    client: reqwest::Client,
22    max_attempts: u32,
23    backoff: Duration,
24    customize: Arc<Customize>,
25}
26
27impl Downloader {
28    pub fn new(
29        client: reqwest::Client,
30        max_attempts: u32,
31        backoff: Duration,
32        customize: impl Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync + 'static,
33    ) -> Self {
34        Self {
35            client,
36            max_attempts: max_attempts.max(1),
37            backoff,
38            customize: Arc::new(customize),
39        }
40    }
41
42    pub(crate) fn backoff(&self) -> Duration {
43        self.backoff
44    }
45
46    pub(crate) fn max_attempts(&self) -> u32 {
47        self.max_attempts
48    }
49
50    pub(crate) async fn get_response_range(
51        &self,
52        url: Url,
53        from: u64,
54    ) -> Result<reqwest::Response, Error> {
55        let range = if from > 0 { Some(from) } else { None };
56        self.retry(|| self.attempt(&url, range)).await
57    }
58
59    async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, Error>
60    where
61        F: FnMut() -> Fut,
62        Fut: Future<Output = Result<T, Error>>,
63    {
64        let mut delay = self.backoff;
65        let mut attempts_left = self.max_attempts;
66
67        loop {
68            attempts_left -= 1;
69            let result = op().await;
70
71            if attempts_left == 0 {
72                return result;
73            }
74            match result {
75                Ok(v) => return Ok(v),
76                Err(e) if !is_retryable(&e) => return Err(e),
77                Err(_) => {
78                    tokio::time::sleep(delay).await;
79                    delay = delay.saturating_mul(2);
80                }
81            }
82        }
83    }
84
85    async fn attempt(
86        &self,
87        url: &Url,
88        range_from: Option<u64>,
89    ) -> Result<reqwest::Response, Error> {
90        let mut req = self.client.get(url.clone());
91        if let Some(from) = range_from {
92            req = req.header(reqwest::header::RANGE, format!("bytes={}-", from));
93        }
94        req = (self.customize)(req);
95        let resp = req.send().await?;
96        let status = resp.status();
97        if status == StatusCode::NOT_FOUND {
98            return Err(Error::NotFound(url.to_string()));
99        }
100        if !status.is_success() {
101            return Err(Error::Status(status));
102        }
103        Ok(resp)
104    }
105}
106
107pub(crate) fn is_retryable(err: &Error) -> bool {
108    match err {
109        Error::Request(e) => e.is_timeout() || e.is_connect(),
110        Error::Status(s) => s.is_server_error() || *s == StatusCode::TOO_MANY_REQUESTS,
111        _ => false,
112    }
113}