1use std::sync::Arc;
4
5use bytes::Bytes;
6use chrono::DateTime;
7use chrono::Utc;
8use http_cache_stream_reqwest::Cache;
9use http_cache_stream_reqwest::storage::DefaultCacheStorage;
10use reqwest::Body;
11use reqwest::Request;
12use reqwest::Response;
13use reqwest::StatusCode;
14use reqwest::header;
15use reqwest::header::HeaderValue;
16use secrecy::ExposeSecret;
17use serde::Deserialize;
18use serde::Serialize;
19use tokio::sync::broadcast;
20use tracing::debug;
21use url::Url;
22
23use crate::BLOCK_SIZE_THRESHOLD;
24use crate::Config;
25use crate::Error;
26use crate::GoogleAuthConfig;
27use crate::HttpClient;
28use crate::ONE_MEBIBYTE;
29use crate::Result;
30use crate::TransferEvent;
31use crate::USER_AGENT;
32use crate::UrlExt as _;
33use crate::backend::StorageBackend;
34use crate::backend::Upload;
35use crate::backend::auth::RequestSigner;
36use crate::backend::auth::SignatureProvider;
37use crate::backend::auth::sha256_hex_string;
38use crate::backend::s3::InitiateMultipartUploadResult;
39use crate::backend::s3::ListBucketResult;
40use crate::streams::ByteStream;
41use crate::streams::TransferStream;
42
43const GOOGLE_ROOT_DOMAIN: &str = "storage.googleapis.com";
45
46const MAX_PARTS: u64 = 10000;
48
49const MIN_PART_SIZE: u64 = 5 * ONE_MEBIBYTE;
52
53const MAX_PART_SIZE: u64 = MIN_PART_SIZE * 1024;
55
56const MAX_FILE_SIZE: u64 = MAX_PART_SIZE * 1024;
58
59const GOOGLE_DATE_HEADER: &str = "x-goog-date";
61
62const GOOGLE_CONTENT_SHA256_HEADER: &str = "x-goog-content-sha256";
64
65#[derive(Debug, thiserror::Error)]
67pub enum GoogleError {
68 #[error("Google Storage block size cannot exceed {MAX_PART_SIZE} bytes")]
70 InvalidBlockSize,
71 #[error("the size of the source file exceeds the supported maximum of {MAX_FILE_SIZE} bytes")]
73 MaximumSizeExceeded,
74 #[error("invalid URL with `gs` scheme: the URL is not in a supported format")]
76 InvalidScheme,
77 #[error("invalid Google Cloud Storage HMAC secret")]
79 InvalidSecretAccessKey,
80 #[error("response from server was missing an ETag header")]
82 ResponseMissingETag,
83 #[error("the bucket name specified in the URL is invalid")]
85 InvalidBucketName,
86 #[error("unexpected {status} response from server: failed to deserialize response contents: {error}", status = .status.as_u16())]
88 UnexpectedResponse {
89 status: reqwest::StatusCode,
91 error: serde_xml_rs::Error,
93 },
94}
95
96pub struct GoogleSignatureProvider<'a> {
98 auth: &'a GoogleAuthConfig,
100}
101
102impl SignatureProvider for GoogleSignatureProvider<'_> {
103 fn algorithm(&self) -> &str {
104 "GOOG4-HMAC-SHA256"
105 }
106
107 fn secret_key_prefix(&self) -> &str {
108 "GOOG4"
109 }
110
111 fn request_type(&self) -> &str {
112 "goog4_request"
113 }
114
115 fn region(&self) -> &str {
116 "any"
117 }
118
119 fn service(&self) -> &str {
120 "storage"
121 }
122
123 fn date_header_name(&self) -> &str {
124 GOOGLE_DATE_HEADER
125 }
126
127 fn content_hash_header_name(&self) -> &str {
128 GOOGLE_CONTENT_SHA256_HEADER
129 }
130
131 fn access_key_id(&self) -> &str {
132 &self.auth.access_key
133 }
134
135 fn secret_access_key(&self) -> &str {
136 self.auth.secret.expose_secret()
137 }
138}
139
140fn append_authentication_header(
142 auth: &GoogleAuthConfig,
143 date: DateTime<Utc>,
144 request: &mut Request,
145) -> Result<()> {
146 let signer = RequestSigner::new(GoogleSignatureProvider { auth });
147 let auth = signer
148 .sign(date, request)
149 .ok_or(GoogleError::InvalidSecretAccessKey)?;
150 request.headers_mut().append(
151 header::AUTHORIZATION,
152 HeaderValue::try_from(auth).expect("value should be valid"),
153 );
154 Ok(())
155}
156
157trait UrlExt {
159 fn bucket_and_path(&self) -> (&str, &str);
165}
166
167impl UrlExt for Url {
168 fn bucket_and_path(&self) -> (&str, &str) {
169 let domain = self.domain().expect("URL should have domain");
170
171 if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
172 let bucket = self
174 .path_segments()
175 .expect("URL should have path")
176 .next()
177 .expect("URL should have at least one path segment");
178
179 (
180 bucket,
181 self.path()
182 .strip_prefix('/')
183 .unwrap()
184 .strip_prefix(bucket)
185 .unwrap(),
186 )
187 } else {
188 let Some((bucket, _)) = domain.split_once('.') else {
190 panic!("URL domain does not contain a bucket");
191 };
192
193 (bucket, self.path())
194 }
195 }
196}
197
198trait ResponseExt {
200 async fn into_error(self) -> Error;
202}
203
204impl ResponseExt for Response {
205 async fn into_error(self) -> Error {
206 #[derive(Default, Deserialize)]
208 #[serde(rename = "Error")]
209 struct ErrorResponse {
210 #[serde(rename = "Message", default)]
212 message: String,
213 #[serde(rename = "Details", default)]
215 details: Option<String>,
216 }
217
218 let status = self.status();
219 let text: String = match self.text().await {
220 Ok(text) => text,
221 Err(e) => return e.into(),
222 };
223
224 if text.is_empty() {
225 return Error::Server {
226 status,
227 message: text,
228 };
229 }
230
231 let message = match serde_xml_rs::from_str::<ErrorResponse>(&text) {
232 Ok(response) => match response.details {
233 Some(details) => {
234 format!("{message}\ndetails: {details}", message = response.message)
235 }
236 None => response.message,
237 },
238 Err(e) => {
239 return GoogleError::UnexpectedResponse { status, error: e }.into();
240 }
241 };
242
243 Error::Server { status, message }
244 }
245}
246
247#[derive(Default, Clone, Serialize)]
249#[serde(rename = "Part")]
250pub struct GoogleUploadPart {
251 #[serde(rename = "PartNumber")]
253 number: u64,
254 #[serde(rename = "ETag")]
256 etag: String,
257}
258
259pub struct GoogleUpload {
261 config: Arc<Config>,
263 client: HttpClient,
265 url: Url,
267 id: String,
269 events: Option<broadcast::Sender<TransferEvent>>,
271}
272
273impl Upload for GoogleUpload {
274 type Part = GoogleUploadPart;
275
276 async fn put(&self, id: u64, block: u64, bytes: Bytes) -> Result<Self::Part> {
277 debug!(
280 "sending PUT request for block {block} of `{url}`",
281 url = self.url.display()
282 );
283
284 let mut url = self.url.clone();
285
286 {
287 let mut pairs = url.query_pairs_mut();
288 pairs.append_pair("partNumber", &format!("{number}", number = block + 1));
289 pairs.append_pair("uploadId", &self.id);
290 }
291
292 let digest = sha256_hex_string(&bytes);
293 let length = bytes.len();
294 let body = Body::wrap_stream(TransferStream::new(
295 ByteStream::new(bytes),
296 id,
297 block,
298 0,
299 self.events.clone(),
300 ));
301
302 let date = Utc::now();
303 let mut request = self
304 .client
305 .put(url)
306 .header(header::USER_AGENT, USER_AGENT)
307 .header(header::CONTENT_LENGTH, length)
308 .header(header::CONTENT_TYPE, "application/octet-stream")
309 .header(
310 GOOGLE_DATE_HEADER,
311 date.format("%Y%m%dT%H%M%SZ").to_string(),
312 )
313 .header(GOOGLE_CONTENT_SHA256_HEADER, &digest)
314 .body(body)
315 .build()?;
316
317 if let Some(auth) = &self.config.google.auth {
318 append_authentication_header(auth, date, &mut request)?;
319 }
320
321 let response = self.client.execute(request).await?;
322 if !response.status().is_success() {
323 return Err(response.into_error().await);
324 }
325
326 let etag = response
327 .headers()
328 .get(header::ETAG)
329 .and_then(|v| v.to_str().ok())
330 .ok_or(GoogleError::ResponseMissingETag)?;
331
332 Ok(GoogleUploadPart {
333 number: block + 1,
334 etag: etag.to_string(),
335 })
336 }
337
338 async fn finalize(&self, parts: &[Self::Part]) -> Result<()> {
339 #[derive(Serialize)]
343 #[serde(rename = "CompleteMultipartUpload")]
344 struct CompleteUpload<'a> {
345 #[serde(rename = "Part")]
347 parts: &'a [GoogleUploadPart],
348 }
349
350 debug!(
351 "sending POST request to finalize upload of `{url}`",
352 url = self.url.display()
353 );
354
355 let mut url = self.url.clone();
356
357 {
358 let mut pairs = url.query_pairs_mut();
359 pairs.append_pair("uploadId", &self.id);
360 }
361
362 let body = serde_xml_rs::SerdeXml::new()
363 .to_string(&CompleteUpload { parts })
364 .expect("should serialize");
365
366 let date = Utc::now();
367 let mut request = self
368 .client
369 .post(url)
370 .header(header::USER_AGENT, USER_AGENT)
371 .header(header::CONTENT_LENGTH, body.len())
372 .header(header::CONTENT_TYPE, "application/xml")
373 .header(
374 GOOGLE_DATE_HEADER,
375 date.format("%Y%m%dT%H%M%SZ").to_string(),
376 )
377 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string(&body))
378 .body(body)
379 .build()?;
380
381 if let Some(auth) = &self.config.google.auth {
382 append_authentication_header(auth, date, &mut request)?;
383 }
384
385 let response = self.client.execute(request).await?;
386 if !response.status().is_success() {
387 return Err(response.into_error().await);
388 }
389
390 Ok(())
391 }
392}
393
394pub struct GoogleStorageBackend {
396 config: Arc<Config>,
398 client: HttpClient,
400 events: Option<broadcast::Sender<TransferEvent>>,
402}
403
404impl GoogleStorageBackend {
405 pub fn new(
407 config: Config,
408 client: HttpClient,
409 events: Option<broadcast::Sender<TransferEvent>>,
410 ) -> Self {
411 Self {
412 config: Arc::new(config),
413 client,
414 events,
415 }
416 }
417}
418
419impl StorageBackend for GoogleStorageBackend {
420 type Upload = GoogleUpload;
421
422 fn config(&self) -> &Config {
423 &self.config
424 }
425
426 fn cache(&self) -> Option<&Cache<DefaultCacheStorage>> {
427 self.client.cache()
428 }
429
430 fn events(&self) -> &Option<broadcast::Sender<TransferEvent>> {
431 &self.events
432 }
433
434 fn block_size(&self, file_size: u64) -> Result<u64> {
435 const BLOCK_COUNT_INCREMENT: u64 = 50;
437
438 if let Some(size) = self.config.block_size {
440 if size > MAX_PART_SIZE {
441 return Err(GoogleError::InvalidBlockSize.into());
442 }
443
444 return Ok(size);
445 }
446
447 let mut num_blocks: u64 = BLOCK_COUNT_INCREMENT;
449 while num_blocks < MAX_PARTS {
450 let block_size = file_size.div_ceil(num_blocks).next_power_of_two();
451 if block_size <= BLOCK_SIZE_THRESHOLD {
452 return Ok(block_size.max(MIN_PART_SIZE));
453 }
454
455 num_blocks += BLOCK_COUNT_INCREMENT;
456 }
457
458 let block_size: u64 = file_size.div_ceil(MAX_PARTS);
461 if block_size > MAX_PART_SIZE {
462 return Err(GoogleError::MaximumSizeExceeded.into());
463 }
464
465 Ok(block_size)
466 }
467
468 fn is_supported_url(_: &Config, url: &Url) -> bool {
469 match url.scheme() {
470 "gs" => true,
471 "http" | "https" => {
472 let Some(domain) = url.domain() else {
473 return false;
474 };
475
476 if domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN) {
477 return url
480 .path_segments()
481 .map(|mut s| s.nth(1).is_some())
482 .unwrap_or(false);
483 }
484
485 let Some((bucket, domain)) = domain.split_once('.') else {
487 return false;
488 };
489
490 !bucket.is_empty()
492 && domain.eq_ignore_ascii_case(GOOGLE_ROOT_DOMAIN)
493 && url
494 .path_segments()
495 .map(|mut s| s.next().is_some())
496 .unwrap_or(false)
497 }
498 _ => false,
499 }
500 }
501
502 fn rewrite_url(&self, url: Url) -> Result<Url> {
503 match url.scheme() {
504 "gs" => {
505 let bucket = url.host_str().ok_or(GoogleError::InvalidScheme)?;
506 let path = url.path();
507
508 if url.path() == "/" {
509 return Err(GoogleError::InvalidScheme.into());
510 }
511
512 match (url.query(), url.fragment()) {
513 (None, None) => format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}"),
514 (None, Some(fragment)) => {
515 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}#{fragment}")
516 }
517 (Some(query), None) => {
518 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}")
519 }
520 (Some(query), Some(fragment)) => {
521 format!("https://{bucket}.{GOOGLE_ROOT_DOMAIN}{path}?{query}#{fragment}")
522 }
523 }
524 .parse()
525 .map_err(|_| GoogleError::InvalidScheme.into())
526 }
527 _ => Ok(url),
528 }
529 }
530
531 fn join_url<'a>(&self, mut url: Url, segments: impl Iterator<Item = &'a str>) -> Result<Url> {
532 {
534 let mut existing = url.path_segments_mut().expect("url should have path");
535 existing.pop_if_empty();
536 existing.extend(segments);
537 }
538
539 Ok(url)
540 }
541
542 async fn head(&self, url: Url) -> Result<Response> {
543 debug_assert!(
544 Self::is_supported_url(&self.config, &url),
545 "{url} is not a supported GCS URL",
546 url = url.as_str()
547 );
548
549 debug!("sending HEAD request for `{url}`", url = url.display());
550
551 let date = Utc::now();
552 let mut request = self
553 .client
554 .head(url)
555 .header(header::USER_AGENT, USER_AGENT)
556 .header(
557 GOOGLE_DATE_HEADER,
558 date.format("%Y%m%dT%H%M%SZ").to_string(),
559 )
560 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
561 .build()?;
562
563 if let Some(auth) = &self.config.google.auth {
564 append_authentication_header(auth, date, &mut request)?;
565 }
566
567 let response = self.client.execute(request).await?;
568 if !response.status().is_success() {
569 return Err(response.into_error().await);
570 }
571
572 Ok(response)
573 }
574
575 async fn get(&self, url: Url) -> Result<Response> {
576 debug_assert!(
577 Self::is_supported_url(&self.config, &url),
578 "{url} is not a supported GCS URL",
579 url = url.as_str()
580 );
581
582 debug!("sending GET request for `{url}`", url = url.display());
583
584 let date = Utc::now();
585 let mut request = self
586 .client
587 .get(url)
588 .header(header::USER_AGENT, USER_AGENT)
589 .header(
590 GOOGLE_DATE_HEADER,
591 date.format("%Y%m%dT%H%M%SZ").to_string(),
592 )
593 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
594 .build()?;
595
596 if let Some(auth) = &self.config.google.auth {
597 append_authentication_header(auth, date, &mut request)?;
598 }
599
600 let response = self.client.execute(request).await?;
601 if !response.status().is_success() {
602 return Err(response.into_error().await);
603 }
604
605 Ok(response)
606 }
607
608 async fn get_at_offset(&self, url: Url, etag: &str, offset: u64) -> Result<Response> {
609 debug_assert!(
610 Self::is_supported_url(&self.config, &url),
611 "{url} is not a supported GCS URL",
612 url = url.as_str()
613 );
614
615 debug!(
616 "sending GET request at offset {offset} for `{url}`",
617 url = url.display(),
618 );
619
620 let date = Utc::now();
621
622 let mut request = self
623 .client
624 .get(url)
625 .header(header::USER_AGENT, USER_AGENT)
626 .header(
627 GOOGLE_DATE_HEADER,
628 date.format("%Y%m%dT%H%M%SZ").to_string(),
629 )
630 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
631 .header(header::RANGE, format!("bytes={offset}-"))
632 .header(header::IF_MATCH, etag)
633 .build()?;
634
635 if let Some(auth) = &self.config.google.auth {
636 append_authentication_header(auth, date, &mut request)?;
637 }
638
639 let response = self.client.execute(request).await?;
640 let status = response.status();
641
642 if status == StatusCode::PRECONDITION_FAILED {
644 return Err(Error::RemoteContentModified);
645 }
646
647 if !status.is_success() {
649 return Err(response.into_error().await);
650 }
651
652 if status != StatusCode::PARTIAL_CONTENT {
654 return Err(Error::RemoteContentModified);
655 }
656
657 Ok(response)
658 }
659
660 async fn walk(&self, mut url: Url) -> Result<Vec<String>> {
661 debug_assert!(
664 Self::is_supported_url(&self.config, &url),
665 "{url} is not a supported GCS URL",
666 url = url.as_str()
667 );
668
669 debug!("walking `{url}` as a directory", url = url.display());
670
671 let (bucket, path) = url.bucket_and_path();
672
673 let mut prefix = path.strip_prefix('/').unwrap_or(path).to_string();
675 prefix.push('/');
676
677 url.set_host(Some(&format!("{bucket}.{GOOGLE_ROOT_DOMAIN}")))
679 .map_err(|_| GoogleError::InvalidBucketName)?;
680 url.set_path("/");
681
682 {
683 let mut pairs = url.query_pairs_mut();
684 pairs.append_pair("list-type", "2");
686 pairs.append_pair("prefix", &prefix);
688 }
689
690 let date = Utc::now();
691 let mut token = String::new();
692 let mut paths = Vec::new();
693 loop {
694 let mut url = url.clone();
695 if !token.is_empty() {
696 url.query_pairs_mut()
697 .append_pair("continuation-token", &token);
698 }
699
700 let mut request = self
702 .client
703 .get(url)
704 .header(header::USER_AGENT, USER_AGENT)
705 .header(
706 GOOGLE_DATE_HEADER,
707 date.format("%Y%m%dT%H%M%SZ").to_string(),
708 )
709 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
710 .build()?;
711
712 if let Some(auth) = &self.config.google.auth {
713 append_authentication_header(auth, date, &mut request)?;
714 }
715
716 let response = self.client.execute(request).await?;
717
718 let status = response.status();
719 if !status.is_success() {
720 return Err(response.into_error().await);
721 }
722
723 let text = response.text().await?;
724 let results: ListBucketResult = match serde_xml_rs::from_str(&text) {
725 Ok(response) => response,
726 Err(e) => {
727 return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
728 }
729 };
730
731 if paths.is_empty()
734 && results.contents.len() == 1
735 && results.token.is_none()
736 && let Some("") = results.contents[0].key.strip_prefix(&prefix)
737 {
738 return Ok(paths);
739 }
740
741 paths.extend(
742 results
743 .contents
744 .into_iter()
745 .map(|c| c.key.strip_prefix(&prefix).map(Into::into).unwrap_or(c.key)),
746 );
747
748 token = results.token.unwrap_or_default();
749 if token.is_empty() {
750 break;
751 }
752 }
753
754 Ok(paths)
755 }
756
757 async fn new_upload(&self, url: Url) -> Result<Self::Upload> {
758 debug_assert!(
761 Self::is_supported_url(&self.config, &url),
762 "{url} is not a supported GCS URL",
763 url = url.as_str()
764 );
765
766 debug!("sending POST request for `{url}`", url = url.display());
767
768 let mut create = url.clone();
769 create.query_pairs_mut().append_key_only("uploads");
770
771 let date = Utc::now();
772
773 create.set_scheme("http").unwrap();
774 create.set_ip_host("127.0.0.1".parse().unwrap()).unwrap();
775 create.set_port(Some(9000)).unwrap();
776
777 let mut request = self
778 .client
779 .post(create)
780 .header(header::USER_AGENT, USER_AGENT)
781 .header(header::CONTENT_LENGTH, "0")
782 .header(
783 GOOGLE_DATE_HEADER,
784 date.format("%Y%m%dT%H%M%SZ").to_string(),
785 )
786 .header(GOOGLE_CONTENT_SHA256_HEADER, sha256_hex_string([]))
787 .build()?;
788
789 if let Some(auth) = &self.config.google.auth {
790 append_authentication_header(auth, date, &mut request)?;
791 }
792
793 let response = self.client.execute(request).await?;
794
795 let status = response.status();
796 if !status.is_success() {
797 return Err(response.into_error().await);
798 }
799
800 let text: String = match response.text().await {
801 Ok(text) => text,
802 Err(e) => return Err(e.into()),
803 };
804
805 let id = match serde_xml_rs::from_str::<InitiateMultipartUploadResult>(&text) {
806 Ok(response) => response.upload_id,
807 Err(e) => {
808 return Err(GoogleError::UnexpectedResponse { status, error: e }.into());
809 }
810 };
811
812 Ok(GoogleUpload {
813 config: self.config.clone(),
814 client: self.client.clone(),
815 url,
816 id,
817 events: self.events.clone(),
818 })
819 }
820}