1mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::attempt_info::AttemptInfo;
27use gaxi::http::HttpRequestBuilder;
28use gaxi::http::reqwest::{HeaderValue, Method, Response};
29use google_cloud_gax::options::internal::{PathTemplate, RequestOptionsExt, ResourceName};
30
31#[derive(Debug)]
68pub struct ReadObject<S = crate::storage::transport::Storage>
69where
70 S: crate::storage::stub::Storage + 'static,
71{
72 stub: std::sync::Arc<S>,
73 request: crate::model::ReadObjectRequest,
74 options: RequestOptions,
75}
76
77impl<S> Clone for ReadObject<S>
78where
79 S: crate::storage::stub::Storage + 'static,
80{
81 fn clone(&self) -> Self {
82 Self {
83 stub: self.stub.clone(),
84 request: self.request.clone(),
85 options: self.options.clone(),
86 }
87 }
88}
89
90impl<S> ReadObject<S>
91where
92 S: crate::storage::stub::Storage + 'static,
93{
94 pub(crate) fn new<B, O>(
95 stub: std::sync::Arc<S>,
96 bucket: B,
97 object: O,
98 options: RequestOptions,
99 ) -> Self
100 where
101 B: Into<String>,
102 O: Into<String>,
103 {
104 ReadObject {
105 stub,
106 request: crate::model::ReadObjectRequest::new()
107 .set_bucket(bucket)
108 .set_object(object),
109 options,
110 }
111 }
112
113 pub fn compute_md5(self) -> Self {
142 let mut this = self;
143 this.options.checksum.md5_hash = Some(Md5::default());
144 this
145 }
146
147 pub fn compute_crc32c(mut self, enable: bool) -> Self {
172 if enable {
173 self.options
174 .checksum
175 .crc32c
176 .get_or_insert_with(Default::default);
177 } else {
178 self.options.checksum.crc32c = None;
179 }
180 self
181 }
182
183 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
186 self.request.generation = v.into();
187 self
188 }
189
190 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
194 where
195 T: Into<i64>,
196 {
197 self.request.if_generation_match = Some(v.into());
198 self
199 }
200
201 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
206 where
207 T: Into<i64>,
208 {
209 self.request.if_generation_not_match = Some(v.into());
210 self
211 }
212
213 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
216 where
217 T: Into<i64>,
218 {
219 self.request.if_metageneration_match = Some(v.into());
220 self
221 }
222
223 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
226 where
227 T: Into<i64>,
228 {
229 self.request.if_metageneration_not_match = Some(v.into());
230 self
231 }
232
233 pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
284 self.request.with_range(range);
285 self
286 }
287
288 pub fn set_key(mut self, v: KeyAes256) -> Self {
305 self.request.common_object_request_params = Some(v.into());
306 self
307 }
308
309 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
331 mut self,
332 v: V,
333 ) -> Self {
334 self.options.retry_policy = v.into().into();
335 self
336 }
337
338 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
355 mut self,
356 v: V,
357 ) -> Self {
358 self.options.backoff_policy = v.into().into();
359 self
360 }
361
362 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
385 mut self,
386 v: V,
387 ) -> Self {
388 self.options.retry_throttler = v.into().into();
389 self
390 }
391
392 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
412 where
413 V: ReadResumePolicy + 'static,
414 {
415 self.options.set_read_resume_policy(std::sync::Arc::new(v));
416 self
417 }
418
419 pub fn with_automatic_decompression(mut self, v: bool) -> Self {
442 self.options.automatic_decompression = v;
443 self
444 }
445
446 pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
461 self.options.user_agent = Some(user_agent.into());
462 self
463 }
464
465 pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
485 self.options.set_quota_project(project);
486 self
487 }
488
489 pub async fn send(self) -> Result<ReadObjectResponse> {
491 self.stub.read_object(self.request, self.options).await
492 }
493}
494
495#[derive(Clone, Debug)]
497pub(crate) struct Reader {
498 pub inner: std::sync::Arc<StorageInner>,
499 pub request: crate::model::ReadObjectRequest,
500 pub options: RequestOptions,
501}
502
503impl Reader {
504 async fn read(self) -> Result<Response> {
505 let throttler = self.options.retry_throttler.clone();
506 let retry = self.options.retry_policy.clone();
507 let backoff = self.options.backoff_policy.clone();
508 let mut count = 0;
509 let inner = async move |_| {
510 let current = count;
511 count += 1;
512 self.read_attempt(current).await
513 };
514
515 google_cloud_gax::retry_loop_internal::retry_loop(
516 inner,
517 async |duration| tokio::time::sleep(duration).await,
518 true,
519 throttler,
520 retry,
521 backoff,
522 )
523 .await
524 }
525
526 async fn read_attempt(&self, attempt_count: u32) -> Result<Response> {
527 let builder = self.http_request_builder().await?;
528 let options = self
529 .options
530 .gax()
531 .insert_extension(PathTemplate("/storage/v1/b/{bucket}/o/{object}"))
532 .insert_extension(ResourceName(format!(
533 "//storage.googleapis.com/{}",
534 self.request.bucket
535 )));
536 let response = builder
537 .send(options, AttemptInfo::new(attempt_count))
538 .await?;
539 if !response.status().is_success() {
540 return gaxi::http::to_http_error(response).await;
541 }
542 Ok(response)
543 }
544
545 async fn http_request_builder(&self) -> Result<HttpRequestBuilder> {
546 let bucket = &self.request.bucket;
548 let bucket_id = bucket
549 .as_str()
550 .strip_prefix("projects/_/buckets/")
551 .ok_or_else(|| {
552 Error::binding(format!(
553 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
554 ))
555 })?;
556 let object = &self.request.object;
557
558 let builder = self
560 .inner
561 .client
562 .http_builder(
563 Method::GET,
564 &format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
565 )
566 .query("alt", "media")
567 .header(
568 "x-goog-api-client",
569 HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
570 );
571
572 let builder = if self.options.automatic_decompression {
573 builder
574 } else {
575 builder.header("accept-encoding", HeaderValue::from_static("gzip"))
581 };
582
583 let builder = if self.request.generation != 0 {
585 builder.query("generation", self.request.generation)
586 } else {
587 builder
588 };
589 let builder = self
590 .request
591 .if_generation_match
592 .iter()
593 .fold(builder, |b, v| b.query("ifGenerationMatch", v));
594 let builder = self
595 .request
596 .if_generation_not_match
597 .iter()
598 .fold(builder, |b, v| b.query("ifGenerationNotMatch", v));
599 let builder = self
600 .request
601 .if_metageneration_match
602 .iter()
603 .fold(builder, |b, v| b.query("ifMetagenerationMatch", v));
604 let builder = self
605 .request
606 .if_metageneration_not_match
607 .iter()
608 .fold(builder, |b, v| b.query("ifMetagenerationNotMatch", v));
609
610 let builder = apply_customer_supplied_encryption_headers(
611 builder,
612 &self.request.common_object_request_params,
613 );
614
615 let builder = match (self.request.read_offset, self.request.read_limit) {
617 (_, l) if l < 0 => {
619 unreachable!("ReadObject build never sets a negative read_limit value")
620 }
621 (o, l) if o < 0 && l > 0 => unreachable!(
623 "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
624 ),
625 (0, 0) => builder,
627 (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
629 (o, 0) => builder.header("range", format!("bytes={o}-")),
632 (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
635 };
636
637 Ok(builder)
638 }
639
640 fn is_gunzipped(response: &Response) -> bool {
641 const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
654 use http::header::WARNING;
655 if response
656 .headers()
657 .get(TRANSFORMATION)
658 .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
659 {
660 return true;
661 }
662 response
663 .headers()
664 .get(WARNING)
665 .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
666 }
667
668 pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
669 let response = self.clone().read().await?;
670 if Self::is_gunzipped(&response) {
671 return Ok(ReadObjectResponse::new(Box::new(
672 non_resumable::NonResumableResponse::new(response)?,
673 )));
674 }
675 Ok(ReadObjectResponse::new(Box::new(
676 resumable::ResumableResponse::new(self, response)?,
677 )))
678 }
679}
680
681#[cfg(test)]
682mod resume_tests;
683
684#[cfg(test)]
685mod tests {
686 use super::client::tests::{test_builder, test_inner_client};
687 use super::*;
688 use crate::error::{ChecksumMismatch, ReadError};
689 use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
690 use base64::Engine;
691 use futures::TryStreamExt;
692 use google_cloud_auth::credentials::{
693 CacheableResource, Credentials, EntityTag, anonymous::Builder as Anonymous,
694 testing::error_credentials,
695 };
696 use httptest::{Expectation, Server, all_of, cycle, matchers::*, responders::status_code};
697 use std::collections::HashMap;
698 use std::error::Error;
699 use std::sync::Arc;
700 use test_case::test_case;
701
702 type Result = anyhow::Result<()>;
703
704 mockall::mock! {
705 #[derive(Debug)]
706 Credentials {}
707 impl google_cloud_auth::credentials::CredentialsProvider for Credentials {
708 async fn headers(
709 &self,
710 extensions: http::Extensions,
711 ) -> std::result::Result<
712 google_cloud_auth::credentials::CacheableResource<http::HeaderMap>,
713 google_cloud_gax::error::CredentialsError,
714 >;
715 async fn universe_domain(&self) -> Option<String>;
716 }
717 }
718
719 async fn http_request_builder(
720 inner: Arc<StorageInner>,
721 builder: ReadObject,
722 ) -> crate::Result<HttpRequestBuilder> {
723 let reader = Reader {
724 inner,
725 request: builder.request,
726 options: builder.options,
727 };
728 reader.http_request_builder().await
729 }
730
731 #[tokio::test]
732 async fn test_clone() {
733 let inner = test_inner_client(test_builder()).await;
734 let stub = crate::storage::transport::Storage::new_test(inner.clone());
735 let options = {
736 let mut o = RequestOptions::new();
737 o.set_resumable_upload_threshold(12345_usize);
738 o
739 };
740 let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
741
742 let clone = builder.clone();
743 assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
744 assert_eq!(clone.request, builder.request);
745 assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
746 }
747
748 #[tokio::test]
750 async fn test_read_is_send_and_static() -> Result {
751 let client = Storage::builder()
752 .with_credentials(Anonymous::new().build())
753 .build()
754 .await?;
755
756 fn need_send<T: Send>(_val: &T) {}
757 fn need_sync<T: Sync>(_val: &T) {}
758 fn need_static<T: 'static>(_val: &T) {}
759
760 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
761 need_send(&read);
762 need_sync(&read);
763 need_static(&read);
764
765 let read = client
766 .read_object("projects/_/buckets/test-bucket", "test-object")
767 .send();
768 need_send(&read);
769 need_static(&read);
770
771 Ok(())
772 }
773
774 #[tokio::test]
775 async fn read_object_normal() -> Result {
776 let server = Server::run();
777 server.expect(
778 Expectation::matching(all_of![
779 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
780 request::headers(contains(("accept-encoding", "gzip"))),
781 request::query(url_decoded(contains(("alt", "media")))),
782 ])
783 .respond_with(
784 status_code(200)
785 .body("hello world")
786 .append_header("x-goog-generation", 123456),
787 ),
788 );
789
790 let client = Storage::builder()
791 .with_endpoint(format!("http://{}", server.addr()))
792 .with_credentials(Anonymous::new().build())
793 .build()
794 .await?;
795 let mut reader = client
796 .read_object("projects/_/buckets/test-bucket", "test-object")
797 .send()
798 .await?;
799 let mut got = Vec::new();
800 while let Some(b) = reader.next().await.transpose()? {
801 got.extend_from_slice(&b);
802 }
803 assert_eq!(got, b"hello world");
804
805 Ok(())
806 }
807
808 #[tokio::test]
809 async fn read_object_stream() -> Result {
810 let server = Server::run();
811 server.expect(
812 Expectation::matching(all_of![
813 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
814 request::query(url_decoded(contains(("alt", "media")))),
815 ])
816 .respond_with(
817 status_code(200)
818 .append_header("x-goog-generation", 123456)
819 .body("hello world"),
820 ),
821 );
822
823 let client = Storage::builder()
824 .with_endpoint(format!("http://{}", server.addr()))
825 .with_credentials(Anonymous::new().build())
826 .build()
827 .await?;
828 let response = client
829 .read_object("projects/_/buckets/test-bucket", "test-object")
830 .send()
831 .await?;
832 let result: Vec<_> = response.into_stream().try_collect().await?;
833 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
834
835 Ok(())
836 }
837
838 #[tokio::test]
839 async fn read_object_next_then_consume_response() -> Result {
840 const BLOCK_SIZE: usize = 500;
842 let mut contents = Vec::new();
843 for i in 0..50 {
844 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
845 }
846
847 let u = crc32c::crc32c(&contents);
849 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
850
851 let server = Server::run();
852 server.expect(
853 Expectation::matching(all_of![
854 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
855 request::query(url_decoded(contains(("alt", "media")))),
856 ])
857 .times(1)
858 .respond_with(
859 status_code(200)
860 .body(contents.clone())
861 .append_header("x-goog-hash", format!("crc32c={value}"))
862 .append_header("x-goog-generation", 123456),
863 ),
864 );
865
866 let client = Storage::builder()
867 .with_endpoint(format!("http://{}", server.addr()))
868 .with_credentials(Anonymous::new().build())
869 .build()
870 .await?;
871
872 let mut response = client
874 .read_object("projects/_/buckets/test-bucket", "test-object")
875 .send()
876 .await?;
877
878 let mut all_bytes = bytes::BytesMut::new();
879 let chunk = response.next().await.transpose()?.unwrap();
880 assert!(!chunk.is_empty());
881 all_bytes.extend(chunk);
882 use futures::StreamExt;
883 let mut stream = response.into_stream();
884 while let Some(chunk) = stream.next().await.transpose()? {
885 all_bytes.extend(chunk);
886 }
887 assert_eq!(all_bytes, contents);
888
889 Ok(())
890 }
891
892 #[tokio::test]
893 async fn read_object_not_found() -> Result {
894 let server = Server::run();
895 server.expect(
896 Expectation::matching(all_of![
897 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
898 request::query(url_decoded(contains(("alt", "media")))),
899 ])
900 .respond_with(status_code(404).body("NOT FOUND")),
901 );
902
903 let client = Storage::builder()
904 .with_endpoint(format!("http://{}", server.addr()))
905 .with_credentials(Anonymous::new().build())
906 .build()
907 .await?;
908 let err = client
909 .read_object("projects/_/buckets/test-bucket", "test-object")
910 .send()
911 .await
912 .expect_err("expected a not found error");
913 assert_eq!(err.http_status_code(), Some(404));
914
915 Ok(())
916 }
917
918 #[tokio::test]
919 async fn read_object_incorrect_crc32c_check() -> Result {
920 let u = crc32c::crc32c("goodbye world".as_bytes());
922 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
923
924 let server = Server::run();
925 server.expect(
926 Expectation::matching(all_of![
927 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
928 request::query(url_decoded(contains(("alt", "media")))),
929 ])
930 .times(3)
931 .respond_with(
932 status_code(200)
933 .body("hello world")
934 .append_header("x-goog-hash", format!("crc32c={value}"))
935 .append_header("x-goog-generation", 123456),
936 ),
937 );
938
939 let client = Storage::builder()
940 .with_endpoint(format!("http://{}", server.addr()))
941 .with_credentials(Anonymous::new().build())
942 .build()
943 .await?;
944 let mut response = client
945 .read_object("projects/_/buckets/test-bucket", "test-object")
946 .send()
947 .await?;
948 let mut partial = Vec::new();
949 let mut err = None;
950 while let Some(r) = response.next().await {
951 match r {
952 Ok(b) => partial.extend_from_slice(&b),
953 Err(e) => err = Some(e),
954 };
955 }
956 assert_eq!(partial, b"hello world");
957 let err = err.expect("expect error on incorrect crc32c");
958 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
959 assert!(
960 matches!(
961 source,
962 Some(&ReadError::ChecksumMismatch(
963 ChecksumMismatch::Crc32c { .. }
964 ))
965 ),
966 "err={err:?}"
967 );
968
969 let mut response = client
970 .read_object("projects/_/buckets/test-bucket", "test-object")
971 .send()
972 .await?;
973 let err: crate::Error = async {
974 {
975 while (response.next().await.transpose()?).is_some() {}
976 Ok(())
977 }
978 }
979 .await
980 .expect_err("expect error on incorrect crc32c");
981 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
982 assert!(
983 matches!(
984 source,
985 Some(&ReadError::ChecksumMismatch(
986 ChecksumMismatch::Crc32c { .. }
987 ))
988 ),
989 "err={err:?}"
990 );
991
992 use futures::TryStreamExt;
993 let err = client
994 .read_object("projects/_/buckets/test-bucket", "test-object")
995 .send()
996 .await?
997 .into_stream()
998 .try_collect::<Vec<bytes::Bytes>>()
999 .await
1000 .expect_err("expect error on incorrect crc32c");
1001 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1002 assert!(
1003 matches!(
1004 source,
1005 Some(&ReadError::ChecksumMismatch(
1006 ChecksumMismatch::Crc32c { .. }
1007 ))
1008 ),
1009 "err={err:?}"
1010 );
1011 Ok(())
1012 }
1013
1014 #[tokio::test]
1015 async fn read_object_disable_crc32c() -> Result {
1016 let inner = test_inner_client(test_builder()).await;
1017 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1018
1019 let builder = ReadObject::new(
1020 stub,
1021 "projects/_/buckets/bucket",
1022 "object",
1023 inner.options.clone(),
1024 )
1025 .compute_crc32c(false);
1026
1027 assert!(builder.options.checksum.crc32c.is_none());
1028 Ok(())
1029 }
1030
1031 #[tokio::test]
1032 async fn read_object_enable_crc32c() -> Result {
1033 let inner = test_inner_client(test_builder()).await;
1034 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1035
1036 let builder = ReadObject::new(
1037 stub,
1038 "projects/_/buckets/bucket",
1039 "object",
1040 inner.options.clone(),
1041 )
1042 .compute_crc32c(false) .compute_crc32c(true);
1044
1045 assert!(builder.options.checksum.crc32c.is_some());
1046 Ok(())
1047 }
1048
1049 #[tokio::test]
1050 async fn read_object_disable_crc32c_ignores_mismatch() -> Result {
1051 let server = Server::run();
1052 let u = crc32c::crc32c("goodbye world".as_bytes());
1054 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1055 server.expect(
1056 Expectation::matching(all_of![
1057 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1058 request::query(url_decoded(contains(("alt", "media")))),
1059 ])
1060 .respond_with(
1061 status_code(200)
1062 .body("hello world")
1063 .append_header("x-goog-hash", format!("crc32c={value}"))
1064 .append_header("x-goog-generation", 123456),
1065 ),
1066 );
1067
1068 let client = Storage::builder()
1069 .with_endpoint(format!("http://{}", server.addr()))
1070 .with_credentials(Anonymous::new().build())
1071 .build()
1072 .await?;
1073 let mut response = client
1074 .read_object("projects/_/buckets/test-bucket", "test-object")
1075 .compute_crc32c(false)
1076 .send()
1077 .await?;
1078 let mut got = Vec::new();
1079 while let Some(b) = response.next().await.transpose()? {
1080 got.extend_from_slice(&b);
1081 }
1082
1083 assert_eq!(got, b"hello world");
1084 Ok(())
1085 }
1086
1087 #[tokio::test]
1088 async fn read_object_incorrect_md5_check() -> Result {
1089 let digest = md5::compute("goodbye world".as_bytes());
1091 let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1092
1093 let server = Server::run();
1094 server.expect(
1095 Expectation::matching(all_of![
1096 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1097 request::query(url_decoded(contains(("alt", "media")))),
1098 ])
1099 .times(1)
1100 .respond_with(
1101 status_code(200)
1102 .body("hello world")
1103 .append_header("x-goog-hash", format!("md5={value}"))
1104 .append_header("x-goog-generation", 123456),
1105 ),
1106 );
1107
1108 let client = Storage::builder()
1109 .with_endpoint(format!("http://{}", server.addr()))
1110 .with_credentials(Anonymous::new().build())
1111 .build()
1112 .await?;
1113 let mut response = client
1114 .read_object("projects/_/buckets/test-bucket", "test-object")
1115 .compute_md5()
1116 .send()
1117 .await?;
1118 let mut partial = Vec::new();
1119 let mut err = None;
1120 while let Some(r) = response.next().await {
1121 match r {
1122 Ok(b) => partial.extend_from_slice(&b),
1123 Err(e) => err = Some(e),
1124 };
1125 }
1126 assert_eq!(partial, b"hello world");
1127 let err = err.expect("expect error on incorrect md5");
1128 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1129 assert!(
1130 matches!(
1131 source,
1132 Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1133 ),
1134 "err={err:?}"
1135 );
1136
1137 Ok(())
1138 }
1139
1140 #[tokio::test]
1141 async fn read_object_with_user_agent() -> Result {
1142 use http::header::USER_AGENT;
1143
1144 let user_agent = "quick_fox_lazy_dog/1.2.3";
1145 let server = Server::run();
1146 server.expect(
1147 Expectation::matching(all_of![
1148 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1149 request::headers(contains(("accept-encoding", "gzip"))),
1150 request::headers(contains((USER_AGENT.as_str(), user_agent))),
1151 request::query(url_decoded(contains(("alt", "media")))),
1152 ])
1153 .respond_with(
1154 status_code(200)
1155 .body("hello world")
1156 .append_header("x-goog-generation", 123456),
1157 ),
1158 );
1159
1160 let client = Storage::builder()
1161 .with_endpoint(format!("http://{}", server.addr()))
1162 .with_credentials(Anonymous::new().build())
1163 .build()
1164 .await?;
1165 let mut reader = client
1166 .read_object("projects/_/buckets/test-bucket", "test-object")
1167 .with_user_agent(user_agent)
1168 .send()
1169 .await?;
1170 let mut got = Vec::new();
1171 while let Some(b) = reader.next().await.transpose()? {
1172 got.extend_from_slice(&b);
1173 }
1174 assert_eq!(got, b"hello world");
1175
1176 Ok(())
1177 }
1178
1179 #[tokio::test]
1180 async fn read_object_with_quota_project() -> Result {
1181 const PROJECT_NAME: &str = "project_lazy_dog";
1182 let server = Server::run();
1183 server.expect(
1184 Expectation::matching(all_of![
1185 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1186 request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1187 ])
1188 .respond_with(
1189 status_code(200)
1190 .body("hello world")
1191 .append_header("x-goog-generation", 123456),
1192 ),
1193 );
1194
1195 let client = Storage::builder()
1196 .with_endpoint(format!("http://{}", server.addr()))
1197 .with_credentials(Anonymous::new().build())
1198 .build()
1199 .await?;
1200 let mut reader = client
1201 .read_object("projects/_/buckets/test-bucket", "test-object")
1202 .with_quota_project(PROJECT_NAME)
1203 .send()
1204 .await?;
1205 let mut got = Vec::new();
1206 while let Some(b) = reader.next().await.transpose()? {
1207 got.extend_from_slice(&b);
1208 }
1209 assert_eq!(got, b"hello world");
1210
1211 Ok(())
1212 }
1213
1214 #[tokio::test]
1215 async fn read_object_strips_credential_quota_project() -> Result {
1216 const PROJECT_NAME: &str = "project_lazy_dog";
1217 const CRED_QUOTA_PROJECT: &str = "cred_quota_project";
1218 let server = Server::run();
1219 server.expect(
1220 Expectation::matching(all_of![
1221 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1222 request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1223 request::headers(not(contains(("x-goog-user-project", CRED_QUOTA_PROJECT)))),
1224 ])
1225 .times(1)
1226 .respond_with(
1227 status_code(200)
1228 .body("hello world")
1229 .append_header("x-goog-generation", 123456),
1230 ),
1231 );
1232
1233 let mut mock = MockCredentials::new();
1234 mock.expect_headers().returning(|_exts: http::Extensions| {
1235 let mut map = http::HeaderMap::new();
1236 map.insert(
1237 http::header::AUTHORIZATION,
1238 http::HeaderValue::from_static("Bearer test-token"),
1239 );
1240 map.insert(
1241 "x-goog-user-project",
1242 http::HeaderValue::from_static(CRED_QUOTA_PROJECT),
1243 );
1244 Ok(CacheableResource::New {
1245 data: map,
1246 entity_tag: EntityTag::default(),
1247 })
1248 });
1249 mock.expect_universe_domain().returning(|| None);
1250
1251 let client = Storage::builder()
1252 .with_endpoint(format!("http://{}", server.addr()))
1253 .with_credentials(Credentials::from(mock))
1254 .build()
1255 .await?;
1256 let mut reader = client
1257 .read_object("projects/_/buckets/test-bucket", "test-object")
1258 .with_quota_project(PROJECT_NAME)
1259 .send()
1260 .await?;
1261 let mut got = Vec::new();
1262 while let Some(b) = reader.next().await.transpose()? {
1263 got.extend_from_slice(&b);
1264 }
1265 assert_eq!(got, b"hello world");
1266
1267 Ok(())
1268 }
1269
1270 #[tokio::test]
1271 async fn read_object_retry_preserves_quota_project() -> Result {
1272 const PROJECT_NAME: &str = "project_lazy_dog";
1273 let server = Server::run();
1274 server.expect(
1275 Expectation::matching(all_of![
1276 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1277 request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1278 ])
1279 .times(2)
1280 .respond_with(cycle![
1281 status_code(503),
1282 status_code(200)
1283 .body("hello")
1284 .append_header("x-goog-generation", 1)
1285 ]),
1286 );
1287
1288 let client = Storage::builder()
1289 .with_endpoint(format!("http://{}", server.addr()))
1290 .with_credentials(Anonymous::new().build())
1291 .build()
1292 .await?;
1293 let mut reader = client
1294 .read_object("projects/_/buckets/test-bucket", "test-object")
1295 .with_quota_project(PROJECT_NAME)
1296 .send()
1297 .await?;
1298 let mut got = Vec::new();
1299 while let Some(b) = reader.next().await.transpose()? {
1300 got.extend_from_slice(&b);
1301 }
1302 assert_eq!(got, b"hello");
1303
1304 Ok(())
1305 }
1306
1307 #[tokio::test]
1308 async fn read_object() -> Result {
1309 let inner = test_inner_client(test_builder()).await;
1310 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1311 let builder = ReadObject::new(
1312 stub,
1313 "projects/_/buckets/bucket",
1314 "object",
1315 inner.options.clone(),
1316 );
1317 let request = http_request_builder(inner, builder)
1318 .await?
1319 .build_for_tests()
1320 .await?;
1321
1322 assert_eq!(request.method(), Method::GET);
1323 assert_eq!(
1324 request.url().as_str(),
1325 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1326 );
1327 Ok(())
1328 }
1329
1330 #[tokio::test]
1331 async fn read_object_error_credentials() -> Result {
1332 let inner =
1333 test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
1334 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1335 let builder = ReadObject::new(
1336 stub,
1337 "projects/_/buckets/bucket",
1338 "object",
1339 inner.options.clone(),
1340 );
1341 let _ = http_request_builder(inner, builder)
1342 .await?
1343 .build_for_tests()
1344 .await
1345 .inspect_err(|e| assert!(e.is_authentication()))
1346 .expect_err("invalid credentials should err");
1347 Ok(())
1348 }
1349
1350 #[tokio::test]
1351 async fn read_object_bad_bucket() -> Result {
1352 let inner = test_inner_client(test_builder()).await;
1353 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1354 let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1355 let _ = http_request_builder(inner, builder)
1356 .await
1357 .expect_err("malformed bucket string should error");
1358 Ok(())
1359 }
1360
1361 #[tokio::test]
1362 async fn read_object_query_params() -> Result {
1363 let inner = test_inner_client(test_builder()).await;
1364 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1365 let builder = ReadObject::new(
1366 stub,
1367 "projects/_/buckets/bucket",
1368 "object",
1369 inner.options.clone(),
1370 )
1371 .set_generation(5)
1372 .set_if_generation_match(10)
1373 .set_if_generation_not_match(20)
1374 .set_if_metageneration_match(30)
1375 .set_if_metageneration_not_match(40);
1376 let request = http_request_builder(inner, builder)
1377 .await?
1378 .build_for_tests()
1379 .await?;
1380
1381 assert_eq!(request.method(), Method::GET);
1382 let want_pairs: HashMap<String, String> = [
1383 ("alt", "media"),
1384 ("generation", "5"),
1385 ("ifGenerationMatch", "10"),
1386 ("ifGenerationNotMatch", "20"),
1387 ("ifMetagenerationMatch", "30"),
1388 ("ifMetagenerationNotMatch", "40"),
1389 ]
1390 .iter()
1391 .map(|(k, v)| (k.to_string(), v.to_string()))
1392 .collect();
1393 let query_pairs: HashMap<String, String> = request
1394 .url()
1395 .query_pairs()
1396 .map(|param| (param.0.to_string(), param.1.to_string()))
1397 .collect();
1398 assert_eq!(query_pairs.len(), want_pairs.len());
1399 assert_eq!(query_pairs, want_pairs);
1400 Ok(())
1401 }
1402
1403 #[tokio::test]
1404 async fn read_object_default_headers() -> Result {
1405 let inner = test_inner_client(test_builder()).await;
1407 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1408 let builder = ReadObject::new(
1409 stub,
1410 "projects/_/buckets/bucket",
1411 "object",
1412 inner.options.clone(),
1413 );
1414 let request = http_request_builder(inner, builder)
1415 .await?
1416 .build_for_tests()
1417 .await?;
1418
1419 assert_eq!(request.method(), Method::GET);
1420 assert_eq!(
1421 request.url().as_str(),
1422 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1423 );
1424
1425 let want = [("accept-encoding", "gzip")];
1426 let headers = request.headers();
1427 for (name, value) in want {
1428 assert_eq!(
1429 headers.get(name).and_then(|h| h.to_str().ok()),
1430 Some(value),
1431 "{request:?}"
1432 );
1433 }
1434 Ok(())
1435 }
1436
1437 #[tokio::test]
1438 async fn read_object_automatic_decompression_headers() -> Result {
1439 let inner = test_inner_client(test_builder()).await;
1441 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1442 let builder = ReadObject::new(
1443 stub,
1444 "projects/_/buckets/bucket",
1445 "object",
1446 inner.options.clone(),
1447 )
1448 .with_automatic_decompression(true);
1449 let request = http_request_builder(inner, builder)
1450 .await?
1451 .build_for_tests()
1452 .await?;
1453
1454 assert_eq!(request.method(), Method::GET);
1455 assert_eq!(
1456 request.url().as_str(),
1457 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1458 );
1459
1460 let headers = request.headers();
1461 assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1462 Ok(())
1463 }
1464
1465 #[tokio::test]
1466 async fn read_object_encryption_headers() -> Result {
1467 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1469
1470 let inner = test_inner_client(test_builder()).await;
1472 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1473 let builder = ReadObject::new(
1474 stub,
1475 "projects/_/buckets/bucket",
1476 "object",
1477 inner.options.clone(),
1478 )
1479 .set_key(KeyAes256::new(&key)?);
1480 let request = http_request_builder(inner, builder)
1481 .await?
1482 .build_for_tests()
1483 .await?;
1484
1485 assert_eq!(request.method(), Method::GET);
1486 assert_eq!(
1487 request.url().as_str(),
1488 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1489 );
1490
1491 let want = [
1492 ("x-goog-encryption-algorithm", "AES256".to_string()),
1493 ("x-goog-encryption-key", key_base64),
1494 ("x-goog-encryption-key-sha256", key_sha256_base64),
1495 ];
1496
1497 let headers = request.headers();
1498 for (name, value) in want {
1499 assert_eq!(
1500 headers.get(name).and_then(|h| h.to_str().ok()),
1501 Some(value.as_str())
1502 );
1503 }
1504 Ok(())
1505 }
1506
1507 #[test_case(ReadRange::all(), None; "no headers needed")]
1508 #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1509 #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1510 #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1511 #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1512 #[tokio::test]
1513 async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1514 let inner = test_inner_client(test_builder()).await;
1515 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1516 let builder = ReadObject::new(
1517 stub,
1518 "projects/_/buckets/bucket",
1519 "object",
1520 inner.options.clone(),
1521 )
1522 .set_read_range(input.clone());
1523 let request = http_request_builder(inner, builder)
1524 .await?
1525 .build_for_tests()
1526 .await?;
1527
1528 assert_eq!(request.method(), Method::GET);
1529 assert_eq!(
1530 request.url().as_str(),
1531 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1532 );
1533
1534 assert_eq!(request.headers().get("range"), want);
1535 Ok(())
1536 }
1537
1538 #[test_case("projects/p", "projects%2Fp")]
1539 #[test_case("kebab-case", "kebab-case")]
1540 #[test_case("dot.name", "dot.name")]
1541 #[test_case("under_score", "under_score")]
1542 #[test_case("tilde~123", "tilde~123")]
1543 #[test_case("exclamation!point!", "exclamation%21point%21")]
1544 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1545 #[test_case("preserve%percent%21", "preserve%percent%21")]
1546 #[test_case(
1547 "testall !#$&'()*+,/:;=?@[]",
1548 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1549 )]
1550 #[tokio::test]
1551 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1552 let inner = test_inner_client(test_builder()).await;
1553 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1554 let builder = ReadObject::new(
1555 stub,
1556 "projects/_/buckets/bucket",
1557 name,
1558 inner.options.clone(),
1559 );
1560 let request = http_request_builder(inner, builder)
1561 .await?
1562 .build_for_tests()
1563 .await?;
1564 let got = request.url().path_segments().unwrap().next_back().unwrap();
1565 assert_eq!(got, want);
1566 Ok(())
1567 }
1568
1569 #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1570 #[test_case("x-guploader-response-body-transformations", "no match", false)]
1571 #[test_case("warning", "214 UploadServer gunzipped", true)]
1572 #[test_case("warning", "no match", false)]
1573 #[test_case("unused", "unused", false)]
1574 fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1575 let response = http::Response::builder()
1576 .status(200)
1577 .header(name, value)
1578 .body(Vec::new())?;
1579 let response = Response::from(response);
1580 let got = Reader::is_gunzipped(&response);
1581 assert_eq!(got, want, "{response:?}");
1582 Ok(())
1583 }
1584}