stream_download/http/
reqwest_client.rs1use std::str::FromStr;
4use std::sync::LazyLock;
5
6use bytes::Bytes;
7use futures_util::Stream;
8use reqwest::header::{self, AsHeaderName, HeaderMap};
9use tracing::warn;
10
11use super::{DecodeError, RANGE_HEADER_KEY, format_range_header_bytes};
12use crate::http::{Client, ClientResponse, ResponseHeaders};
13
14impl ResponseHeaders for HeaderMap {
15 fn header(&self, name: &str) -> Option<&str> {
16 get_header_str(self, name)
17 }
18}
19
20fn get_header_str<K: AsHeaderName>(headers: &HeaderMap, key: K) -> Option<&str> {
21 headers.get(key).and_then(|val| {
22 val.to_str()
23 .inspect_err(|e| warn!("error converting header value: {e:?}"))
24 .ok()
25 })
26}
27
28#[derive(thiserror::Error, Debug)]
30#[error("Failed to fetch: {source}")]
31pub struct FetchError {
32 #[source]
33 source: reqwest::Error,
34 response: reqwest::Response,
35}
36
37impl FetchError {
38 pub fn source(&self) -> &reqwest::Error {
40 &self.source
41 }
42
43 pub fn response(&self) -> &reqwest::Response {
45 &self.response
46 }
47}
48
49impl DecodeError for FetchError {
50 async fn decode_error(self) -> String {
51 match self.response.text().await {
52 Ok(text) => format!("{}: {text}", self.source),
53 Err(e) => format!("{}. Error decoding response: {e}", self.source),
54 }
55 }
56}
57
58impl ClientResponse for reqwest::Response {
59 type ResponseError = FetchError;
60 type StreamError = reqwest::Error;
61 type Headers = HeaderMap;
62
63 fn content_length(&self) -> Option<u64> {
64 get_header_str(self.headers(), header::CONTENT_LENGTH).and_then(|content_length| {
65 u64::from_str(content_length)
66 .inspect_err(|e| warn!("invalid content length value: {e:?}"))
67 .ok()
68 })
69 }
70
71 fn content_type(&self) -> Option<&str> {
72 get_header_str(self.headers(), header::CONTENT_TYPE)
73 }
74
75 fn headers(&self) -> Self::Headers {
76 self.headers().clone()
77 }
78
79 fn into_result(self) -> Result<Self, Self::ResponseError> {
80 if let Err(error) = self.error_for_status_ref() {
81 Err(FetchError {
82 source: error,
83 response: self,
84 })
85 } else {
86 Ok(self)
87 }
88 }
89
90 fn stream(
91 self,
92 ) -> Box<dyn Stream<Item = Result<Bytes, Self::StreamError>> + Unpin + Send + Sync> {
93 Box::new(self.bytes_stream())
94 }
95}
96
97static CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);
99
100impl Client for reqwest::Client {
101 type Url = reqwest::Url;
102 type Response = reqwest::Response;
103 type Error = reqwest::Error;
104 type Headers = HeaderMap;
105
106 fn create() -> Self {
107 CLIENT.clone()
108 }
109
110 async fn get(&self, url: &Self::Url) -> Result<Self::Response, Self::Error> {
111 self.get(url.clone()).send().await
112 }
113
114 async fn get_range(
115 &self,
116 url: &Self::Url,
117 start: u64,
118 end: Option<u64>,
119 ) -> Result<Self::Response, Self::Error> {
120 self.get(url.clone())
121 .header(RANGE_HEADER_KEY, format_range_header_bytes(start, end))
122 .send()
123 .await
124 }
125}