stream_download/http/
mod.rs

1//! An HTTP implementation of the [`SourceStream`] trait.
2//!
3//! An implementation of the [Client] trait using [reqwest](https://docs.rs/reqwest/latest/reqwest)
4//! is provided if the `reqwest` feature is enabled. If you need to customize the client object, you
5//! can use [`HttpStream::new`](crate::http::HttpStream::new) to supply your own reqwest client.
6//! Keep in mind that reqwest recommends creating a single client and cloning it for each new
7//! connection.
8//!
9//! # Example
10//!
11//! ```no_run
12//! use std::error::Error;
13//! use std::result::Result;
14//!
15//! use stream_download::http::HttpStream;
16//! use stream_download::http::reqwest::Client;
17//! use stream_download::source::SourceStream;
18//!
19//! #[tokio::main]
20//! async fn main() -> Result<(), Box<dyn Error>> {
21//!     let stream = HttpStream::new(
22//!         Client::new(),
23//!         "https://some-cool-url.com/some-file.mp3".parse()?,
24//!     )
25//!     .await?;
26//!     let content_length = stream.content_length();
27//!     Ok(())
28//! }
29//! ```
30
31use std::error::Error;
32use std::fmt::{Debug, Display, Formatter};
33use std::io;
34use std::pin::Pin;
35use std::task::{self, Poll};
36use std::time::Instant;
37
38use bytes::Bytes;
39use educe::Educe;
40use futures_util::{Future, Stream};
41use mediatype::MediaTypeBuf;
42#[cfg(feature = "reqwest")]
43pub use reqwest;
44use tracing::{debug, instrument, warn};
45
46use crate::WrapIoResult;
47use crate::source::{DecodeError, SourceStream};
48
49#[cfg(feature = "reqwest")]
50pub mod reqwest_client;
51
52#[cfg(feature = "reqwest-middleware")]
53pub(crate) mod reqwest_middleware_client;
54
55/// Wrapper trait for an HTTP client that exposes only functionality necessary for retrieving the
56/// stream content. If the `reqwest` feature is enabled, this trait is implemented for
57/// [reqwest::Client](https://docs.rs/reqwest/latest/reqwest/struct.Client.html).
58/// This can be implemented for a custom HTTP client if desired.
59pub trait Client: Send + Sync + Unpin + 'static {
60    /// The HTTP URL of the remote resource.
61    type Url: Display + Send + Sync + Unpin;
62
63    /// The type that contains the HTTP response headers.
64    type Headers: ResponseHeaders;
65
66    /// The HTTP response object.
67    type Response: ClientResponse<Headers = Self::Headers>;
68
69    /// The error type returned by HTTP requests.
70    type Error: Error + Send + Sync;
71
72    /// Creates a new instance of the client.
73    fn create() -> Self;
74
75    /// Sends an HTTP GET request to the URL.
76    fn get(
77        &self,
78        url: &Self::Url,
79    ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send;
80
81    /// Sends an HTTP GET request to the URL utilizing the `Range` header to request a specific part
82    /// of the stream.
83    ///
84    /// The end value should be inclusive, per the HTTP spec.
85    fn get_range(
86        &self,
87        url: &Self::Url,
88        start: u64,
89        end: Option<u64>,
90    ) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send;
91}
92
93/// Represents the content type HTTP response header
94#[derive(Clone, Debug, PartialEq, Eq)]
95pub struct ContentType {
96    /// The top-level content type such as application, audio, video, etc.
97    pub r#type: String,
98    /// The specific subtype such as mpeg, mp4, ogg, etc.
99    pub subtype: String,
100}
101
102/// A trait for getting a specific header value
103pub trait ResponseHeaders: Send + Sync + Unpin {
104    /// Get a specific header from the response.
105    /// If the value is not present or it can't be decoded as a string, `None` is returned.
106    fn header(&self, name: &str) -> Option<&str>;
107}
108
109/// A wrapper trait for an HTTP response that exposes only functionality necessary for retrieving
110/// the stream content. If the `reqwest` feature is enabled,
111/// this trait is implemented for
112/// [reqwest::Response](https://docs.rs/reqwest/latest/reqwest/struct.Response.html).
113/// This can be implemented for a custom HTTP response if desired.
114pub trait ClientResponse: Send + Sync + Sized {
115    /// Error type returned by the underlying response.
116    type ResponseError: DecodeError + Send;
117    /// Error type returned by the stream.
118    type StreamError: Error + Send + Sync;
119    /// Object containing HTTP response headers.
120    type Headers: ResponseHeaders;
121
122    /// The size of the remote resource in bytes.
123    /// The result should be `None` if the stream is infinite or doesn't have a known length.
124    fn content_length(&self) -> Option<u64>;
125
126    /// The content-type response header.
127    /// This should be in the standard format of `<type>/<subtype>`.
128    fn content_type(&self) -> Option<&str>;
129
130    /// Object containing HTTP response headers.
131    fn headers(&self) -> Self::Headers;
132
133    /// Turns the response into an error if the response was not successful.
134    fn into_result(self) -> Result<Self, Self::ResponseError>;
135
136    /// Converts the response into a byte stream
137    fn stream(
138        self,
139    ) -> Box<dyn Stream<Item = Result<Bytes, Self::StreamError>> + Unpin + Send + Sync>;
140}
141
142fn fmt<T>(val: &T, fmt: &mut Formatter<'_>) -> Result<(), std::fmt::Error>
143where
144    T: Display,
145{
146    write!(fmt, "{val}")
147}
148
149/// Error returned from an HTTP stream.
150#[derive(thiserror::Error, Educe)]
151#[educe(Debug)]
152pub enum HttpStreamError<C: Client> {
153    /// Failed to fetch.
154    #[error("Failed to fetch: {0}")]
155    FetchFailure(C::Error),
156    /// Failed to get response.
157    #[error("Failed to get response: {0}")]
158    ResponseFailure(<<C as Client>::Response as ClientResponse>::ResponseError),
159}
160
161impl<C: Client> DecodeError for HttpStreamError<C> {
162    async fn decode_error(self) -> String {
163        match self {
164            Self::ResponseFailure(e) => e.decode_error().await,
165            this @ Self::FetchFailure(_) => this.to_string(),
166        }
167    }
168}
169
170/// An HTTP implementation of the [`SourceStream`] trait.
171#[derive(Educe)]
172#[educe(Debug)]
173pub struct HttpStream<C: Client> {
174    #[educe(Debug = false)]
175    stream: Box<
176        dyn Stream<Item = Result<Bytes, <<C as Client>::Response as ClientResponse>::StreamError>>
177            + Unpin
178            + Send
179            + Sync,
180    >,
181    client: C,
182    content_length: Option<u64>,
183    content_type: Option<ContentType>,
184    #[educe(Debug(method = "fmt"))]
185    url: C::Url,
186    #[educe(Debug = false)]
187    headers: C::Headers,
188}
189
190impl<C: Client> HttpStream<C> {
191    /// Creates a new [`HttpStream`] from a [`Client`].
192    #[instrument(skip(client, url), fields(url = url.to_string()))]
193    pub async fn new(
194        client: C,
195        url: <Self as SourceStream>::Params,
196    ) -> Result<Self, HttpStreamError<C>> {
197        debug!("requesting stream content");
198        let request_start = Instant::now();
199
200        let response = client
201            .get(&url)
202            .await
203            .map_err(HttpStreamError::FetchFailure)?;
204        debug!(
205            duration = format!("{:?}", request_start.elapsed()),
206            "request finished"
207        );
208
209        let response = response
210            .into_result()
211            .map_err(HttpStreamError::ResponseFailure)?;
212        let content_length = response.content_length().map_or_else(
213            || {
214                warn!("content length header missing");
215                None
216            },
217            |content_length| {
218                debug!(content_length, "received content length");
219                Some(content_length)
220            },
221        );
222
223        let content_type = response.content_type().map_or_else(
224            || {
225                warn!("content type header missing");
226                None
227            },
228            |content_type| {
229                debug!(content_type, "received content type");
230                match content_type.parse::<MediaTypeBuf>() {
231                    Ok(content_type) => Some(ContentType {
232                        r#type: content_type.ty().to_string(),
233                        subtype: content_type.subty().to_string(),
234                    }),
235                    Err(e) => {
236                        warn!("error parsing content type: {e:?}");
237                        None
238                    }
239                }
240            },
241        );
242
243        let headers = response.headers();
244        let stream = response.stream();
245        Ok(Self {
246            stream: Box::new(stream),
247            client,
248            content_length,
249            content_type,
250            headers,
251            url,
252        })
253    }
254
255    /// The [`ContentType`] of the response stream.
256    pub fn content_type(&self) -> &Option<ContentType> {
257        &self.content_type
258    }
259
260    /// Get a specific header from the response.
261    /// If the value is not present or it can't be decoded as a string, `None` is returned.
262    pub fn header(&self, name: &str) -> Option<&str> {
263        self.headers.header(name)
264    }
265
266    /// Object containing HTTP response headers.
267    pub fn headers(&self) -> &C::Headers {
268        &self.headers
269    }
270
271    fn supports_range_request(&self) -> bool {
272        match self.header("Accept-Ranges") {
273            Some(val) => val != "none",
274            None => false,
275        }
276    }
277}
278
279impl<C: Client> Stream for HttpStream<C> {
280    type Item = Result<Bytes, <<C as Client>::Response as ClientResponse>::StreamError>;
281
282    fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
283        Pin::new(&mut self.stream).poll_next(cx)
284    }
285}
286
287impl<C: Client> SourceStream for HttpStream<C> {
288    type Params = C::Url;
289    type StreamCreationError = HttpStreamError<C>;
290
291    async fn create(params: Self::Params) -> Result<Self, Self::StreamCreationError> {
292        Self::new(C::create(), params).await
293    }
294
295    fn content_length(&self) -> Option<u64> {
296        self.content_length
297    }
298
299    #[instrument(skip(self))]
300    async fn seek_range(&mut self, start: u64, end: Option<u64>) -> io::Result<()> {
301        if Some(start) == self.content_length {
302            debug!(
303                "attempting to seek where start is the length of the stream, returning empty \
304                 stream"
305            );
306            self.stream = Box::new(futures_util::stream::empty());
307            return Ok(());
308        }
309
310        if !self.supports_range_request() {
311            warn!("Accept-Ranges header not present. Attempting seek anyway.");
312        }
313
314        debug!("sending HTTP range request");
315        let request_start = Instant::now();
316        let response = self
317            .client
318            // seek_range provides an exclusive end value, but we need it to be inclusive here
319            .get_range(&self.url, start, end.map(|e| e - 1))
320            .await
321            .map_err(|e| io::Error::other(e.to_string()))
322            .wrap_err(&format!("error sending HTTP range request to {}", self.url))?;
323        debug!(
324            duration = format!("{:?}", request_start.elapsed()),
325            "HTTP request finished"
326        );
327
328        let response = match response.into_result() {
329            Ok(response) => Ok(response),
330            Err(e) => {
331                let error = e.decode_error().await;
332                Err(io::Error::other(error)).wrap_err(&format!(
333                    "error getting HTTP range response from {}",
334                    self.url
335                ))
336            }
337        }?;
338        self.stream = Box::new(response.stream());
339        debug!("done seeking");
340        Ok(())
341    }
342
343    async fn reconnect(&mut self, current_position: u64) -> Result<(), io::Error> {
344        if self.supports_range_request() {
345            self.seek_range(current_position, None).await
346        } else {
347            let response = self
348                .client
349                .get(&self.url)
350                .await
351                .map_err(|e| io::Error::other(e.to_string()))
352                .wrap_err(&format!("error sending HTTP request to {}", self.url))?;
353            self.stream = Box::new(response.stream());
354            Ok(())
355        }
356    }
357
358    fn supports_seek(&self) -> bool {
359        true
360    }
361}
362
363/// HTTP range header key
364pub const RANGE_HEADER_KEY: &str = "Range";
365
366/// Utility function to format a range header for requesting bytes.
367///
368/// ex: `bytes=200-400`
369pub fn format_range_header_bytes(start: u64, end: Option<u64>) -> String {
370    format!(
371        "bytes={start}-{}",
372        end.map(|e| e.to_string()).unwrap_or_default()
373    )
374}