1use reqwest::Client as AsyncClient;
24use reqwest::StatusCode;
25use reqwest::blocking::Client;
26use reqwest::header::CONTENT_TYPE;
27use roas::loader::{AsyncResourceFetcher, FetchFuture, LoaderError, ResourceFetcher};
28#[cfg(feature = "yaml")]
29use serde::de::Error as _;
30use serde_json::Value;
31use std::time::Duration;
32use url::Url;
33
34const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
35
36#[derive(Clone, Debug)]
47pub struct Fetcher<C> {
48 client: C,
49}
50
51pub type HttpFetcher = Fetcher<Client>;
54
55pub type AsyncHttpFetcher = Fetcher<AsyncClient>;
59
60impl Fetcher<Client> {
61 pub fn new() -> Self {
67 Self::try_new().expect("default reqwest::blocking::Client must build")
68 }
69
70 pub fn try_new() -> Result<Self, reqwest::Error> {
73 Ok(Self::with_client(
74 Client::builder().timeout(DEFAULT_TIMEOUT).build()?,
75 ))
76 }
77
78 pub fn with_client(client: Client) -> Self {
82 Self { client }
83 }
84}
85
86impl Default for Fetcher<Client> {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl Fetcher<AsyncClient> {
93 pub fn new() -> Self {
99 Self::try_new().expect("default reqwest::Client must build")
100 }
101
102 pub fn try_new() -> Result<Self, reqwest::Error> {
105 Ok(Self::with_client(
106 AsyncClient::builder().timeout(DEFAULT_TIMEOUT).build()?,
107 ))
108 }
109
110 pub fn with_client(client: AsyncClient) -> Self {
113 Self { client }
114 }
115}
116
117impl Default for Fetcher<AsyncClient> {
118 fn default() -> Self {
119 Self::new()
120 }
121}
122
123impl ResourceFetcher for Fetcher<Client> {
124 fn fetch(&mut self, uri: &Url) -> Result<Value, LoaderError> {
125 check_scheme(uri)?;
126 let response = self.client.get(uri.as_str()).send().map_err(|source| {
127 fetch_error(uri.as_str().to_string(), HttpFetchError::Request { source })
128 })?;
129
130 let status = response.status();
131 if !status.is_success() {
132 return Err(fetch_error(
133 uri.as_str().to_string(),
134 HttpFetchError::Status { status },
135 ));
136 }
137
138 let content_type = response
139 .headers()
140 .get(CONTENT_TYPE)
141 .and_then(|v| v.to_str().ok())
142 .map(|s| s.to_string());
143
144 let bytes = response.bytes().map_err(|source| {
145 fetch_error(uri.as_str().to_string(), HttpFetchError::Body { source })
146 })?;
147
148 parse_body(uri, content_type.as_deref(), &bytes)
149 }
150}
151
152impl AsyncResourceFetcher for Fetcher<AsyncClient> {
153 fn fetch<'a>(&'a mut self, uri: &'a Url) -> FetchFuture<'a> {
154 let client = self.client.clone();
155 Box::pin(async move {
156 check_scheme(uri)?;
157 let response = client.get(uri.as_str()).send().await.map_err(|source| {
158 fetch_error(uri.as_str().to_string(), HttpFetchError::Request { source })
159 })?;
160
161 let status = response.status();
162 if !status.is_success() {
163 return Err(fetch_error(
164 uri.as_str().to_string(),
165 HttpFetchError::Status { status },
166 ));
167 }
168
169 let content_type = response
170 .headers()
171 .get(CONTENT_TYPE)
172 .and_then(|v| v.to_str().ok())
173 .map(|s| s.to_string());
174
175 let bytes = response.bytes().await.map_err(|source| {
176 fetch_error(uri.as_str().to_string(), HttpFetchError::Body { source })
177 })?;
178
179 parse_body(uri, content_type.as_deref(), &bytes)
180 })
181 }
182}
183
184fn check_scheme(uri: &Url) -> Result<(), LoaderError> {
185 match uri.scheme() {
186 "http" | "https" => Ok(()),
187 _ => Err(LoaderError::UnsupportedFetcherUri(uri.as_str().to_string())),
188 }
189}
190
191fn parse_body(uri: &Url, content_type: Option<&str>, bytes: &[u8]) -> Result<Value, LoaderError> {
192 if is_yaml(content_type, uri) {
193 parse_yaml(uri, bytes)
194 } else {
195 serde_json::from_slice(bytes).map_err(|source| LoaderError::Parse {
196 uri: uri.as_str().to_string(),
197 source,
198 })
199 }
200}
201
202#[allow(unused_variables)]
210fn is_yaml(content_type: Option<&str>, uri: &Url) -> bool {
211 #[cfg(feature = "yaml")]
212 {
213 if let Some(ct) = content_type {
214 let mime = ct
215 .split(';')
216 .next()
217 .unwrap_or("")
218 .trim()
219 .to_ascii_lowercase();
220 if mime.contains("yaml") {
221 return true;
222 }
223 if !mime.is_empty() && mime != "application/octet-stream" {
224 return false;
225 }
226 }
227 let path = uri.path().to_ascii_lowercase();
228 path.ends_with(".yaml") || path.ends_with(".yml")
229 }
230 #[cfg(not(feature = "yaml"))]
231 {
232 false
233 }
234}
235
236#[cfg(feature = "yaml")]
237fn parse_yaml(uri: &Url, bytes: &[u8]) -> Result<Value, LoaderError> {
238 serde_yaml_ng::from_slice(bytes).map_err(|yaml_err| LoaderError::Parse {
239 uri: uri.as_str().to_string(),
240 source: serde_json::Error::custom(yaml_err.to_string()),
241 })
242}
243
244#[cfg(not(feature = "yaml"))]
245#[allow(dead_code)]
246fn parse_yaml(_uri: &Url, _bytes: &[u8]) -> Result<Value, LoaderError> {
247 unreachable!("parse_yaml is only reached when the `yaml` feature is enabled")
248}
249
250#[derive(Debug, thiserror::Error)]
257#[non_exhaustive]
258pub enum HttpFetchError {
259 #[error("HTTP request failed")]
262 Request {
263 #[source]
264 source: reqwest::Error,
265 },
266
267 #[error("non-success HTTP response: {status}")]
269 Status { status: StatusCode },
270
271 #[error("failed to read response body")]
273 Body {
274 #[source]
275 source: reqwest::Error,
276 },
277}
278
279fn fetch_error(uri: String, source: HttpFetchError) -> LoaderError {
280 LoaderError::Fetch {
281 uri,
282 source: Box::new(source),
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn http_fetcher_default_constructs() {
292 let _ = HttpFetcher::default();
293 let _ = HttpFetcher::new();
294 let _ = AsyncHttpFetcher::default();
295 let _ = AsyncHttpFetcher::new();
296 }
297
298 #[test]
299 fn http_fetcher_try_new_succeeds_for_default_config() {
300 HttpFetcher::try_new().expect("blocking client must build");
301 AsyncHttpFetcher::try_new().expect("async client must build");
302 }
303
304 #[test]
305 fn http_fetcher_is_clone_and_shares_pool() {
306 let fetcher = HttpFetcher::new();
309 let _second = fetcher.clone();
310 let async_fetcher = AsyncHttpFetcher::new();
311 let _async_second = async_fetcher.clone();
312 }
313
314 #[test]
315 fn fetch_error_helper_boxes_into_loader_error_fetch() {
316 let inner = HttpFetchError::Status {
317 status: StatusCode::NOT_FOUND,
318 };
319 let err = fetch_error("https://example.test/x.json".into(), inner);
320 match err {
321 LoaderError::Fetch { uri, source } => {
322 assert_eq!(uri, "https://example.test/x.json");
323 let downcast = source
324 .downcast_ref::<HttpFetchError>()
325 .expect("source must downcast to HttpFetchError");
326 assert!(matches!(
327 downcast,
328 HttpFetchError::Status {
329 status: StatusCode::NOT_FOUND
330 }
331 ));
332 }
333 other => panic!("expected LoaderError::Fetch, got {other:?}"),
334 }
335 }
336
337 #[test]
338 fn check_scheme_accepts_http_and_https() {
339 check_scheme(&Url::parse("http://example.test/x.json").unwrap()).unwrap();
340 check_scheme(&Url::parse("https://example.test/x.json").unwrap()).unwrap();
341 }
342
343 #[test]
344 fn check_scheme_rejects_file_uri_with_unsupported_fetcher_uri() {
345 let err = check_scheme(&Url::parse("file:///tmp/x.json").unwrap())
346 .expect_err("file:// must be rejected");
347 assert!(matches!(err, LoaderError::UnsupportedFetcherUri(s) if s.starts_with("file://")));
348 }
349}