stream_download/http/
reqwest_client.rs

1//! Adapters for using [`reqwest`] with `stream-download`
2
3use std::str::FromStr;
4use std::sync::LazyLock;
5
6use bytes::Bytes;
7use futures_util::Stream;
8use reqwest::header::{self, AsHeaderName, HeaderMap};
9use tracing::warn;
10
11use super::{DecodeError, RANGE_HEADER_KEY, format_range_header_bytes};
12use crate::http::{Client, ClientResponse, ResponseHeaders};
13
14impl ResponseHeaders for HeaderMap {
15    fn header(&self, name: &str) -> Option<&str> {
16        get_header_str(self, name)
17    }
18}
19
20fn get_header_str<K: AsHeaderName>(headers: &HeaderMap, key: K) -> Option<&str> {
21    headers.get(key).and_then(|val| {
22        val.to_str()
23            .inspect_err(|e| warn!("error converting header value: {e:?}"))
24            .ok()
25    })
26}
27
28/// Error returned when making an HTTP call
29#[derive(thiserror::Error, Debug)]
30#[error("Failed to fetch: {source}")]
31pub struct FetchError {
32    #[source]
33    source: reqwest::Error,
34    response: reqwest::Response,
35}
36
37impl FetchError {
38    /// Error source.
39    pub fn source(&self) -> &reqwest::Error {
40        &self.source
41    }
42
43    /// Http response.
44    pub fn response(&self) -> &reqwest::Response {
45        &self.response
46    }
47}
48
49impl DecodeError for FetchError {
50    async fn decode_error(self) -> String {
51        match self.response.text().await {
52            Ok(text) => format!("{}: {text}", self.source),
53            Err(e) => format!("{}. Error decoding response: {e}", self.source),
54        }
55    }
56}
57
58impl ClientResponse for reqwest::Response {
59    type ResponseError = FetchError;
60    type StreamError = reqwest::Error;
61    type Headers = HeaderMap;
62
63    fn content_length(&self) -> Option<u64> {
64        get_header_str(self.headers(), header::CONTENT_LENGTH).and_then(|content_length| {
65            u64::from_str(content_length)
66                .inspect_err(|e| warn!("invalid content length value: {e:?}"))
67                .ok()
68        })
69    }
70
71    fn content_type(&self) -> Option<&str> {
72        get_header_str(self.headers(), header::CONTENT_TYPE)
73    }
74
75    fn headers(&self) -> Self::Headers {
76        self.headers().clone()
77    }
78
79    fn into_result(self) -> Result<Self, Self::ResponseError> {
80        if let Err(error) = self.error_for_status_ref() {
81            Err(FetchError {
82                source: error,
83                response: self,
84            })
85        } else {
86            Ok(self)
87        }
88    }
89
90    fn stream(
91        self,
92    ) -> Box<dyn Stream<Item = Result<Bytes, Self::StreamError>> + Unpin + Send + Sync> {
93        Box::new(self.bytes_stream())
94    }
95}
96
97// per reqwest's docs, it's advisable to create a single client and reuse it
98static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
99
100impl Client for reqwest::Client {
101    type Url = reqwest::Url;
102    type Response = reqwest::Response;
103    type Error = reqwest::Error;
104    type Headers = HeaderMap;
105
106    fn create() -> Self {
107        CLIENT.clone()
108    }
109
110    async fn get(&self, url: &Self::Url) -> Result<Self::Response, Self::Error> {
111        self.get(url.clone()).send().await
112    }
113
114    async fn get_range(
115        &self,
116        url: &Self::Url,
117        start: u64,
118        end: Option<u64>,
119    ) -> Result<Self::Response, Self::Error> {
120        self.get(url.clone())
121            .header(RANGE_HEADER_KEY, format_range_header_bytes(start, end))
122            .send()
123            .await
124    }
125}