1use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10use reqwest::StatusCode;
11use url::Url;
12
13use crate::Error;
14
15type Customize = dyn Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync;
16
17pub struct Downloader {
21 client: reqwest::Client,
22 max_attempts: u32,
23 backoff: Duration,
24 customize: Arc<Customize>,
25}
26
27pub struct DownloaderBuilder(Downloader);
30
31impl Downloader {
32 pub fn builder(client: reqwest::Client) -> DownloaderBuilder {
33 DownloaderBuilder(Downloader {
34 client,
35 max_attempts: 3,
36 backoff: Duration::from_millis(250),
37 customize: Arc::new(identity),
38 })
39 }
40
41 pub(crate) fn backoff(&self) -> Duration {
42 self.backoff
43 }
44
45 pub(crate) fn max_attempts(&self) -> u32 {
46 self.max_attempts
47 }
48
49 pub(crate) async fn get_response_range(
50 &self,
51 url: Url,
52 from: u64,
53 ) -> Result<reqwest::Response, Error> {
54 let range = if from > 0 { Some(from) } else { None };
55 self.retry(|| self.attempt(&url, range)).await
56 }
57
58 async fn retry<F, Fut, T>(&self, mut op: F) -> Result<T, Error>
59 where
60 F: FnMut() -> Fut,
61 Fut: Future<Output = Result<T, Error>>,
62 {
63 let mut delay = self.backoff;
64 let mut attempts_left = self.max_attempts;
65
66 loop {
67 attempts_left -= 1;
68 let result = op().await;
69
70 if attempts_left == 0 {
71 return result;
72 }
73 match result {
74 Ok(v) => return Ok(v),
75 Err(e) if !is_retryable(&e) => return Err(e),
76 Err(_) => {
77 tokio::time::sleep(delay).await;
78 delay = delay.saturating_mul(2);
79 }
80 }
81 }
82 }
83
84 async fn attempt(
85 &self,
86 url: &Url,
87 range_from: Option<u64>,
88 ) -> Result<reqwest::Response, Error> {
89 let mut req = self.client.get(url.clone());
90 if let Some(from) = range_from {
91 req = req.header(reqwest::header::RANGE, format!("bytes={}-", from));
92 }
93 req = (self.customize)(req);
94 let resp = req.send().await?;
95 let status = resp.status();
96 if status == StatusCode::NOT_FOUND {
97 return Err(Error::NotFound(url.to_string()));
98 }
99 if !status.is_success() {
100 return Err(Error::Status(status));
101 }
102 Ok(resp)
103 }
104}
105
106impl DownloaderBuilder {
107 pub fn max_attempts(mut self, max_attempts: u32) -> Self {
108 self.0.max_attempts = max_attempts.max(1);
109 self
110 }
111
112 pub fn backoff(mut self, backoff: Duration) -> Self {
113 self.0.backoff = backoff;
114 self
115 }
116
117 pub fn customize_request(
118 mut self,
119 customize: impl Fn(reqwest::RequestBuilder) -> reqwest::RequestBuilder + Send + Sync + 'static,
120 ) -> Self {
121 self.0.customize = Arc::new(customize);
122 self
123 }
124
125 pub fn build(self) -> Downloader {
126 self.0
127 }
128}
129
130fn identity(req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
131 req
132}
133
134pub(crate) fn is_retryable(err: &Error) -> bool {
135 match err {
136 Error::Request(e) => e.is_timeout() || e.is_connect(),
137 Error::Status(s) => s.is_server_error() || *s == StatusCode::TOO_MANY_REQUESTS,
138 _ => false,
139 }
140}