async_google_apis_common/
http.rs

1use crate::*;
2use anyhow::Context;
3use tokio::io::AsyncSeekExt;
4
5pub trait AsyncWriteUnpin: tokio::io::AsyncWrite + std::marker::Unpin + Send + Sync {}
6
7impl<T> AsyncWriteUnpin for T
8where T: tokio::io::AsyncWrite + std::marker::Unpin + Send + Sync {}
9
10fn body_to_str(b: hyper::body::Bytes) -> String {
11    String::from_utf8(b.to_vec()).unwrap_or("[UTF-8 decode failed]".into())
12}
13
14/// This type is used as type parameter to the following functions, when `rq` is `None`.
15#[derive(Debug, Serialize)]
16pub struct EmptyRequest {}
17
18/// This type is used as type parameter for when no response is expected.
19#[derive(Debug, Deserialize, Clone, Default)]
20pub struct EmptyResponse {}
21
22/// Result of a method that can (but doesn't always) download data.
23#[derive(Debug, PartialEq)]
24pub enum DownloadResult<T: DeserializeOwned + std::fmt::Debug> {
25    /// Downloaded data has been written to the supplied Writer.
26    Downloaded,
27    /// A structured response has been returned.
28    Response(T),
29}
30
31/// The Content-Type header is set automatically to application/json.
32pub async fn do_request<
33    Req: Serialize + std::fmt::Debug,
34    Resp: DeserializeOwned + Clone + Default,
35>(
36    cl: &TlsClient,
37    path: &str,
38    headers: &[(hyper::header::HeaderName, String)],
39    http_method: &str,
40    rq: Option<Req>,
41) -> Result<Resp> {
42    use futures::future::FutureExt;
43    do_request_with_headers(cl, path, headers, http_method, rq)
44        .map(|r| r.map(|t| t.0))
45        .await
46}
47
48/// The Content-Type header is set automatically to application/json. Also returns response
49/// headers.
50pub async fn do_request_with_headers<
51    Req: Serialize + std::fmt::Debug,
52    Resp: DeserializeOwned + Clone + Default,
53>(
54    cl: &TlsClient,
55    path: &str,
56    headers: &[(hyper::header::HeaderName, String)],
57    http_method: &str,
58    rq: Option<Req>,
59) -> Result<(Resp, hyper::HeaderMap)> {
60    let mut reqb = hyper::Request::builder().uri(path).method(http_method);
61    for (k, v) in headers {
62        reqb = reqb.header(k, v);
63    }
64    reqb = reqb.header("Content-Type", "application/json");
65    let body_str;
66    if let Some(rq) = rq {
67        body_str = serde_json::to_string(&rq).context(format!("{:?}", rq))?;
68    } else {
69        body_str = "".to_string();
70    }
71
72    let body;
73    if body_str == "null" {
74        body = hyper::Body::from("");
75    } else {
76        body = hyper::Body::from(body_str);
77    }
78
79    let http_request = reqb.body(body)?;
80
81    debug!("do_request: Launching HTTP request: {:?}", http_request);
82
83    let http_response = cl.request(http_request).await?;
84    let status = http_response.status();
85
86    debug!(
87        "do_request: HTTP response with status {} received: {:?}",
88        status, http_response
89    );
90
91    let headers = http_response.headers().clone();
92    let response_body = hyper::body::to_bytes(http_response.into_body()).await?;
93    if !status.is_success() {
94        Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into())
95    } else {
96        // Evaluate body_to_str lazily
97        if response_body.len() > 0 {
98            serde_json::from_reader(response_body.as_ref())
99                .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
100                .map(|r| (r, headers))
101        } else {
102            Ok((Default::default(), headers))
103        }
104    }
105}
106
107/// The Content-Length header is set automatically.
108pub async fn do_upload_multipart<
109    Req: Serialize + std::fmt::Debug,
110    Resp: DeserializeOwned + Clone,
111>(
112    cl: &TlsClient,
113    path: &str,
114    headers: &[(hyper::header::HeaderName, String)],
115    http_method: &str,
116    req: Option<Req>,
117    data: hyper::body::Bytes,
118) -> Result<Resp> {
119    let mut reqb = hyper::Request::builder().uri(path).method(http_method);
120    for (k, v) in headers {
121        reqb = reqb.header(k, v);
122    }
123
124    let data = multipart::format_multipart(&req, data)?;
125    reqb = reqb.header("Content-Length", data.as_ref().len());
126    reqb = reqb.header(
127        "Content-Type",
128        format!("multipart/related; boundary={}", multipart::MIME_BOUNDARY),
129    );
130
131    let body = hyper::Body::from(data.as_ref().to_vec());
132    let http_request = reqb.body(body)?;
133    debug!(
134        "do_upload_multipart: Launching HTTP request: {:?}",
135        http_request
136    );
137    let http_response = cl.request(http_request).await?;
138    let status = http_response.status();
139    debug!(
140        "do_upload_multipart: HTTP response with status {} received: {:?}",
141        status, http_response
142    );
143    let response_body = hyper::body::to_bytes(http_response.into_body()).await?;
144
145    if !status.is_success() {
146        Err(ApiError::HTTPResponseError(status, body_to_str(response_body)).into())
147    } else {
148        serde_json::from_reader(response_body.as_ref())
149            .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
150    }
151}
152
153/// An ongoing download.
154///
155/// Note that this does not necessarily result in a download. It is returned by all API methods
156/// that are capable of downloading data. Whether a download takes place is determined by the
157/// `Content-Type` sent by the server; frequently, the parameters sent in the request determine
158/// whether the server starts a download (`Content-Type: whatever`) or sends a response
159/// (`Content-Type: application/json`).
160pub struct Download<'a, Request, Response> {
161    cl: &'a TlsClient,
162    http_method: String,
163    uri: hyper::Uri,
164    rq: Option<&'a Request>,
165    headers: Vec<(hyper::header::HeaderName, String)>,
166
167    _marker: std::marker::PhantomData<Response>,
168}
169
170impl<'a, Request: Serialize + std::fmt::Debug, Response: DeserializeOwned + std::fmt::Debug>
171    Download<'a, Request, Response>
172{
173    /// Trivial adapter for `download()`: Store downloaded data into a `Vec<u8>`.
174    pub async fn do_it_to_buf(&mut self, buf: &mut Vec<u8>) -> Result<DownloadResult<Response>> {
175        self.do_it(Some(buf)).await
176    }
177
178    /// Run the actual download, streaming the response into the supplied `dst`. If the server
179    /// responded with a `Response` object, no download is started; the response is wrapped in the
180    /// `DownloadResult<Response>` object.
181    ///
182    /// Whether a download takes place or you receive a structured `Response` (i.e. a JSON object)
183    /// depends on the `Content-Type` sent by the server. It is an error to attempt a download
184    /// without specifying `dst`. Often, whether a download takes place is influenced by the
185    /// request parameters. For example, `alt = media` is frequently used in Google APIs to
186    /// indicate that a download is expected.
187    pub async fn do_it(
188        &mut self,
189        dst: Option<&mut (dyn AsyncWriteUnpin)>,
190    ) -> Result<DownloadResult<Response>> {
191        use std::str::FromStr;
192
193        let mut http_response;
194        let mut n_redirects = 0;
195        let mut uri = self.uri.clone();
196
197        // Follow redirects.
198        loop {
199            let mut reqb = hyper::Request::builder()
200                .uri(&uri)
201                .method(self.http_method.as_str());
202            for (k, v) in self.headers.iter() {
203                reqb = reqb.header(k, v);
204            }
205
206            let body;
207            if let Some(rq) = self.rq.take() {
208                body = hyper::Body::from(
209                    serde_json::to_string(&rq).context(format!("{:?}", self.rq))?,
210                );
211            } else {
212                body = hyper::Body::from("");
213            }
214
215            let http_request = reqb.body(body)?;
216            debug!(
217                "Download::do_it: Redirect {}, Launching HTTP request: {:?}",
218                n_redirects, http_request
219            );
220
221            http_response = Some(self.cl.request(http_request).await?);
222            let status = http_response.as_ref().unwrap().status();
223            debug!(
224                "Download::do_it: Redirect {}, HTTP response with status {} received: {:?}",
225                n_redirects, status, http_response
226            );
227
228            // Server returns data - either download or structured response (JSON).
229            if status.is_success() {
230                let headers = http_response.as_ref().unwrap().headers();
231
232                // Check if an object was returned.
233                if let Some(ct) = headers.get(hyper::header::CONTENT_TYPE) {
234                    if ct.to_str()?.contains("application/json") {
235                        let response_body =
236                            hyper::body::to_bytes(http_response.unwrap().into_body()).await?;
237                        return serde_json::from_reader(response_body.as_ref())
238                            .map_err(|e| anyhow::Error::from(e).context(body_to_str(response_body)))
239                            .map(DownloadResult::Response);
240                    }
241                }
242
243                if let Some(dst) = dst {
244                    use tokio::io::AsyncWriteExt;
245                    let mut response_body = http_response.unwrap().into_body();
246                    while let Some(chunk) = tokio_stream::StreamExt::next(&mut response_body).await
247                    {
248                        let chunk = chunk?;
249                        // Chunks often contain just a few kilobytes.
250                        // info!("received chunk with size {}", chunk.as_ref().len());
251                        dst.write(chunk.as_ref()).await?;
252                    }
253                    return Ok(DownloadResult::Downloaded);
254                } else {
255                    return Err(ApiError::DataAvailableError(format!(
256                        "No `dst` was supplied to download data to. Content-Type: {:?}",
257                        headers.get(hyper::header::CONTENT_TYPE)
258                    ))
259                    .into());
260                }
261
262            // Server redirects us.
263            } else if status.is_redirection() {
264                n_redirects += 1;
265                let new_location = http_response
266                    .as_ref()
267                    .unwrap()
268                    .headers()
269                    .get(hyper::header::LOCATION);
270                if new_location.is_none() {
271                    return Err(ApiError::RedirectError(format!(
272                        "Redirect doesn't contain a Location: header"
273                    ))
274                    .into());
275                }
276                uri = hyper::Uri::from_str(new_location.unwrap().to_str()?)?;
277                continue;
278            } else if !status.is_success() {
279                return Err(ApiError::HTTPResponseError(
280                    status,
281                    body_to_str(hyper::body::to_bytes(http_response.unwrap().into_body()).await?),
282                )
283                .into());
284            }
285
286            // Too many redirects.
287            if n_redirects > 5 {
288                return Err(ApiError::HTTPTooManyRedirectsError.into());
289            }
290        }
291    }
292}
293
294pub async fn do_download<
295    'a,
296    Req: Serialize + std::fmt::Debug,
297    Resp: DeserializeOwned + std::fmt::Debug,
298>(
299    cl: &'a TlsClient,
300    path: &str,
301    headers: Vec<(hyper::header::HeaderName, String)>,
302    http_method: String,
303    rq: Option<&'a Req>,
304) -> Result<Download<'a, Req, Resp>> {
305    use std::str::FromStr;
306    Ok(Download {
307        cl: cl,
308        http_method: http_method,
309        uri: hyper::Uri::from_str(path)?,
310        rq: rq,
311        headers: headers,
312        _marker: Default::default(),
313    })
314}
315
316/// A resumable upload in progress, useful for sending large objects.
317pub struct ResumableUpload<'client, Response: DeserializeOwned> {
318    dest: hyper::Uri,
319    cl: &'client TlsClient,
320    max_chunksize: usize,
321    _resp: std::marker::PhantomData<Response>,
322}
323
324fn format_content_range(from: usize, to: usize, total: usize) -> String {
325    format!("bytes {}-{}/{}", from, to, total)
326}
327
328fn parse_response_range(rng: &str) -> Option<(usize, usize)> {
329    if let Some(main) = rng.strip_prefix("bytes=") {
330        let mut parts = main.split("-");
331        let (first, second) = (parts.next(), parts.next());
332        if first.is_none() || second.is_none() {
333            return None;
334        }
335        Some((
336            usize::from_str_radix(first.unwrap(), 10).unwrap_or(0),
337            usize::from_str_radix(second.unwrap(), 10).unwrap_or(0),
338        ))
339    } else {
340        None
341    }
342}
343
344impl<'client, Response: DeserializeOwned> ResumableUpload<'client, Response> {
345    pub fn new(
346        to: hyper::Uri,
347        cl: &'client TlsClient,
348        max_chunksize: usize,
349    ) -> ResumableUpload<'client, Response> {
350        ResumableUpload {
351            dest: to,
352            cl: cl,
353            max_chunksize: max_chunksize,
354            _resp: Default::default(),
355        }
356    }
357    pub fn set_max_chunksize(&mut self, size: usize) -> Result<&mut Self> {
358        if size % (1024 * 256) != 0 {
359            Err(ApiError::InputDataError(
360                "ResumableUpload: max_chunksize must be multiple of 256 KiB.".into(),
361            )
362            .into())
363        } else {
364            self.max_chunksize = size;
365            Ok(self)
366        }
367    }
368
369    /// Upload data from a reader; use only if the reader cannot be seeked. Memory usage is higher,
370    /// because data needs to be cached if the server hasn't accepted all data.
371    pub async fn upload<R: tokio::io::AsyncRead + std::marker::Unpin>(
372        &self,
373        mut f: R,
374        size: usize,
375    ) -> Result<Response> {
376        use tokio::io::AsyncReadExt;
377
378        // Cursor to current position in stream.
379        let mut current = 0;
380        // Buffer portion that we couldn't send previously.
381        let mut previously_unsent = None;
382        loop {
383            let chunksize = if (size - current) > self.max_chunksize {
384                self.max_chunksize
385            } else {
386                size - current
387            };
388
389            let mut buf: Vec<u8>;
390            let read_from_stream;
391            if let Some(buf2) = previously_unsent.take() {
392                buf = buf2;
393                read_from_stream = buf.len();
394            } else {
395                buf = vec![0 as u8; chunksize];
396                // Move buffer into body.
397                read_from_stream = f.read_exact(&mut buf).await?;
398                buf.resize(read_from_stream, 0);
399            }
400
401            let reqb = hyper::Request::builder()
402                .uri(self.dest.clone())
403                .method(hyper::Method::PUT)
404                .header(hyper::header::CONTENT_LENGTH, read_from_stream)
405                .header(
406                    hyper::header::CONTENT_RANGE,
407                    format_content_range(current, current + read_from_stream - 1, size),
408                )
409                .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
410            let request = reqb.body(hyper::Body::from(buf[..].to_vec()))?;
411            debug!("upload_file: Launching HTTP request: {:?}", request);
412
413            let response = self.cl.request(request).await?;
414            debug!("upload_file: Received response: {:?}", response);
415
416            let status = response.status();
417            // 308 means: continue upload.
418            if !status.is_success() && status.as_u16() != 308 {
419                debug!("upload_file: Encountered error: {}", status);
420                return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
421                    body_to_str(hyper::body::to_bytes(response.into_body()).await?),
422                );
423            }
424
425            let sent;
426            if let Some(rng) = response.headers().get(hyper::header::RANGE) {
427                if let Some((_, to)) = parse_response_range(rng.to_str()?) {
428                    sent = to + 1 - current;
429                    if sent < read_from_stream {
430                        previously_unsent = Some(buf.split_off(sent));
431                    }
432                    current = to + 1;
433                } else {
434                    sent = read_from_stream;
435                    current += read_from_stream;
436                }
437            } else {
438                sent = read_from_stream;
439                current += read_from_stream;
440            }
441
442            debug!(
443                "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
444                chunksize, sent, size, self.dest
445            );
446
447            if current >= size {
448                let headers = response.headers().clone();
449                let response_body = hyper::body::to_bytes(response.into_body()).await?;
450
451                if !status.is_success() {
452                    return Err(Error::from(ApiError::HTTPResponseError(
453                        status,
454                        body_to_str(response_body),
455                    ))
456                    .context(format!("{:?}", headers)));
457                } else {
458                    return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
459                        anyhow::Error::from(e)
460                            .context(body_to_str(response_body))
461                            .context(format!("{:?}", headers))
462                    });
463                }
464            }
465        }
466    }
467    /// Upload content from a file. This is most efficient if you have an actual file, as seek can
468    /// be used in case the server didn't accept all data.
469    pub async fn upload_file(&self, mut f: tokio::fs::File) -> Result<Response> {
470        use tokio::io::AsyncReadExt;
471
472        let len = f.metadata().await?.len() as usize;
473        let mut current = 0;
474        loop {
475            let chunksize = if (len - current) > self.max_chunksize {
476                self.max_chunksize
477            } else {
478                len - current
479            };
480
481            f.seek(std::io::SeekFrom::Start(current as u64)).await?;
482
483            let mut buf = vec![0 as u8; chunksize];
484            // Move buffer into body.
485            let read_from_stream = f.read_exact(&mut buf).await?;
486            buf.resize(read_from_stream, 0);
487
488            let reqb = hyper::Request::builder()
489                .uri(self.dest.clone())
490                .method(hyper::Method::PUT)
491                .header(hyper::header::CONTENT_LENGTH, read_from_stream)
492                .header(
493                    hyper::header::CONTENT_RANGE,
494                    format_content_range(current, current + read_from_stream - 1, len),
495                )
496                .header(hyper::header::CONTENT_TYPE, "application/octet-stream");
497            let request = reqb.body(hyper::Body::from(buf))?;
498            debug!("upload_file: Launching HTTP request: {:?}", request);
499
500            let response = self.cl.request(request).await?;
501            debug!("upload_file: Received response: {:?}", response);
502
503            let status = response.status();
504            // 308 means: continue upload.
505            if !status.is_success() && status.as_u16() != 308 {
506                debug!("upload_file: Encountered error: {}", status);
507                return Err(ApiError::HTTPResponseError(status, status.to_string())).context(
508                    body_to_str(hyper::body::to_bytes(response.into_body()).await?),
509                );
510            }
511
512            let sent;
513            if let Some(rng) = response.headers().get(hyper::header::RANGE) {
514                if let Some((_, to)) = parse_response_range(rng.to_str()?) {
515                    sent = to + 1 - current;
516                    current = to + 1;
517                } else {
518                    sent = read_from_stream;
519                    current += read_from_stream;
520                }
521            } else {
522                // This can also happen if response code is 200.
523                sent = read_from_stream;
524                current += read_from_stream;
525            }
526
527            debug!(
528                "upload_file: Sent {} bytes (successful: {}) of total {} to {}",
529                chunksize, sent, len, self.dest
530            );
531
532            if current >= len {
533                let headers = response.headers().clone();
534                let response_body = hyper::body::to_bytes(response.into_body()).await?;
535
536                if !status.is_success() {
537                    return Err(Error::from(ApiError::HTTPResponseError(
538                        status,
539                        body_to_str(response_body),
540                    ))
541                    .context(format!("{:?}", headers)));
542                } else {
543                    return serde_json::from_reader(response_body.as_ref()).map_err(|e| {
544                        anyhow::Error::from(e)
545                            .context(body_to_str(response_body))
546                            .context(format!("{:?}", headers))
547                    });
548                }
549            }
550        }
551    }
552}