1mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::attempt_info::AttemptInfo;
27use gaxi::http::HttpRequestBuilder;
28use gaxi::http::reqwest::{HeaderValue, Method, Response};
29use google_cloud_gax::options::internal::{PathTemplate, RequestOptionsExt, ResourceName};
30
31#[derive(Debug)]
68pub struct ReadObject<S = crate::storage::transport::Storage>
69where
70 S: crate::storage::stub::Storage + 'static,
71{
72 stub: std::sync::Arc<S>,
73 request: crate::model::ReadObjectRequest,
74 options: RequestOptions,
75}
76
77impl<S> Clone for ReadObject<S>
78where
79 S: crate::storage::stub::Storage + 'static,
80{
81 fn clone(&self) -> Self {
82 Self {
83 stub: self.stub.clone(),
84 request: self.request.clone(),
85 options: self.options.clone(),
86 }
87 }
88}
89
90impl<S> ReadObject<S>
91where
92 S: crate::storage::stub::Storage + 'static,
93{
94 pub(crate) fn new<B, O>(
95 stub: std::sync::Arc<S>,
96 bucket: B,
97 object: O,
98 options: RequestOptions,
99 ) -> Self
100 where
101 B: Into<String>,
102 O: Into<String>,
103 {
104 ReadObject {
105 stub,
106 request: crate::model::ReadObjectRequest::new()
107 .set_bucket(bucket)
108 .set_object(object),
109 options,
110 }
111 }
112
113 pub fn compute_md5(self) -> Self {
142 let mut this = self;
143 this.options.checksum.md5_hash = Some(Md5::default());
144 this
145 }
146
147 pub fn compute_crc32c(mut self, enable: bool) -> Self {
172 if enable {
173 self.options
174 .checksum
175 .crc32c
176 .get_or_insert_with(Default::default);
177 } else {
178 self.options.checksum.crc32c = None;
179 }
180 self
181 }
182
183 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
186 self.request.generation = v.into();
187 self
188 }
189
190 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
194 where
195 T: Into<i64>,
196 {
197 self.request.if_generation_match = Some(v.into());
198 self
199 }
200
201 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
206 where
207 T: Into<i64>,
208 {
209 self.request.if_generation_not_match = Some(v.into());
210 self
211 }
212
213 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
216 where
217 T: Into<i64>,
218 {
219 self.request.if_metageneration_match = Some(v.into());
220 self
221 }
222
223 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
226 where
227 T: Into<i64>,
228 {
229 self.request.if_metageneration_not_match = Some(v.into());
230 self
231 }
232
233 pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
284 self.request.with_range(range);
285 self
286 }
287
288 pub fn set_key(mut self, v: KeyAes256) -> Self {
305 self.request.common_object_request_params = Some(v.into());
306 self
307 }
308
309 pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
331 mut self,
332 v: V,
333 ) -> Self {
334 self.options.retry_policy = v.into().into();
335 self
336 }
337
338 pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
355 mut self,
356 v: V,
357 ) -> Self {
358 self.options.backoff_policy = v.into().into();
359 self
360 }
361
362 pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
385 mut self,
386 v: V,
387 ) -> Self {
388 self.options.retry_throttler = v.into().into();
389 self
390 }
391
392 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
412 where
413 V: ReadResumePolicy + 'static,
414 {
415 self.options.set_read_resume_policy(std::sync::Arc::new(v));
416 self
417 }
418
419 pub fn with_automatic_decompression(mut self, v: bool) -> Self {
442 self.options.automatic_decompression = v;
443 self
444 }
445
446 pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
461 self.options.user_agent = Some(user_agent.into());
462 self
463 }
464
465 pub async fn send(self) -> Result<ReadObjectResponse> {
467 self.stub.read_object(self.request, self.options).await
468 }
469}
470
471#[derive(Clone, Debug)]
473pub(crate) struct Reader {
474 pub inner: std::sync::Arc<StorageInner>,
475 pub request: crate::model::ReadObjectRequest,
476 pub options: RequestOptions,
477}
478
479impl Reader {
480 async fn read(self) -> Result<Response> {
481 let throttler = self.options.retry_throttler.clone();
482 let retry = self.options.retry_policy.clone();
483 let backoff = self.options.backoff_policy.clone();
484 let mut count = 0;
485 let inner = async move |_| {
486 let current = count;
487 count += 1;
488 self.read_attempt(current).await
489 };
490
491 google_cloud_gax::retry_loop_internal::retry_loop(
492 inner,
493 async |duration| tokio::time::sleep(duration).await,
494 true,
495 throttler,
496 retry,
497 backoff,
498 )
499 .await
500 }
501
502 async fn read_attempt(&self, attempt_count: u32) -> Result<Response> {
503 let builder = self.http_request_builder().await?;
504 let options = self
505 .options
506 .gax()
507 .insert_extension(PathTemplate("/storage/v1/b/{bucket}/o/{object}"))
508 .insert_extension(ResourceName(format!(
509 "//storage.googleapis.com/{}",
510 self.request.bucket
511 )));
512 let response = builder
513 .send(options, AttemptInfo::new(attempt_count))
514 .await?;
515 if !response.status().is_success() {
516 return gaxi::http::to_http_error(response).await;
517 }
518 Ok(response)
519 }
520
521 async fn http_request_builder(&self) -> Result<HttpRequestBuilder> {
522 let bucket = &self.request.bucket;
524 let bucket_id = bucket
525 .as_str()
526 .strip_prefix("projects/_/buckets/")
527 .ok_or_else(|| {
528 Error::binding(format!(
529 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
530 ))
531 })?;
532 let object = &self.request.object;
533
534 let builder = self
536 .inner
537 .client
538 .http_builder(
539 Method::GET,
540 &format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
541 )
542 .query("alt", "media")
543 .header(
544 "x-goog-api-client",
545 HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
546 );
547
548 let builder = if self.options.automatic_decompression {
549 builder
550 } else {
551 builder.header("accept-encoding", HeaderValue::from_static("gzip"))
557 };
558
559 let builder = if self.request.generation != 0 {
561 builder.query("generation", self.request.generation)
562 } else {
563 builder
564 };
565 let builder = self
566 .request
567 .if_generation_match
568 .iter()
569 .fold(builder, |b, v| b.query("ifGenerationMatch", v));
570 let builder = self
571 .request
572 .if_generation_not_match
573 .iter()
574 .fold(builder, |b, v| b.query("ifGenerationNotMatch", v));
575 let builder = self
576 .request
577 .if_metageneration_match
578 .iter()
579 .fold(builder, |b, v| b.query("ifMetagenerationMatch", v));
580 let builder = self
581 .request
582 .if_metageneration_not_match
583 .iter()
584 .fold(builder, |b, v| b.query("ifMetagenerationNotMatch", v));
585
586 let builder = apply_customer_supplied_encryption_headers(
587 builder,
588 &self.request.common_object_request_params,
589 );
590
591 let builder = match (self.request.read_offset, self.request.read_limit) {
593 (_, l) if l < 0 => {
595 unreachable!("ReadObject build never sets a negative read_limit value")
596 }
597 (o, l) if o < 0 && l > 0 => unreachable!(
599 "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
600 ),
601 (0, 0) => builder,
603 (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
605 (o, 0) => builder.header("range", format!("bytes={o}-")),
608 (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
611 };
612
613 Ok(builder)
614 }
615
616 fn is_gunzipped(response: &Response) -> bool {
617 const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
630 use http::header::WARNING;
631 if response
632 .headers()
633 .get(TRANSFORMATION)
634 .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
635 {
636 return true;
637 }
638 response
639 .headers()
640 .get(WARNING)
641 .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
642 }
643
644 pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
645 let response = self.clone().read().await?;
646 if Self::is_gunzipped(&response) {
647 return Ok(ReadObjectResponse::new(Box::new(
648 non_resumable::NonResumableResponse::new(response)?,
649 )));
650 }
651 Ok(ReadObjectResponse::new(Box::new(
652 resumable::ResumableResponse::new(self, response)?,
653 )))
654 }
655}
656
657#[cfg(test)]
658mod resume_tests;
659
660#[cfg(test)]
661mod tests {
662 use super::client::tests::{test_builder, test_inner_client};
663 use super::*;
664 use crate::error::{ChecksumMismatch, ReadError};
665 use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
666 use base64::Engine;
667 use futures::TryStreamExt;
668 use google_cloud_auth::credentials::{
669 anonymous::Builder as Anonymous, testing::error_credentials,
670 };
671 use httptest::{Expectation, Server, matchers::*, responders::status_code};
672 use std::collections::HashMap;
673 use std::error::Error;
674 use std::sync::Arc;
675 use test_case::test_case;
676
677 type Result = anyhow::Result<()>;
678
679 async fn http_request_builder(
680 inner: Arc<StorageInner>,
681 builder: ReadObject,
682 ) -> crate::Result<HttpRequestBuilder> {
683 let reader = Reader {
684 inner,
685 request: builder.request,
686 options: builder.options,
687 };
688 reader.http_request_builder().await
689 }
690
691 #[tokio::test]
692 async fn test_clone() {
693 let inner = test_inner_client(test_builder()).await;
694 let stub = crate::storage::transport::Storage::new_test(inner.clone());
695 let options = {
696 let mut o = RequestOptions::new();
697 o.set_resumable_upload_threshold(12345_usize);
698 o
699 };
700 let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
701
702 let clone = builder.clone();
703 assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
704 assert_eq!(clone.request, builder.request);
705 assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
706 }
707
708 #[tokio::test]
710 async fn test_read_is_send_and_static() -> Result {
711 let client = Storage::builder()
712 .with_credentials(Anonymous::new().build())
713 .build()
714 .await?;
715
716 fn need_send<T: Send>(_val: &T) {}
717 fn need_sync<T: Sync>(_val: &T) {}
718 fn need_static<T: 'static>(_val: &T) {}
719
720 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
721 need_send(&read);
722 need_sync(&read);
723 need_static(&read);
724
725 let read = client
726 .read_object("projects/_/buckets/test-bucket", "test-object")
727 .send();
728 need_send(&read);
729 need_static(&read);
730
731 Ok(())
732 }
733
734 #[tokio::test]
735 async fn read_object_normal() -> Result {
736 let server = Server::run();
737 server.expect(
738 Expectation::matching(all_of![
739 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
740 request::headers(contains(("accept-encoding", "gzip"))),
741 request::query(url_decoded(contains(("alt", "media")))),
742 ])
743 .respond_with(
744 status_code(200)
745 .body("hello world")
746 .append_header("x-goog-generation", 123456),
747 ),
748 );
749
750 let client = Storage::builder()
751 .with_endpoint(format!("http://{}", server.addr()))
752 .with_credentials(Anonymous::new().build())
753 .build()
754 .await?;
755 let mut reader = client
756 .read_object("projects/_/buckets/test-bucket", "test-object")
757 .send()
758 .await?;
759 let mut got = Vec::new();
760 while let Some(b) = reader.next().await.transpose()? {
761 got.extend_from_slice(&b);
762 }
763 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
764
765 Ok(())
766 }
767
768 #[tokio::test]
769 async fn read_object_stream() -> Result {
770 let server = Server::run();
771 server.expect(
772 Expectation::matching(all_of![
773 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
774 request::query(url_decoded(contains(("alt", "media")))),
775 ])
776 .respond_with(
777 status_code(200)
778 .append_header("x-goog-generation", 123456)
779 .body("hello world"),
780 ),
781 );
782
783 let client = Storage::builder()
784 .with_endpoint(format!("http://{}", server.addr()))
785 .with_credentials(Anonymous::new().build())
786 .build()
787 .await?;
788 let response = client
789 .read_object("projects/_/buckets/test-bucket", "test-object")
790 .send()
791 .await?;
792 let result: Vec<_> = response.into_stream().try_collect().await?;
793 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
794
795 Ok(())
796 }
797
798 #[tokio::test]
799 async fn read_object_next_then_consume_response() -> Result {
800 const BLOCK_SIZE: usize = 500;
802 let mut contents = Vec::new();
803 for i in 0..50 {
804 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
805 }
806
807 let u = crc32c::crc32c(&contents);
809 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
810
811 let server = Server::run();
812 server.expect(
813 Expectation::matching(all_of![
814 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
815 request::query(url_decoded(contains(("alt", "media")))),
816 ])
817 .times(1)
818 .respond_with(
819 status_code(200)
820 .body(contents.clone())
821 .append_header("x-goog-hash", format!("crc32c={value}"))
822 .append_header("x-goog-generation", 123456),
823 ),
824 );
825
826 let client = Storage::builder()
827 .with_endpoint(format!("http://{}", server.addr()))
828 .with_credentials(Anonymous::new().build())
829 .build()
830 .await?;
831
832 let mut response = client
834 .read_object("projects/_/buckets/test-bucket", "test-object")
835 .send()
836 .await?;
837
838 let mut all_bytes = bytes::BytesMut::new();
839 let chunk = response.next().await.transpose()?.unwrap();
840 assert!(!chunk.is_empty());
841 all_bytes.extend(chunk);
842 use futures::StreamExt;
843 let mut stream = response.into_stream();
844 while let Some(chunk) = stream.next().await.transpose()? {
845 all_bytes.extend(chunk);
846 }
847 assert_eq!(all_bytes, contents);
848
849 Ok(())
850 }
851
852 #[tokio::test]
853 async fn read_object_not_found() -> Result {
854 let server = Server::run();
855 server.expect(
856 Expectation::matching(all_of![
857 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
858 request::query(url_decoded(contains(("alt", "media")))),
859 ])
860 .respond_with(status_code(404).body("NOT FOUND")),
861 );
862
863 let client = Storage::builder()
864 .with_endpoint(format!("http://{}", server.addr()))
865 .with_credentials(Anonymous::new().build())
866 .build()
867 .await?;
868 let err = client
869 .read_object("projects/_/buckets/test-bucket", "test-object")
870 .send()
871 .await
872 .expect_err("expected a not found error");
873 assert_eq!(err.http_status_code(), Some(404));
874
875 Ok(())
876 }
877
878 #[tokio::test]
879 async fn read_object_incorrect_crc32c_check() -> Result {
880 let u = crc32c::crc32c("goodbye world".as_bytes());
882 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
883
884 let server = Server::run();
885 server.expect(
886 Expectation::matching(all_of![
887 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
888 request::query(url_decoded(contains(("alt", "media")))),
889 ])
890 .times(3)
891 .respond_with(
892 status_code(200)
893 .body("hello world")
894 .append_header("x-goog-hash", format!("crc32c={value}"))
895 .append_header("x-goog-generation", 123456),
896 ),
897 );
898
899 let client = Storage::builder()
900 .with_endpoint(format!("http://{}", server.addr()))
901 .with_credentials(Anonymous::new().build())
902 .build()
903 .await?;
904 let mut response = client
905 .read_object("projects/_/buckets/test-bucket", "test-object")
906 .send()
907 .await?;
908 let mut partial = Vec::new();
909 let mut err = None;
910 while let Some(r) = response.next().await {
911 match r {
912 Ok(b) => partial.extend_from_slice(&b),
913 Err(e) => err = Some(e),
914 };
915 }
916 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
917 let err = err.expect("expect error on incorrect crc32c");
918 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
919 assert!(
920 matches!(
921 source,
922 Some(&ReadError::ChecksumMismatch(
923 ChecksumMismatch::Crc32c { .. }
924 ))
925 ),
926 "err={err:?}"
927 );
928
929 let mut response = client
930 .read_object("projects/_/buckets/test-bucket", "test-object")
931 .send()
932 .await?;
933 let err: crate::Error = async {
934 {
935 while (response.next().await.transpose()?).is_some() {}
936 Ok(())
937 }
938 }
939 .await
940 .expect_err("expect error on incorrect crc32c");
941 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
942 assert!(
943 matches!(
944 source,
945 Some(&ReadError::ChecksumMismatch(
946 ChecksumMismatch::Crc32c { .. }
947 ))
948 ),
949 "err={err:?}"
950 );
951
952 use futures::TryStreamExt;
953 let err = client
954 .read_object("projects/_/buckets/test-bucket", "test-object")
955 .send()
956 .await?
957 .into_stream()
958 .try_collect::<Vec<bytes::Bytes>>()
959 .await
960 .expect_err("expect error on incorrect crc32c");
961 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
962 assert!(
963 matches!(
964 source,
965 Some(&ReadError::ChecksumMismatch(
966 ChecksumMismatch::Crc32c { .. }
967 ))
968 ),
969 "err={err:?}"
970 );
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn read_object_disable_crc32c() -> Result {
976 let inner = test_inner_client(test_builder()).await;
977 let stub = crate::storage::transport::Storage::new_test(inner.clone());
978
979 let builder = ReadObject::new(
980 stub,
981 "projects/_/buckets/bucket",
982 "object",
983 inner.options.clone(),
984 )
985 .compute_crc32c(false);
986
987 assert!(builder.options.checksum.crc32c.is_none());
988 Ok(())
989 }
990
991 #[tokio::test]
992 async fn read_object_enable_crc32c() -> Result {
993 let inner = test_inner_client(test_builder()).await;
994 let stub = crate::storage::transport::Storage::new_test(inner.clone());
995
996 let builder = ReadObject::new(
997 stub,
998 "projects/_/buckets/bucket",
999 "object",
1000 inner.options.clone(),
1001 )
1002 .compute_crc32c(false) .compute_crc32c(true);
1004
1005 assert!(builder.options.checksum.crc32c.is_some());
1006 Ok(())
1007 }
1008
1009 #[tokio::test]
1010 async fn read_object_disable_crc32c_ignores_mismatch() -> Result {
1011 let server = Server::run();
1012 let u = crc32c::crc32c("goodbye world".as_bytes());
1014 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1015 server.expect(
1016 Expectation::matching(all_of![
1017 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1018 request::query(url_decoded(contains(("alt", "media")))),
1019 ])
1020 .respond_with(
1021 status_code(200)
1022 .body("hello world")
1023 .append_header("x-goog-hash", format!("crc32c={value}"))
1024 .append_header("x-goog-generation", 123456),
1025 ),
1026 );
1027
1028 let client = Storage::builder()
1029 .with_endpoint(format!("http://{}", server.addr()))
1030 .with_credentials(Anonymous::new().build())
1031 .build()
1032 .await?;
1033 let mut response = client
1034 .read_object("projects/_/buckets/test-bucket", "test-object")
1035 .compute_crc32c(false)
1036 .send()
1037 .await?;
1038 let mut got = Vec::new();
1039 while let Some(b) = response.next().await.transpose()? {
1040 got.extend_from_slice(&b);
1041 }
1042
1043 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
1044 Ok(())
1045 }
1046
1047 #[tokio::test]
1048 async fn read_object_incorrect_md5_check() -> Result {
1049 let digest = md5::compute("goodbye world".as_bytes());
1051 let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1052
1053 let server = Server::run();
1054 server.expect(
1055 Expectation::matching(all_of![
1056 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1057 request::query(url_decoded(contains(("alt", "media")))),
1058 ])
1059 .times(1)
1060 .respond_with(
1061 status_code(200)
1062 .body("hello world")
1063 .append_header("x-goog-hash", format!("md5={value}"))
1064 .append_header("x-goog-generation", 123456),
1065 ),
1066 );
1067
1068 let client = Storage::builder()
1069 .with_endpoint(format!("http://{}", server.addr()))
1070 .with_credentials(Anonymous::new().build())
1071 .build()
1072 .await?;
1073 let mut response = client
1074 .read_object("projects/_/buckets/test-bucket", "test-object")
1075 .compute_md5()
1076 .send()
1077 .await?;
1078 let mut partial = Vec::new();
1079 let mut err = None;
1080 while let Some(r) = response.next().await {
1081 match r {
1082 Ok(b) => partial.extend_from_slice(&b),
1083 Err(e) => err = Some(e),
1084 };
1085 }
1086 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1087 let err = err.expect("expect error on incorrect md5");
1088 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1089 assert!(
1090 matches!(
1091 source,
1092 Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1093 ),
1094 "err={err:?}"
1095 );
1096
1097 Ok(())
1098 }
1099
1100 #[tokio::test]
1101 async fn read_object_with_user_agent() -> Result {
1102 use http::header::USER_AGENT;
1103
1104 let user_agent = "quick_fox_lazy_dog/1.2.3";
1105 let server = Server::run();
1106 server.expect(
1107 Expectation::matching(all_of![
1108 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1109 request::headers(contains(("accept-encoding", "gzip"))),
1110 request::headers(contains((USER_AGENT.as_str(), user_agent))),
1111 request::query(url_decoded(contains(("alt", "media")))),
1112 ])
1113 .respond_with(
1114 status_code(200)
1115 .body("hello world")
1116 .append_header("x-goog-generation", 123456),
1117 ),
1118 );
1119
1120 let client = Storage::builder()
1121 .with_endpoint(format!("http://{}", server.addr()))
1122 .with_credentials(Anonymous::new().build())
1123 .build()
1124 .await?;
1125 let mut reader = client
1126 .read_object("projects/_/buckets/test-bucket", "test-object")
1127 .with_user_agent(user_agent)
1128 .send()
1129 .await?;
1130 let mut got = Vec::new();
1131 while let Some(b) = reader.next().await.transpose()? {
1132 got.extend_from_slice(&b);
1133 }
1134 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
1135
1136 Ok(())
1137 }
1138
1139 #[tokio::test]
1140 async fn read_object() -> Result {
1141 let inner = test_inner_client(test_builder()).await;
1142 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1143 let builder = ReadObject::new(
1144 stub,
1145 "projects/_/buckets/bucket",
1146 "object",
1147 inner.options.clone(),
1148 );
1149 let request = http_request_builder(inner, builder)
1150 .await?
1151 .build_for_tests()
1152 .await?;
1153
1154 assert_eq!(request.method(), Method::GET);
1155 assert_eq!(
1156 request.url().as_str(),
1157 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1158 );
1159 Ok(())
1160 }
1161
1162 #[tokio::test]
1163 async fn read_object_error_credentials() -> Result {
1164 let inner =
1165 test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
1166 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1167 let builder = ReadObject::new(
1168 stub,
1169 "projects/_/buckets/bucket",
1170 "object",
1171 inner.options.clone(),
1172 );
1173 let _ = http_request_builder(inner, builder)
1174 .await?
1175 .build_for_tests()
1176 .await
1177 .inspect_err(|e| assert!(e.is_authentication()))
1178 .expect_err("invalid credentials should err");
1179 Ok(())
1180 }
1181
1182 #[tokio::test]
1183 async fn read_object_bad_bucket() -> Result {
1184 let inner = test_inner_client(test_builder()).await;
1185 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1186 let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1187 let _ = http_request_builder(inner, builder)
1188 .await
1189 .expect_err("malformed bucket string should error");
1190 Ok(())
1191 }
1192
1193 #[tokio::test]
1194 async fn read_object_query_params() -> Result {
1195 let inner = test_inner_client(test_builder()).await;
1196 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1197 let builder = ReadObject::new(
1198 stub,
1199 "projects/_/buckets/bucket",
1200 "object",
1201 inner.options.clone(),
1202 )
1203 .set_generation(5)
1204 .set_if_generation_match(10)
1205 .set_if_generation_not_match(20)
1206 .set_if_metageneration_match(30)
1207 .set_if_metageneration_not_match(40);
1208 let request = http_request_builder(inner, builder)
1209 .await?
1210 .build_for_tests()
1211 .await?;
1212
1213 assert_eq!(request.method(), Method::GET);
1214 let want_pairs: HashMap<String, String> = [
1215 ("alt", "media"),
1216 ("generation", "5"),
1217 ("ifGenerationMatch", "10"),
1218 ("ifGenerationNotMatch", "20"),
1219 ("ifMetagenerationMatch", "30"),
1220 ("ifMetagenerationNotMatch", "40"),
1221 ]
1222 .iter()
1223 .map(|(k, v)| (k.to_string(), v.to_string()))
1224 .collect();
1225 let query_pairs: HashMap<String, String> = request
1226 .url()
1227 .query_pairs()
1228 .map(|param| (param.0.to_string(), param.1.to_string()))
1229 .collect();
1230 assert_eq!(query_pairs.len(), want_pairs.len());
1231 assert_eq!(query_pairs, want_pairs);
1232 Ok(())
1233 }
1234
1235 #[tokio::test]
1236 async fn read_object_default_headers() -> Result {
1237 let inner = test_inner_client(test_builder()).await;
1239 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1240 let builder = ReadObject::new(
1241 stub,
1242 "projects/_/buckets/bucket",
1243 "object",
1244 inner.options.clone(),
1245 );
1246 let request = http_request_builder(inner, builder)
1247 .await?
1248 .build_for_tests()
1249 .await?;
1250
1251 assert_eq!(request.method(), Method::GET);
1252 assert_eq!(
1253 request.url().as_str(),
1254 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1255 );
1256
1257 let want = [("accept-encoding", "gzip")];
1258 let headers = request.headers();
1259 for (name, value) in want {
1260 assert_eq!(
1261 headers.get(name).and_then(|h| h.to_str().ok()),
1262 Some(value),
1263 "{request:?}"
1264 );
1265 }
1266 Ok(())
1267 }
1268
1269 #[tokio::test]
1270 async fn read_object_automatic_decompression_headers() -> Result {
1271 let inner = test_inner_client(test_builder()).await;
1273 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1274 let builder = ReadObject::new(
1275 stub,
1276 "projects/_/buckets/bucket",
1277 "object",
1278 inner.options.clone(),
1279 )
1280 .with_automatic_decompression(true);
1281 let request = http_request_builder(inner, builder)
1282 .await?
1283 .build_for_tests()
1284 .await?;
1285
1286 assert_eq!(request.method(), Method::GET);
1287 assert_eq!(
1288 request.url().as_str(),
1289 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1290 );
1291
1292 let headers = request.headers();
1293 assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1294 Ok(())
1295 }
1296
1297 #[tokio::test]
1298 async fn read_object_encryption_headers() -> Result {
1299 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1301
1302 let inner = test_inner_client(test_builder()).await;
1304 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1305 let builder = ReadObject::new(
1306 stub,
1307 "projects/_/buckets/bucket",
1308 "object",
1309 inner.options.clone(),
1310 )
1311 .set_key(KeyAes256::new(&key)?);
1312 let request = http_request_builder(inner, builder)
1313 .await?
1314 .build_for_tests()
1315 .await?;
1316
1317 assert_eq!(request.method(), Method::GET);
1318 assert_eq!(
1319 request.url().as_str(),
1320 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1321 );
1322
1323 let want = [
1324 ("x-goog-encryption-algorithm", "AES256".to_string()),
1325 ("x-goog-encryption-key", key_base64),
1326 ("x-goog-encryption-key-sha256", key_sha256_base64),
1327 ];
1328
1329 let headers = request.headers();
1330 for (name, value) in want {
1331 assert_eq!(
1332 headers.get(name).and_then(|h| h.to_str().ok()),
1333 Some(value.as_str())
1334 );
1335 }
1336 Ok(())
1337 }
1338
1339 #[test_case(ReadRange::all(), None; "no headers needed")]
1340 #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1341 #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1342 #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1343 #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1344 #[tokio::test]
1345 async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1346 let inner = test_inner_client(test_builder()).await;
1347 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1348 let builder = ReadObject::new(
1349 stub,
1350 "projects/_/buckets/bucket",
1351 "object",
1352 inner.options.clone(),
1353 )
1354 .set_read_range(input.clone());
1355 let request = http_request_builder(inner, builder)
1356 .await?
1357 .build_for_tests()
1358 .await?;
1359
1360 assert_eq!(request.method(), Method::GET);
1361 assert_eq!(
1362 request.url().as_str(),
1363 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1364 );
1365
1366 assert_eq!(request.headers().get("range"), want);
1367 Ok(())
1368 }
1369
1370 #[test_case("projects/p", "projects%2Fp")]
1371 #[test_case("kebab-case", "kebab-case")]
1372 #[test_case("dot.name", "dot.name")]
1373 #[test_case("under_score", "under_score")]
1374 #[test_case("tilde~123", "tilde~123")]
1375 #[test_case("exclamation!point!", "exclamation%21point%21")]
1376 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1377 #[test_case("preserve%percent%21", "preserve%percent%21")]
1378 #[test_case(
1379 "testall !#$&'()*+,/:;=?@[]",
1380 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1381 )]
1382 #[tokio::test]
1383 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1384 let inner = test_inner_client(test_builder()).await;
1385 let stub = crate::storage::transport::Storage::new_test(inner.clone());
1386 let builder = ReadObject::new(
1387 stub,
1388 "projects/_/buckets/bucket",
1389 name,
1390 inner.options.clone(),
1391 );
1392 let request = http_request_builder(inner, builder)
1393 .await?
1394 .build_for_tests()
1395 .await?;
1396 let got = request.url().path_segments().unwrap().next_back().unwrap();
1397 assert_eq!(got, want);
1398 Ok(())
1399 }
1400
1401 #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1402 #[test_case("x-guploader-response-body-transformations", "no match", false)]
1403 #[test_case("warning", "214 UploadServer gunzipped", true)]
1404 #[test_case("warning", "no match", false)]
1405 #[test_case("unused", "unused", false)]
1406 fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1407 let response = http::Response::builder()
1408 .status(200)
1409 .header(name, value)
1410 .body(Vec::new())?;
1411 let response = Response::from(response);
1412 let got = Reader::is_gunzipped(&response);
1413 assert_eq!(got, want, "{response:?}");
1414 Ok(())
1415 }
1416}