walker_common/fetcher/
mod.rs1mod data;
4use backon::{ExponentialBuilder, Retryable};
5pub use data::*;
6
7use crate::http::{calculate_retry_after_from_response_header, get_client_error};
8use reqwest::{Client, ClientBuilder, IntoUrl, Method, Response, StatusCode};
9use std::fmt::Debug;
10use std::future::Future;
11use std::marker::PhantomData;
12use std::time::Duration;
13use url::Url;
14
15#[derive(Clone, Debug)]
20pub struct Fetcher {
21 client: Client,
22 retries: usize,
23 default_retry_after: Duration,
25}
26
27#[derive(Debug, thiserror::Error)]
29pub enum Error {
30 #[error("Request error: {0}")]
31 Request(#[from] reqwest::Error),
32 #[error("Rate limited (HTTP 429), retry after {0:?}")]
33 RateLimited(Duration),
34 #[error("Client error: {0}")]
35 ClientError(StatusCode),
36}
37
38#[non_exhaustive]
40#[derive(Clone, Debug)]
41pub struct FetcherOptions {
42 timeout: Duration,
43 retries: usize,
44 default_retry_after: Duration,
45 max_retry_after: Duration,
46}
47
48impl FetcherOptions {
49 pub fn new() -> Self {
51 Self::default()
52 }
53
54 pub fn timeout(mut self, timeout: impl Into<Duration>) -> Self {
56 self.timeout = timeout.into();
57 self
58 }
59
60 pub fn retries(mut self, retries: usize) -> Self {
62 self.retries = retries;
63 self
64 }
65
66 pub fn retry_after(mut self, duration: Duration) -> Self {
68 if duration > self.max_retry_after {
69 panic!("Default retry-after cannot be greater than max retry-after (300s)");
70 }
71 self.default_retry_after = duration;
72 self
73 }
74
75 pub fn retry_after_with_max(mut self, default: Duration, max: Duration) -> Self {
78 if default > max {
79 panic!("Default retry-after cannot be greater than max retry-after");
80 }
81 self.default_retry_after = default;
82 self.max_retry_after = max;
83 self
84 }
85}
86
87impl Default for FetcherOptions {
88 fn default() -> Self {
89 Self {
90 timeout: Duration::from_secs(30),
91 retries: 5,
92 default_retry_after: Duration::from_secs(10),
93 max_retry_after: Duration::from_mins(5),
94 }
95 }
96}
97
98impl From<Client> for Fetcher {
99 fn from(client: Client) -> Self {
100 Self::with_client(client, FetcherOptions::default())
101 }
102}
103
104impl Fetcher {
105 pub async fn new(options: FetcherOptions) -> anyhow::Result<Self> {
107 let client = ClientBuilder::new().timeout(options.timeout);
108
109 Ok(Self::with_client(client.build()?, options))
110 }
111
112 fn with_client(client: Client, options: FetcherOptions) -> Self {
114 Self {
115 client,
116 retries: options.retries,
117 default_retry_after: options.default_retry_after,
118 }
119 }
120
121 async fn new_request(
122 &self,
123 method: Method,
124 url: Url,
125 ) -> Result<reqwest::RequestBuilder, reqwest::Error> {
126 Ok(self.client.request(method, url))
127 }
128
129 pub async fn fetch<D: Data>(&self, url: impl IntoUrl) -> Result<D, Error> {
131 log::debug!("Fetching: {}", url.as_str());
132 self.fetch_processed(url, TypedProcessor::<D>::new()).await
133 }
134
135 pub async fn fetch_processed<D: DataProcessor>(
137 &self,
138 url: impl IntoUrl,
139 processor: D,
140 ) -> Result<D::Type, Error> {
141 let url = url.into_url()?;
143
144 let retries = self.retries;
145 let retry = ExponentialBuilder::default().with_max_times(retries);
146
147 (|| async { self.fetch_once(url.clone(), &processor).await })
148 .retry(retry)
149 .when(|e| !matches!(e, Error::ClientError(_)))
150 .adjust(|e, dur| {
151 if let Error::RateLimited(retry_after) = e {
152 if let Some(dur_value) = dur
153 && dur_value > *retry_after
154 {
155 return dur;
156 }
157 Some(*retry_after) } else {
159 dur }
161 })
162 .await
163 }
164
165 async fn fetch_once<D: DataProcessor>(
166 &self,
167 url: Url,
168 processor: &D,
169 ) -> Result<D::Type, Error> {
170 let response = self.new_request(Method::GET, url).await?.send().await?;
171
172 log::debug!("Response Status: {}", response.status());
173
174 if let Some(retry_after) =
176 calculate_retry_after_from_response_header(&response, self.default_retry_after)
177 {
178 log::info!("Rate limited (429), retry after: {:?}", retry_after);
179 return Err(Error::RateLimited(retry_after));
180 }
181
182 match processor.process(response).await {
185 Ok(data) => Ok(data),
187 Err(err) => {
189 if let Some(status_code) = err.status().and_then(get_client_error) {
190 log::debug!("Client error: {status_code}");
191 Err(Error::ClientError(status_code))
192 } else {
193 Err(err.into())
195 }
196 }
197 }
198 }
199}
200
201pub trait DataProcessor {
203 type Type: Sized;
204 fn process(
205 &self,
206 response: reqwest::Response,
207 ) -> impl Future<Output = Result<Self::Type, reqwest::Error>>;
208}
209
210struct TypedProcessor<D: Data> {
211 _marker: PhantomData<D>,
212}
213
214impl<D: Data> TypedProcessor<D> {
215 pub const fn new() -> Self {
216 Self {
217 _marker: PhantomData::<D>,
218 }
219 }
220}
221
222impl<D: Data> DataProcessor for TypedProcessor<D> {
224 type Type = D;
225
226 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
227 D::from_response(response).await
228 }
229}