iroh_io/
http.rs

1//! An [AsyncSliceReader] implementation for HTTP resources, using range requests.
2//!
3//! Uses the [reqwest](https://docs.rs/reqwest) crate. Somewhat inspired by
4//! <https://github.com/fasterthanlime/ubio/blob/main/src/http/mod.rs>
5use std::{pin::Pin, str::FromStr, sync::Arc};
6
7use futures_lite::{Stream, StreamExt};
8use reqwest::{
9    header::{HeaderMap, HeaderValue},
10    Method, StatusCode, Url,
11};
12
13use self::http_adapter::Opts;
14use super::*;
15
16/// A struct that implements [AsyncSliceReader] using HTTP range requests
17#[derive(Debug)]
18pub struct HttpAdapter {
19    opts: Arc<http_adapter::Opts>,
20    size: Option<u64>,
21}
22
23impl HttpAdapter {
24    /// Creates a new [`HttpAdapter`] from a URL
25    pub fn new(url: Url) -> Self {
26        Self::with_opts(Arc::new(Opts {
27            url,
28            client: reqwest::Client::new(),
29            headers: None,
30        }))
31    }
32
33    /// Creates a new [`HttpAdapter`] from a URL and options
34    pub fn with_opts(opts: Arc<http_adapter::Opts>) -> Self {
35        Self { opts, size: None }
36    }
37
38    /// Returns the client used for requests
39    pub fn client(&self) -> &reqwest::Client {
40        &self.opts.client
41    }
42
43    /// Returns the URL of the resource
44    pub fn url(&self) -> &Url {
45        &self.opts.url
46    }
47
48    async fn head_request(&self) -> Result<reqwest::Response, reqwest::Error> {
49        let mut req_builder = self.client().request(Method::HEAD, self.url().clone());
50        if let Some(headers) = self.opts.headers.as_ref() {
51            for (k, v) in headers.iter() {
52                req_builder = req_builder.header(k, v);
53            }
54        }
55        let req = req_builder.build()?;
56        let res = self.client().execute(req).await?;
57        Ok(res)
58    }
59
60    async fn range_request(
61        &self,
62        from: u64,
63        to: Option<u64>,
64    ) -> Result<reqwest::Response, reqwest::Error> {
65        // to is inclusive, commented out because warp is non spec compliant
66        let to = to.and_then(|x| x.checked_add(1));
67        let range = match to {
68            Some(to) => format!("bytes={from}-{to}"),
69            None => format!("bytes={from}-"),
70        };
71        let mut req_builder = self.client().request(Method::GET, self.url().clone());
72        if let Some(headers) = self.opts.headers.as_ref() {
73            for (k, v) in headers.iter() {
74                req_builder = req_builder.header(k, v);
75            }
76        }
77        req_builder = req_builder.header("range", range);
78
79        let req = req_builder.build()?;
80        let res = self.client().execute(req).await?;
81        Ok(res)
82    }
83
84    async fn get_stream_at(
85        &self,
86        offset: u64,
87        len: usize,
88    ) -> io::Result<Pin<Box<dyn Stream<Item = io::Result<Bytes>>>>> {
89        if let Some(size) = self.size {
90            if offset >= size {
91                return Ok(Box::pin(futures_lite::stream::empty()));
92            }
93        }
94        let from = offset;
95        let to = offset.checked_add(len as u64);
96        // if we have a size, clamp the range
97        let from = self.size.map(|size| from.min(size)).unwrap_or(from);
98        let to = self
99            .size
100            .map(|size| to.map(|to| to.min(size)))
101            .unwrap_or(to);
102        let res = self.range_request(from, to).await.map_err(make_io_error)?;
103        if res.status().is_success() {
104            Ok(Box::pin(
105                res.bytes_stream().map(|r| r.map_err(make_io_error)),
106            ))
107        } else if res.status() == StatusCode::RANGE_NOT_SATISFIABLE {
108            // https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/416
109            // we requested a range that is out of bounds, just return nothing
110            Ok(Box::pin(futures_lite::stream::empty()))
111        } else {
112            Err(make_io_error(format!("http error {}", res.status())))
113        }
114    }
115}
116
117/// Support for the [HttpAdapter]
118pub mod http_adapter {
119    use super::*;
120
121    /// Options for [HttpAdapter]
122    #[derive(Debug, Clone)]
123    pub struct Opts {
124        /// The URL of the resource
125        pub url: Url,
126        /// Additional headers to send with the requests
127        pub headers: Option<HeaderMap<HeaderValue>>,
128        /// The client to use for requests. If not set, a new client will be created
129        /// for each reader, which is wasteful.
130        pub client: reqwest::Client,
131    }
132
133    impl AsyncSliceReader for HttpAdapter {
134        async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
135            let mut stream = self.get_stream_at(offset, len).await?;
136            let mut res = BytesMut::with_capacity(len.min(1024));
137            while let Some(chunk) = stream.next().await {
138                let chunk = chunk?;
139                res.extend_from_slice(&chunk);
140                if BytesMut::len(&res) >= len {
141                    break;
142                }
143            }
144            // we do not want to rely on the server sending the exact amount of bytes
145            res.truncate(len);
146            Ok(res.freeze())
147        }
148
149        async fn size(&mut self) -> io::Result<u64> {
150            let io_err = |text: &str| io::Error::other(text);
151            let head_response = self
152                .head_request()
153                .await
154                .map_err(|_| io_err("head request failed"))?;
155            if !head_response.status().is_success() {
156                return Err(io_err("head request failed"));
157            }
158            let size = head_response
159                .headers()
160                .get("content-length")
161                .ok_or_else(|| io_err("content-length header missing"))?;
162            let text = size
163                .to_str()
164                .map_err(|_| io_err("content-length malformed"))?;
165            let size = u64::from_str(text).map_err(|_| io_err("content-length malformed"))?;
166            self.size = Some(size);
167            Ok(size)
168        }
169    }
170}