1use std::error::Error;
32use std::fmt::{Debug, Display, Formatter};
33use std::io;
34use std::pin::Pin;
35use std::task::{self, Poll};
36use std::time::Instant;
37
38use bytes::Bytes;
39use educe::Educe;
40use futures_util::{Future, Stream};
41use mediatype::MediaTypeBuf;
42#[cfg(feature = "reqwest")]
43pub use reqwest;
44use tracing::{debug, instrument, warn};
45
46use crate::WrapIoResult;
47use crate::source::{DecodeError, SourceStream};
48
49#[cfg(feature = "reqwest")]
50pub mod reqwest_client;
51
52#[cfg(feature = "reqwest-middleware")]
53pub(crate) mod reqwest_middleware_client;
54
55pub trait Client: Send + Sync + Unpin + 'static {
60 type Url: Display + Send + Sync + Unpin;
62
63 type Headers: ResponseHeaders;
65
66 type Response: ClientResponse<Headers = Self::Headers>;
68
69 type Error: Error + Send + Sync;
71
72 fn create() -> Self;
74
75 fn get(
77 &self,
78 url: &Self::Url,
79 ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send;
80
81 fn get_range(
86 &self,
87 url: &Self::Url,
88 start: u64,
89 end: Option<u64>,
90 ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send;
91}
92
93#[derive(Clone, Debug, PartialEq, Eq)]
95pub struct ContentType {
96 pub r#type: String,
98 pub subtype: String,
100}
101
102pub trait ResponseHeaders: Send + Sync + Unpin {
104 fn header(&self, name: &str) -> Option<&str>;
107}
108
109pub trait ClientResponse: Send + Sync + Sized {
115 type ResponseError: DecodeError + Send;
117 type StreamError: Error + Send + Sync;
119 type Headers: ResponseHeaders;
121
122 fn content_length(&self) -> Option<u64>;
125
126 fn content_type(&self) -> Option<&str>;
129
130 fn headers(&self) -> Self::Headers;
132
133 fn into_result(self) -> Result<Self, Self::ResponseError>;
135
136 fn stream(
138 self,
139 ) -> Box<dyn Stream<Item = Result<Bytes, Self::StreamError>> + Unpin + Send + Sync>;
140}
141
142fn fmt<T>(val: &T, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error>
143where
144 T: Display,
145{
146 write!(fmt, "{val}")
147}
148
149#[derive(thiserror::Error, Educe)]
151#[educe(Debug)]
152pub enum HttpStreamError<C: Client> {
153 #[error("Failed to fetch: {0}")]
155 FetchFailure(C::Error),
156 #[error("Failed to get response: {0}")]
158 ResponseFailure(<<C as Client>::Response as ClientResponse>::ResponseError),
159}
160
161impl<C: Client> DecodeError for HttpStreamError<C> {
162 async fn decode_error(self) -> String {
163 match self {
164 Self::ResponseFailure(e) => e.decode_error().await,
165 this @ Self::FetchFailure(_) => this.to_string(),
166 }
167 }
168}
169
170#[derive(Educe)]
172#[educe(Debug)]
173pub struct HttpStream<C: Client> {
174 #[educe(Debug = false)]
175 stream: Box<
176 dyn Stream<Item = Result<Bytes, <<C as Client>::Response as ClientResponse>::StreamError>>
177 + Unpin
178 + Send
179 + Sync,
180 >,
181 client: C,
182 content_length: Option<u64>,
183 content_type: Option<ContentType>,
184 #[educe(Debug(method = "fmt"))]
185 url: C::Url,
186 #[educe(Debug = false)]
187 headers: C::Headers,
188}
189
190impl<C: Client> HttpStream<C> {
191 #[instrument(skip(client, url), fields(url = url.to_string()))]
193 pub async fn new(
194 client: C,
195 url: <Self as SourceStream>::Params,
196 ) -> Result<Self, HttpStreamError<C>> {
197 debug!("requesting stream content");
198 let request_start = Instant::now();
199
200 let response = client
201 .get(&url)
202 .await
203 .map_err(HttpStreamError::FetchFailure)?;
204 debug!(
205 duration = format!("{:?}", request_start.elapsed()),
206 "request finished"
207 );
208
209 let response = response
210 .into_result()
211 .map_err(HttpStreamError::ResponseFailure)?;
212 let content_length = response.content_length().map_or_else(
213 || {
214 warn!("content length header missing");
215 None
216 },
217 |content_length| {
218 debug!(content_length, "received content length");
219 Some(content_length)
220 },
221 );
222
223 let content_type = response.content_type().map_or_else(
224 || {
225 warn!("content type header missing");
226 None
227 },
228 |content_type| {
229 debug!(content_type, "received content type");
230 match content_type.parse::<MediaTypeBuf>() {
231 Ok(content_type) => Some(ContentType {
232 r#type: content_type.ty().to_string(),
233 subtype: content_type.subty().to_string(),
234 }),
235 Err(e) => {
236 warn!("error parsing content type: {e:?}");
237 None
238 }
239 }
240 },
241 );
242
243 let headers = response.headers();
244 let stream = response.stream();
245 Ok(Self {
246 stream: Box::new(stream),
247 client,
248 content_length,
249 content_type,
250 headers,
251 url,
252 })
253 }
254
255 pub fn content_type(&self) -> &Option<ContentType> {
257 &self.content_type
258 }
259
260 pub fn header(&self, name: &str) -> Option<&str> {
263 self.headers.header(name)
264 }
265
266 pub fn headers(&self) -> &C::Headers {
268 &self.headers
269 }
270
271 fn supports_range_request(&self) -> bool {
272 match self.header("Accept-Ranges") {
273 Some(val) => val != "none",
274 None => false,
275 }
276 }
277}
278
279impl<C: Client> Stream for HttpStream<C> {
280 type Item = Result<Bytes, <<C as Client>::Response as ClientResponse>::StreamError>;
281
282 fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
283 Pin::new(&mut self.stream).poll_next(cx)
284 }
285}
286
287impl<C: Client> SourceStream for HttpStream<C> {
288 type Params = C::Url;
289 type StreamCreationError = HttpStreamError<C>;
290
291 async fn create(params: Self::Params) -> Result<Self, Self::StreamCreationError> {
292 Self::new(C::create(), params).await
293 }
294
295 fn content_length(&self) -> Option<u64> {
296 self.content_length
297 }
298
299 #[instrument(skip(self))]
300 async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
301 if Some(start) == self.content_length {
302 debug!(
303 "attempting to seek where start is the length of the stream, returning empty \
304 stream"
305 );
306 self.stream = Box::new(futures_util::stream::empty());
307 return Ok(());
308 }
309
310 if !self.supports_range_request() {
311 warn!("Accept-Ranges header not present. Attempting seek anyway.");
312 }
313
314 debug!("sending HTTP range request");
315 let request_start = Instant::now();
316 let response = self
317 .client
318 .get_range(&self.url, start, end.map(|e| e - 1))
320 .await
321 .map_err(|e| io::Error::other(e.to_string()))
322 .wrap_err(&format!("error sending HTTP range request to {}", self.url))?;
323 debug!(
324 duration = format!("{:?}", request_start.elapsed()),
325 "HTTP request finished"
326 );
327
328 let response = match response.into_result() {
329 Ok(response) => Ok(response),
330 Err(e) => {
331 let error = e.decode_error().await;
332 Err(io::Error::other(error)).wrap_err(&format!(
333 "error getting HTTP range response from {}",
334 self.url
335 ))
336 }
337 }?;
338 self.stream = Box::new(response.stream());
339 debug!("done seeking");
340 Ok(())
341 }
342
343 async fn reconnect(&mut self, current_position: u64) -> Result<(), io::Error> {
344 if self.supports_range_request() {
345 self.seek_range(current_position, None).await
346 } else {
347 let response = self
348 .client
349 .get(&self.url)
350 .await
351 .map_err(|e| io::Error::other(e.to_string()))
352 .wrap_err(&format!("error sending HTTP request to {}", self.url))?;
353 self.stream = Box::new(response.stream());
354 Ok(())
355 }
356 }
357
358 fn supports_seek(&self) -> bool {
359 true
360 }
361}
362
363pub const RANGE_HEADER_KEY: &str = "Range";
365
366pub fn format_range_header_bytes(start: u64, end: Option<u64>) -> String {
370 format!(
371 "bytes={start}-{}",
372 end.map(|e| e.to_string()).unwrap_or_default()
373 )
374}