1use super::client::*;
16use super::*;
17use crate::download_resume_policy::DownloadResumePolicy;
18use crate::error::{RangeError, ReadError};
19use crate::model::ObjectChecksums;
20use crate::storage::checksum::{
21 ChecksumEngine,
22 details::{Crc32c, Md5, validate},
23};
24use base64::Engine;
25#[cfg(feature = "unstable-stream")]
26use futures::Stream;
27use serde_with::DeserializeAs;
28
29#[derive(Clone, Debug)]
66pub struct ReadObject<C = Crc32c> {
67 inner: std::sync::Arc<StorageInner>,
68 request: crate::model::ReadObjectRequest,
69 options: super::request_options::RequestOptions,
70 checksum: C,
71}
72
73impl ReadObject<Crc32c> {
74 pub(crate) fn new<B, O>(inner: std::sync::Arc<StorageInner>, bucket: B, object: O) -> Self
75 where
76 B: Into<String>,
77 O: Into<String>,
78 {
79 let options = inner.options.clone();
80 ReadObject {
81 inner,
82 request: crate::model::ReadObjectRequest::new()
83 .set_bucket(bucket)
84 .set_object(object),
85 options,
86 checksum: Crc32c::default(),
87 }
88 }
89
90 pub fn compute_md5(self) -> ReadObject<Md5<Crc32c>> {
119 self.switch_checksum(Md5::from_inner)
120 }
121}
122
123impl<C> ReadObject<C>
124where
125 C: Clone + ChecksumEngine + Send,
126{
127 fn switch_checksum<F, U>(self, new: F) -> ReadObject<U>
128 where
129 F: FnOnce(C) -> U,
130 {
131 ReadObject {
132 inner: self.inner,
133 request: self.request,
134 options: self.options,
135 checksum: new(self.checksum),
136 }
137 }
138
139 pub fn with_generation<T: Into<i64>>(mut self, v: T) -> Self {
142 self.request.generation = v.into();
143 self
144 }
145
146 pub fn with_if_generation_match<T>(mut self, v: T) -> Self
150 where
151 T: Into<i64>,
152 {
153 self.request.if_generation_match = Some(v.into());
154 self
155 }
156
157 pub fn with_if_generation_not_match<T>(mut self, v: T) -> Self
162 where
163 T: Into<i64>,
164 {
165 self.request.if_generation_not_match = Some(v.into());
166 self
167 }
168
169 pub fn with_if_metageneration_match<T>(mut self, v: T) -> Self
172 where
173 T: Into<i64>,
174 {
175 self.request.if_metageneration_match = Some(v.into());
176 self
177 }
178
179 pub fn with_if_metageneration_not_match<T>(mut self, v: T) -> Self
182 where
183 T: Into<i64>,
184 {
185 self.request.if_metageneration_not_match = Some(v.into());
186 self
187 }
188
189 pub fn with_read_offset<T>(mut self, v: T) -> Self
237 where
238 T: Into<i64>,
239 {
240 self.request.read_offset = v.into();
241 self
242 }
243
244 pub fn with_read_limit<T>(mut self, v: T) -> Self
279 where
280 T: Into<i64>,
281 {
282 self.request.read_limit = v.into();
283 self
284 }
285
286 pub fn with_key(mut self, v: KeyAes256) -> Self {
303 self.request.common_object_request_params = Some(v.into());
304 self
305 }
306
307 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
328 self.options.retry_policy = v.into().into();
329 self
330 }
331
332 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
349 mut self,
350 v: V,
351 ) -> Self {
352 self.options.backoff_policy = v.into().into();
353 self
354 }
355
356 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
379 mut self,
380 v: V,
381 ) -> Self {
382 self.options.retry_throttler = v.into().into();
383 self
384 }
385
386 pub fn with_download_resume_policy<V>(mut self, v: V) -> Self
406 where
407 V: DownloadResumePolicy + 'static,
408 {
409 self.options.download_resume_policy = std::sync::Arc::new(v);
410 self
411 }
412
413 pub async fn send(self) -> Result<ReadObjectResponse<C>> {
415 let download = self.clone().download().await?;
416 ReadObjectResponse::new(self, download)
417 }
418
419 async fn download(self) -> Result<reqwest::Response> {
420 let throttler = self.options.retry_throttler.clone();
421 let retry = self.options.retry_policy.clone();
422 let backoff = self.options.backoff_policy.clone();
423
424 gax::retry_loop_internal::retry_loop(
425 async move |_| self.download_attempt().await,
426 async |duration| tokio::time::sleep(duration).await,
427 true,
428 throttler,
429 retry,
430 backoff,
431 )
432 .await
433 }
434
435 async fn download_attempt(&self) -> Result<reqwest::Response> {
436 let builder = self.http_request_builder().await?;
437 let response = builder.send().await.map_err(Error::io)?;
438 if !response.status().is_success() {
439 return gaxi::http::to_http_error(response).await;
440 }
441 Ok(response)
442 }
443
444 async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
445 let bucket = &self.request.bucket;
447 let bucket_id = bucket
448 .as_str()
449 .strip_prefix("projects/_/buckets/")
450 .ok_or_else(|| {
451 Error::binding(format!(
452 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
453 ))
454 })?;
455 let object = &self.request.object;
456
457 let builder = self
459 .inner
460 .client
461 .request(
462 reqwest::Method::GET,
463 format!(
464 "{}/storage/v1/b/{bucket_id}/o/{}",
465 &self.inner.endpoint,
466 enc(object)
467 ),
468 )
469 .query(&[("alt", "media")])
470 .header(
471 "x-goog-api-client",
472 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
473 );
474
475 let builder = if self.request.generation != 0 {
477 builder.query(&[("generation", self.request.generation)])
478 } else {
479 builder
480 };
481 let builder = self
482 .request
483 .if_generation_match
484 .iter()
485 .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
486 let builder = self
487 .request
488 .if_generation_not_match
489 .iter()
490 .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
491 let builder = self
492 .request
493 .if_metageneration_match
494 .iter()
495 .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
496 let builder = self
497 .request
498 .if_metageneration_not_match
499 .iter()
500 .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
501
502 let builder = apply_customer_supplied_encryption_headers(
503 builder,
504 &self.request.common_object_request_params,
505 );
506
507 let builder = match (self.request.read_offset, self.request.read_limit) {
509 (_, l) if l < 0 => Err(RangeError::NegativeLimit),
511 (o, l) if o < 0 && l > 0 => Err(RangeError::NegativeOffsetWithLimit),
513 (0, 0) => Ok(builder),
515 (o, 0) => Ok(builder.header("range", format!("bytes={o}-"))),
518 (o, l) => Ok(builder.header("range", format!("bytes={o}-{}", o + l - 1))),
521 }
522 .map_err(Error::ser)?;
523
524 self.inner.apply_auth_headers(builder).await
525 }
526}
527
528fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
529 headers
530 .get("x-goog-hash")
531 .and_then(|hash| hash.to_str().ok())
532 .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
533 .and_then(|hash| {
534 let hash = hash.trim_start_matches("crc32c=");
535 v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
536 })
537}
538
539fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
540 headers
541 .get("x-goog-hash")
542 .and_then(|hash| hash.to_str().ok())
543 .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
544 .and_then(|hash| {
545 let hash = hash.trim_start_matches("md5=");
546 base64::prelude::BASE64_STANDARD.decode(hash).ok()
547 })
548 .unwrap_or_default()
549}
550
551#[derive(Debug)]
553pub struct ReadObjectResponse<C> {
554 inner: Option<reqwest::Response>,
555 highlights: ObjectHighlights,
556 response_checksums: ObjectChecksums,
558 range: ReadRange,
560 generation: i64,
561 builder: ReadObject<C>,
562 resume_count: u32,
563}
564
565impl<C> ReadObjectResponse<C>
566where
567 C: ChecksumEngine + Clone + Send,
568{
569 fn new(builder: ReadObject<C>, inner: reqwest::Response) -> Result<Self> {
570 let full = builder.request.read_offset == 0 && builder.request.read_limit == 0;
571 let response_checksums = checksums_from_response(full, inner.status(), inner.headers());
572 let range = response_range(&inner).map_err(Error::deser)?;
573 let generation = response_generation(&inner).map_err(Error::deser)?;
574
575 let headers = inner.headers();
576 let get_as_i64 = |header_name: &str| -> i64 {
577 headers
578 .get(header_name)
579 .and_then(|s| s.to_str().ok())
580 .and_then(|s| s.parse::<i64>().ok())
581 .unwrap_or_default()
582 };
583 let get_as_string = |header_name: &str| -> String {
584 headers
585 .get(header_name)
586 .and_then(|sc| sc.to_str().ok())
587 .map(|sc| sc.to_string())
588 .unwrap_or_default()
589 };
590 let highlights = ObjectHighlights {
591 generation,
592 metageneration: get_as_i64("x-goog-metageneration"),
593 size: get_as_i64("x-goog-stored-content-length"),
594 content_encoding: get_as_string("x-goog-stored-content-encoding"),
595 storage_class: get_as_string("x-goog-storage-class"),
596 content_type: get_as_string("content-type"),
597 content_language: get_as_string("content-language"),
598 content_disposition: get_as_string("content-disposition"),
599 etag: get_as_string("etag"),
600 checksums: headers.get("x-goog-hash").map(|_| {
601 crate::model::ObjectChecksums::new()
602 .set_or_clear_crc32c(headers_to_crc32c(headers))
603 .set_md5_hash(headers_to_md5_hash(headers))
604 }),
605 };
606
607 Ok(Self {
608 inner: Some(inner),
609 highlights,
610 response_checksums,
612 range,
614 generation,
615 builder,
616 resume_count: 0,
617 })
618 }
619}
620
621impl<C> ReadObjectResponse<C>
622where
623 C: ChecksumEngine + Clone + Send,
624{
625 pub fn object(&self) -> ObjectHighlights {
647 self.highlights.clone()
648 }
649
650 pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
670 match self.next_attempt().await {
671 None => None,
672 Some(Ok(b)) => Some(Ok(b)),
673 Some(Err(e)) => Box::pin(self.resume(e)).await,
676 }
677 }
678
679 async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
680 let inner = self.inner.as_mut()?;
681 let res = inner.chunk().await.map_err(Error::io);
682 match res {
683 Ok(Some(chunk)) => {
684 self.builder.checksum.update(self.range.start, &chunk);
685 let len = chunk.len() as u64;
686 if self.range.limit < len {
687 return Some(Err(Error::deser(ReadError::LongRead {
688 expected: self.range.limit,
689 got: len,
690 })));
691 }
692 self.range.limit -= len;
693 self.range.start += len;
694 Some(Ok(chunk))
695 }
696 Ok(None) => {
697 if self.range.limit != 0 {
698 return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
699 }
700 let computed = self.builder.checksum.finalize();
701 let res = validate(&self.response_checksums, &Some(computed));
702 match res {
703 Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
704 Ok(()) => None,
705 }
706 }
707 Err(e) => Some(Err(e)),
708 }
709 }
710
711 async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
712 use crate::download_resume_policy::{ResumeQuery, ResumeResult};
713
714 self.inner = None;
716 self.resume_count += 1;
717 let query = ResumeQuery::new(self.resume_count);
718 match self
719 .builder
720 .options
721 .download_resume_policy
722 .on_error(&query, error)
723 {
724 ResumeResult::Continue(_) => {}
725 ResumeResult::Permanent(e) => return Some(Err(e)),
726 ResumeResult::Exhausted(e) => return Some(Err(e)),
727 };
728 self.builder.request.read_offset = self.range.start as i64;
729 self.builder.request.read_limit = self.range.limit as i64;
730 self.builder.request.generation = self.generation;
731 self.inner = match self.builder.clone().download().await {
732 Ok(r) => Some(r),
733 Err(e) => return Some(Err(e)),
734 };
735 self.next().await
736 }
737
738 #[cfg(feature = "unstable-stream")]
739 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
740 pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
742 use futures::stream::unfold;
743 Box::pin(unfold(Some(self), move |state| async move {
744 if let Some(mut this) = state {
745 if let Some(chunk) = this.next().await {
746 return Some((chunk, Some(this)));
747 }
748 };
749 None
750 }))
751 }
752}
753
754#[derive(Clone, Debug, PartialEq)]
756#[non_exhaustive]
757pub struct ObjectHighlights {
758 pub generation: i64,
760
761 pub metageneration: i64,
766
767 pub size: i64,
771
772 pub content_encoding: String,
776
777 pub checksums: std::option::Option<crate::model::ObjectChecksums>,
782
783 pub storage_class: String,
785
786 pub content_language: String,
790
791 pub content_type: String,
797
798 pub content_disposition: String,
802
803 pub etag: String,
805}
806
807fn checksums_from_response(
819 full_content_requested: bool,
820 status: http::StatusCode,
821 headers: &http::HeaderMap,
822) -> ObjectChecksums {
823 let checksums = ObjectChecksums::new();
824 if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
825 return checksums;
826 }
827 let stored_encoding = headers
828 .get("x-goog-stored-content-encoding")
829 .and_then(|e| e.to_str().ok())
830 .map_or("", |e| e);
831 let content_encoding = headers
832 .get("content-encoding")
833 .and_then(|e| e.to_str().ok())
834 .map_or("", |e| e);
835 if stored_encoding == "gzip" && content_encoding != "gzip" {
836 return checksums;
837 }
838 checksums
839 .set_or_clear_crc32c(headers_to_crc32c(headers))
840 .set_md5_hash(headers_to_md5_hash(headers))
841}
842
843fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
844 match response.status() {
845 reqwest::StatusCode::OK => {
846 let header = required_header(response, "content-length")?;
847 let limit = header
848 .parse::<u64>()
849 .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
850 Ok(ReadRange { start: 0, limit })
851 }
852 reqwest::StatusCode::PARTIAL_CONTENT => {
853 let header = required_header(response, "content-range")?;
854 let header = header.strip_prefix("bytes ").ok_or_else(|| {
855 ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
856 })?;
857 let (range, _) = header.split_once('/').ok_or_else(|| {
858 ReadError::BadHeaderFormat("content-range", "missing / separator".into())
859 })?;
860 let (start, end) = range.split_once('-').ok_or_else(|| {
861 ReadError::BadHeaderFormat("content-range", "missing - separator".into())
862 })?;
863 let start = start
864 .parse::<u64>()
865 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
866 let end = end
867 .parse::<u64>()
868 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
869 let end = end + 1;
872 let limit = end
873 .checked_sub(start)
874 .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
875 Ok(ReadRange { start, limit })
876 }
877 s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
878 }
879}
880
881fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
882 let header = required_header(response, "x-goog-generation")?;
883 header
884 .parse::<i64>()
885 .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
886}
887
888fn required_header<'a>(
889 response: &'a reqwest::Response,
890 name: &'static str,
891) -> std::result::Result<&'a str, ReadError> {
892 let header = response
893 .headers()
894 .get(name)
895 .ok_or_else(|| ReadError::MissingHeader(name))?;
896 header
897 .to_str()
898 .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
899}
900
901#[derive(Debug, PartialEq)]
902struct ReadRange {
903 start: u64,
904 limit: u64,
905}
906
907#[cfg(test)]
908mod resume_tests;
909
910#[cfg(test)]
911mod tests {
912 use super::client::tests::{create_key_helper, test_builder, test_inner_client};
913 use super::*;
914 use crate::error::ChecksumMismatch;
915 use futures::TryStreamExt;
916 use httptest::{Expectation, Server, matchers::*, responders::status_code};
917 use std::collections::HashMap;
918 use std::error::Error;
919 use test_case::test_case;
920
921 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
922
923 #[tokio::test]
925 async fn test_read_is_send_and_static() -> Result {
926 let client = Storage::builder()
927 .with_credentials(auth::credentials::testing::test_credentials())
928 .build()
929 .await?;
930
931 fn need_send<T: Send>(_val: &T) {}
932 fn need_sync<T: Sync>(_val: &T) {}
933 fn need_static<T: 'static>(_val: &T) {}
934
935 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
936 need_send(&read);
937 need_sync(&read);
938 need_static(&read);
939
940 let read = client
941 .read_object("projects/_/buckets/test-bucket", "test-object")
942 .send();
943 need_send(&read);
944 need_static(&read);
945
946 Ok(())
947 }
948 #[tokio::test]
949 async fn read_object_normal() -> Result {
950 let server = Server::run();
951 server.expect(
952 Expectation::matching(all_of![
953 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
954 request::query(url_decoded(contains(("alt", "media")))),
955 ])
956 .respond_with(
957 status_code(200)
958 .body("hello world")
959 .append_header("x-goog-generation", 123456),
960 ),
961 );
962
963 let client = Storage::builder()
964 .with_endpoint(format!("http://{}", server.addr()))
965 .with_credentials(auth::credentials::testing::test_credentials())
966 .build()
967 .await?;
968 let mut reader = client
969 .read_object("projects/_/buckets/test-bucket", "test-object")
970 .send()
971 .await?;
972 let mut got = Vec::new();
973 while let Some(b) = reader.next().await.transpose()? {
974 got.extend_from_slice(&b);
975 }
976 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
977
978 Ok(())
979 }
980
981 #[tokio::test]
982 async fn read_object_metadata() -> Result {
983 const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
984 let server = Server::run();
985 server.expect(
986 Expectation::matching(all_of![
987 request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
988 request::query(url_decoded(contains(("alt", "media")))),
989 ])
990 .respond_with(
991 status_code(200)
992 .body(CONTENTS)
993 .append_header(
994 "x-goog-hash",
995 "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
996 )
997 .append_header("x-goog-generation", 500)
998 .append_header("x-goog-metageneration", "1")
999 .append_header("x-goog-stored-content-length", 30)
1000 .append_header("x-goog-stored-content-encoding", "identity")
1001 .append_header("x-goog-storage-class", "STANDARD")
1002 .append_header("content-language", "en")
1003 .append_header("content-type", "text/plain")
1004 .append_header("content-disposition", "inline")
1005 .append_header("etag", "etagval"),
1006 ),
1007 );
1008
1009 let endpoint = server.url("");
1010 let client = Storage::builder()
1011 .with_endpoint(endpoint.to_string())
1012 .with_credentials(auth::credentials::testing::test_credentials())
1013 .build()
1014 .await?;
1015 let reader = client
1016 .read_object("projects/_/buckets/test-bucket", "test-object")
1017 .send()
1018 .await?;
1019 let object = reader.object();
1020 assert_eq!(object.generation, 500);
1021 assert_eq!(object.metageneration, 1);
1022 assert_eq!(object.size, 30);
1023 assert_eq!(object.content_encoding, "identity");
1024 assert_eq!(
1025 object.checksums.as_ref().unwrap().crc32c.unwrap(),
1026 crc32c::crc32c(CONTENTS.as_bytes())
1027 );
1028 assert_eq!(
1029 object.checksums.as_ref().unwrap().md5_hash,
1030 base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
1031 );
1032
1033 Ok(())
1034 }
1035
1036 #[tokio::test]
1037 async fn read_object_stream() -> Result {
1038 let server = Server::run();
1039 server.expect(
1040 Expectation::matching(all_of![
1041 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1042 request::query(url_decoded(contains(("alt", "media")))),
1043 ])
1044 .respond_with(
1045 status_code(200)
1046 .append_header("x-goog-generation", 123456)
1047 .body("hello world"),
1048 ),
1049 );
1050
1051 let client = Storage::builder()
1052 .with_endpoint(format!("http://{}", server.addr()))
1053 .with_credentials(auth::credentials::testing::test_credentials())
1054 .build()
1055 .await?;
1056 let response = client
1057 .read_object("projects/_/buckets/test-bucket", "test-object")
1058 .send()
1059 .await?;
1060 let result: Vec<_> = response.into_stream().try_collect().await?;
1061 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
1062
1063 Ok(())
1064 }
1065
1066 #[tokio::test]
1067 async fn read_object_next_then_consume_response() -> Result {
1068 const BLOCK_SIZE: usize = 500;
1070 let mut contents = Vec::new();
1071 for i in 0..50 {
1072 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
1073 }
1074
1075 let u = crc32c::crc32c(&contents);
1077 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1078
1079 let server = Server::run();
1080 server.expect(
1081 Expectation::matching(all_of![
1082 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1083 request::query(url_decoded(contains(("alt", "media")))),
1084 ])
1085 .times(1)
1086 .respond_with(
1087 status_code(200)
1088 .body(contents.clone())
1089 .append_header("x-goog-hash", format!("crc32c={value}"))
1090 .append_header("x-goog-generation", 123456),
1091 ),
1092 );
1093
1094 let client = Storage::builder()
1095 .with_endpoint(format!("http://{}", server.addr()))
1096 .with_credentials(auth::credentials::testing::test_credentials())
1097 .build()
1098 .await?;
1099
1100 let mut response = client
1102 .read_object("projects/_/buckets/test-bucket", "test-object")
1103 .send()
1104 .await?;
1105
1106 let mut all_bytes = bytes::BytesMut::new();
1107 let chunk = response.next().await.transpose()?.unwrap();
1108 assert!(!chunk.is_empty());
1109 all_bytes.extend(chunk);
1110 use futures::StreamExt;
1111 let mut stream = response.into_stream();
1112 while let Some(chunk) = stream.next().await.transpose()? {
1113 all_bytes.extend(chunk);
1114 }
1115 assert_eq!(all_bytes, contents);
1116
1117 Ok(())
1118 }
1119
1120 #[tokio::test]
1121 async fn read_object_not_found() -> Result {
1122 let server = Server::run();
1123 server.expect(
1124 Expectation::matching(all_of![
1125 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1126 request::query(url_decoded(contains(("alt", "media")))),
1127 ])
1128 .respond_with(status_code(404).body("NOT FOUND")),
1129 );
1130
1131 let client = Storage::builder()
1132 .with_endpoint(format!("http://{}", server.addr()))
1133 .with_credentials(auth::credentials::testing::test_credentials())
1134 .build()
1135 .await?;
1136 let err = client
1137 .read_object("projects/_/buckets/test-bucket", "test-object")
1138 .send()
1139 .await
1140 .expect_err("expected a not found error");
1141 assert_eq!(err.http_status_code(), Some(404));
1142
1143 Ok(())
1144 }
1145
1146 #[tokio::test]
1147 async fn read_object_incorrect_crc32c_check() -> Result {
1148 let u = crc32c::crc32c("goodbye world".as_bytes());
1150 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1151
1152 let server = Server::run();
1153 server.expect(
1154 Expectation::matching(all_of![
1155 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1156 request::query(url_decoded(contains(("alt", "media")))),
1157 ])
1158 .times(3)
1159 .respond_with(
1160 status_code(200)
1161 .body("hello world")
1162 .append_header("x-goog-hash", format!("crc32c={value}"))
1163 .append_header("x-goog-generation", 123456),
1164 ),
1165 );
1166
1167 let client = Storage::builder()
1168 .with_endpoint(format!("http://{}", server.addr()))
1169 .with_credentials(auth::credentials::testing::test_credentials())
1170 .build()
1171 .await?;
1172 let mut response = client
1173 .read_object("projects/_/buckets/test-bucket", "test-object")
1174 .send()
1175 .await?;
1176 let mut partial = Vec::new();
1177 let mut err = None;
1178 while let Some(r) = response.next().await {
1179 match r {
1180 Ok(b) => partial.extend_from_slice(&b),
1181 Err(e) => err = Some(e),
1182 };
1183 }
1184 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1185 let err = err.expect("expect error on incorrect crc32c");
1186 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1187 assert!(
1188 matches!(
1189 source,
1190 Some(&ReadError::ChecksumMismatch(
1191 ChecksumMismatch::Crc32c { .. }
1192 ))
1193 ),
1194 "err={err:?}"
1195 );
1196
1197 let mut response = client
1198 .read_object("projects/_/buckets/test-bucket", "test-object")
1199 .send()
1200 .await?;
1201 let err: crate::Error = async {
1202 {
1203 while (response.next().await.transpose()?).is_some() {}
1204 Ok(())
1205 }
1206 }
1207 .await
1208 .expect_err("expect error on incorrect crc32c");
1209 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1210 assert!(
1211 matches!(
1212 source,
1213 Some(&ReadError::ChecksumMismatch(
1214 ChecksumMismatch::Crc32c { .. }
1215 ))
1216 ),
1217 "err={err:?}"
1218 );
1219
1220 use futures::TryStreamExt;
1221 let err = client
1222 .read_object("projects/_/buckets/test-bucket", "test-object")
1223 .send()
1224 .await?
1225 .into_stream()
1226 .try_collect::<Vec<bytes::Bytes>>()
1227 .await
1228 .expect_err("expect error on incorrect crc32c");
1229 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1230 assert!(
1231 matches!(
1232 source,
1233 Some(&ReadError::ChecksumMismatch(
1234 ChecksumMismatch::Crc32c { .. }
1235 ))
1236 ),
1237 "err={err:?}"
1238 );
1239 Ok(())
1240 }
1241
1242 #[tokio::test]
1243 async fn read_object_incorrect_md5_check() -> Result {
1244 let digest = md5::compute("goodbye world".as_bytes());
1246 let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1247
1248 let server = Server::run();
1249 server.expect(
1250 Expectation::matching(all_of![
1251 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1252 request::query(url_decoded(contains(("alt", "media")))),
1253 ])
1254 .times(1)
1255 .respond_with(
1256 status_code(200)
1257 .body("hello world")
1258 .append_header("x-goog-hash", format!("md5={value}"))
1259 .append_header("x-goog-generation", 123456),
1260 ),
1261 );
1262
1263 let client = Storage::builder()
1264 .with_endpoint(format!("http://{}", server.addr()))
1265 .with_credentials(auth::credentials::testing::test_credentials())
1266 .build()
1267 .await?;
1268 let mut response = client
1269 .read_object("projects/_/buckets/test-bucket", "test-object")
1270 .compute_md5()
1271 .send()
1272 .await?;
1273 let mut partial = Vec::new();
1274 let mut err = None;
1275 while let Some(r) = response.next().await {
1276 match r {
1277 Ok(b) => partial.extend_from_slice(&b),
1278 Err(e) => err = Some(e),
1279 };
1280 }
1281 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1282 let err = err.expect("expect error on incorrect md5");
1283 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1284 assert!(
1285 matches!(
1286 source,
1287 Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1288 ),
1289 "err={err:?}"
1290 );
1291
1292 Ok(())
1293 }
1294
1295 #[tokio::test]
1296 async fn read_object() -> Result {
1297 let inner = test_inner_client(test_builder());
1298 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1299 .http_request_builder()
1300 .await?
1301 .build()?;
1302
1303 assert_eq!(request.method(), reqwest::Method::GET);
1304 assert_eq!(
1305 request.url().as_str(),
1306 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1307 );
1308 Ok(())
1309 }
1310
1311 #[tokio::test]
1312 async fn read_object_error_credentials() -> Result {
1313 let inner = test_inner_client(
1314 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1315 );
1316 let _ = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1317 .http_request_builder()
1318 .await
1319 .inspect_err(|e| assert!(e.is_authentication()))
1320 .expect_err("invalid credentials should err");
1321 Ok(())
1322 }
1323
1324 #[tokio::test]
1325 async fn read_object_bad_bucket() -> Result {
1326 let inner = test_inner_client(test_builder());
1327 ReadObject::new(inner, "malformed", "object")
1328 .http_request_builder()
1329 .await
1330 .expect_err("malformed bucket string should error");
1331 Ok(())
1332 }
1333
1334 #[tokio::test]
1335 async fn read_object_query_params() -> Result {
1336 let inner = test_inner_client(test_builder());
1337 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1338 .with_generation(5)
1339 .with_if_generation_match(10)
1340 .with_if_generation_not_match(20)
1341 .with_if_metageneration_match(30)
1342 .with_if_metageneration_not_match(40)
1343 .http_request_builder()
1344 .await?
1345 .build()?;
1346
1347 assert_eq!(request.method(), reqwest::Method::GET);
1348 let want_pairs: HashMap<String, String> = [
1349 ("alt", "media"),
1350 ("generation", "5"),
1351 ("ifGenerationMatch", "10"),
1352 ("ifGenerationNotMatch", "20"),
1353 ("ifMetagenerationMatch", "30"),
1354 ("ifMetagenerationNotMatch", "40"),
1355 ]
1356 .iter()
1357 .map(|(k, v)| (k.to_string(), v.to_string()))
1358 .collect();
1359 let query_pairs: HashMap<String, String> = request
1360 .url()
1361 .query_pairs()
1362 .map(|param| (param.0.to_string(), param.1.to_string()))
1363 .collect();
1364 assert_eq!(query_pairs.len(), want_pairs.len());
1365 assert_eq!(query_pairs, want_pairs);
1366 Ok(())
1367 }
1368
1369 #[tokio::test]
1370 async fn read_object_headers() -> Result {
1371 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1373
1374 let inner = test_inner_client(test_builder());
1376 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1377 .with_key(KeyAes256::new(&key)?)
1378 .http_request_builder()
1379 .await?
1380 .build()?;
1381
1382 assert_eq!(request.method(), reqwest::Method::GET);
1383 assert_eq!(
1384 request.url().as_str(),
1385 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1386 );
1387
1388 let want = vec![
1389 ("x-goog-encryption-algorithm", "AES256".to_string()),
1390 ("x-goog-encryption-key", key_base64),
1391 ("x-goog-encryption-key-sha256", key_sha256_base64),
1392 ];
1393
1394 for (name, value) in want {
1395 assert_eq!(
1396 request.headers().get(name).unwrap().as_bytes(),
1397 bytes::Bytes::from(value)
1398 );
1399 }
1400 Ok(())
1401 }
1402
1403 #[test_case(0, 0, None; "no headers needed")]
1404 #[test_case(10, 0, Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1405 #[test_case(-2000, 0, Some(&http::HeaderValue::from_static("bytes=-2000-")); "negative offset")]
1406 #[test_case(0, 100, Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1407 #[test_case(1000, 100, Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1408 #[tokio::test]
1409 async fn range_header(offset: i64, limit: i64, want: Option<&http::HeaderValue>) -> Result {
1410 let inner = test_inner_client(test_builder());
1411 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1412 .with_read_offset(offset)
1413 .with_read_limit(limit)
1414 .http_request_builder()
1415 .await?
1416 .build()?;
1417
1418 assert_eq!(request.method(), reqwest::Method::GET);
1419 assert_eq!(
1420 request.url().as_str(),
1421 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1422 );
1423
1424 assert_eq!(request.headers().get("range"), want);
1425 Ok(())
1426 }
1427
1428 #[tokio::test]
1429 async fn range_header_negative_limit() -> Result {
1430 let inner = test_inner_client(test_builder());
1431 let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1432 .with_read_limit(-100)
1433 .http_request_builder()
1434 .await
1435 .unwrap_err();
1436
1437 assert!(
1438 matches!(
1439 err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
1440 RangeError::NegativeLimit
1441 ),
1442 "{err:?}"
1443 );
1444 Ok(())
1445 }
1446
1447 #[tokio::test]
1448 async fn range_header_negative_offset_with_limit() -> Result {
1449 let inner = test_inner_client(test_builder());
1450 let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1451 .with_read_offset(-100)
1452 .with_read_limit(100)
1453 .http_request_builder()
1454 .await
1455 .unwrap_err();
1456
1457 assert!(
1458 matches!(
1459 err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
1460 RangeError::NegativeOffsetWithLimit
1461 ),
1462 "{err:?}"
1463 );
1464 Ok(())
1465 }
1466
1467 #[test_case("projects/p", "projects%2Fp")]
1468 #[test_case("kebab-case", "kebab-case")]
1469 #[test_case("dot.name", "dot.name")]
1470 #[test_case("under_score", "under_score")]
1471 #[test_case("tilde~123", "tilde~123")]
1472 #[test_case("exclamation!point!", "exclamation%21point%21")]
1473 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1474 #[test_case("preserve%percent%21", "preserve%percent%21")]
1475 #[test_case(
1476 "testall !#$&'()*+,/:;=?@[]",
1477 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1478 )]
1479 #[tokio::test]
1480 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1481 let inner = test_inner_client(test_builder());
1482 let request = ReadObject::new(inner, "projects/_/buckets/bucket", name)
1483 .http_request_builder()
1484 .await?
1485 .build()?;
1486 let got = request.url().path_segments().unwrap().next_back().unwrap();
1487 assert_eq!(got, want);
1488 Ok(())
1489 }
1490
1491 #[test]
1492 fn document_crc32c_values() {
1493 let bytes = (1234567890_u32).to_be_bytes();
1494 let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1495 assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1496 }
1497
1498 #[test_case("", None; "no header")]
1499 #[test_case("crc32c=hello", None; "invalid value")]
1500 #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1501 #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1502 #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1503 #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1504 fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1505 let mut headers = http::HeaderMap::new();
1506 if !val.is_empty() {
1507 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1508 }
1509 let got = headers_to_crc32c(&headers);
1510 assert_eq!(got, want);
1511 Ok(())
1512 }
1513
1514 #[test_case("", None; "no header")]
1515 #[test_case("md5=invalid", None; "invalid value")]
1516 #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1517 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1518 #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1519 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1520 fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1521 let mut headers = http::HeaderMap::new();
1522 if !val.is_empty() {
1523 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1524 }
1525 let got = headers_to_md5_hash(&headers);
1526 match want {
1527 Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1528 None => assert!(got.is_empty()),
1529 }
1530 Ok(())
1531 }
1532
1533 #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1534 #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1535 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1536 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1537 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1538 fn test_checksums_validation_enabled(
1539 full_content_requested: bool,
1540 headers: Vec<(&str, &str)>,
1541 status: http::StatusCode,
1542 want_crc32c: Option<u32>,
1543 want_md5: &str,
1544 ) -> Result {
1545 let mut header_map = http::HeaderMap::new();
1546 for (key, value) in headers {
1547 header_map.insert(
1548 http::HeaderName::from_bytes(key.as_bytes())?,
1549 http::HeaderValue::from_bytes(value.as_bytes())?,
1550 );
1551 }
1552
1553 let got = checksums_from_response(full_content_requested, status, &header_map);
1554 assert_eq!(got.crc32c, want_crc32c);
1555 assert_eq!(
1556 got.md5_hash,
1557 base64::prelude::BASE64_STANDARD.decode(want_md5)?
1558 );
1559 Ok(())
1560 }
1561
1562 #[test_case(0)]
1563 #[test_case(1024)]
1564 fn response_range_success(limit: u64) -> Result {
1565 let response = http::Response::builder()
1566 .status(200)
1567 .header("content-length", limit)
1568 .body(Vec::new())?;
1569 let response = reqwest::Response::from(response);
1570 let range = response_range(&response)?;
1571 assert_eq!(range, ReadRange { start: 0, limit });
1572 Ok(())
1573 }
1574
1575 #[test]
1576 fn response_range_missing() -> Result {
1577 let response = http::Response::builder().status(200).body(Vec::new())?;
1578 let response = reqwest::Response::from(response);
1579 let err = response_range(&response).expect_err("missing header should result in an error");
1580 assert!(
1581 matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1582 "{err:?}"
1583 );
1584 Ok(())
1585 }
1586
1587 #[test_case("")]
1588 #[test_case("abc")]
1589 #[test_case("-123")]
1590 fn response_range_format(value: &'static str) -> Result {
1591 let response = http::Response::builder()
1592 .status(200)
1593 .header("content-length", value)
1594 .body(Vec::new())?;
1595 let response = reqwest::Response::from(response);
1596 let err = response_range(&response).expect_err("header value should result in an error");
1597 assert!(
1598 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1599 "{err:?}"
1600 );
1601 assert!(err.source().is_some(), "{err:?}");
1602 Ok(())
1603 }
1604
1605 #[test_case(0, 123)]
1606 #[test_case(123, 456)]
1607 fn response_range_partial_success(start: u64, end: u64) -> Result {
1608 let response = http::Response::builder()
1609 .status(206)
1610 .header(
1611 "content-range",
1612 format!("bytes {}-{}/{}", start, end, end + 1),
1613 )
1614 .body(Vec::new())?;
1615 let response = reqwest::Response::from(response);
1616 let range = response_range(&response)?;
1617 assert_eq!(
1618 range,
1619 ReadRange {
1620 start,
1621 limit: (end + 1 - start)
1622 }
1623 );
1624 Ok(())
1625 }
1626
1627 #[test]
1628 fn response_range_partial_missing() -> Result {
1629 let response = http::Response::builder().status(206).body(Vec::new())?;
1630 let response = reqwest::Response::from(response);
1631 let err = response_range(&response).expect_err("missing header should result in an error");
1632 assert!(
1633 matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1634 "{err:?}"
1635 );
1636 Ok(())
1637 }
1638
1639 #[test_case("")]
1640 #[test_case("123-456/457"; "bad prefix")]
1641 #[test_case("bytes 123-456 457"; "bad separator")]
1642 #[test_case("bytes 123+456/457"; "bad separator [2]")]
1643 #[test_case("bytes abc-456/457"; "start is not numbers")]
1644 #[test_case("bytes 123-cde/457"; "end is not numbers")]
1645 #[test_case("bytes 123-0/457"; "invalid range")]
1646 fn response_range_partial_format(value: &'static str) -> Result {
1647 let response = http::Response::builder()
1648 .status(206)
1649 .header("content-range", value)
1650 .body(Vec::new())?;
1651 let response = reqwest::Response::from(response);
1652 let err = response_range(&response).expect_err("header value should result in an error");
1653 assert!(
1654 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1655 "{err:?}"
1656 );
1657 assert!(err.source().is_some(), "{err:?}");
1658 Ok(())
1659 }
1660
1661 #[test]
1662 fn response_range_bad_response() -> Result {
1663 let code = reqwest::StatusCode::CREATED;
1664 let response = http::Response::builder().status(code).body(Vec::new())?;
1665 let response = reqwest::Response::from(response);
1666 let err = response_range(&response).expect_err("unexpected status creates error");
1667 assert!(
1668 matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1669 "{err:?}"
1670 );
1671 Ok(())
1672 }
1673
1674 #[test_case(0)]
1675 #[test_case(1024)]
1676 fn response_generation_success(value: i64) -> Result {
1677 let response = http::Response::builder()
1678 .status(200)
1679 .header("x-goog-generation", value)
1680 .body(Vec::new())?;
1681 let response = reqwest::Response::from(response);
1682 let got = response_generation(&response)?;
1683 assert_eq!(got, value);
1684 Ok(())
1685 }
1686
1687 #[test]
1688 fn response_generation_missing() -> Result {
1689 let response = http::Response::builder().status(200).body(Vec::new())?;
1690 let response = reqwest::Response::from(response);
1691 let err =
1692 response_generation(&response).expect_err("missing header should result in an error");
1693 assert!(
1694 matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1695 "{err:?}"
1696 );
1697 Ok(())
1698 }
1699
1700 #[test_case("")]
1701 #[test_case("abc")]
1702 fn response_generation_format(value: &'static str) -> Result {
1703 let response = http::Response::builder()
1704 .status(200)
1705 .header("x-goog-generation", value)
1706 .body(Vec::new())?;
1707 let response = reqwest::Response::from(response);
1708 let err =
1709 response_generation(&response).expect_err("header value should result in an error");
1710 assert!(
1711 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1712 "{err:?}"
1713 );
1714 assert!(err.source().is_some(), "{err:?}");
1715 Ok(())
1716 }
1717
1718 #[test]
1719 fn required_header_not_str() -> Result {
1720 let name = "x-goog-test";
1721 let response = http::Response::builder()
1722 .status(200)
1723 .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1724 .body(Vec::new())?;
1725 let response = reqwest::Response::from(response);
1726 let err =
1727 required_header(&response, name).expect_err("header value should result in an error");
1728 assert!(
1729 matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1730 "{err:?}"
1731 );
1732 assert!(err.source().is_some(), "{err:?}");
1733 Ok(())
1734 }
1735}