google_apis_common/
lib.rs

1pub mod auth;
2pub mod field_mask;
3pub mod serde;
4pub mod url;
5
6pub use auth::{GetToken, NoToken};
7pub use field_mask::FieldMask;
8
9use std::io::{Cursor, Read, Seek, SeekFrom, Write};
10use std::str::FromStr;
11use std::time::Duration;
12
13use hyper::header::{HeaderMap, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE, USER_AGENT};
14use hyper::Method;
15use hyper::StatusCode;
16use mime::Mime;
17use tokio::time::sleep;
18
19const LINE_ENDING: &str = "\r\n";
20
21/// A body.
22pub type Body = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
23
24/// A response.
25pub type Response = hyper::Response<Body>;
26
27/// A client.
28pub type Client<C> = hyper_util::client::legacy::Client<C, Body>;
29
30/// A connector.
31pub trait Connector:
32    hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
33{
34}
35
36impl<T> Connector for T where
37    T: hyper_util::client::legacy::connect::Connect + Clone + Send + Sync + 'static
38{
39}
40
41pub enum Retry {
42    /// Signal you don't want to retry
43    Abort,
44    /// Signals you want to retry after the given duration
45    After(Duration),
46}
47
48#[derive(PartialEq, Eq)]
49pub enum UploadProtocol {
50    Simple,
51    Resumable,
52}
53
54/// Identifies the Hub. There is only one per library, this trait is supposed
55/// to make intended use more explicit.
56/// The hub allows to access all resource methods more easily.
57pub trait Hub {}
58
59/// Identifies types for building methods of a particular resource type
60pub trait MethodsBuilder {}
61
62/// Identifies types which represent builders for a particular resource method
63pub trait CallBuilder {}
64
65/// Identifies types which can be inserted and deleted.
66/// Types with this trait are most commonly used by clients of this API.
67pub trait Resource {}
68
69/// Identifies types which are used in API responses.
70pub trait ResponseResult {}
71
72/// Identifies types which are used in API requests.
73pub trait RequestValue {}
74
75/// Identifies types which are not actually used by the API
76/// This might be a bug within the google API schema.
77pub trait UnusedType {}
78
79/// Identifies types which are only used as part of other types, which
80/// usually are carrying the `Resource` trait.
81pub trait Part {}
82
83/// Identifies types which are only used by other types internally.
84/// They have no special meaning, this trait just marks them for completeness.
85pub trait NestedType {}
86
87/// A utility to specify reader types which provide seeking capabilities too
88pub trait ReadSeek: Seek + Read + Send {}
89impl<T: Seek + Read + Send> ReadSeek for T {}
90
91/// A trait for all types that can convert themselves into a *parts* string
92pub trait ToParts {
93    fn to_parts(&self) -> String;
94}
95
96/// A trait specifying functionality to help controlling any request performed by the API.
97/// The trait has a conservative default implementation.
98///
99/// It contains methods to deal with all common issues, as well with the ones related to
100/// uploading media
101pub trait Delegate: Send {
102    /// Called at the beginning of any API request. The delegate should store the method
103    /// information if he is interesting in knowing more context when further calls to it
104    /// are made.
105    /// The matching `finished()` call will always be made, no matter whether or not the API
106    /// request was successful. That way, the delegate may easily maintain a clean state
107    /// between various API calls.
108    fn begin(&mut self, _info: MethodInfo) {}
109
110    /// Called whenever there is an [HttpError](hyper_util::client::legacy::Error), usually if
111    /// there are network problems.
112    ///
113    /// If you choose to retry after a duration, the duration should be chosen using the
114    /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff).
115    ///
116    /// Return retry information.
117    fn http_error(&mut self, _err: &hyper_util::client::legacy::Error) -> Retry {
118        Retry::Abort
119    }
120
121    /// Called whenever there is the need for your applications API key after
122    /// the official authenticator implementation didn't provide one, for some reason.
123    /// If this method returns None as well, the underlying operation will fail
124    fn api_key(&mut self) -> Option<String> {
125        None
126    }
127
128    /// Called whenever the Authenticator didn't yield a token. The delegate
129    /// may attempt to provide one, or just take it as a general information about the
130    /// impending failure.
131    /// The given Error provides information about why the token couldn't be acquired in the
132    /// first place
133    fn token(
134        &mut self,
135        e: Box<dyn std::error::Error + Send + Sync>,
136    ) -> std::result::Result<Option<String>, Box<dyn std::error::Error + Send + Sync>> {
137        Err(e)
138    }
139
140    /// Called during resumable uploads to provide a URL for the impending upload.
141    /// It was saved after a previous call to `store_upload_url(...)`, and if not None,
142    /// will be used instead of asking the server for a new upload URL.
143    /// This is useful in case a previous resumable upload was aborted/canceled, but should now
144    /// be resumed.
145    /// The returned URL will be used exactly once - if it fails again and the delegate allows
146    /// to retry, we will ask the server for a new upload URL.
147    fn upload_url(&mut self) -> Option<String> {
148        None
149    }
150
151    /// Called after we have retrieved a new upload URL for a resumable upload to store it
152    /// in case we fail or cancel. That way, we can attempt to resume the upload later,
153    /// see `upload_url()`.
154    /// It will also be called with None after a successful upload, which allows the delegate
155    /// to forget the URL. That way, we will not attempt to resume an upload that has already
156    /// finished.
157    fn store_upload_url(&mut self, url: Option<&str>) {
158        let _ = url;
159    }
160
161    /// Called whenever a server response could not be decoded from json.
162    /// It's for informational purposes only, the caller will return with an error
163    /// accordingly.
164    ///
165    /// # Arguments
166    ///
167    /// * `json_encoded_value` - The json-encoded value which failed to decode.
168    /// * `json_decode_error`  - The decoder error
169    fn response_json_decode_error(
170        &mut self,
171        json_encoded_value: &str,
172        json_decode_error: &serde_json::Error,
173    ) {
174        let _ = json_encoded_value;
175        let _ = json_decode_error;
176    }
177
178    /// Called whenever the http request returns with a non-success status code.
179    /// This can involve authentication issues, or anything else that very much
180    /// depends on the used API method.
181    /// The delegate should check the status, header and decoded json error to decide
182    /// whether to retry or not. In the latter case, the underlying call will fail.
183    ///
184    /// If you choose to retry after a duration, the duration should be chosen using the
185    /// [exponential backoff algorithm](http://en.wikipedia.org/wiki/Exponential_backoff).
186    fn http_failure(&mut self, _: &Response, _err: Option<&serde_json::Value>) -> Retry {
187        Retry::Abort
188    }
189
190    /// Called prior to sending the main request of the given method. It can be used to time
191    /// the call or to print progress information.
192    /// It's also useful as you can be sure that a request will definitely be made.
193    fn pre_request(&mut self) {}
194
195    /// Return the size of each chunk of a resumable upload.
196    /// Must be a power of two, with 1<<18 being the smallest allowed chunk size.
197    /// Will be called once before starting any resumable upload.
198    fn chunk_size(&mut self) -> u64 {
199        1 << 23
200    }
201
202    /// Called before the given chunk is uploaded to the server.
203    /// If true is returned, the upload will be interrupted.
204    /// However, it may be resumable if you stored the upload URL in a previous call
205    /// to `store_upload_url()`
206    fn cancel_chunk_upload(&mut self, chunk: &ContentRange) -> bool {
207        let _ = chunk;
208        false
209    }
210
211    /// Called before the API request method returns, in every case. It can be used to clean up
212    /// internal state between calls to the API.
213    /// This call always has a matching call to `begin(...)`.
214    ///
215    /// # Arguments
216    ///
217    /// * `is_success` - a true value indicates the operation was successful. If false, you should
218    ///   discard all values stored during `store_upload_url`.
219    fn finished(&mut self, is_success: bool) {
220        let _ = is_success;
221    }
222}
223
224/// A delegate with a conservative default implementation, which is used if no other delegate is
225/// set.
226#[derive(Default)]
227pub struct DefaultDelegate;
228
229impl Delegate for DefaultDelegate {}
230
231#[derive(Debug)]
232pub enum Error {
233    /// The http connection failed
234    HttpError(hyper_util::client::legacy::Error),
235
236    /// An attempt was made to upload a resource with size stored in field `.0`
237    /// even though the maximum upload size is what is stored in field `.1`.
238    UploadSizeLimitExceeded(u64, u64),
239
240    /// Represents information about a request that was not understood by the server.
241    /// Details are included.
242    BadRequest(serde_json::Value),
243
244    /// We needed an API key for authentication, but didn't obtain one.
245    /// Neither through the authenticator, nor through the Delegate.
246    MissingAPIKey,
247
248    /// We required a Token, but didn't get one from the Authenticator
249    MissingToken(Box<dyn std::error::Error + Send + Sync>),
250
251    /// The delgate instructed to cancel the operation
252    Cancelled,
253
254    /// An additional, free form field clashed with one of the built-in optional ones
255    FieldClash(&'static str),
256
257    /// Shows that we failed to decode the server response.
258    /// This can happen if the protocol changes in conjunction with strict json decoding.
259    JsonDecodeError(String, serde_json::Error),
260
261    /// Indicates an HTTP repsonse with a non-success status code
262    Failure(Response),
263
264    /// An IO error occurred while reading a stream into memory
265    Io(std::io::Error),
266}
267
268impl std::fmt::Display for Error {
269    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
270        match self {
271            Error::Io(err) => err.fmt(f),
272            Error::HttpError(err) => err.fmt(f),
273            Error::UploadSizeLimitExceeded(resource_size, max_size) => writeln!(
274                f,
275                "The media size {resource_size} exceeds the maximum allowed upload size of {max_size}"
276            ),
277            Error::MissingAPIKey => {
278                writeln!(
279                    f,
280                    "The application's API key was not found in the configuration"
281                )?;
282                writeln!(
283                    f,
284                    "It is used as there are no Scopes defined for this method."
285                )
286            }
287            Error::BadRequest(message) => writeln!(f, "Bad Request: {message}"),
288            Error::MissingToken(e) => writeln!(f, "Token retrieval failed: {e}"),
289            Error::Cancelled => writeln!(f, "Operation cancelled by delegate"),
290            Error::FieldClash(field) => writeln!(
291                f,
292                "The custom parameter '{field}' is already provided natively by the CallBuilder."
293            ),
294            Error::JsonDecodeError(json_str, err) => writeln!(f, "{err}: {json_str}"),
295            Error::Failure(response) => {
296                writeln!(f, "Http status indicates failure: {response:?}")
297            }
298        }
299    }
300}
301
302impl std::error::Error for Error {
303    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
304        match *self {
305            Error::HttpError(ref err) => err.source(),
306            Error::JsonDecodeError(_, ref err) => err.source(),
307            _ => None,
308        }
309    }
310}
311
312impl From<std::io::Error> for Error {
313    fn from(err: std::io::Error) -> Self {
314        Error::Io(err)
315    }
316}
317
318/// A universal result type used as return for all calls.
319pub type Result<T> = std::result::Result<T, Error>;
320
321/// Contains information about an API request.
322pub struct MethodInfo {
323    pub id: &'static str,
324    pub http_method: Method,
325}
326
327const BOUNDARY: &str = "MDuXWGyeE33QFXGchb2VFWc4Z7945d";
328
329/// Provides a `Read` interface that converts multiple parts into the protocol
330/// identified by [RFC2387](https://tools.ietf.org/html/rfc2387).
331/// **Note**: This implementation is just as rich as it needs to be to perform uploads
332/// to google APIs, and might not be a fully-featured implementation.
333#[derive(Default)]
334pub struct MultiPartReader<'a> {
335    raw_parts: Vec<(HeaderMap, &'a mut (dyn Read + Send))>,
336    current_part: Option<(Cursor<Vec<u8>>, &'a mut (dyn Read + Send))>,
337    last_part_boundary: Option<Cursor<Vec<u8>>>,
338}
339
340impl<'a> MultiPartReader<'a> {
341    // TODO: This should be an associated constant
342    /// Returns the mime-type representing our multi-part message.
343    /// Use it with the ContentType header.
344    pub fn mime_type() -> Mime {
345        Mime::from_str(&format!("multipart/related;boundary={BOUNDARY}")).expect("valid mimetype")
346    }
347
348    /// Reserve memory for exactly the given amount of parts
349    pub fn reserve_exact(&mut self, cap: usize) {
350        self.raw_parts.reserve_exact(cap);
351    }
352
353    /// Add a new part to the queue of parts to be read on the first `read` call.
354    ///
355    /// # Arguments
356    ///
357    /// `headers` - identifying the body of the part. It's similar to the header
358    ///             in an ordinary single-part call, and should thus contain the
359    ///             same information.
360    /// `reader`  - a reader providing the part's body
361    /// `size`    - the amount of bytes provided by the reader. It will be put onto the header as
362    ///             content-size.
363    /// `mime`    - It will be put onto the content type
364    pub fn add_part(
365        &mut self,
366        reader: &'a mut (dyn Read + Send),
367        size: u64,
368        mime_type: Mime,
369    ) -> &mut MultiPartReader<'a> {
370        let mut headers = HeaderMap::new();
371        headers.insert(
372            CONTENT_TYPE,
373            hyper::header::HeaderValue::from_str(mime_type.as_ref()).unwrap(),
374        );
375        headers.insert(CONTENT_LENGTH, size.into());
376        self.raw_parts.push((headers, reader));
377        self
378    }
379
380    /// Returns true if we are totally used
381    fn is_depleted(&self) -> bool {
382        self.raw_parts.is_empty()
383            && self.current_part.is_none()
384            && self.last_part_boundary.is_none()
385    }
386
387    /// Returns true if we are handling our last part
388    fn is_last_part(&self) -> bool {
389        self.raw_parts.is_empty() && self.current_part.is_some()
390    }
391}
392
393impl Read for MultiPartReader<'_> {
394    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
395        match (
396            self.raw_parts.len(),
397            self.current_part.is_none(),
398            self.last_part_boundary.is_none(),
399        ) {
400            (_, _, false) => {
401                let br = self
402                    .last_part_boundary
403                    .as_mut()
404                    .unwrap()
405                    .read(buf)
406                    .unwrap_or(0);
407                if br < buf.len() {
408                    self.last_part_boundary = None;
409                }
410                return Ok(br);
411            }
412            (0, true, true) => return Ok(0),
413            (n, true, _) if n > 0 => {
414                use std::fmt::Write as _;
415                let (headers, reader) = self.raw_parts.remove(0);
416
417                let mut encoded_headers = String::new();
418                for (k, v) in &headers {
419                    if !encoded_headers.is_empty() {
420                        encoded_headers.push_str(LINE_ENDING);
421                    }
422
423                    write!(encoded_headers, "{}: {}", k, v.to_str().unwrap())
424                        .map_err(std::io::Error::other)?;
425                }
426
427                let mut c = Cursor::new(Vec::<u8>::new());
428                //TODO: The first line ending should be omitted for the first part,
429                // fortunately Google's API serves don't seem to mind.
430                (write!(
431                    &mut c,
432                    "{LINE_ENDING}--{BOUNDARY}{LINE_ENDING}{encoded_headers}{LINE_ENDING}{LINE_ENDING}"
433                ))?;
434                c.rewind()?;
435                self.current_part = Some((c, reader));
436            }
437            _ => {}
438        }
439
440        // read headers as long as possible
441        let (hb, rr) = {
442            let &mut (ref mut c, ref mut reader) = self.current_part.as_mut().unwrap();
443            let b = c.read(buf).unwrap_or(0);
444            (b, reader.read(&mut buf[b..]))
445        };
446
447        match rr {
448            Ok(bytes_read) => {
449                if hb < buf.len() && bytes_read == 0 {
450                    if self.is_last_part() {
451                        // before clearing the last part, we will add the boundary that
452                        // will be written last
453                        self.last_part_boundary = Some(Cursor::new(
454                            format!("{LINE_ENDING}--{BOUNDARY}--{LINE_ENDING}").into_bytes(),
455                        ))
456                    }
457                    // We are depleted - this can trigger the next part to come in
458                    self.current_part = None;
459                }
460                let mut total_bytes_read = hb + bytes_read;
461                while total_bytes_read < buf.len() && !self.is_depleted() {
462                    match self.read(&mut buf[total_bytes_read..]) {
463                        Ok(br) => total_bytes_read += br,
464                        Err(err) => return Err(err),
465                    }
466                }
467                Ok(total_bytes_read)
468            }
469            Err(err) => {
470                // fail permanently
471                self.current_part = None;
472                self.last_part_boundary = None;
473                self.raw_parts.clear();
474                Err(err)
475            }
476        }
477    }
478}
479
480/// The `X-Upload-Content-Type` header.
481///
482/// Generated via rustc --pretty expanded -Z unstable-options, and manually
483/// processed to be more readable.
484#[derive(PartialEq, Eq, Debug, Clone)]
485pub struct XUploadContentType(pub Mime);
486
487impl std::ops::Deref for XUploadContentType {
488    type Target = Mime;
489    fn deref(&self) -> &Mime {
490        &self.0
491    }
492}
493impl std::ops::DerefMut for XUploadContentType {
494    fn deref_mut(&mut self) -> &mut Mime {
495        &mut self.0
496    }
497}
498impl std::fmt::Display for XUploadContentType {
499    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
500        std::fmt::Display::fmt(&**self, f)
501    }
502}
503
504#[derive(Clone, PartialEq, Eq, Debug)]
505pub struct Chunk {
506    pub first: u64,
507    pub last: u64,
508}
509
510impl std::fmt::Display for Chunk {
511    fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
512        (write!(fmt, "{}-{}", self.first, self.last)).ok();
513        Ok(())
514    }
515}
516
517impl FromStr for Chunk {
518    type Err = &'static str;
519
520    /// NOTE: only implements `%i-%i`, not `*`
521    fn from_str(s: &str) -> std::result::Result<Chunk, &'static str> {
522        let parts: Vec<&str> = s.split('-').collect();
523        if parts.len() != 2 {
524            return Err("Expected two parts: %i-%i");
525        }
526        Ok(Chunk {
527            first: match FromStr::from_str(parts[0]) {
528                Ok(d) => d,
529                _ => return Err("Couldn't parse 'first' as digit"),
530            },
531            last: match FromStr::from_str(parts[1]) {
532                Ok(d) => d,
533                _ => return Err("Couldn't parse 'last' as digit"),
534            },
535        })
536    }
537}
538
539/// Implements the Content-Range header, for serialization only
540#[derive(Clone, PartialEq, Eq, Debug)]
541pub struct ContentRange {
542    pub range: Option<Chunk>,
543    pub total_length: u64,
544}
545
546impl ContentRange {
547    pub fn header_value(&self) -> String {
548        format!(
549            "bytes {}/{}",
550            match self.range {
551                Some(ref c) => format!("{c}"),
552                None => "*".to_string(),
553            },
554            self.total_length
555        )
556    }
557}
558
559#[derive(Clone, PartialEq, Eq, Debug)]
560pub struct RangeResponseHeader(pub Chunk);
561
562impl RangeResponseHeader {
563    fn from_bytes(raw: &[u8]) -> Self {
564        if !raw.is_empty() {
565            if let Ok(s) = std::str::from_utf8(raw) {
566                const PREFIX: &str = "bytes ";
567                if let Some(stripped) = s.strip_prefix(PREFIX) {
568                    if let Ok(c) = <Chunk as FromStr>::from_str(stripped) {
569                        return RangeResponseHeader(c);
570                    }
571                }
572            }
573        }
574
575        panic!("Unable to parse Range header {raw:?}")
576    }
577}
578
579/// A utility type to perform a resumable upload from start to end.
580pub struct ResumableUploadHelper<'a, A: 'a, C>
581where
582    C: Connector,
583{
584    pub client: &'a Client<C>,
585    pub delegate: &'a mut dyn Delegate,
586    pub start_at: Option<u64>,
587    pub auth: &'a A,
588    pub user_agent: &'a str,
589    pub auth_header: String,
590    pub url: &'a str,
591    pub reader: &'a mut dyn ReadSeek,
592    pub media_type: Mime,
593    pub content_length: u64,
594}
595
596impl<A, C> ResumableUploadHelper<'_, A, C>
597where
598    C: Connector,
599{
600    async fn query_transfer_status(
601        &mut self,
602    ) -> std::result::Result<u64, std::result::Result<Response, hyper_util::client::legacy::Error>>
603    {
604        loop {
605            match self
606                .client
607                .request(
608                    hyper::Request::builder()
609                        .method(hyper::Method::POST)
610                        .uri(self.url)
611                        .header(USER_AGENT, self.user_agent.to_string())
612                        .header(
613                            "Content-Range",
614                            ContentRange {
615                                range: None,
616                                total_length: self.content_length,
617                            }
618                            .header_value(),
619                        )
620                        .header(AUTHORIZATION, self.auth_header.clone())
621                        .body(to_body::<String>(None))
622                        .unwrap(),
623                )
624                .await
625            {
626                Ok(r) => {
627                    // 308 = resume-incomplete == PermanentRedirect
628                    let headers = r.headers().clone();
629                    let h: RangeResponseHeader = match headers.get("Range") {
630                        Some(hh) if r.status() == StatusCode::PERMANENT_REDIRECT => {
631                            RangeResponseHeader::from_bytes(hh.as_bytes())
632                        }
633                        None | Some(_) => {
634                            let (parts, body) = r.into_parts();
635                            let body = to_body(to_bytes(body).await);
636                            let response = Response::from_parts(parts, body);
637                            if let Retry::After(d) = self.delegate.http_failure(&response, None) {
638                                sleep(d).await;
639                                continue;
640                            }
641                            return Err(Ok(response));
642                        }
643                    };
644                    return Ok(h.0.last);
645                }
646                Err(err) => {
647                    if let Retry::After(d) = self.delegate.http_error(&err) {
648                        sleep(d).await;
649                        continue;
650                    }
651                    return Err(Err(err));
652                }
653            }
654        }
655    }
656
657    /// returns None if operation was cancelled by delegate, or the HttpResult.
658    /// It can be that we return the result just because we didn't understand the status code -
659    /// caller should check for status himself before assuming it's OK to use
660    pub async fn upload(
661        &mut self,
662    ) -> Option<std::result::Result<Response, hyper_util::client::legacy::Error>> {
663        let mut start = match self.start_at {
664            Some(s) => s,
665            None => match self.query_transfer_status().await {
666                Ok(s) => s,
667                Err(result) => return Some(result),
668            },
669        };
670
671        const MIN_CHUNK_SIZE: u64 = 1 << 18;
672        let chunk_size = match self.delegate.chunk_size() {
673            cs if cs > MIN_CHUNK_SIZE => cs,
674            _ => MIN_CHUNK_SIZE,
675        };
676
677        loop {
678            self.reader.seek(SeekFrom::Start(start)).unwrap();
679
680            let request_size = match self.content_length - start {
681                rs if rs > chunk_size => chunk_size,
682                rs => rs,
683            };
684
685            let mut section_reader = self.reader.take(request_size);
686            let mut bytes = vec![];
687            section_reader.read_to_end(&mut bytes).unwrap();
688            let range_header = ContentRange {
689                range: Some(Chunk {
690                    first: start,
691                    last: start + request_size - 1,
692                }),
693                total_length: self.content_length,
694            };
695            if self.delegate.cancel_chunk_upload(&range_header) {
696                return None;
697            }
698            match self
699                .client
700                .request(
701                    hyper::Request::builder()
702                        .uri(self.url)
703                        .method(hyper::Method::POST)
704                        .header("Content-Range", range_header.header_value())
705                        .header(CONTENT_TYPE, format!("{}", self.media_type))
706                        .header(USER_AGENT, self.user_agent.to_string())
707                        .body(to_body(bytes.into()))
708                        .unwrap(),
709                )
710                .await
711            {
712                Ok(response) => {
713                    start += request_size;
714
715                    if response.status() == StatusCode::PERMANENT_REDIRECT {
716                        continue;
717                    }
718
719                    let (parts, body) = response.into_parts();
720                    let success = parts.status.is_success();
721                    let bytes = to_bytes(body).await.unwrap_or_default();
722                    let error = if !success {
723                        serde_json::from_str(&to_string(&bytes)).ok()
724                    } else {
725                        None
726                    };
727                    let response = to_response(parts, bytes.into());
728
729                    if !success {
730                        if let Retry::After(d) =
731                            self.delegate.http_failure(&response, error.as_ref())
732                        {
733                            sleep(d).await;
734                            continue;
735                        }
736                    }
737                    return Some(Ok(response));
738                }
739                Err(err) => {
740                    if let Retry::After(d) = self.delegate.http_error(&err) {
741                        sleep(d).await;
742                        continue;
743                    }
744                    return Some(Err(err));
745                }
746            }
747        }
748    }
749}
750
751// TODO(ST): Allow sharing common code between program types
752pub fn remove_json_null_values(value: &mut serde_json::value::Value) {
753    match value {
754        serde_json::value::Value::Object(map) => {
755            map.retain(|_, value| !value.is_null());
756            map.values_mut().for_each(remove_json_null_values);
757        }
758        serde_json::value::Value::Array(arr) => {
759            arr.retain(|value| !value.is_null());
760            arr.iter_mut().for_each(remove_json_null_values);
761        }
762        _ => {}
763    }
764}
765
766#[doc(hidden)]
767pub fn to_body<T>(bytes: Option<T>) -> Body
768where
769    T: Into<hyper::body::Bytes>,
770{
771    use http_body_util::BodyExt;
772
773    fn falliable(_: std::convert::Infallible) -> hyper::Error {
774        unreachable!()
775    }
776
777    let bytes = bytes.map(Into::into).unwrap_or_default();
778    Body::new(http_body_util::Full::from(bytes).map_err(falliable))
779}
780
781#[doc(hidden)]
782pub async fn to_bytes<T>(body: T) -> Option<hyper::body::Bytes>
783where
784    T: hyper::body::Body,
785{
786    use http_body_util::BodyExt;
787    body.collect().await.ok().map(|value| value.to_bytes())
788}
789
790#[doc(hidden)]
791pub fn to_string(bytes: &hyper::body::Bytes) -> std::borrow::Cow<'_, str> {
792    String::from_utf8_lossy(bytes)
793}
794
795#[doc(hidden)]
796pub fn to_response<T>(parts: http::response::Parts, body: Option<T>) -> Response
797where
798    T: Into<hyper::body::Bytes>,
799{
800    Response::from_parts(parts, to_body(body))
801}
802
803#[cfg(test)]
804mod tests {
805    use std::default::Default;
806    use std::str::FromStr;
807
808    use ::serde::{Deserialize, Serialize};
809
810    use super::*;
811
812    #[test]
813    fn serde() {
814        #[derive(Default, Serialize, Deserialize)]
815        struct Foo {
816            opt: Option<String>,
817            req: u32,
818            opt_vec: Option<Vec<String>>,
819            vec: Vec<String>,
820        }
821
822        let f: Foo = Default::default();
823        serde_json::to_string(&f).unwrap(); // should work
824
825        let j = "{\"opt\":null,\"req\":0,\"vec\":[]}";
826        let _f: Foo = serde_json::from_str(j).unwrap();
827
828        // This fails, unless 'vec' is optional
829        // let j = "{\"opt\":null,\"req\":0}";
830        // let f: Foo = serde_json::from_str(j).unwrap();
831
832        #[derive(Default, Serialize, Deserialize)]
833        struct Bar {
834            #[serde(rename = "snooSnoo")]
835            snoo_snoo: String,
836        }
837        serde_json::to_string(&<Bar as Default>::default()).unwrap();
838
839        let j = "{\"snooSnoo\":\"foo\"}";
840        let b: Bar = serde_json::from_str(j).unwrap();
841        assert_eq!(b.snoo_snoo, "foo");
842
843        // We can't have unknown fields with structs.
844        // #[derive(Default, Serialize, Deserialize)]
845        // struct BarOpt {
846        //     #[serde(rename="snooSnoo")]
847        //     snoo_snoo: Option<String>
848        // }
849        // let j = "{\"snooSnoo\":\"foo\",\"foo\":\"bar\"}";
850        // let b: BarOpt = serde_json::from_str(&j).unwrap();
851    }
852
853    #[test]
854    fn byte_range_from_str() {
855        assert_eq!(
856            <Chunk as FromStr>::from_str("2-42"),
857            Ok(Chunk { first: 2, last: 42 })
858        )
859    }
860
861    #[test]
862    fn dyn_delegate_is_send() {
863        fn with_send(_x: impl Send) {}
864
865        let mut dd = DefaultDelegate;
866        let dlg: &mut dyn Delegate = &mut dd;
867        with_send(dlg);
868    }
869
870    #[test]
871    fn test_mime() {
872        let mime = MultiPartReader::mime_type();
873
874        assert_eq!(mime::MULTIPART, mime.type_());
875        assert_eq!("related", mime.subtype());
876        assert_eq!(
877            Some(BOUNDARY),
878            mime.get_param("boundary").map(|x| x.as_str())
879        );
880    }
881}