firebase_rs_sdk/storage/request/
transport.rs

1use crate::platform::runtime::{self, TimeoutError};
2use crate::storage::error::{internal_error, StorageError, StorageResult};
3use crate::storage::util::is_url;
4#[cfg(not(target_arch = "wasm32"))]
5use bytes::Bytes;
6#[cfg(not(target_arch = "wasm32"))]
7use futures::stream::TryStreamExt;
8use reqwest::{Client, Response, StatusCode, Url};
9use std::collections::HashMap;
10#[cfg(not(target_arch = "wasm32"))]
11use std::io::{Error as IoError, ErrorKind};
12#[cfg(not(target_arch = "wasm32"))]
13use std::pin::Pin;
14use std::time::Duration;
15#[cfg(not(target_arch = "wasm32"))]
16use tokio_util::io::StreamReader;
17
18use super::backoff::{BackoffConfig, BackoffState};
19use super::info::{RequestBody, RequestInfo};
20
21#[derive(Clone, Debug)]
22pub struct ResponsePayload {
23    pub status: StatusCode,
24    pub headers: HashMap<String, String>,
25    pub body: Vec<u8>,
26}
27
28impl ResponsePayload {
29    async fn from_response(response: Response) -> StorageResult<Self> {
30        let status = response.status();
31        let mut headers = HashMap::new();
32        for (key, value) in response.headers().iter() {
33            if let Ok(val) = value.to_str() {
34                headers.insert(key.as_str().to_owned(), val.to_owned());
35            }
36        }
37        let body = response
38            .bytes()
39            .await
40            .map_err(|err| internal_error(format!("failed to read response body: {err}")))?
41            .to_vec();
42        Ok(Self {
43            status,
44            headers,
45            body,
46        })
47    }
48}
49
50#[cfg(not(target_arch = "wasm32"))]
51type DynByteStream = Pin<Box<dyn futures::stream::Stream<Item = Result<Bytes, IoError>> + Send>>;
52
53#[cfg(not(target_arch = "wasm32"))]
54pub type StorageByteStream = StreamReader<DynByteStream, Bytes>;
55
56#[cfg(not(target_arch = "wasm32"))]
57pub struct StreamingResponse {
58    pub status: StatusCode,
59    pub headers: HashMap<String, String>,
60    pub reader: StorageByteStream,
61}
62
63#[derive(Debug)]
64pub enum RequestError {
65    Network(String),
66    Timeout,
67    Fatal(StorageError),
68}
69
70#[derive(Clone)]
71pub struct HttpClient {
72    client: Client,
73    is_using_emulator: bool,
74    backoff: BackoffConfig,
75}
76
77impl HttpClient {
78    pub fn new(is_using_emulator: bool, backoff: BackoffConfig) -> StorageResult<Self> {
79        let client = Client::builder()
80            .build()
81            .map_err(|err| internal_error(format!("failed to build HTTP client: {err}")))?;
82        Ok(Self {
83            client,
84            is_using_emulator,
85            backoff,
86        })
87    }
88
89    pub async fn execute<O>(&self, info: RequestInfo<O>) -> StorageResult<O> {
90        let mut backoff = BackoffState::new(self.backoff.clone());
91
92        loop {
93            if !backoff.has_time_remaining() {
94                return Err(internal_error("storage request timed out"));
95            }
96
97            let delay = backoff.next_delay();
98            if delay > Duration::from_millis(0) {
99                runtime::sleep(delay).await;
100            }
101
102            let result = self.try_once(&info).await;
103
104            match result {
105                Ok(payload) => {
106                    if info.success_codes.contains(&payload.status.as_u16()) {
107                        return (info.response_handler)(payload);
108                    }
109
110                    if should_retry(payload.status, &info) && backoff.can_retry() {
111                        continue;
112                    }
113
114                    return Err(map_failure(payload, &info));
115                }
116                Err(RequestError::Fatal(err)) => return Err(err),
117                Err(RequestError::Timeout) => {
118                    return Err(internal_error("storage request timed out"));
119                }
120                Err(RequestError::Network(reason)) => {
121                    if backoff.can_retry() {
122                        continue;
123                    }
124                    return Err(internal_error(format!(
125                        "network failure after retries: {reason}"
126                    )));
127                }
128            }
129        }
130    }
131
132    async fn try_once<O>(&self, info: &RequestInfo<O>) -> Result<ResponsePayload, RequestError> {
133        let mut url = self.prepare_url(&info.url).map_err(RequestError::Fatal)?;
134        if !info.query_params.is_empty() {
135            {
136                let mut pairs = url.query_pairs_mut();
137                for (k, v) in &info.query_params {
138                    pairs.append_pair(k, v);
139                }
140            }
141        }
142
143        let mut request_builder = self.client.request(info.method.clone(), url);
144
145        for (header, value) in &info.headers {
146            request_builder = request_builder.header(header, value);
147        }
148
149        match &info.body {
150            RequestBody::Bytes(bytes) => {
151                if !bytes.is_empty() {
152                    request_builder = request_builder.body(bytes.clone());
153                }
154            }
155            RequestBody::Text(text) => {
156                if !text.is_empty() {
157                    request_builder = request_builder.body(text.clone());
158                }
159            }
160            RequestBody::Empty => {}
161        }
162
163        let response = send_with_timeout(request_builder, info.timeout).await?;
164
165        ResponsePayload::from_response(response)
166            .await
167            .map_err(RequestError::Fatal)
168    }
169
170    #[cfg(not(target_arch = "wasm32"))]
171    pub async fn execute_streaming<O>(
172        &self,
173        info: RequestInfo<O>,
174    ) -> StorageResult<StreamingResponse> {
175        let mut backoff = BackoffState::new(self.backoff.clone());
176
177        loop {
178            if !backoff.has_time_remaining() {
179                return Err(internal_error("storage request timed out"));
180            }
181
182            let delay = backoff.next_delay();
183            if delay > Duration::from_millis(0) {
184                runtime::sleep(delay).await;
185            }
186
187            match self.try_stream_once(&info).await {
188                Ok(response) => return Ok(response),
189                Err(RequestError::Fatal(err)) => return Err(err),
190                Err(RequestError::Timeout) => {
191                    return Err(internal_error("storage request timed out"));
192                }
193                Err(RequestError::Network(reason)) => {
194                    if backoff.can_retry() {
195                        continue;
196                    }
197                    return Err(internal_error(format!(
198                        "network failure after retries: {reason}"
199                    )));
200                }
201            }
202        }
203    }
204
205    #[cfg(not(target_arch = "wasm32"))]
206    async fn try_stream_once<O>(
207        &self,
208        info: &RequestInfo<O>,
209    ) -> Result<StreamingResponse, RequestError> {
210        let mut url = self.prepare_url(&info.url).map_err(RequestError::Fatal)?;
211        if !info.query_params.is_empty() {
212            let mut pairs = url.query_pairs_mut();
213            for (k, v) in &info.query_params {
214                pairs.append_pair(k, v);
215            }
216        }
217
218        let mut request_builder = self.client.request(info.method.clone(), url);
219        request_builder = request_builder.timeout(info.timeout);
220
221        for (header, value) in &info.headers {
222            request_builder = request_builder.header(header, value);
223        }
224
225        match &info.body {
226            RequestBody::Bytes(bytes) => {
227                if !bytes.is_empty() {
228                    request_builder = request_builder.body(bytes.clone());
229                }
230            }
231            RequestBody::Text(text) => {
232                if !text.is_empty() {
233                    request_builder = request_builder.body(text.clone());
234                }
235            }
236            RequestBody::Empty => {}
237        }
238
239        let response = send_with_timeout(request_builder, info.timeout).await?;
240        let status = response.status();
241
242        if !info.success_codes.contains(&status.as_u16()) {
243            let payload = ResponsePayload::from_response(response)
244                .await
245                .map_err(RequestError::Fatal)?;
246            return Err(RequestError::Fatal(map_failure(payload, info)));
247        }
248
249        let mut headers = HashMap::new();
250        for (key, value) in response.headers().iter() {
251            if let Ok(val) = value.to_str() {
252                headers.insert(key.as_str().to_owned(), val.to_owned());
253            }
254        }
255
256        let stream = response
257            .bytes_stream()
258            .map_err(|err| IoError::new(ErrorKind::Other, err));
259        let stream: DynByteStream = Box::pin(stream);
260        let reader = StreamReader::new(stream);
261
262        Ok(StreamingResponse {
263            status,
264            headers,
265            reader,
266        })
267    }
268
269    fn prepare_url(&self, raw: &str) -> StorageResult<Url> {
270        if is_url(raw) {
271            Url::parse(raw).map_err(|err| internal_error(format!("invalid storage URL: {err}")))
272        } else {
273            let scheme = if self.is_using_emulator {
274                "http"
275            } else {
276                "https"
277            };
278            let formatted = format!("{scheme}://{raw}");
279            Url::parse(&formatted)
280                .map_err(|err| internal_error(format!("invalid storage URL: {err}")))
281        }
282    }
283}
284
285async fn send_with_timeout(
286    builder: reqwest::RequestBuilder,
287    timeout: Duration,
288) -> Result<Response, RequestError> {
289    #[cfg(not(target_arch = "wasm32"))]
290    let send_future = builder.timeout(timeout).send();
291    #[cfg(target_arch = "wasm32")]
292    let send_future = builder.send();
293
294    match runtime::with_timeout(send_future, timeout).await {
295        Ok(result) => result.map_err(map_reqwest_error),
296        Err(TimeoutError) => Err(RequestError::Timeout),
297    }
298}
299
300fn map_reqwest_error(err: reqwest::Error) -> RequestError {
301    if err.is_timeout() {
302        RequestError::Timeout
303    } else {
304        RequestError::Network(err.to_string())
305    }
306}
307
308fn should_retry<O>(status: StatusCode, info: &RequestInfo<O>) -> bool {
309    crate::storage::util::is_retry_status_code(status.as_u16(), &info.additional_retry_codes)
310}
311
312fn map_failure<O>(payload: ResponsePayload, info: &RequestInfo<O>) -> StorageError {
313    let base_error = internal_error(format!(
314        "storage request failed with status {}",
315        payload.status
316    ))
317    .with_status(payload.status.as_u16())
318    .with_server_response(String::from_utf8_lossy(&payload.body).to_string());
319
320    if let Some(handler) = &info.error_handler {
321        handler(payload, base_error)
322    } else {
323        base_error
324    }
325}