1use std::borrow::Cow;
4use std::sync::Arc;
5
6use bytes::Bytes;
7use chrono::DateTime;
8use chrono::Utc;
9use http_cache_stream_reqwest::Cache;
10use http_cache_stream_reqwest::storage::DefaultCacheStorage;
11use reqwest::Body;
12use reqwest::Request;
13use reqwest::Response;
14use reqwest::StatusCode;
15use reqwest::header;
16use reqwest::header::HeaderValue;
17use secrecy::ExposeSecret;
18use serde::Deserialize;
19use serde::Serialize;
20use tokio::sync::broadcast;
21use tracing::debug;
22use url::Url;
23
24use crate::BLOCK_SIZE_THRESHOLD;
25use crate::Config;
26use crate::Error;
27use crate::GoogleAuthConfig;
28use crate::HttpClient;
29use crate::ONE_MEBIBYTE;
30use crate::Result;
31use crate::TransferEvent;
32use crate::USER_AGENT;
33use crate::UrlExt as _;
34use crate::backend::StorageBackend;
35use crate::backend::Upload;
36use crate::backend::auth::RequestSigner;
37use crate::backend::auth::SignatureProvider;
38use crate::backend::auth::sha256_hex_string;
39use crate::backend::s3::InitiateMultipartUploadResult;
40use crate::backend::s3::ListBucketResult;
41use crate::streams::ByteStream;
42use crate::streams::TransferStream;
43
44const GOOGLE_ROOT_DOMAIN: &str = "storage.googleapis.com";
46
47const MAX_PARTS: u64 = 10000;
49
50const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
53
54const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
56
57const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
59
60const GOOGLE_DATE_HEADER: &str = "x-goog-date";
62
63const GOOGLE_CONTENT_SHA256_HEADER: &str = "x-goog-content-sha256";
65
66#[derive(Debug, thiserror::Error)]
68pub enum GoogleError {
69 #[error("Google Storage block size cannot exceed {MAX_PART_SIZE} bytes")]
71 InvalidBlockSize,
72 #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
74 MaximumSizeExceeded,
75 #[error("invalid URL with `gs` scheme: the URL is not in a supported format")]
77 InvalidScheme,
78 #[error("invalid Google Cloud Storage HMAC secret")]
80 InvalidSecretAccessKey,
81 #[error("response from server was missing an ETag header")]
83 ResponseMissingETag,
84 #[error("the bucket name specified in the URL is invalid")]
86 InvalidBucketName,
87 #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
89 UnexpectedResponse {
90 status: reqwest::StatusCode,
92 error: serde_xml_rs::Error,
94 },
95}
96
97pub struct GoogleSignatureProvider<'a> {
99 auth: &'a GoogleAuthConfig,
101}
102
103impl SignatureProvider for GoogleSignatureProvider<'_> {
104 fn algorithm(&self) -> &str {
105 "GOOG4-HMAC-SHA256"
106 }
107
108 fn secret_key_prefix(&self) -> &str {
109 "GOOG4"
110 }
111
112 fn request_type(&self) -> &str {
113 "goog4_request"
114 }
115
116 fn region(&self) -> &str {
117 "any"
118 }
119
120 fn service(&self) -> &str {
121 "storage"
122 }
123
124 fn date_header_name(&self) -> &str {
125 GOOGLE_DATE_HEADER
126 }
127
128 fn content_hash_header_name(&self) -> &str {
129 GOOGLE_CONTENT_SHA256_HEADER
130 }
131
132 fn access_key_id(&self) -> &str {
133 &self.auth.access_key
134 }
135
136 fn secret_access_key(&self) -> &str {
137 self.auth.secret.expose_secret()
138 }
139}
140
141fn append_authentication_header(
143 auth: &GoogleAuthConfig,
144 date: DateTime<Utc>,
145 request: &mut Request,
146) -> Result<()> {
147 let signer = RequestSigner::new(GoogleSignatureProvider { auth });
148 let auth = signer
149 .sign(date, request)
150 .ok_or(GoogleError::InvalidSecretAccessKey)?;
151 request.headers_mut().append(
152 header::AUTHORIZATION,
153 HeaderValue::try_from(auth).expect("value should be valid"),
154 );
155 Ok(())
156}
157
158trait UrlExt {
160 fn bucket_and_path(&self) -> (&str, &str);
166}
167
168impl UrlExt for Url {
169 fn bucket_and_path(&self) -> (&str, &str) {
170 let domain = self.domain().expect("URL should have domain");
171
172 if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
173 let bucket = self
175 .path_segments()
176 .expect("URL should have path")
177 .next()
178 .expect("URL should have at least one path segment");
179
180 (
181 bucket,
182 self.path()
183 .strip_prefix('/')
184 .unwrap()
185 .strip_prefix(bucket)
186 .unwrap(),
187 )
188 } else {
189 let Some((bucket, _)) = domain.split_once('.') else {
191 panic!("URL domain does not contain a bucket");
192 };
193
194 (bucket, self.path())
195 }
196 }
197}
198
199trait ResponseExt {
201 async fn into_error(self) -> Error;
203}
204
205impl ResponseExt for Response {
206 async fn into_error(self) -> Error {
207 #[derive(Default, Deserialize)]
209 #[serde(rename = "Error")]
210 struct ErrorResponse {
211 #[serde(rename = "Message", default)]
213 message: String,
214 #[serde(rename = "Details", default)]
216 details: Option<String>,
217 }
218
219 let status = self.status();
220 let text: String = match self.text().await {
221 Ok(text) => text,
222 Err(e) => return e.into(),
223 };
224
225 if text.is_empty() {
226 return Error::Server {
227 status,
228 message: text,
229 };
230 }
231
232 let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
233 Ok(response) => match response.details {
234 Some(details) => {
235 format!("{message}\ndetails: {details}", message = response.message)
236 }
237 None => response.message,
238 },
239 Err(e) => {
240 return GoogleError::UnexpectedResponse { status, error: e }.into();
241 }
242 };
243
244 Error::Server { status, message }
245 }
246}
247
248#[derive(Default, Clone, Serialize)]
250#[serde(rename = "Part")]
251pub struct GoogleUploadPart {
252 #[serde(rename = "PartNumber")]
254 number: u64,
255 #[serde(rename = "ETag")]
257 etag: String,
258}
259
260pub struct GoogleUpload {
262 config: Arc<Config>,
264 client: HttpClient,
266 url: Url,
268 id: String,
270 events: Option<broadcast::Sender<TransferEvent>>,
272}
273
274impl Upload for GoogleUpload {
275 type Part = GoogleUploadPart;
276
277 async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Option<Self::Part>> {
278 debug!(
281 "sending PUT request for block {block} of `{url}`",
282 url = self.url.display()
283 );
284
285 let mut url = self.url.clone();
286
287 {
288 let mut pairs = url.query_pairs_mut();
289 pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
290 pairs.append_pair("uploadId", &self.id);
291 }
292
293 let digest = sha256_hex_string(&bytes);
294 let length = bytes.len();
295 let body = Body::wrap_stream(TransferStream::new(
296 ByteStream::new(bytes),
297 id,
298 block,
299 0,
300 self.events.clone(),
301 ));
302
303 let date = Utc::now();
304 let mut request = self
305 .client
306 .put(url)
307 .header(header::USER_AGENT, USER_AGENT)
308 .header(header::CONTENT_LENGTH, length)
309 .header(header::CONTENT_TYPE, "application/octet-stream")
310 .header(
311 GOOGLE_DATE_HEADER,
312 date.format("%Y%m%dT%H%M%SZ").to_string(),
313 )
314 .header(GOOGLE_CONTENT_SHA256_HEADER, &digest)
315 .body(body)
316 .build()?;
317
318 if let Some(auth) = &self.config.google.auth {
319 append_authentication_header(auth, date, &mut request)?;
320 }
321
322 let response = self.client.execute(request).await?;
323 if !response.status().is_success() {
324 return Err(response.into_error().await);
325 }
326
327 let etag = response
328 .headers()
329 .get(header::ETAG)
330 .and_then(|v| v.to_str().ok())
331 .ok_or(GoogleError::ResponseMissingETag)?;
332
333 Ok(Some(GoogleUploadPart {
334 number: block + 1,
335 etag: etag.to_string(),
336 }))
337 }
338
339 async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
340 #[derive(Serialize)]
344 #[serde(rename = "CompleteMultipartUpload")]
345 struct CompleteUpload<'a> {
346 #[serde(rename = "Part")]
348 parts: &'a [GoogleUploadPart],
349 }
350
351 debug!(
352 "sending POST request to finalize upload of `{url}`",
353 url = self.url.display()
354 );
355
356 let mut url = self.url.clone();
357
358 {
359 let mut pairs = url.query_pairs_mut();
360 pairs.append_pair("uploadId", &self.id);
361 }
362
363 let body = serde_xml_rs::SerdeXml::new()
364 .to_string(&CompleteUpload { parts })
365 .expect("should serialize");
366
367 let date = Utc::now();
368 let mut request = self
369 .client
370 .post(url)
371 .header(header::USER_AGENT, USER_AGENT)
372 .header(header::CONTENT_LENGTH, body.len())
373 .header(header::CONTENT_TYPE, "application/xml")
374 .header(
375 GOOGLE_DATE_HEADER,
376 date.format("%Y%m%dT%H%M%SZ").to_string(),
377 )
378 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
379 .body(body)
380 .build()?;
381
382 if let Some(auth) = &self.config.google.auth {
383 append_authentication_header(auth, date, &mut request)?;
384 }
385
386 let response = self.client.execute(request).await?;
387 if !response.status().is_success() {
388 return Err(response.into_error().await);
389 }
390
391 Ok(())
392 }
393}
394
395pub struct GoogleStorageBackend {
397 config: Arc<Config>,
399 client: HttpClient,
401 events: Option<broadcast::Sender<TransferEvent>>,
403}
404
405impl GoogleStorageBackend {
406 pub fn new(
408 config: Config,
409 client: HttpClient,
410 events: Option<broadcast::Sender<TransferEvent>>,
411 ) -> Self {
412 Self {
413 config: Arc::new(config),
414 client,
415 events,
416 }
417 }
418}
419
420impl StorageBackend for GoogleStorageBackend {
421 type Upload = GoogleUpload;
422
423 fn config(&self) -> &Config {
424 &self.config
425 }
426
427 fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
428 self.client.cache()
429 }
430
431 fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
432 &self.events
433 }
434
435 fn block_size(&self, file_size: u64) -> Result<u64> {
436 const BLOCK_COUNT_INCREMENT: u64 = 50;
438
439 if let Some(size) = self.config.block_size {
441 if size > MAX_PART_SIZE {
442 return Err(GoogleError::InvalidBlockSize.into());
443 }
444
445 return Ok(size);
446 }
447
448 let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
450 while num_blocks < MAX_PARTS {
451 let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
452 if block_size <= BLOCK_SIZE_THRESHOLD {
453 return Ok(block_size.max(MIN_PART_SIZE));
454 }
455
456 num_blocks += BLOCK_COUNT_INCREMENT;
457 }
458
459 let block_size: u64 = file_size.div_ceil(MAX_PARTS);
462 if block_size > MAX_PART_SIZE {
463 return Err(GoogleError::MaximumSizeExceeded.into());
464 }
465
466 Ok(block_size)
467 }
468
469 fn is_supported_url(_: &Config, url: &Url) -> bool {
470 match url.scheme() {
471 "gs" => true,
472 "http" | "https" => {
473 let Some(domain) = url.domain() else {
474 return false;
475 };
476
477 if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
478 return url
481 .path_segments()
482 .map(|mut s| s.nth(1).is_some())
483 .unwrap_or(false);
484 }
485
486 let Some((bucket, domain)) = domain.split_once('.') else {
488 return false;
489 };
490
491 !bucket.is_empty()
493 && domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN)
494 && url
495 .path_segments()
496 .map(|mut s| s.next().is_some())
497 .unwrap_or(false)
498 }
499 _ => false,
500 }
501 }
502
503 fn rewrite_url<'a>(_: &Config, url: &'a Url) -> Result<Cow<'a, Url>> {
504 match url.scheme() {
505 "gs" => {
506 let bucket = url.host_str().ok_or(GoogleError::InvalidScheme)?;
507 let path = url.path();
508
509 if url.path() == "/" {
510 return Err(GoogleError::InvalidScheme.into());
511 }
512
513 match (url.query(), url.fragment()) {
514 (None, None) => format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}"),
515 (None, Some(fragment)) => {
516 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}#{fragment}")
517 }
518 (Some(query), None) => {
519 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}")
520 }
521 (Some(query), Some(fragment)) => {
522 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}#{fragment}")
523 }
524 }
525 .parse()
526 .map(Cow::Owned)
527 .map_err(|_| GoogleError::InvalidScheme.into())
528 }
529 _ => Ok(Cow::Borrowed(url)),
530 }
531 }
532
533 fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
534 {
536 let mut existing = url.path_segments_mut().expect("url should have path");
537 existing.pop_if_empty();
538 existing.extend(segments);
539 }
540
541 Ok(url)
542 }
543
544 async fn head(&self, url: Url, must_exist: bool) -> Result<Response> {
545 debug_assert!(
546 Self::is_supported_url(&self.config, &url),
547 "{url} is not a supported GCS URL",
548 url = url.as_str()
549 );
550
551 debug!("sending HEAD request for `{url}`", url = url.display());
552
553 let date = Utc::now();
554 let mut request = self
555 .client
556 .head(url)
557 .header(header::USER_AGENT, USER_AGENT)
558 .header(
559 GOOGLE_DATE_HEADER,
560 date.format("%Y%m%dT%H%M%SZ").to_string(),
561 )
562 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
563 .build()?;
564
565 if let Some(auth) = &self.config.google.auth {
566 append_authentication_header(auth, date, &mut request)?;
567 }
568
569 let response = self.client.execute(request).await?;
570 if !response.status().is_success() {
571 if !must_exist && response.status() == StatusCode::NOT_FOUND {
573 return Ok(response);
574 }
575
576 return Err(response.into_error().await);
577 }
578
579 Ok(response)
580 }
581
582 async fn get(&self, url: Url) -> Result<Response> {
583 debug_assert!(
584 Self::is_supported_url(&self.config, &url),
585 "{url} is not a supported GCS URL",
586 url = url.as_str()
587 );
588
589 debug!("sending GET request for `{url}`", url = url.display());
590
591 let date = Utc::now();
592 let mut request = self
593 .client
594 .get(url)
595 .header(header::USER_AGENT, USER_AGENT)
596 .header(
597 GOOGLE_DATE_HEADER,
598 date.format("%Y%m%dT%H%M%SZ").to_string(),
599 )
600 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
601 .build()?;
602
603 if let Some(auth) = &self.config.google.auth {
604 append_authentication_header(auth, date, &mut request)?;
605 }
606
607 let response = self.client.execute(request).await?;
608 if !response.status().is_success() {
609 return Err(response.into_error().await);
610 }
611
612 Ok(response)
613 }
614
615 async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
616 debug_assert!(
617 Self::is_supported_url(&self.config, &url),
618 "{url} is not a supported GCS URL",
619 url = url.as_str()
620 );
621
622 debug!(
623 "sending GET request at offset {offset} for `{url}`",
624 url = url.display(),
625 );
626
627 let date = Utc::now();
628
629 let mut request = self
630 .client
631 .get(url)
632 .header(header::USER_AGENT, USER_AGENT)
633 .header(
634 GOOGLE_DATE_HEADER,
635 date.format("%Y%m%dT%H%M%SZ").to_string(),
636 )
637 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
638 .header(header::RANGE, format!("bytes={offset}-"))
639 .header(header::IF_MATCH, etag)
640 .build()?;
641
642 if let Some(auth) = &self.config.google.auth {
643 append_authentication_header(auth, date, &mut request)?;
644 }
645
646 let response = self.client.execute(request).await?;
647 let status = response.status();
648
649 if status == StatusCode::PRECONDITION_FAILED {
651 return Err(Error::RemoteContentModified);
652 }
653
654 if !status.is_success() {
656 return Err(response.into_error().await);
657 }
658
659 if status != StatusCode::PARTIAL_CONTENT {
661 return Err(Error::RemoteContentModified);
662 }
663
664 Ok(response)
665 }
666
667 async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
668 debug_assert!(
671 Self::is_supported_url(&self.config, &url),
672 "{url} is not a supported GCS URL",
673 url = url.as_str()
674 );
675
676 debug!("walking `{url}` as a directory", url = url.display());
677
678 let (bucket, path) = url.bucket_and_path();
679
680 let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
682 prefix.push('/');
683
684 url.set_host(Some(&format!("{bucket}.{GOOGLE_ROOT_DOMAIN}")))
686 .map_err(|_| GoogleError::InvalidBucketName)?;
687 url.set_path("/");
688
689 {
690 let mut pairs = url.query_pairs_mut();
691 pairs.append_pair("list-type", "2");
693 pairs.append_pair("prefix", &prefix);
695 }
696
697 let date = Utc::now();
698 let mut token = String::new();
699 let mut paths = Vec::new();
700 loop {
701 let mut url = url.clone();
702 if !token.is_empty() {
703 url.query_pairs_mut()
704 .append_pair("continuation-token", &token);
705 }
706
707 let mut request = self
709 .client
710 .get(url)
711 .header(header::USER_AGENT, USER_AGENT)
712 .header(
713 GOOGLE_DATE_HEADER,
714 date.format("%Y%m%dT%H%M%SZ").to_string(),
715 )
716 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
717 .build()?;
718
719 if let Some(auth) = &self.config.google.auth {
720 append_authentication_header(auth, date, &mut request)?;
721 }
722
723 let response = self.client.execute(request).await?;
724
725 let status = response.status();
726 if !status.is_success() {
727 return Err(response.into_error().await);
728 }
729
730 let text = response.text().await?;
731 let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
732 Ok(response) => response,
733 Err(e) => {
734 return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
735 }
736 };
737
738 if paths.is_empty()
741 && results.contents.len() == 1
742 && results.token.is_none()
743 && let Some("") = results.contents[0].key.strip_prefix(&prefix)
744 {
745 return Ok(paths);
746 }
747
748 paths.extend(
749 results
750 .contents
751 .into_iter()
752 .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
753 );
754
755 token = results.token.unwrap_or_default();
756 if token.is_empty() {
757 break;
758 }
759 }
760
761 Ok(paths)
762 }
763
764 async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
765 debug_assert!(
768 Self::is_supported_url(&self.config, &url),
769 "{url} is not a supported GCS URL",
770 url = url.as_str()
771 );
772
773 if !self.config.overwrite {
776 let response = self.head(url.clone(), false).await?;
777 if response.status() != StatusCode::NOT_FOUND {
778 return Err(Error::RemoteDestinationExists(url));
779 }
780 }
781
782 debug!("sending POST request for `{url}`", url = url.display());
783
784 let mut create = url.clone();
785 create.query_pairs_mut().append_key_only("uploads");
786
787 let date = Utc::now();
788
789 create.set_scheme("http").unwrap();
790 create.set_ip_host("127.0.0.1".parse().unwrap()).unwrap();
791 create.set_port(Some(9000)).unwrap();
792
793 let mut request = self
794 .client
795 .post(create)
796 .header(header::USER_AGENT, USER_AGENT)
797 .header(header::CONTENT_LENGTH, "0")
798 .header(
799 GOOGLE_DATE_HEADER,
800 date.format("%Y%m%dT%H%M%SZ").to_string(),
801 )
802 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
803 .build()?;
804
805 if let Some(auth) = &self.config.google.auth {
806 append_authentication_header(auth, date, &mut request)?;
807 }
808
809 let response = self.client.execute(request).await?;
810 let status = response.status();
811 if !status.is_success() {
812 return Err(response.into_error().await);
813 }
814
815 let text: String = match response.text().await {
816 Ok(text) => text,
817 Err(e) => return Err(e.into()),
818 };
819
820 let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
821 Ok(response) => response.upload_id,
822 Err(e) => {
823 return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
824 }
825 };
826
827 Ok(GoogleUpload {
828 config: self.config.clone(),
829 client: self.client.clone(),
830 url,
831 id,
832 events: self.events.clone(),
833 })
834 }
835}