Skip to main content

http_ferry/
http.rs

1//! HTTP layer for the transfer engine: a resumable range fetcher with
2//! request-level retry. Auth (and any other per-request customization) is
3//! injected by the caller via a closure, so this layer carries no knowledge
4//! of how a particular service authenticates.
5
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10use reqwest::StatusCode;
11use url::Url;
12
13use crate::Error;
14
15type Customize = dyn Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync;
16
17/// HTTP client for resumable range fetches. The `customize` closure is the
18/// auth seam: it is applied to every request, so callers inject basic auth, a
19/// bearer token, signed headers, or nothing — the engine stays auth-agnostic.
20pub struct Downloader {
21    client: reqwest::Client,
22    max_attempts: u32,
23    backoff: Duration,
24    customize: Arc<Customize>,
25}
26
27/// Builder for [`Downloader`]. Settings are applied to the wrapped
28/// `Downloader`, which `build` returns.
29pub struct DownloaderBuilder(Downloader);
30
31impl Downloader {
32    pub fn builder(client: reqwest::Client) -> DownloaderBuilder {
33        DownloaderBuilder(Downloader {
34            client,
35            max_attempts: 3,
36            backoff: Duration::from_millis(250),
37            customize: Arc::new(identity),
38        })
39    }
40
41    pub(crate) fn backoff(&self) -> Duration {
42        self.backoff
43    }
44
45    pub(crate) fn max_attempts(&self) -> u32 {
46        self.max_attempts
47    }
48
49    pub(crate) async fn get_response_range(
50        &self,
51        url: Url,
52        from: u64,
53    ) -> Result<reqwest::Response, Error> {
54        let range = if from > 0 { Some(from) } else { None };
55        self.retry(|| self.attempt(&url, range)).await
56    }
57
58    async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, Error>
59    where
60        F: FnMut() -> Fut,
61        Fut: Future<Output = Result<T, Error>>,
62    {
63        let mut delay = self.backoff;
64        let mut attempts_left = self.max_attempts;
65
66        loop {
67            attempts_left -= 1;
68            let result = op().await;
69
70            if attempts_left == 0 {
71                return result;
72            }
73            match result {
74                Ok(v) => return Ok(v),
75                Err(e) if !is_retryable(&e) => return Err(e),
76                Err(_) => {
77                    tokio::time::sleep(delay).await;
78                    delay = delay.saturating_mul(2);
79                }
80            }
81        }
82    }
83
84    async fn attempt(
85        &self,
86        url: &Url,
87        range_from: Option<u64>,
88    ) -> Result<reqwest::Response, Error> {
89        let mut req = self.client.get(url.clone());
90        if let Some(from) = range_from {
91            req = req.header(reqwest::header::RANGE, format!("bytes={}-", from));
92        }
93        req = (self.customize)(req);
94        let resp = req.send().await?;
95        let status = resp.status();
96        if status == StatusCode::NOT_FOUND {
97            return Err(Error::NotFound(url.to_string()));
98        }
99        if !status.is_success() {
100            return Err(Error::Status(status));
101        }
102        Ok(resp)
103    }
104}
105
106impl DownloaderBuilder {
107    pub fn max_attempts(mut self, max_attempts: u32) -> Self {
108        self.0.max_attempts = max_attempts.max(1);
109        self
110    }
111
112    pub fn backoff(mut self, backoff: Duration) -> Self {
113        self.0.backoff = backoff;
114        self
115    }
116
117    pub fn customize_request(
118        mut self,
119        customize: impl Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync + 'static,
120    ) -> Self {
121        self.0.customize = Arc::new(customize);
122        self
123    }
124
125    pub fn build(self) -> Downloader {
126        self.0
127    }
128}
129
130fn identity(req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
131    req
132}
133
134pub(crate) fn is_retryable(err: &Error) -> bool {
135    match err {
136        Error::Request(e) => e.is_timeout() || e.is_connect(),
137        Error::Status(s) => s.is_server_error() || *s == StatusCode::TOO_MANY_REQUESTS,
138        _ => false,
139    }
140}