Skip to main content

walker_common/fetcher/
mod.rs

1//! Fetching remote resources
2
3mod data;
4use backon::{ExponentialBuilder, Retryable};
5pub use data::*;
6
7use crate::http::{calculate_retry_after_from_response_header, get_client_error};
8use reqwest::{Client, ClientBuilder, IntoUrl, Method, Response, StatusCode};
9use std::fmt::Debug;
10use std::future::Future;
11use std::marker::PhantomData;
12use std::time::Duration;
13use url::Url;
14
15/// Fetch data using HTTP.
16///
17/// This is some functionality sitting on top an HTTP client, allowing for additional options like
18/// retries.
19#[derive(Clone, Debug)]
20pub struct Fetcher {
21    client: Client,
22    retries: usize,
23    /// *default_retry_after* is used when a 429 response does not include a Retry-After header
24    default_retry_after: Duration,
25}
26
27/// Error when retrieving
28#[derive(Debug, thiserror::Error)]
29pub enum Error {
30    #[error("Request error: {0}")]
31    Request(#[from] reqwest::Error),
32    #[error("Rate limited (HTTP 429), retry after {0:?}")]
33    RateLimited(Duration),
34    #[error("Client error: {0}")]
35    ClientError(StatusCode),
36}
37
38/// Options for the [`Fetcher`]
39#[non_exhaustive]
40#[derive(Clone, Debug)]
41pub struct FetcherOptions {
42    timeout: Duration,
43    retries: usize,
44    default_retry_after: Duration,
45    max_retry_after: Duration,
46}
47
48impl FetcherOptions {
49    /// Create a new instance.
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Set the timeout.
55    pub fn timeout(mut self, timeout: impl Into<Duration>) -> Self {
56        self.timeout = timeout.into();
57        self
58    }
59
60    /// Set the number of retries.
61    pub fn retries(mut self, retries: usize) -> Self {
62        self.retries = retries;
63        self
64    }
65
66    /// Set the default retry-after duration when a 429 response doesn't include a Retry-After header.
67    pub fn retry_after(mut self, duration: Duration) -> Self {
68        if duration > self.max_retry_after {
69            panic!("Default retry-after cannot be greater than max retry-after (300s)");
70        }
71        self.default_retry_after = duration;
72        self
73    }
74
75    /// Set the default retry-after duration when a 429 response doesn't include a Retry-After header
76    /// and checks the duration against the maximum retry-after.
77    pub fn retry_after_with_max(mut self, default: Duration, max: Duration) -> Self {
78        if default > max {
79            panic!("Default retry-after cannot be greater than max retry-after");
80        }
81        self.default_retry_after = default;
82        self.max_retry_after = max;
83        self
84    }
85}
86
87impl Default for FetcherOptions {
88    fn default() -> Self {
89        Self {
90            timeout: Duration::from_secs(30),
91            retries: 5,
92            default_retry_after: Duration::from_secs(10),
93            max_retry_after: Duration::from_mins(5),
94        }
95    }
96}
97
98impl From<Client> for Fetcher {
99    fn from(client: Client) -> Self {
100        Self::with_client(client, FetcherOptions::default())
101    }
102}
103
104impl Fetcher {
105    /// Create a new downloader from options
106    pub async fn new(options: FetcherOptions) -> anyhow::Result<Self> {
107        let client = ClientBuilder::new().timeout(options.timeout);
108
109        Ok(Self::with_client(client.build()?, options))
110    }
111
112    /// Create a fetcher providing an existing client.
113    fn with_client(client: Client, options: FetcherOptions) -> Self {
114        Self {
115            client,
116            retries: options.retries,
117            default_retry_after: options.default_retry_after,
118        }
119    }
120
121    async fn new_request(
122        &self,
123        method: Method,
124        url: Url,
125    ) -> Result<reqwest::RequestBuilder, reqwest::Error> {
126        Ok(self.client.request(method, url))
127    }
128
129    /// fetch data, using a GET request.
130    pub async fn fetch<D: Data>(&self, url: impl IntoUrl) -> Result<D, Error> {
131        log::debug!("Fetching: {}", url.as_str());
132        self.fetch_processed(url, TypedProcessor::<D>::new()).await
133    }
134
135    /// fetch data, using a GET request, processing the response data.
136    pub async fn fetch_processed<D: DataProcessor>(
137        &self,
138        url: impl IntoUrl,
139        processor: D,
140    ) -> Result<D::Type, Error> {
141        // if the URL building fails, there is no need to re-try, abort now.
142        let url = url.into_url()?;
143
144        let retries = self.retries;
145        let retry = ExponentialBuilder::default().with_max_times(retries);
146
147        (|| async { self.fetch_once(url.clone(), &processor).await })
148            .retry(retry)
149            .when(|e| !matches!(e, Error::ClientError(_)))
150            .adjust(|e, dur| {
151                if let Error::RateLimited(retry_after) = e {
152                    if let Some(dur_value) = dur
153                        && dur_value > *retry_after
154                    {
155                        return dur;
156                    }
157                    Some(*retry_after) // only use server-provided delay if it's longer
158                } else {
159                    dur // minimum delay as per backoff strategy
160                }
161            })
162            .await
163    }
164
165    async fn fetch_once<D: DataProcessor>(
166        &self,
167        url: Url,
168        processor: &D,
169    ) -> Result<D::Type, Error> {
170        let response = self.new_request(Method::GET, url).await?.send().await?;
171
172        log::debug!("Response Status: {}", response.status());
173
174        // Check for rate limiting
175        if let Some(retry_after) =
176            calculate_retry_after_from_response_header(&response, self.default_retry_after)
177        {
178            log::info!("Rate limited (429), retry after: {:?}", retry_after);
179            return Err(Error::RateLimited(retry_after));
180        }
181
182        // Now test if we can convert the (possibly failed) response to result data.
183        // This includes allowed for 404 becoming `None`.
184        match processor.process(response).await {
185            // Ok, return
186            Ok(data) => Ok(data),
187            // Error, extract client error
188            Err(err) => {
189                if let Some(status_code) = err.status().and_then(get_client_error) {
190                    log::debug!("Client error: {status_code}");
191                    Err(Error::ClientError(status_code))
192                } else {
193                    // or return other error as is
194                    Err(err.into())
195                }
196            }
197        }
198    }
199}
200
201/// Processing data returned by a request.
202pub trait DataProcessor {
203    type Type: Sized;
204    fn process(
205        &self,
206        response: reqwest::Response,
207    ) -> impl Future<Output = Result<Self::Type, reqwest::Error>>;
208}
209
210struct TypedProcessor<D: Data> {
211    _marker: PhantomData<D>,
212}
213
214impl<D: Data> TypedProcessor<D> {
215    pub const fn new() -> Self {
216        Self {
217            _marker: PhantomData::<D>,
218        }
219    }
220}
221
222/// Extract response payload which implements [`Data`].
223impl<D: Data> DataProcessor for TypedProcessor<D> {
224    type Type = D;
225
226    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
227        D::from_response(response).await
228    }
229}