Skip to main content

boundless_market/storage/providers/
http.rs

1// Copyright 2026 Boundless Foundation, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! HTTP/HTTPS download handler.
16//!
17//! This module provides download functionality for HTTP and HTTPS URLs with support for:
18//! - Retry policies
19//! - Size limits
20//! - IPFS gateway fallback
21
22use crate::storage::{config::StorageDownloaderConfig, StorageDownloader, StorageError};
23use async_trait::async_trait;
24use futures::StreamExt;
25use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
26use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
27use url::Url;
28
29/// HTTP/HTTPS downloader with retry support.
30///
31/// This downloader handles `http://` and `https://` URLs. It supports:
32/// - Configurable retry policies with exponential backoff
33/// - Size limits to prevent downloading excessively large files
34/// - Optional IPFS gateway fallback for URLs containing `/ipfs/`
35#[derive(Clone, Debug)]
36pub struct HttpDownloader {
37    client: ClientWithMiddleware,
38    ipfs_gateway: Option<Url>,
39}
40
41impl Default for HttpDownloader {
42    fn default() -> Self {
43        let config = StorageDownloaderConfig::default();
44        Self::new(config.max_retries, config.ipfs_gateway)
45    }
46}
47
48impl HttpDownloader {
49    /// Creates a new HTTP downloader with optional retry configuration.
50    ///
51    /// # Panics
52    ///
53    /// Panics if `ipfs_gateway` uses a scheme other than `http` or `https`.
54    pub fn new(max_retries: Option<u8>, ipfs_gateway: Option<Url>) -> Self {
55        let mut builder = ClientBuilder::new(reqwest::Client::new());
56
57        if let Some(max_retries) = max_retries {
58            let retry_policy =
59                ExponentialBackoff::builder().build_with_max_retries(max_retries as u32);
60            builder = builder.with(RetryTransientMiddleware::new_with_policy(retry_policy));
61        }
62
63        Self { client: builder.build(), ipfs_gateway: ipfs_gateway.map(normalize_gateway_url) }
64    }
65
66    /// Attempts to rewrite an IPFS gateway URL to use the configured fallback gateway.
67    ///
68    /// IPFS gateway URLs follow the path-style format: `https://<gateway>/ipfs/<CID>[/path]`.
69    /// This method replaces the gateway while preserving the `/ipfs/...` path and any
70    /// base path configured in the fallback gateway.
71    ///
72    /// Returns `None` if the URL is not an IPFS gateway URL.
73    ///
74    /// References:
75    /// - <https://specs.ipfs.tech/http-gateways/path-gateway/>
76    /// - <https://docs.ipfs.tech/how-to/address-ipfs-on-web/>
77    fn rewrite_ipfs_url(&self, url: &Url) -> Option<Url> {
78        let gateway = self.ipfs_gateway.as_ref()?;
79        let ipfs_path = url.path().strip_prefix("/ipfs/")?;
80
81        let mut new_url = gateway.join(&format!("ipfs/{ipfs_path}")).ok()?;
82        new_url.set_query(url.query());
83
84        // Don't rewrite if the URL is unchanged (already using the configured gateway)
85        (new_url != *url).then_some(new_url)
86    }
87
88    /// Internal download implementation without IPFS fallback.
89    async fn download_impl(&self, url: Url, limit: usize) -> Result<Vec<u8>, StorageError> {
90        if !is_http_url(&url) {
91            return Err(StorageError::UnsupportedScheme(url.scheme().to_string()));
92        }
93
94        tracing::debug!(%url, "downloading from HTTP");
95
96        let resp = self.client.get(url.clone()).send().await.map_err(StorageError::http)?;
97
98        let status = resp.status();
99        if !status.is_success() {
100            return Err(StorageError::HttpStatus(status.as_u16()));
101        }
102
103        // Check content length if available for early rejection
104        let content_length = resp.content_length().unwrap_or(0) as usize;
105        if content_length > limit {
106            return Err(StorageError::SizeLimitExceeded { size: content_length, limit });
107        }
108
109        // Stream the response body with size checking
110        let mut buffer = Vec::with_capacity(content_length.min(limit));
111        let mut stream = resp.bytes_stream();
112
113        while let Some(chunk) = stream.next().await {
114            let chunk = chunk.map_err(StorageError::http)?;
115            if buffer.len() + chunk.len() > limit {
116                return Err(StorageError::SizeLimitExceeded {
117                    size: buffer.len() + chunk.len(),
118                    limit,
119                });
120            }
121            buffer.extend_from_slice(&chunk);
122        }
123
124        tracing::trace!(size = buffer.len(), %url, "downloaded from HTTP");
125
126        Ok(buffer)
127    }
128}
129
130fn is_http_url(url: &Url) -> bool {
131    matches!(url.scheme(), "http" | "https")
132}
133
134fn normalize_gateway_url(mut url: Url) -> Url {
135    assert!(
136        is_http_url(&url),
137        "IPFS gateway URL must use http or https scheme, got: {}",
138        url.scheme()
139    );
140
141    // Ensure trailing slash so Url::join() appends rather than replaces
142    if !url.path().ends_with('/') {
143        url.set_path(&format!("{}/", url.path()));
144    }
145    url
146}
147
148/// Returns `true` if the error is worth retrying with a fallback gateway.
149fn should_retry(err: &StorageError) -> bool {
150    match err {
151        // Retry on any HTTP-related error (connection, status, etc.)
152        StorageError::Http(_) | StorageError::HttpStatus(_) => true,
153        // Don't retry on size limit - the fallback would have the same content
154        StorageError::SizeLimitExceeded { .. } => false,
155        // Don't retry on other errors
156        _ => false,
157    }
158}
159
160#[async_trait]
161impl StorageDownloader for HttpDownloader {
162    async fn download_url_with_limit(
163        &self,
164        url: Url,
165        limit: usize,
166    ) -> Result<Vec<u8>, StorageError> {
167        match self.download_impl(url.clone(), limit).await {
168            Ok(data) => Ok(data),
169            Err(e) if should_retry(&e) => match self.rewrite_ipfs_url(&url) {
170                Some(gateway_url) => {
171                    tracing::debug!(
172                        original = %url,
173                        fallback = %gateway_url,
174                        error = %e,
175                        "Retrying download with IPFS gateway"
176                    );
177                    self.download_impl(gateway_url, limit).await
178                }
179                None => Err(e),
180            },
181            Err(e) => Err(e),
182        }
183    }
184
185    async fn download_url(&self, url: Url) -> Result<Vec<u8>, StorageError> {
186        // No size limit configured; download full content
187        self.download_url_with_limit(url, usize::MAX).await
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194    use httpmock::prelude::*;
195
196    fn mock_server_with_body(path: &str, body: &[u8]) -> (MockServer, Vec<u8>) {
197        let server = MockServer::start();
198        let body = body.to_vec();
199        server.mock(|when, then| {
200            when.method(GET).path(path);
201            then.status(200).body(&body);
202        });
203        (server, body)
204    }
205
206    #[tokio::test]
207    async fn download_success() {
208        let (server, expected) = mock_server_with_body("/test", &[0x41; 4]);
209        let url = Url::parse(&server.url("/test")).unwrap();
210
211        let data = HttpDownloader::default().download_url(url).await.unwrap();
212        assert_eq!(data, expected);
213    }
214
215    #[tokio::test]
216    async fn download_rejects_oversized_response() {
217        let (server, _) = mock_server_with_body("/test", &[0x41; 100]);
218        let url = Url::parse(&server.url("/test")).unwrap();
219
220        let result = HttpDownloader::default().download_url_with_limit(url, 10).await;
221        assert!(matches!(result, Err(StorageError::SizeLimitExceeded { .. })));
222    }
223
224    #[tokio::test]
225    async fn ipfs_gateway_rewrites_basic_path() {
226        let (gateway, _) = mock_server_with_body("/ipfs/QmTestHash", b"content");
227        let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
228
229        let url = Url::parse("http://other-gateway.invalid/ipfs/QmTestHash").unwrap();
230        let data = downloader.download_url(url).await.unwrap();
231        assert_eq!(data, b"content");
232    }
233
234    #[tokio::test]
235    async fn ipfs_gateway_preserves_nested_path() {
236        let (gateway, _) = mock_server_with_body("/ipfs/QmHash/subdir/file.txt", b"nested");
237        let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
238
239        let url = Url::parse("http://other-gateway.invalid/ipfs/QmHash/subdir/file.txt").unwrap();
240        let data = downloader.download_url(url).await.unwrap();
241        assert_eq!(data, b"nested");
242    }
243
244    #[tokio::test]
245    async fn ipfs_gateway_preserves_base_path() {
246        let (gateway, _) = mock_server_with_body("/v1/ipfs/QmHash", b"with-base");
247        let gateway_url = Url::parse(&format!("{}/v1/", gateway.base_url())).unwrap();
248        let downloader = HttpDownloader::new(None, Some(gateway_url));
249
250        let url = Url::parse("http://other-gateway.invalid/ipfs/QmHash").unwrap();
251        let data = downloader.download_url(url).await.unwrap();
252        assert_eq!(data, b"with-base");
253    }
254
255    #[tokio::test]
256    async fn ipfs_fallback_triggers_on_failure() {
257        let (gateway, _) = mock_server_with_body("/ipfs/QmHash", b"fallback");
258        let downloader = HttpDownloader::new(None, Some(Url::parse(&gateway.base_url()).unwrap()));
259
260        // Primary URL is unreachable, should fall back to gateway
261        let url = Url::parse("http://unreachable.invalid/ipfs/QmHash").unwrap();
262        let data = downloader.download_url(url).await.unwrap();
263        assert_eq!(data, b"fallback");
264    }
265
266    #[tokio::test]
267    async fn ipfs_fallback_avoids_infinite_loop() {
268        let gateway = MockServer::start();
269        // Gateway returns 404 for this CID
270        gateway.mock(|when, then| {
271            when.method(GET).path("/ipfs/QmNotFound");
272            then.status(404);
273        });
274        let gateway_url = Url::parse(&gateway.base_url()).unwrap();
275        let downloader = HttpDownloader::new(None, Some(gateway_url.clone()));
276
277        // URL already points to configured gateway - should fail without retrying itself
278        let url = gateway_url.join("ipfs/QmNotFound").unwrap();
279        let result = downloader.download_url(url).await;
280
281        assert!(matches!(result, Err(StorageError::HttpStatus(404))));
282    }
283
284    #[test]
285    #[should_panic(expected = "IPFS gateway URL must use http or https scheme")]
286    fn rejects_non_http_gateway_scheme() {
287        let _ = HttpDownloader::new(None, Some(Url::parse("ftp://gateway.example.com").unwrap()));
288    }
289}