walker_common/fetcher/
mod.rs1mod data;
4use backon::{ExponentialBuilder, Retryable};
5pub use data::*;
6
7use reqwest::{Client, ClientBuilder, IntoUrl, Method, Response};
8use std::fmt::Debug;
9use std::future::Future;
10use std::marker::PhantomData;
11use std::time::Duration;
12use url::Url;
13
14#[derive(Clone, Debug)]
19pub struct Fetcher {
20 client: Client,
21 retries: usize,
22}
23
24#[derive(Debug, thiserror::Error)]
26pub enum Error {
27 #[error("Request error: {0}")]
28 Request(#[from] reqwest::Error),
29}
30
31#[non_exhaustive]
33#[derive(Clone, Debug)]
34pub struct FetcherOptions {
35 pub timeout: Duration,
36 pub retries: usize,
37}
38
39impl FetcherOptions {
40 pub fn new() -> Self {
42 Self::default()
43 }
44
45 pub fn timeout(mut self, timeout: impl Into<Duration>) -> Self {
47 self.timeout = timeout.into();
48 self
49 }
50
51 pub fn retries(mut self, retries: usize) -> Self {
53 self.retries = retries;
54 self
55 }
56}
57
58impl Default for FetcherOptions {
59 fn default() -> Self {
60 Self {
61 timeout: Duration::from_secs(30),
62 retries: 5,
63 }
64 }
65}
66
67impl From<Client> for Fetcher {
68 fn from(client: Client) -> Self {
69 Self::with_client(client, FetcherOptions::default())
70 }
71}
72
73impl Fetcher {
74 pub async fn new(options: FetcherOptions) -> anyhow::Result<Self> {
76 let client = ClientBuilder::new().timeout(options.timeout);
77
78 Ok(Self::with_client(client.build()?, options))
79 }
80
81 fn with_client(client: Client, options: FetcherOptions) -> Self {
83 Self {
84 client,
85 retries: options.retries,
86 }
87 }
88
89 async fn new_request(
90 &self,
91 method: Method,
92 url: Url,
93 ) -> Result<reqwest::RequestBuilder, reqwest::Error> {
94 Ok(self.client.request(method, url))
95 }
96
97 pub async fn fetch<D: Data>(&self, url: impl IntoUrl) -> Result<D, Error> {
99 log::debug!("Fetching: {}", url.as_str());
100 self.fetch_processed(url, TypedProcessor::<D>::new()).await
101 }
102
103 pub async fn fetch_processed<D: DataProcessor>(
105 &self,
106 url: impl IntoUrl,
107 processor: D,
108 ) -> Result<D::Type, Error> {
109 let url = url.into_url()?;
111
112 let retries = self.retries;
113 let backoff = ExponentialBuilder::default();
114
115 (|| async {
116 match self.fetch_once(url.clone(), &processor).await {
117 Ok(result) => Ok(result),
118 Err(err) => {
119 log::info!("Failed to retrieve: {err}");
120 Err(err)
121 }
122 }
123 })
124 .retry(&backoff.with_max_times(retries))
125 .await
126 }
127
128 async fn fetch_once<D: DataProcessor>(
129 &self,
130 url: Url,
131 processor: &D,
132 ) -> Result<D::Type, Error> {
133 let response = self.new_request(Method::GET, url).await?.send().await?;
134
135 Ok(processor.process(response).await?)
136 }
137}
138
139pub trait DataProcessor {
141 type Type: Sized;
142 fn process(
143 &self,
144 response: reqwest::Response,
145 ) -> impl Future<Output = Result<Self::Type, reqwest::Error>>;
146}
147
148struct TypedProcessor<D: Data> {
149 _marker: PhantomData<D>,
150}
151
152impl<D: Data> TypedProcessor<D> {
153 pub const fn new() -> Self {
154 Self {
155 _marker: PhantomData::<D>,
156 }
157 }
158}
159
160impl<D: Data> DataProcessor for TypedProcessor<D> {
162 type Type = D;
163
164 async fn process(&self, response: Response) -> Result<Self::Type, reqwest::Error> {
165 D::from_response(response).await
166 }
167}