firebase_rs_sdk/storage/request/
transport.rs1use crate::platform::runtime::{self, TimeoutError};
2use crate::storage::error::{internal_error, StorageError, StorageResult};
3use crate::storage::util::is_url;
4#[cfg(not(target_arch = "wasm32"))]
5use bytes::Bytes;
6#[cfg(not(target_arch = "wasm32"))]
7use futures::stream::TryStreamExt;
8use reqwest::{Client, Response, StatusCode, Url};
9use std::collections::HashMap;
10#[cfg(not(target_arch = "wasm32"))]
11use std::io::{Error as IoError, ErrorKind};
12#[cfg(not(target_arch = "wasm32"))]
13use std::pin::Pin;
14use std::time::Duration;
15#[cfg(not(target_arch = "wasm32"))]
16use tokio_util::io::StreamReader;
17
18use super::backoff::{BackoffConfig, BackoffState};
19use super::info::{RequestBody, RequestInfo};
20
21#[derive(Clone, Debug)]
22pub struct ResponsePayload {
23 pub status: StatusCode,
24 pub headers: HashMap<String, String>,
25 pub body: Vec<u8>,
26}
27
28impl ResponsePayload {
29 async fn from_response(response: Response) -> StorageResult<Self> {
30 let status = response.status();
31 let mut headers = HashMap::new();
32 for (key, value) in response.headers().iter() {
33 if let Ok(val) = value.to_str() {
34 headers.insert(key.as_str().to_owned(), val.to_owned());
35 }
36 }
37 let body = response
38 .bytes()
39 .await
40 .map_err(|err| internal_error(format!("failed to read response body: {err}")))?
41 .to_vec();
42 Ok(Self {
43 status,
44 headers,
45 body,
46 })
47 }
48}
49
50#[cfg(not(target_arch = "wasm32"))]
51type DynByteStream = Pin<Box<dyn futures::stream::Stream<Item = Result<Bytes, IoError>> + Send>>;
52
53#[cfg(not(target_arch = "wasm32"))]
54pub type StorageByteStream = StreamReader<DynByteStream, Bytes>;
55
56#[cfg(not(target_arch = "wasm32"))]
57pub struct StreamingResponse {
58 pub status: StatusCode,
59 pub headers: HashMap<String, String>,
60 pub reader: StorageByteStream,
61}
62
63#[derive(Debug)]
64pub enum RequestError {
65 Network(String),
66 Timeout,
67 Fatal(StorageError),
68}
69
70#[derive(Clone)]
71pub struct HttpClient {
72 client: Client,
73 is_using_emulator: bool,
74 backoff: BackoffConfig,
75}
76
77impl HttpClient {
78 pub fn new(is_using_emulator: bool, backoff: BackoffConfig) -> StorageResult<Self> {
79 let client = Client::builder()
80 .build()
81 .map_err(|err| internal_error(format!("failed to build HTTP client: {err}")))?;
82 Ok(Self {
83 client,
84 is_using_emulator,
85 backoff,
86 })
87 }
88
89 pub async fn execute<O>(&self, info: RequestInfo<O>) -> StorageResult<O> {
90 let mut backoff = BackoffState::new(self.backoff.clone());
91
92 loop {
93 if !backoff.has_time_remaining() {
94 return Err(internal_error("storage request timed out"));
95 }
96
97 let delay = backoff.next_delay();
98 if delay > Duration::from_millis(0) {
99 runtime::sleep(delay).await;
100 }
101
102 let result = self.try_once(&info).await;
103
104 match result {
105 Ok(payload) => {
106 if info.success_codes.contains(&payload.status.as_u16()) {
107 return (info.response_handler)(payload);
108 }
109
110 if should_retry(payload.status, &info) && backoff.can_retry() {
111 continue;
112 }
113
114 return Err(map_failure(payload, &info));
115 }
116 Err(RequestError::Fatal(err)) => return Err(err),
117 Err(RequestError::Timeout) => {
118 return Err(internal_error("storage request timed out"));
119 }
120 Err(RequestError::Network(reason)) => {
121 if backoff.can_retry() {
122 continue;
123 }
124 return Err(internal_error(format!(
125 "network failure after retries: {reason}"
126 )));
127 }
128 }
129 }
130 }
131
132 async fn try_once<O>(&self, info: &RequestInfo<O>) -> Result<ResponsePayload, RequestError> {
133 let mut url = self.prepare_url(&info.url).map_err(RequestError::Fatal)?;
134 if !info.query_params.is_empty() {
135 {
136 let mut pairs = url.query_pairs_mut();
137 for (k, v) in &info.query_params {
138 pairs.append_pair(k, v);
139 }
140 }
141 }
142
143 let mut request_builder = self.client.request(info.method.clone(), url);
144
145 for (header, value) in &info.headers {
146 request_builder = request_builder.header(header, value);
147 }
148
149 match &info.body {
150 RequestBody::Bytes(bytes) => {
151 if !bytes.is_empty() {
152 request_builder = request_builder.body(bytes.clone());
153 }
154 }
155 RequestBody::Text(text) => {
156 if !text.is_empty() {
157 request_builder = request_builder.body(text.clone());
158 }
159 }
160 RequestBody::Empty => {}
161 }
162
163 let response = send_with_timeout(request_builder, info.timeout).await?;
164
165 ResponsePayload::from_response(response)
166 .await
167 .map_err(RequestError::Fatal)
168 }
169
170 #[cfg(not(target_arch = "wasm32"))]
171 pub async fn execute_streaming<O>(
172 &self,
173 info: RequestInfo<O>,
174 ) -> StorageResult<StreamingResponse> {
175 let mut backoff = BackoffState::new(self.backoff.clone());
176
177 loop {
178 if !backoff.has_time_remaining() {
179 return Err(internal_error("storage request timed out"));
180 }
181
182 let delay = backoff.next_delay();
183 if delay > Duration::from_millis(0) {
184 runtime::sleep(delay).await;
185 }
186
187 match self.try_stream_once(&info).await {
188 Ok(response) => return Ok(response),
189 Err(RequestError::Fatal(err)) => return Err(err),
190 Err(RequestError::Timeout) => {
191 return Err(internal_error("storage request timed out"));
192 }
193 Err(RequestError::Network(reason)) => {
194 if backoff.can_retry() {
195 continue;
196 }
197 return Err(internal_error(format!(
198 "network failure after retries: {reason}"
199 )));
200 }
201 }
202 }
203 }
204
205 #[cfg(not(target_arch = "wasm32"))]
206 async fn try_stream_once<O>(
207 &self,
208 info: &RequestInfo<O>,
209 ) -> Result<StreamingResponse, RequestError> {
210 let mut url = self.prepare_url(&info.url).map_err(RequestError::Fatal)?;
211 if !info.query_params.is_empty() {
212 let mut pairs = url.query_pairs_mut();
213 for (k, v) in &info.query_params {
214 pairs.append_pair(k, v);
215 }
216 }
217
218 let mut request_builder = self.client.request(info.method.clone(), url);
219 request_builder = request_builder.timeout(info.timeout);
220
221 for (header, value) in &info.headers {
222 request_builder = request_builder.header(header, value);
223 }
224
225 match &info.body {
226 RequestBody::Bytes(bytes) => {
227 if !bytes.is_empty() {
228 request_builder = request_builder.body(bytes.clone());
229 }
230 }
231 RequestBody::Text(text) => {
232 if !text.is_empty() {
233 request_builder = request_builder.body(text.clone());
234 }
235 }
236 RequestBody::Empty => {}
237 }
238
239 let response = send_with_timeout(request_builder, info.timeout).await?;
240 let status = response.status();
241
242 if !info.success_codes.contains(&status.as_u16()) {
243 let payload = ResponsePayload::from_response(response)
244 .await
245 .map_err(RequestError::Fatal)?;
246 return Err(RequestError::Fatal(map_failure(payload, info)));
247 }
248
249 let mut headers = HashMap::new();
250 for (key, value) in response.headers().iter() {
251 if let Ok(val) = value.to_str() {
252 headers.insert(key.as_str().to_owned(), val.to_owned());
253 }
254 }
255
256 let stream = response
257 .bytes_stream()
258 .map_err(|err| IoError::new(ErrorKind::Other, err));
259 let stream: DynByteStream = Box::pin(stream);
260 let reader = StreamReader::new(stream);
261
262 Ok(StreamingResponse {
263 status,
264 headers,
265 reader,
266 })
267 }
268
269 fn prepare_url(&self, raw: &str) -> StorageResult<Url> {
270 if is_url(raw) {
271 Url::parse(raw).map_err(|err| internal_error(format!("invalid storage URL: {err}")))
272 } else {
273 let scheme = if self.is_using_emulator {
274 "http"
275 } else {
276 "https"
277 };
278 let formatted = format!("{scheme}://{raw}");
279 Url::parse(&formatted)
280 .map_err(|err| internal_error(format!("invalid storage URL: {err}")))
281 }
282 }
283}
284
285async fn send_with_timeout(
286 builder: reqwest::RequestBuilder,
287 timeout: Duration,
288) -> Result<Response, RequestError> {
289 #[cfg(not(target_arch = "wasm32"))]
290 let send_future = builder.timeout(timeout).send();
291 #[cfg(target_arch = "wasm32")]
292 let send_future = builder.send();
293
294 match runtime::with_timeout(send_future, timeout).await {
295 Ok(result) => result.map_err(map_reqwest_error),
296 Err(TimeoutError) => Err(RequestError::Timeout),
297 }
298}
299
300fn map_reqwest_error(err: reqwest::Error) -> RequestError {
301 if err.is_timeout() {
302 RequestError::Timeout
303 } else {
304 RequestError::Network(err.to_string())
305 }
306}
307
308fn should_retry<O>(status: StatusCode, info: &RequestInfo<O>) -> bool {
309 crate::storage::util::is_retry_status_code(status.as_u16(), &info.additional_retry_codes)
310}
311
312fn map_failure<O>(payload: ResponsePayload, info: &RequestInfo<O>) -> StorageError {
313 let base_error = internal_error(format!(
314 "storage request failed with status {}",
315 payload.status
316 ))
317 .with_status(payload.status.as_u16())
318 .with_server_response(String::from_utf8_lossy(&payload.body).to_string());
319
320 if let Some(handler) = &info.error_handler {
321 handler(payload, base_error)
322 } else {
323 base_error
324 }
325}