1use std::{pin::Pin, str::FromStr, sync::Arc};
6
7use futures_lite::{Stream, StreamExt};
8use reqwest::{
9 header::{HeaderMap, HeaderValue},
10 Method, StatusCode, Url,
11};
12
13use self::http_adapter::Opts;
14use super::*;
15
16#[derive(Debug)]
18pub struct HttpAdapter {
19 opts: Arc<http_adapter::Opts>,
20 size: Option<u64>,
21}
22
23impl HttpAdapter {
24 pub fn new(url: Url) -> Self {
26 Self::with_opts(Arc::new(Opts {
27 url,
28 client: reqwest::Client::new(),
29 headers: None,
30 }))
31 }
32
33 pub fn with_opts(opts: Arc<http_adapter::Opts>) -> Self {
35 Self { opts, size: None }
36 }
37
38 pub fn client(&self) -> &reqwest::Client {
40 &self.opts.client
41 }
42
43 pub fn url(&self) -> &Url {
45 &self.opts.url
46 }
47
48 async fn head_request(&self) -> Result<reqwest::Response, reqwest::Error> {
49 let mut req_builder = self.client().request(Method::HEAD, self.url().clone());
50 if let Some(headers) = self.opts.headers.as_ref() {
51 for (k, v) in headers.iter() {
52 req_builder = req_builder.header(k, v);
53 }
54 }
55 let req = req_builder.build()?;
56 let res = self.client().execute(req).await?;
57 Ok(res)
58 }
59
60 async fn range_request(
61 &self,
62 from: u64,
63 to: Option<u64>,
64 ) -> Result<reqwest::Response, reqwest::Error> {
65 let to = to.and_then(|x| x.checked_add(1));
67 let range = match to {
68 Some(to) => format!("bytes={from}-{to}"),
69 None => format!("bytes={from}-"),
70 };
71 let mut req_builder = self.client().request(Method::GET, self.url().clone());
72 if let Some(headers) = self.opts.headers.as_ref() {
73 for (k, v) in headers.iter() {
74 req_builder = req_builder.header(k, v);
75 }
76 }
77 req_builder = req_builder.header("range", range);
78
79 let req = req_builder.build()?;
80 let res = self.client().execute(req).await?;
81 Ok(res)
82 }
83
84 async fn get_stream_at(
85 &self,
86 offset: u64,
87 len: usize,
88 ) -> io::Result<Pin<Box<dyn Stream<Item = io::Result<Bytes>>>>> {
89 if let Some(size) = self.size {
90 if offset >= size {
91 return Ok(Box::pin(futures_lite::stream::empty()));
92 }
93 }
94 let from = offset;
95 let to = offset.checked_add(len as u64);
96 let from = self.size.map(|size| from.min(size)).unwrap_or(from);
98 let to = self
99 .size
100 .map(|size| to.map(|to| to.min(size)))
101 .unwrap_or(to);
102 let res = self.range_request(from, to).await.map_err(make_io_error)?;
103 if res.status().is_success() {
104 Ok(Box::pin(
105 res.bytes_stream().map(|r| r.map_err(make_io_error)),
106 ))
107 } else if res.status() == StatusCode::RANGE_NOT_SATISFIABLE {
108 Ok(Box::pin(futures_lite::stream::empty()))
111 } else {
112 Err(make_io_error(format!("http error {}", res.status())))
113 }
114 }
115}
116
117pub mod http_adapter {
119 use super::*;
120
121 #[derive(Debug, Clone)]
123 pub struct Opts {
124 pub url: Url,
126 pub headers: Option<HeaderMap<HeaderValue>>,
128 pub client: reqwest::Client,
131 }
132
133 impl AsyncSliceReader for HttpAdapter {
134 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
135 let mut stream = self.get_stream_at(offset, len).await?;
136 let mut res = BytesMut::with_capacity(len.min(1024));
137 while let Some(chunk) = stream.next().await {
138 let chunk = chunk?;
139 res.extend_from_slice(&chunk);
140 if BytesMut::len(&res) >= len {
141 break;
142 }
143 }
144 res.truncate(len);
146 Ok(res.freeze())
147 }
148
149 async fn size(&mut self) -> io::Result<u64> {
150 let io_err = |text: &str| io::Error::other(text);
151 let head_response = self
152 .head_request()
153 .await
154 .map_err(|_| io_err("head request failed"))?;
155 if !head_response.status().is_success() {
156 return Err(io_err("head request failed"));
157 }
158 let size = head_response
159 .headers()
160 .get("content-length")
161 .ok_or_else(|| io_err("content-length header missing"))?;
162 let text = size
163 .to_str()
164 .map_err(|_| io_err("content-length malformed"))?;
165 let size = u64::from_str(text).map_err(|_| io_err("content-length malformed"))?;
166 self.size = Some(size);
167 Ok(size)
168 }
169 }
170}