walker_common/fetcher/
mod.rs

1//! Fetching remote resources
2
3mod data;
4use backon::{ExponentialBuilder, Retryable};
5pub use data::*;
6
7use reqwest::{Client, ClientBuilder, IntoUrl, Method, Response};
8use std::fmt::Debug;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::time::Duration;
12use url::Url;
13
14/// Fetch data using HTTP.
15///
16/// This is some functionality sitting on top an HTTP client, allowing for additional options like
17/// retries.
18#[derive(Clone, Debug)]
19pub struct Fetcher {
20    client: Client,
21    retries: usize,
22}
23
24/// Error when retrieving
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27    #[error("Request error: {0}")]
28    Request(#[from] reqwest::Error),
29}
30
31/// Options for the [`Fetcher`]
32#[non_exhaustive]
33#[derive(Clone, Debug)]
34pub struct FetcherOptions {
35    pub timeout: Duration,
36    pub retries: usize,
37}
38
39impl FetcherOptions {
40    /// Create a new instance.
41    pub fn new() -> Self {
42        Self::default()
43    }
44
45    /// Set the timeout.
46    pub fn timeout(mut self, timeout: impl Into<Duration>) -> Self {
47        self.timeout = timeout.into();
48        self
49    }
50
51    /// Set the number of retries.
52    pub fn retries(mut self, retries: usize) -> Self {
53        self.retries = retries;
54        self
55    }
56}
57
58impl Default for FetcherOptions {
59    fn default() -> Self {
60        Self {
61            timeout: Duration::from_secs(30),
62            retries: 5,
63        }
64    }
65}
66
67impl From<Client> for Fetcher {
68    fn from(client: Client) -> Self {
69        Self::with_client(client, FetcherOptions::default())
70    }
71}
72
73impl Fetcher {
74    /// Create a new downloader from options
75    pub async fn new(options: FetcherOptions) -> anyhow::Result<Self> {
76        let client = ClientBuilder::new().timeout(options.timeout);
77
78        Ok(Self::with_client(client.build()?, options))
79    }
80
81    /// Create a fetcher providing an existing client.
82    fn with_client(client: Client, options: FetcherOptions) -> Self {
83        Self {
84            client,
85            retries: options.retries,
86        }
87    }
88
89    async fn new_request(
90        &self,
91        method: Method,
92        url: Url,
93    ) -> Result<reqwest::RequestBuilder, reqwest::Error> {
94        Ok(self.client.request(method, url))
95    }
96
97    /// fetch data, using a GET request.
98    pub async fn fetch<D: Data>(&self, url: impl IntoUrl) -> Result<D, Error> {
99        log::debug!("Fetching: {}", url.as_str());
100        self.fetch_processed(url, TypedProcessor::<D>::new()).await
101    }
102
103    /// fetch data, using a GET request, processing the response data.
104    pub async fn fetch_processed<D: DataProcessor>(
105        &self,
106        url: impl IntoUrl,
107        processor: D,
108    ) -> Result<D::Type, Error> {
109        // if the URL building fails, there is no need to re-try, abort now.
110        let url = url.into_url()?;
111
112        let retries = self.retries;
113        let backoff = ExponentialBuilder::default();
114
115        (|| async {
116            match self.fetch_once(url.clone(), &processor).await {
117                Ok(result) => Ok(result),
118                Err(err) => {
119                    log::info!("Failed to retrieve: {err}");
120                    Err(err)
121                }
122            }
123        })
124        .retry(&backoff.with_max_times(retries))
125        .await
126    }
127
128    async fn fetch_once<D: DataProcessor>(
129        &self,
130        url: Url,
131        processor: &D,
132    ) -> Result<D::Type, Error> {
133        let response = self.new_request(Method::GET, url).await?.send().await?;
134
135        Ok(processor.process(response).await?)
136    }
137}
138
139/// Processing data returned by a request.
140pub trait DataProcessor {
141    type Type: Sized;
142    fn process(
143        &self,
144        response: reqwest::Response,
145    ) -> impl Future<Output = Result<Self::Type, reqwest::Error>>;
146}
147
148struct TypedProcessor<D: Data> {
149    _marker: PhantomData<D>,
150}
151
152impl<D: Data> TypedProcessor<D> {
153    pub const fn new() -> Self {
154        Self {
155            _marker: PhantomData::<D>,
156        }
157    }
158}
159
160/// Extract response payload which implements [`Data`].
161impl<D: Data> DataProcessor for TypedProcessor<D> {
162    type Type = D;
163
164    async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
165        D::from_response(response).await
166    }
167}