boundless_market/storage/providers/
http.rs1use crate::storage::{config::StorageDownloaderConfig, StorageDownloader, StorageError};
23use async_trait::async_trait;
24use futures::StreamExt;
25use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
26use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
27use url::Url;
28
29#[derive(Clone, Debug)]
36pub struct HttpDownloader {
37 client: ClientWithMiddleware,
38 ipfs_gateway: Option<Url>,
39}
40
41impl Default for HttpDownloader {
42 fn default() -> Self {
43 let config = StorageDownloaderConfig::default();
44 Self::new(config.max_retries, config.ipfs_gateway)
45 }
46}
47
48impl HttpDownloader {
49 pub fn new(max_retries: Option<u8>, ipfs_gateway: Option<Url>) -> Self {
55 let mut builder = ClientBuilder::new(reqwest::Client::new());
56
57 if let Some(max_retries) = max_retries {
58 let retry_policy =
59 ExponentialBackoff::builder().build_with_max_retries(max_retries as u32);
60 builder = builder.with(RetryTransientMiddleware::new_with_policy(retry_policy));
61 }
62
63 Self { client: builder.build(), ipfs_gateway: ipfs_gateway.map(normalize_gateway_url) }
64 }
65
66 fn rewrite_ipfs_url(&self, url: &Url) -> Option<Url> {
78 let gateway = self.ipfs_gateway.as_ref()?;
79 let ipfs_path = url.path().strip_prefix("/ipfs/")?;
80
81 let mut new_url = gateway.join(&format!("ipfs/{ipfs_path}")).ok()?;
82 new_url.set_query(url.query());
83
84 (new_url != *url).then_some(new_url)
86 }
87
88 async fn download_impl(&self, url: Url, limit: usize) -> Result<Vec<u8>, StorageError> {
90 if !is_http_url(&url) {
91 return Err(StorageError::UnsupportedScheme(url.scheme().to_string()));
92 }
93
94 tracing::debug!(%url, "downloading from HTTP");
95
96 let resp = self.client.get(url.clone()).send().await.map_err(StorageError::http)?;
97
98 let status = resp.status();
99 if !status.is_success() {
100 return Err(StorageError::HttpStatus(status.as_u16()));
101 }
102
103 let content_length = resp.content_length().unwrap_or(0) as usize;
105 if content_length > limit {
106 return Err(StorageError::SizeLimitExceeded { size: content_length, limit });
107 }
108
109 let mut buffer = Vec::with_capacity(content_length.min(limit));
111 let mut stream = resp.bytes_stream();
112
113 while let Some(chunk) = stream.next().await {
114 let chunk = chunk.map_err(StorageError::http)?;
115 if buffer.len() + chunk.len() > limit {
116 return Err(StorageError::SizeLimitExceeded {
117 size: buffer.len() + chunk.len(),
118 limit,
119 });
120 }
121 buffer.extend_from_slice(&chunk);
122 }
123
124 tracing::trace!(size = buffer.len(), %url, "downloaded from HTTP");
125
126 Ok(buffer)
127 }
128}
129
130fn is_http_url(url: &Url) -> bool {
131 matches!(url.scheme(), "http" | "https")
132}
133
134fn normalize_gateway_url(mut url: Url) -> Url {
135 assert!(
136 is_http_url(&url),
137 "IPFS gateway URL must use http or https scheme, got: {}",
138 url.scheme()
139 );
140
141 if !url.path().ends_with('/') {
143 url.set_path(&format!("{}/", url.path()));
144 }
145 url
146}
147
148fn should_retry(err: &StorageError) -> bool {
150 match err {
151 StorageError::Http(_) | StorageError::HttpStatus(_) => true,
153 StorageError::SizeLimitExceeded { .. } => false,
155 _ => false,
157 }
158}
159
160#[async_trait]
161impl StorageDownloader for HttpDownloader {
162 async fn download_url_with_limit(
163 &self,
164 url: Url,
165 limit: usize,
166 ) -> Result<Vec<u8>, StorageError> {
167 match self.download_impl(url.clone(), limit).await {
168 Ok(data) => Ok(data),
169 Err(e) if should_retry(&e) => match self.rewrite_ipfs_url(&url) {
170 Some(gateway_url) => {
171 tracing::debug!(
172 original = %url,
173 fallback = %gateway_url,
174 error = %e,
175 "Retrying download with IPFS gateway"
176 );
177 self.download_impl(gateway_url, limit).await
178 }
179 None => Err(e),
180 },
181 Err(e) => Err(e),
182 }
183 }
184
185 async fn download_url(&self, url: Url) -> Result<Vec<u8>, StorageError> {
186 self.download_url_with_limit(url, usize::MAX).await
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use super::*;
194 use httpmock::prelude::*;
195
196 fn mock_server_with_body(path: &str, body: &[u8]) -> (MockServer, Vec<u8>) {
197 let server = MockServer::start();
198 let body = body.to_vec();
199 server.mock(|when, then| {
200 when.method(GET).path(path);
201 then.status(200).body(&body);
202 });
203 (server, body)
204 }
205
206 #[tokio::test]
207 async fn download_success() {
208 let (server, expected) = mock_server_with_body("/test", &[0x41; 4]);
209 let url = Url::parse(&server.url("/test")).unwrap();
210
211 let data = HttpDownloader::default().download_url(url).await.unwrap();
212 assert_eq!(data, expected);
213 }
214
215 #[tokio::test]
216 async fn download_rejects_oversized_response() {
217 let (server, _) = mock_server_with_body("/test", &[0x41; 100]);
218 let url = Url::parse(&server.url("/test")).unwrap();
219
220 let result = HttpDownloader::default().download_url_with_limit(url, 10).await;
221 assert!(matches!(result, Err(StorageError::SizeLimitExceeded { .. })));
222 }
223
224 #[tokio::test]
225 async fn ipfs_gateway_rewrites_basic_path() {
226 let (gateway, _) = mock_server_with_body("/ipfs/QmTestHash", b"content");
227 let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
228
229 let url = Url::parse("http://other-gateway.invalid/ipfs/QmTestHash").unwrap();
230 let data = downloader.download_url(url).await.unwrap();
231 assert_eq!(data, b"content");
232 }
233
234 #[tokio::test]
235 async fn ipfs_gateway_preserves_nested_path() {
236 let (gateway, _) = mock_server_with_body("/ipfs/QmHash/subdir/file.txt", b"nested");
237 let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
238
239 let url = Url::parse("http://other-gateway.invalid/ipfs/QmHash/subdir/file.txt").unwrap();
240 let data = downloader.download_url(url).await.unwrap();
241 assert_eq!(data, b"nested");
242 }
243
244 #[tokio::test]
245 async fn ipfs_gateway_preserves_base_path() {
246 let (gateway, _) = mock_server_with_body("/v1/ipfs/QmHash", b"with-base");
247 let gateway_url = Url::parse(&format!("{}/v1/", gateway.base_url())).unwrap();
248 let downloader = HttpDownloader::new(None, Some(gateway_url));
249
250 let url = Url::parse("http://other-gateway.invalid/ipfs/QmHash").unwrap();
251 let data = downloader.download_url(url).await.unwrap();
252 assert_eq!(data, b"with-base");
253 }
254
255 #[tokio::test]
256 async fn ipfs_fallback_triggers_on_failure() {
257 let (gateway, _) = mock_server_with_body("/ipfs/QmHash", b"fallback");
258 let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
259
260 let url = Url::parse("http://unreachable.invalid/ipfs/QmHash").unwrap();
262 let data = downloader.download_url(url).await.unwrap();
263 assert_eq!(data, b"fallback");
264 }
265
266 #[tokio::test]
267 async fn ipfs_fallback_avoids_infinite_loop() {
268 let gateway = MockServer::start();
269 gateway.mock(|when, then| {
271 when.method(GET).path("/ipfs/QmNotFound");
272 then.status(404);
273 });
274 let gateway_url = Url::parse(&gateway.base_url()).unwrap();
275 let downloader = HttpDownloader::new(None, Some(gateway_url.clone()));
276
277 let url = gateway_url.join("ipfs/QmNotFound").unwrap();
279 let result = downloader.download_url(url).await;
280
281 assert!(matches!(result, Err(StorageError::HttpStatus(404))));
282 }
283
284 #[test]
285 #[should_panic(expected = "IPFS gateway URL must use http or https scheme")]
286 fn rejects_non_http_gateway_scheme() {
287 let _ = HttpDownloader::new(None, Some(Url::parse("ftp://gateway.example.com").unwrap()));
288 }
289}