cloud_copy/backend/
google.rs

1//! Implementation of the Google Cloud Storage backend.
2
3use std::borrow::Cow;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use chrono::DateTime;
8use chrono::Utc;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Request;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use reqwest::header::HeaderValue;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::broadcast;
21use tracing::debug;
22use url::Url;
23
24use crate::BLOCK_SIZE_THRESHOLD;
25use crate::Config;
26use crate::Error;
27use crate::GoogleAuthConfig;
28use crate::HttpClient;
29use crate::ONE_MEBIBYTE;
30use crate::Result;
31use crate::TransferEvent;
32use crate::USER_AGENT;
33use crate::UrlExt as _;
34use crate::backend::StorageBackend;
35use crate::backend::Upload;
36use crate::backend::auth::RequestSigner;
37use crate::backend::auth::SignatureProvider;
38use crate::backend::auth::sha256_hex_string;
39use crate::backend::s3::InitiateMultipartUploadResult;
40use crate::backend::s3::ListBucketResult;
41use crate::streams::ByteStream;
42use crate::streams::TransferStream;
43
44/// The root domain for Google Cloud Storage.
45const GOOGLE_ROOT_DOMAIN: &str = "storage.googleapis.com";
46
47/// The maximum number of parts in an upload.
48const MAX_PARTS: u64 = 10000;
49
50/// The minimum size of a part in bytes (5 MiB); applies to every part except
51/// the last.
52const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
53
54/// The maximum size in bytes (5 GiB) for an upload part.
55const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
56
57/// The maximum size of a file on Google in bytes (5 TiB).
58const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
59
60/// The Google date header name.
61const GOOGLE_DATE_HEADER: &str = "x-goog-date";
62
63/// The Google content SHA256 header name.
64const GOOGLE_CONTENT_SHA256_HEADER: &str = "x-goog-content-sha256";
65
66/// Represents a Google-specific copy operation error.
67#[derive(Debug, thiserror::Error)]
68pub enum GoogleError {
69    /// The specified Google Storage block size exceeds the maximum.
70    #[error("Google Storage block size cannot exceed {MAX_PART_SIZE} bytes")]
71    InvalidBlockSize,
72    /// The source size exceeds the supported maximum size.
73    #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
74    MaximumSizeExceeded,
75    /// Invalid URL with an `gs` scheme.
76    #[error("invalid URL with `gs` scheme: the URL is not in a supported format")]
77    InvalidScheme,
78    /// The Google Cloud HMAC secret is invalid.
79    #[error("invalid Google Cloud Storage HMAC secret")]
80    InvalidSecretAccessKey,
81    /// The response was missing an ETag header.
82    #[error("response from server was missing an ETag header")]
83    ResponseMissingETag,
84    /// The bucket name in the URL was invalid.
85    #[error("the bucket name specified in the URL is invalid")]
86    InvalidBucketName,
87    /// Unexpected response from server.
88    #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
89    UnexpectedResponse {
90        /// The response status code.
91        status: reqwest::StatusCode,
92        /// The deserialization error.
93        error: serde_xml_rs::Error,
94    },
95}
96
97/// Represents a Google Cloud Storage signature provider.
98pub struct GoogleSignatureProvider<'a> {
99    /// The Google Storage authentication configuration.
100    auth: &'a GoogleAuthConfig,
101}
102
103impl SignatureProvider for GoogleSignatureProvider<'_> {
104    fn algorithm(&self) -> &str {
105        "GOOG4-HMAC-SHA256"
106    }
107
108    fn secret_key_prefix(&self) -> &str {
109        "GOOG4"
110    }
111
112    fn request_type(&self) -> &str {
113        "goog4_request"
114    }
115
116    fn region(&self) -> &str {
117        "any"
118    }
119
120    fn service(&self) -> &str {
121        "storage"
122    }
123
124    fn date_header_name(&self) -> &str {
125        GOOGLE_DATE_HEADER
126    }
127
128    fn content_hash_header_name(&self) -> &str {
129        GOOGLE_CONTENT_SHA256_HEADER
130    }
131
132    fn access_key_id(&self) -> &str {
133        &self.auth.access_key
134    }
135
136    fn secret_access_key(&self) -> &str {
137        self.auth.secret.expose_secret()
138    }
139}
140
141/// Appends the authentication header to the request.
142fn append_authentication_header(
143    auth: &GoogleAuthConfig,
144    date: DateTime<Utc>,
145    request: &mut Request,
146) -> Result<()> {
147    let signer = RequestSigner::new(GoogleSignatureProvider { auth });
148    let auth = signer
149        .sign(date, request)
150        .ok_or(GoogleError::InvalidSecretAccessKey)?;
151    request.headers_mut().append(
152        header::AUTHORIZATION,
153        HeaderValue::try_from(auth).expect("value should be valid"),
154    );
155    Ok(())
156}
157
158/// URL extensions for Google Cloud Storage.
159trait UrlExt {
160    /// Extracts the bucket name and object path from the URL.
161    ///
162    /// # Panics
163    ///
164    /// Panics if the URL is not a valid S3 URL.
165    fn bucket_and_path(&self) -> (&str, &str);
166}
167
168impl UrlExt for Url {
169    fn bucket_and_path(&self) -> (&str, &str) {
170        let domain = self.domain().expect("URL should have domain");
171
172        if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
173            // Path-style URL of the form https://storage.googleapis.com/<bucket>/<path>
174            let bucket = self
175                .path_segments()
176                .expect("URL should have path")
177                .next()
178                .expect("URL should have at least one path segment");
179
180            (
181                bucket,
182                self.path()
183                    .strip_prefix('/')
184                    .unwrap()
185                    .strip_prefix(bucket)
186                    .unwrap(),
187            )
188        } else {
189            // Virtual host style URL of the form https://<bucket>.storage.googleapis.com/<path>
190            let Some((bucket, _)) = domain.split_once('.') else {
191                panic!("URL domain does not contain a bucket");
192            };
193
194            (bucket, self.path())
195        }
196    }
197}
198
199/// Extension trait for response.
200trait ResponseExt {
201    /// Converts an error response from Google Cloud Storage into an `Error`.
202    async fn into_error(self) -> Error;
203}
204
205impl ResponseExt for Response {
206    async fn into_error(self) -> Error {
207        /// Represents an error response.
208        #[derive(Default, Deserialize)]
209        #[serde(rename = "Error")]
210        struct ErrorResponse {
211            /// The error message.
212            #[serde(rename = "Message", default)]
213            message: String,
214            /// The error details.
215            #[serde(rename = "Details", default)]
216            details: Option<String>,
217        }
218
219        let status = self.status();
220        let text: String = match self.text().await {
221            Ok(text) => text,
222            Err(e) => return e.into(),
223        };
224
225        if text.is_empty() {
226            return Error::Server {
227                status,
228                message: text,
229            };
230        }
231
232        let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
233            Ok(response) => match response.details {
234                Some(details) => {
235                    format!("{message}\ndetails: {details}", message = response.message)
236                }
237                None => response.message,
238            },
239            Err(e) => {
240                return GoogleError::UnexpectedResponse { status, error: e }.into();
241            }
242        };
243
244        Error::Server { status, message }
245    }
246}
247
248/// Represents a completed part of an upload.
249#[derive(Default, Clone, Serialize)]
250#[serde(rename = "Part")]
251pub struct GoogleUploadPart {
252    /// The part number of the upload.
253    #[serde(rename = "PartNumber")]
254    number: u64,
255    /// The ETag of the part.
256    #[serde(rename = "ETag")]
257    etag: String,
258}
259
260/// Represents an Google Cloud Storage file upload.
261pub struct GoogleUpload {
262    /// The configuration to use for the upload.
263    config: Arc<Config>,
264    /// The HTTP client to use for uploading.
265    client: HttpClient,
266    /// The URL of the object being uploaded.
267    url: Url,
268    /// The identifier of this upload.
269    id: String,
270    /// The channel for sending progress updates.
271    events: Option<broadcast::Sender<TransferEvent>>,
272}
273
274impl Upload for GoogleUpload {
275    type Part = GoogleUploadPart;
276
277    async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Option<Self::Part>> {
278        // See: https://cloud.google.com/storage/docs/xml-api/put-object-multipart
279
280        debug!(
281            "sending PUT request for block {block} of `{url}`",
282            url = self.url.display()
283        );
284
285        let mut url = self.url.clone();
286
287        {
288            let mut pairs = url.query_pairs_mut();
289            pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
290            pairs.append_pair("uploadId", &self.id);
291        }
292
293        let digest = sha256_hex_string(&bytes);
294        let length = bytes.len();
295        let body = Body::wrap_stream(TransferStream::new(
296            ByteStream::new(bytes),
297            id,
298            block,
299            0,
300            self.events.clone(),
301        ));
302
303        let date = Utc::now();
304        let mut request = self
305            .client
306            .put(url)
307            .header(header::USER_AGENT, USER_AGENT)
308            .header(header::CONTENT_LENGTH, length)
309            .header(header::CONTENT_TYPE, "application/octet-stream")
310            .header(
311                GOOGLE_DATE_HEADER,
312                date.format("%Y%m%dT%H%M%SZ").to_string(),
313            )
314            .header(GOOGLE_CONTENT_SHA256_HEADER, &digest)
315            .body(body)
316            .build()?;
317
318        if let Some(auth) = &self.config.google.auth {
319            append_authentication_header(auth, date, &mut request)?;
320        }
321
322        let response = self.client.execute(request).await?;
323        if !response.status().is_success() {
324            return Err(response.into_error().await);
325        }
326
327        let etag = response
328            .headers()
329            .get(header::ETAG)
330            .and_then(|v| v.to_str().ok())
331            .ok_or(GoogleError::ResponseMissingETag)?;
332
333        Ok(Some(GoogleUploadPart {
334            number: block + 1,
335            etag: etag.to_string(),
336        }))
337    }
338
339    async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
340        // See: https://cloud.google.com/storage/docs/xml-api/post-object-complete
341
342        /// Represents the request body for completing a multipart upload.
343        #[derive(Serialize)]
344        #[serde(rename = "CompleteMultipartUpload")]
345        struct CompleteUpload<'a> {
346            /// The parts of the upload.
347            #[serde(rename = "Part")]
348            parts: &'a [GoogleUploadPart],
349        }
350
351        debug!(
352            "sending POST request to finalize upload of `{url}`",
353            url = self.url.display()
354        );
355
356        let mut url = self.url.clone();
357
358        {
359            let mut pairs = url.query_pairs_mut();
360            pairs.append_pair("uploadId", &self.id);
361        }
362
363        let body = serde_xml_rs::SerdeXml::new()
364            .to_string(&CompleteUpload { parts })
365            .expect("should serialize");
366
367        let date = Utc::now();
368        let mut request = self
369            .client
370            .post(url)
371            .header(header::USER_AGENT, USER_AGENT)
372            .header(header::CONTENT_LENGTH, body.len())
373            .header(header::CONTENT_TYPE, "application/xml")
374            .header(
375                GOOGLE_DATE_HEADER,
376                date.format("%Y%m%dT%H%M%SZ").to_string(),
377            )
378            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
379            .body(body)
380            .build()?;
381
382        if let Some(auth) = &self.config.google.auth {
383            append_authentication_header(auth, date, &mut request)?;
384        }
385
386        let response = self.client.execute(request).await?;
387        if !response.status().is_success() {
388            return Err(response.into_error().await);
389        }
390
391        Ok(())
392    }
393}
394
395/// Represents the Google Cloud Storage backend.
396pub struct GoogleStorageBackend {
397    /// The config to use for transferring files.
398    config: Arc<Config>,
399    /// The HTTP client to use for transferring files.
400    client: HttpClient,
401    /// The channel for sending transfer events.
402    events: Option<broadcast::Sender<TransferEvent>>,
403}
404
405impl GoogleStorageBackend {
406    /// Constructs a new Google Cloud Storage backend.
407    pub fn new(
408        config: Config,
409        client: HttpClient,
410        events: Option<broadcast::Sender<TransferEvent>>,
411    ) -> Self {
412        Self {
413            config: Arc::new(config),
414            client,
415            events,
416        }
417    }
418}
419
420impl StorageBackend for GoogleStorageBackend {
421    type Upload = GoogleUpload;
422
423    fn config(&self) -> &Config {
424        &self.config
425    }
426
427    fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
428        self.client.cache()
429    }
430
431    fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
432        &self.events
433    }
434
435    fn block_size(&self, file_size: u64) -> Result<u64> {
436        /// The number of blocks to increment by in search of a block size
437        const BLOCK_COUNT_INCREMENT: u64 = 50;
438
439        // Return the block size if one was specified
440        if let Some(size) = self.config.block_size {
441            if size > MAX_PART_SIZE {
442                return Err(GoogleError::InvalidBlockSize.into());
443            }
444
445            return Ok(size);
446        }
447
448        // Try to balance the number of blocks with the size of the blocks
449        let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
450        while num_blocks < MAX_PARTS {
451            let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
452            if block_size <= BLOCK_SIZE_THRESHOLD {
453                return Ok(block_size.max(MIN_PART_SIZE));
454            }
455
456            num_blocks += BLOCK_COUNT_INCREMENT;
457        }
458
459        // Couldn't fit the number of blocks within the size threshold; fallback to
460        // whatever will fit
461        let block_size: u64 = file_size.div_ceil(MAX_PARTS);
462        if block_size > MAX_PART_SIZE {
463            return Err(GoogleError::MaximumSizeExceeded.into());
464        }
465
466        Ok(block_size)
467    }
468
469    fn is_supported_url(_: &Config, url: &Url) -> bool {
470        match url.scheme() {
471            "gs" => true,
472            "http" | "https" => {
473                let Some(domain) = url.domain() else {
474                    return false;
475                };
476
477                if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
478                    // Path-style URL of the form http://storage.googleapis.com/<bucket>/<object>
479                    // There must be at least two path segments
480                    return url
481                        .path_segments()
482                        .map(|mut s| s.nth(1).is_some())
483                        .unwrap_or(false);
484                }
485
486                // Virtual host style URL of the form https://<bucket>.storage.googleapis.com/<object>
487                let Some((bucket, domain)) = domain.split_once('.') else {
488                    return false;
489                };
490
491                // There must be at least one path segment
492                !bucket.is_empty()
493                    && domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN)
494                    && url
495                        .path_segments()
496                        .map(|mut s| s.next().is_some())
497                        .unwrap_or(false)
498            }
499            _ => false,
500        }
501    }
502
503    fn rewrite_url<'a>(_: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
504        match url.scheme() {
505            "gs" => {
506                let bucket = url.host_str().ok_or(GoogleError::InvalidScheme)?;
507                let path = url.path();
508
509                if url.path() == "/" {
510                    return Err(GoogleError::InvalidScheme.into());
511                }
512
513                match (url.query(), url.fragment()) {
514                    (None, None) => format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}"),
515                    (None, Some(fragment)) => {
516                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}#{fragment}")
517                    }
518                    (Some(query), None) => {
519                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}")
520                    }
521                    (Some(query), Some(fragment)) => {
522                        format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}#{fragment}")
523                    }
524                }
525                .parse()
526                .map(Cow::Owned)
527                .map_err(|_| GoogleError::InvalidScheme.into())
528            }
529            _ => Ok(Cow::Borrowed(url)),
530        }
531    }
532
533    fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
534        // Append on the segments
535        {
536            let mut existing = url.path_segments_mut().expect("url should have path");
537            existing.pop_if_empty();
538            existing.extend(segments);
539        }
540
541        Ok(url)
542    }
543
544    async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
545        debug_assert!(
546            Self::is_supported_url(&self.config, &url),
547            "{url} is not a supported GCS URL",
548            url = url.as_str()
549        );
550
551        debug!("sending HEAD request for `{url}`", url = url.display());
552
553        let date = Utc::now();
554        let mut request = self
555            .client
556            .head(url)
557            .header(header::USER_AGENT, USER_AGENT)
558            .header(
559                GOOGLE_DATE_HEADER,
560                date.format("%Y%m%dT%H%M%SZ").to_string(),
561            )
562            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
563            .build()?;
564
565        if let Some(auth) = &self.config.google.auth {
566            append_authentication_header(auth, date, &mut request)?;
567        }
568
569        let response = self.client.execute(request).await?;
570        if !response.status().is_success() {
571            // If the resource isn't required to exist and it's a 404, return the response.
572            if !must_exist && response.status() == StatusCode::NOT_FOUND {
573                return Ok(response);
574            }
575
576            return Err(response.into_error().await);
577        }
578
579        Ok(response)
580    }
581
582    async fn get(&self, url: Url) -> Result<Response> {
583        debug_assert!(
584            Self::is_supported_url(&self.config, &url),
585            "{url} is not a supported GCS URL",
586            url = url.as_str()
587        );
588
589        debug!("sending GET request for `{url}`", url = url.display());
590
591        let date = Utc::now();
592        let mut request = self
593            .client
594            .get(url)
595            .header(header::USER_AGENT, USER_AGENT)
596            .header(
597                GOOGLE_DATE_HEADER,
598                date.format("%Y%m%dT%H%M%SZ").to_string(),
599            )
600            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
601            .build()?;
602
603        if let Some(auth) = &self.config.google.auth {
604            append_authentication_header(auth, date, &mut request)?;
605        }
606
607        let response = self.client.execute(request).await?;
608        if !response.status().is_success() {
609            return Err(response.into_error().await);
610        }
611
612        Ok(response)
613    }
614
615    async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
616        debug_assert!(
617            Self::is_supported_url(&self.config, &url),
618            "{url} is not a supported GCS URL",
619            url = url.as_str()
620        );
621
622        debug!(
623            "sending GET request at offset {offset} for `{url}`",
624            url = url.display(),
625        );
626
627        let date = Utc::now();
628
629        let mut request = self
630            .client
631            .get(url)
632            .header(header::USER_AGENT, USER_AGENT)
633            .header(
634                GOOGLE_DATE_HEADER,
635                date.format("%Y%m%dT%H%M%SZ").to_string(),
636            )
637            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
638            .header(header::RANGE, format!("bytes={offset}-"))
639            .header(header::IF_MATCH, etag)
640            .build()?;
641
642        if let Some(auth) = &self.config.google.auth {
643            append_authentication_header(auth, date, &mut request)?;
644        }
645
646        let response = self.client.execute(request).await?;
647        let status = response.status();
648
649        // Handle precondition failed as remote content modified
650        if status == StatusCode::PRECONDITION_FAILED {
651            return Err(Error::RemoteContentModified);
652        }
653
654        // Handle error response
655        if !status.is_success() {
656            return Err(response.into_error().await);
657        }
658
659        // We expect partial content, otherwise treat as remote content modified
660        if status != StatusCode::PARTIAL_CONTENT {
661            return Err(Error::RemoteContentModified);
662        }
663
664        Ok(response)
665    }
666
667    async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
668        // See: https://cloud.google.com/storage/docs/xml-api/get-bucket-list
669
670        debug_assert!(
671            Self::is_supported_url(&self.config, &url),
672            "{url} is not a supported GCS URL",
673            url = url.as_str()
674        );
675
676        debug!("walking `{url}` as a directory", url = url.display());
677
678        let (bucket, path) = url.bucket_and_path();
679
680        // The prefix should end with `/` to signify a directory.
681        let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
682        prefix.push('/');
683
684        // Format the request to always use the virtual-host style URL
685        url.set_host(Some(&format!("{bucket}.{GOOGLE_ROOT_DOMAIN}")))
686            .map_err(|_| GoogleError::InvalidBucketName)?;
687        url.set_path("/");
688
689        {
690            let mut pairs = url.query_pairs_mut();
691            // Use version 2.0 of the API
692            pairs.append_pair("list-type", "2");
693            // Only return objects with this prefix
694            pairs.append_pair("prefix", &prefix);
695        }
696
697        let date = Utc::now();
698        let mut token = String::new();
699        let mut paths = Vec::new();
700        loop {
701            let mut url = url.clone();
702            if !token.is_empty() {
703                url.query_pairs_mut()
704                    .append_pair("continuation-token", &token);
705            }
706
707            // List the objects with the prefix
708            let mut request = self
709                .client
710                .get(url)
711                .header(header::USER_AGENT, USER_AGENT)
712                .header(
713                    GOOGLE_DATE_HEADER,
714                    date.format("%Y%m%dT%H%M%SZ").to_string(),
715                )
716                .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
717                .build()?;
718
719            if let Some(auth) = &self.config.google.auth {
720                append_authentication_header(auth, date, &mut request)?;
721            }
722
723            let response = self.client.execute(request).await?;
724
725            let status = response.status();
726            if !status.is_success() {
727                return Err(response.into_error().await);
728            }
729
730            let text = response.text().await?;
731            let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
732                Ok(response) => response,
733                Err(e) => {
734                    return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
735                }
736            };
737
738            // If there is only one result and the result is an empty path, then the given
739            // URL was to a file and not a "directory"
740            if paths.is_empty()
741                && results.contents.len() == 1
742                && results.token.is_none()
743                && let Some("") = results.contents[0].key.strip_prefix(&prefix)
744            {
745                return Ok(paths);
746            }
747
748            paths.extend(
749                results
750                    .contents
751                    .into_iter()
752                    .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
753            );
754
755            token = results.token.unwrap_or_default();
756            if token.is_empty() {
757                break;
758            }
759        }
760
761        Ok(paths)
762    }
763
764    async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
765        // See: https://cloud.google.com/storage/docs/xml-api/post-object-multipart
766
767        debug_assert!(
768            Self::is_supported_url(&self.config, &url),
769            "{url} is not a supported GCS URL",
770            url = url.as_str()
771        );
772
773        // GCS doesn't support conditional requests for `CreateMultipartUpload`.
774        // Therefore, we must issue a HEAD request for the object if not overwriting.
775        if !self.config.overwrite {
776            let response = self.head(url.clone(), false).await?;
777            if response.status() != StatusCode::NOT_FOUND {
778                return Err(Error::RemoteDestinationExists(url));
779            }
780        }
781
782        debug!("sending POST request for `{url}`", url = url.display());
783
784        let mut create = url.clone();
785        create.query_pairs_mut().append_key_only("uploads");
786
787        let date = Utc::now();
788
789        create.set_scheme("http").unwrap();
790        create.set_ip_host("127.0.0.1".parse().unwrap()).unwrap();
791        create.set_port(Some(9000)).unwrap();
792
793        let mut request = self
794            .client
795            .post(create)
796            .header(header::USER_AGENT, USER_AGENT)
797            .header(header::CONTENT_LENGTH, "0")
798            .header(
799                GOOGLE_DATE_HEADER,
800                date.format("%Y%m%dT%H%M%SZ").to_string(),
801            )
802            .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
803            .build()?;
804
805        if let Some(auth) = &self.config.google.auth {
806            append_authentication_header(auth, date, &mut request)?;
807        }
808
809        let response = self.client.execute(request).await?;
810        let status = response.status();
811        if !status.is_success() {
812            return Err(response.into_error().await);
813        }
814
815        let text: String = match response.text().await {
816            Ok(text) => text,
817            Err(e) => return Err(e.into()),
818        };
819
820        let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
821            Ok(response) => response.upload_id,
822            Err(e) => {
823                return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
824            }
825        };
826
827        Ok(GoogleUpload {
828            config: self.config.clone(),
829            client: self.client.clone(),
830            url,
831            id,
832            events: self.events.clone(),
833        })
834    }
835}