cloud_copy/backend/
google.rs

1//! Implementation of the Google Cloud Storage backend.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use chrono::DateTime;
7use chrono::Utc;
8use http_cache_stream_reqwest::Cache;
9use http_cache_stream_reqwest::storage::DefaultCacheStorage;
10use reqwest::Body;
11use reqwest::Request;
12use reqwest::Response;
13use reqwest::StatusCode;
14use reqwest::header;
15use reqwest::header::HeaderValue;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::sync::broadcast;
20use tracing::debug;
21use url::Url;
22
23use crate::BLOCK_SIZE_THRESHOLD;
24use crate::Config;
25use crate::Error;
26use crate::GoogleAuthConfig;
27use crate::HttpClient;
28use crate::ONE_MEBIBYTE;
29use crate::Result;
30use crate::TransferEvent;
31use crate::USER_AGENT;
32use crate::UrlExt as _;
33use crate::backend::StorageBackend;
34use crate::backend::Upload;
35use crate::backend::auth::RequestSigner;
36use crate::backend::auth::SignatureProvider;
37use crate::backend::auth::sha256_hex_string;
38use crate::backend::s3::InitiateMultipartUploadResult;
39use crate::backend::s3::ListBucketResult;
40use crate::streams::ByteStream;
41use crate::streams::TransferStream;
42
43/// The root domain for Google Cloud Storage.
44const GOOGLE_ROOT_DOMAIN: &str = "storage.googleapis.com";
45
46/// The maximum number of parts in an upload.
47const MAX_PARTS: u64 = 10000;
48
49/// The minimum size of a part in bytes (5 MiB); applies to every part except
50/// the last.
51const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
52
53/// The maximum size in bytes (5 GiB) for an upload part.
54const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
55
56/// The maximum size of a file on Google in bytes (5 TiB).
57const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
58
59/// The Google date header name.
60const GOOGLE_DATE_HEADER: &str = "x-goog-date";
61
62/// The Google content SHA256 header name.
63const GOOGLE_CONTENT_SHA256_HEADER: &str = "x-goog-content-sha256";
64
65/// Represents a Google-specific copy operation error.
66#[derive(Debug, thiserror::Error)]
67pub enum GoogleError {
68    /// The specified Google Storage block size exceeds the maximum.
69    #[error("Google Storage block size cannot exceed {MAX_PART_SIZE} bytes")]
70    InvalidBlockSize,
71    /// The source size exceeds the supported maximum size.
72    #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
73    MaximumSizeExceeded,
74    /// Invalid URL with an `gs` scheme.
75    #[error("invalid URL with `gs` scheme: the URL is not in a supported format")]
76    InvalidScheme,
77    /// The Google Cloud HMAC secret is invalid.
78    #[error("invalid Google Cloud Storage HMAC secret")]
79    InvalidSecretAccessKey,
80    /// The response was missing an ETag header.
81    #[error("response from server was missing an ETag header")]
82    ResponseMissingETag,
83    /// The bucket name in the URL was invalid.
84    #[error("the bucket name specified in the URL is invalid")]
85    InvalidBucketName,
86    /// Unexpected response from server.
87    #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
88    UnexpectedResponse {
89        /// The response status code.
90        status: reqwest::StatusCode,
91        /// The deserialization error.
92        error: serde_xml_rs::Error,
93    },
94}
95
96/// Represents a Google Cloud Storage signature provider.
97pub struct GoogleSignatureProvider<'a> {
98    /// The Google Storage authentication configuration.
99    auth: &'a GoogleAuthConfig,
100}
101
102impl SignatureProvider for GoogleSignatureProvider<'_> {
103    fn algorithm(&self) -> &str {
104        "GOOG4-HMAC-SHA256"
105    }
106
107    fn secret_key_prefix(&self) -> &str {
108        "GOOG4"
109    }
110
111    fn request_type(&self) -> &str {
112        "goog4_request"
113    }
114
115    fn region(&self) -> &str {
116        "any"
117    }
118
119    fn service(&self) -> &str {
120        "storage"
121    }
122
123    fn date_header_name(&self) -> &str {
124        GOOGLE_DATE_HEADER
125    }
126
127    fn content_hash_header_name(&self) -> &str {
128        GOOGLE_CONTENT_SHA256_HEADER
129    }
130
131    fn access_key_id(&self) -> &str {
132        &self.auth.access_key
133    }
134
135    fn secret_access_key(&self) -> &str {
136        self.auth.secret.expose_secret()
137    }
138}
139
140/// Appends the authentication header to the request.
141fn append_authentication_header(
142    auth: &GoogleAuthConfig,
143    date: DateTime<Utc>,
144    request: &mut Request,
145) -> Result<()> {
146    let signer = RequestSigner::new(GoogleSignatureProvider { auth });
147    let auth = signer
148        .sign(date, request)
149        .ok_or(GoogleError::InvalidSecretAccessKey)?;
150    request.headers_mut().append(
151        header::AUTHORIZATION,
152        HeaderValue::try_from(auth).expect("value should be valid"),
153    );
154    Ok(())
155}
156
157/// URL extensions for Google Cloud Storage.
158trait UrlExt {
159    /// Extracts the bucket name and object path from the URL.
160    ///
161    /// # Panics
162    ///
163    /// Panics if the URL is not a valid S3 URL.
164    fn bucket_and_path(&self) -> (&str, &str);
165}
166
167impl UrlExt for Url {
168    fn bucket_and_path(&self) -> (&str, &str) {
169        let domain = self.domain().expect("URL should have domain");
170
171        if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
172            // Path-style URL of the form https://storage.googleapis.com/<bucket>/<path>
173            let bucket = self
174                .path_segments()
175                .expect("URL should have path")
176                .next()
177                .expect("URL should have at least one path segment");
178
179            (
180                bucket,
181                self.path()
182                    .strip_prefix('/')
183                    .unwrap()
184                    .strip_prefix(bucket)
185                    .unwrap(),
186            )
187        } else {
188            // Virtual host style URL of the form https://<bucket>.storage.googleapis.com/<path>
189            let Some((bucket, _)) = domain.split_once('.') else {
190                panic!("URL domain does not contain a bucket");
191            };
192
193            (bucket, self.path())
194        }
195    }
196}
197
198/// Extension trait for response.
199trait ResponseExt {
200    /// Converts an error response from Google Cloud Storage into an `Error`.
201    async fn into_error(self) -> Error;
202}
203
204impl ResponseExt for Response {
205    async fn into_error(self) -> Error {
206        /// Represents an error response.
207        #[derive(Default, Deserialize)]
208        #[serde(rename = "Error")]
209        struct ErrorResponse {
210            /// The error message.
211            #[serde(rename = "Message", default)]
212            message: String,
213            /// The error details.
214            #[serde(rename = "Details", default)]
215            details: Option<String>,
216        }
217
218        let status = self.status();
219        let text: String = match self.text().await {
220            Ok(text) => text,
221            Err(e) => return e.into(),
222        };
223
224        if text.is_empty() {
225            return Error::Server {
226                status,
227                message: text,
228            };
229        }
230
231        let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
232            Ok(response) => match response.details {
233                Some(details) => {
234                    format!("{message}\ndetails: {details}", message = response.message)
235                }
236                None => response.message,
237            },
238            Err(e) => {
239                return GoogleError::UnexpectedResponse { status, error: e }.into();
240            }
241        };
242
243        Error::Server { status, message }
244    }
245}
246
247/// Represents a completed part of an upload.
248#[derive(Default, Clone, Serialize)]
249#[serde(rename = "Part")]
250pub struct GoogleUploadPart {
251    /// The part number of the upload.
252    #[serde(rename = "PartNumber")]
253    number: u64,
254    /// The ETag of the part.
255    #[serde(rename = "ETag")]
256    etag: String,
257}
258
259/// Represents an Google Cloud Storage file upload.
260pub struct GoogleUpload {
261    /// The configuration to use for the upload.
262    config: Arc<Config>,
263    /// The HTTP client to use for uploading.
264    client: HttpClient,
265    /// The URL of the object being uploaded.
266    url: Url,
267    /// The identifier of this upload.
268    id: String,
269    /// The channel for sending progress updates.
270    events: Option<broadcast::Sender<TransferEvent>>,
271}
272
273impl Upload for GoogleUpload {
274    type Part = GoogleUploadPart;
275
276    async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Self::Part> {
277        // See: https://cloud.google.com/storage/docs/xml-api/put-object-multipart
278
279        debug!(
280            "sending PUT request for block {block} of `{url}`",
281            url = self.url.display()
282        );
283
284        let mut url = self.url.clone();
285
286        {
287            let mut pairs = url.query_pairs_mut();
288            pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
289            pairs.append_pair("uploadId", &self.id);
290        }
291
292        let digest = sha256_hex_string(&bytes);
293        let length = bytes.len();
294        let body = Body::wrap_stream(TransferStream::new(
295            ByteStream::new(bytes),
296            id,
297            block,
298            0,
299            self.events.clone(),
300        ));
301
302        let date = Utc::now();
303        let mut request = self
304            .client
305            .put(url)
306            .header(header::USER_AGENT, USER_AGENT)
307            .header(header::CONTENT_LENGTH, length)
308            .header(header::CONTENT_TYPE, "application/octet-stream")
309            .header(
310                GOOGLE_DATE_HEADER,
311                date.format("%Y%m%dT%H%M%SZ").to_string(),
312            )
313            .header(GOOGLE_CONTENT_SHA256_HEADER, &digest)
314            .body(body)
315            .build()?;
316
317        if let Some(auth) = &self.config.google.auth {
318            append_authentication_header(auth, date, &mut request)?;
319        }
320
321        let response = self.client.execute(request).await?;
322        if !response.status().is_success() {
323            return Err(response.into_error().await);
324        }
325
326        let etag = response
327            .headers()
328            .get(header::ETAG)
329            .and_then(|v| v.to_str().ok())
330            .ok_or(GoogleError::ResponseMissingETag)?;
331
332        Ok(GoogleUploadPart {
333            number: block + 1,
334            etag: etag.to_string(),
335        })
336    }
337
338    async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
339        // See: https://cloud.google.com/storage/docs/xml-api/post-object-complete
340
341        /// Represents the request body for completing a multipart upload.
342        #[derive(Serialize)]
343        #[serde(rename = "CompleteMultipartUpload")]
344        struct CompleteUpload<'a> {
345            /// The parts of the upload.
346            #[serde(rename = "Part")]
347            parts: &'a [GoogleUploadPart],
348        }
349
350        debug!(
351            "sending POST request to finalize upload of `{url}`",
352            url = self.url.display()
353        );
354
355        let mut url = self.url.clone();
356
357        {
358            let mut pairs = url.query_pairs_mut();
359            pairs.append_pair("uploadId", &self.id);
360        }
361
362        let body = serde_xml_rs::SerdeXml::new()
363            .to_string(&CompleteUpload { parts })
364            .expect("should serialize");
365
366        let date = Utc::now();
367        let mut request = self
368            .client
369            .post(url)
370            .header(header::USER_AGENT, USER_AGENT)
371            .header(header::CONTENT_LENGTH, body.len())
372            .header(header::CONTENT_TYPE, "application/xml")
373            .header(
374                GOOGLE_DATE_HEADER,
375                date.format("%Y%m%dT%H%M%SZ").to_string(),
376            )
377            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
378            .body(body)
379            .build()?;
380
381        if let Some(auth) = &self.config.google.auth {
382            append_authentication_header(auth, date, &mut request)?;
383        }
384
385        let response = self.client.execute(request).await?;
386        if !response.status().is_success() {
387            return Err(response.into_error().await);
388        }
389
390        Ok(())
391    }
392}
393
394/// Represents the Google Cloud Storage backend.
395pub struct GoogleStorageBackend {
396    /// The config to use for transferring files.
397    config: Arc<Config>,
398    /// The HTTP client to use for transferring files.
399    client: HttpClient,
400    /// The channel for sending transfer events.
401    events: Option<broadcast::Sender<TransferEvent>>,
402}
403
404impl GoogleStorageBackend {
405    /// Constructs a new Google Cloud Storage backend.
406    pub fn new(
407        config: Config,
408        client: HttpClient,
409        events: Option<broadcast::Sender<TransferEvent>>,
410    ) -> Self {
411        Self {
412            config: Arc::new(config),
413            client,
414            events,
415        }
416    }
417}
418
419impl StorageBackend for GoogleStorageBackend {
420    type Upload = GoogleUpload;
421
422    fn config(&self) -> &Config {
423        &self.config
424    }
425
426    fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
427        self.client.cache()
428    }
429
430    fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
431        &self.events
432    }
433
434    fn block_size(&self, file_size: u64) -> Result<u64> {
435        /// The number of blocks to increment by in search of a block size
436        const BLOCK_COUNT_INCREMENT: u64 = 50;
437
438        // Return the block size if one was specified
439        if let Some(size) = self.config.block_size {
440            if size > MAX_PART_SIZE {
441                return Err(GoogleError::InvalidBlockSize.into());
442            }
443
444            return Ok(size);
445        }
446
447        // Try to balance the number of blocks with the size of the blocks
448        let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
449        while num_blocks < MAX_PARTS {
450            let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
451            if block_size <= BLOCK_SIZE_THRESHOLD {
452                return Ok(block_size.max(MIN_PART_SIZE));
453            }
454
455            num_blocks += BLOCK_COUNT_INCREMENT;
456        }
457
458        // Couldn't fit the number of blocks within the size threshold; fallback to
459        // whatever will fit
460        let block_size: u64 = file_size.div_ceil(MAX_PARTS);
461        if block_size > MAX_PART_SIZE {
462            return Err(GoogleError::MaximumSizeExceeded.into());
463        }
464
465        Ok(block_size)
466    }
467
468    fn is_supported_url(_: &Config, url: &Url) -> bool {
469        match url.scheme() {
470            "gs" => true,
471            "http" | "https" => {
472                let Some(domain) = url.domain() else {
473                    return false;
474                };
475
476                if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
477                    // Path-style URL of the form http://storage.googleapis.com/<bucket>/<object>
478                    // There must be at least two path segments
479                    return url
480                        .path_segments()
481                        .map(|mut s| s.nth(1).is_some())
482                        .unwrap_or(false);
483                }
484
485                // Virtual host style URL of the form https://<bucket>.storage.googleapis.com/<object>
486                let Some((bucket, domain)) = domain.split_once('.') else {
487                    return false;
488                };
489
490                // There must be at least one path segment
491                !bucket.is_empty()
492                    && domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN)
493                    && url
494                        .path_segments()
495                        .map(|mut s| s.next().is_some())
496                        .unwrap_or(false)
497            }
498            _ => false,
499        }
500    }
501
502    fn rewrite_url(&self, url: Url) -> Result<Url> {
503        match url.scheme() {
504            "gs" => {
505                let bucket = url.host_str().ok_or(GoogleError::InvalidScheme)?;
506                let path = url.path();
507
508                if url.path() == "/" {
509                    return Err(GoogleError::InvalidScheme.into());
510                }
511
512                match (url.query(), url.fragment()) {
513                    (None, None) => format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}"),
514                    (None, Some(fragment)) => {
515                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}#{fragment}")
516                    }
517                    (Some(query), None) => {
518                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}")
519                    }
520                    (Some(query), Some(fragment)) => {
521                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}#{fragment}")
522                    }
523                }
524                .parse()
525                .map_err(|_| GoogleError::InvalidScheme.into())
526            }
527            _ => Ok(url),
528        }
529    }
530
531    fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
532        // Append on the segments
533        {
534            let mut existing = url.path_segments_mut().expect("url should have path");
535            existing.pop_if_empty();
536            existing.extend(segments);
537        }
538
539        Ok(url)
540    }
541
542    async fn head(&self, url: Url) -> Result<Response> {
543        debug_assert!(
544            Self::is_supported_url(&self.config, &url),
545            "{url} is not a supported GCS URL",
546            url = url.as_str()
547        );
548
549        debug!("sending HEAD request for `{url}`", url = url.display());
550
551        let date = Utc::now();
552        let mut request = self
553            .client
554            .head(url)
555            .header(header::USER_AGENT, USER_AGENT)
556            .header(
557                GOOGLE_DATE_HEADER,
558                date.format("%Y%m%dT%H%M%SZ").to_string(),
559            )
560            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
561            .build()?;
562
563        if let Some(auth) = &self.config.google.auth {
564            append_authentication_header(auth, date, &mut request)?;
565        }
566
567        let response = self.client.execute(request).await?;
568        if !response.status().is_success() {
569            return Err(response.into_error().await);
570        }
571
572        Ok(response)
573    }
574
575    async fn get(&self, url: Url) -> Result<Response> {
576        debug_assert!(
577            Self::is_supported_url(&self.config, &url),
578            "{url} is not a supported GCS URL",
579            url = url.as_str()
580        );
581
582        debug!("sending GET request for `{url}`", url = url.display());
583
584        let date = Utc::now();
585        let mut request = self
586            .client
587            .get(url)
588            .header(header::USER_AGENT, USER_AGENT)
589            .header(
590                GOOGLE_DATE_HEADER,
591                date.format("%Y%m%dT%H%M%SZ").to_string(),
592            )
593            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
594            .build()?;
595
596        if let Some(auth) = &self.config.google.auth {
597            append_authentication_header(auth, date, &mut request)?;
598        }
599
600        let response = self.client.execute(request).await?;
601        if !response.status().is_success() {
602            return Err(response.into_error().await);
603        }
604
605        Ok(response)
606    }
607
608    async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
609        debug_assert!(
610            Self::is_supported_url(&self.config, &url),
611            "{url} is not a supported GCS URL",
612            url = url.as_str()
613        );
614
615        debug!(
616            "sending GET request at offset {offset} for `{url}`",
617            url = url.display(),
618        );
619
620        let date = Utc::now();
621
622        let mut request = self
623            .client
624            .get(url)
625            .header(header::USER_AGENT, USER_AGENT)
626            .header(
627                GOOGLE_DATE_HEADER,
628                date.format("%Y%m%dT%H%M%SZ").to_string(),
629            )
630            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
631            .header(header::RANGE, format!("bytes={offset}-"))
632            .header(header::IF_MATCH, etag)
633            .build()?;
634
635        if let Some(auth) = &self.config.google.auth {
636            append_authentication_header(auth, date, &mut request)?;
637        }
638
639        let response = self.client.execute(request).await?;
640        let status = response.status();
641
642        // Handle precondition failed as remote content modified
643        if status == StatusCode::PRECONDITION_FAILED {
644            return Err(Error::RemoteContentModified);
645        }
646
647        // Handle error response
648        if !status.is_success() {
649            return Err(response.into_error().await);
650        }
651
652        // We expect partial content, otherwise treat as remote content modified
653        if status != StatusCode::PARTIAL_CONTENT {
654            return Err(Error::RemoteContentModified);
655        }
656
657        Ok(response)
658    }
659
660    async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
661        // See: https://cloud.google.com/storage/docs/xml-api/get-bucket-list
662
663        debug_assert!(
664            Self::is_supported_url(&self.config, &url),
665            "{url} is not a supported GCS URL",
666            url = url.as_str()
667        );
668
669        debug!("walking `{url}` as a directory", url = url.display());
670
671        let (bucket, path) = url.bucket_and_path();
672
673        // The prefix should end with `/` to signify a directory.
674        let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
675        prefix.push('/');
676
677        // Format the request to always use the virtual-host style URL
678        url.set_host(Some(&format!("{bucket}.{GOOGLE_ROOT_DOMAIN}")))
679            .map_err(|_| GoogleError::InvalidBucketName)?;
680        url.set_path("/");
681
682        {
683            let mut pairs = url.query_pairs_mut();
684            // Use version 2.0 of the API
685            pairs.append_pair("list-type", "2");
686            // Only return objects with this prefix
687            pairs.append_pair("prefix", &prefix);
688        }
689
690        let date = Utc::now();
691        let mut token = String::new();
692        let mut paths = Vec::new();
693        loop {
694            let mut url = url.clone();
695            if !token.is_empty() {
696                url.query_pairs_mut()
697                    .append_pair("continuation-token", &token);
698            }
699
700            // List the objects with the prefix
701            let mut request = self
702                .client
703                .get(url)
704                .header(header::USER_AGENT, USER_AGENT)
705                .header(
706                    GOOGLE_DATE_HEADER,
707                    date.format("%Y%m%dT%H%M%SZ").to_string(),
708                )
709                .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
710                .build()?;
711
712            if let Some(auth) = &self.config.google.auth {
713                append_authentication_header(auth, date, &mut request)?;
714            }
715
716            let response = self.client.execute(request).await?;
717
718            let status = response.status();
719            if !status.is_success() {
720                return Err(response.into_error().await);
721            }
722
723            let text = response.text().await?;
724            let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
725                Ok(response) => response,
726                Err(e) => {
727                    return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
728                }
729            };
730
731            // If there is only one result and the result is an empty path, then the given
732            // URL was to a file and not a "directory"
733            if paths.is_empty()
734                && results.contents.len() == 1
735                && results.token.is_none()
736                && let Some("") = results.contents[0].key.strip_prefix(&prefix)
737            {
738                return Ok(paths);
739            }
740
741            paths.extend(
742                results
743                    .contents
744                    .into_iter()
745                    .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
746            );
747
748            token = results.token.unwrap_or_default();
749            if token.is_empty() {
750                break;
751            }
752        }
753
754        Ok(paths)
755    }
756
757    async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
758        // See: https://cloud.google.com/storage/docs/xml-api/post-object-multipart
759
760        debug_assert!(
761            Self::is_supported_url(&self.config, &url),
762            "{url} is not a supported GCS URL",
763            url = url.as_str()
764        );
765
766        debug!("sending POST request for `{url}`", url = url.display());
767
768        let mut create = url.clone();
769        create.query_pairs_mut().append_key_only("uploads");
770
771        let date = Utc::now();
772
773        create.set_scheme("http").unwrap();
774        create.set_ip_host("127.0.0.1".parse().unwrap()).unwrap();
775        create.set_port(Some(9000)).unwrap();
776
777        let mut request = self
778            .client
779            .post(create)
780            .header(header::USER_AGENT, USER_AGENT)
781            .header(header::CONTENT_LENGTH, "0")
782            .header(
783                GOOGLE_DATE_HEADER,
784                date.format("%Y%m%dT%H%M%SZ").to_string(),
785            )
786            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
787            .build()?;
788
789        if let Some(auth) = &self.config.google.auth {
790            append_authentication_header(auth, date, &mut request)?;
791        }
792
793        let response = self.client.execute(request).await?;
794
795        let status = response.status();
796        if !status.is_success() {
797            return Err(response.into_error().await);
798        }
799
800        let text: String = match response.text().await {
801            Ok(text) => text,
802            Err(e) => return Err(e.into()),
803        };
804
805        let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
806            Ok(response) => response.upload_id,
807            Err(e) => {
808                return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
809            }
810        };
811
812        Ok(GoogleUpload {
813            config: self.config.clone(),
814            client: self.client.clone(),
815            url,
816            id,
817            events: self.events.clone(),
818        })
819    }
820}