1use super::client::*;
16use super::*;
17use crate::download_resume_policy::DownloadResumePolicy;
18use base64::Engine;
19#[cfg(feature = "unstable-stream")]
20use futures::Stream;
21use serde_with::DeserializeAs;
22
23#[derive(Clone, Debug)]
60pub struct ReadObject {
61 inner: std::sync::Arc<StorageInner>,
62 request: crate::model::ReadObjectRequest,
63 options: super::request_options::RequestOptions,
64}
65
66impl ReadObject {
67 pub(crate) fn new<B, O>(inner: std::sync::Arc<StorageInner>, bucket: B, object: O) -> Self
68 where
69 B: Into<String>,
70 O: Into<String>,
71 {
72 let options = inner.options.clone();
73 ReadObject {
74 inner,
75 request: crate::model::ReadObjectRequest::new()
76 .set_bucket(bucket)
77 .set_object(object),
78 options,
79 }
80 }
81
82 pub fn with_generation<T: Into<i64>>(mut self, v: T) -> Self {
85 self.request.generation = v.into();
86 self
87 }
88
89 pub fn with_if_generation_match<T>(mut self, v: T) -> Self
93 where
94 T: Into<i64>,
95 {
96 self.request.if_generation_match = Some(v.into());
97 self
98 }
99
100 pub fn with_if_generation_not_match<T>(mut self, v: T) -> Self
105 where
106 T: Into<i64>,
107 {
108 self.request.if_generation_not_match = Some(v.into());
109 self
110 }
111
112 pub fn with_if_metageneration_match<T>(mut self, v: T) -> Self
115 where
116 T: Into<i64>,
117 {
118 self.request.if_metageneration_match = Some(v.into());
119 self
120 }
121
122 pub fn with_if_metageneration_not_match<T>(mut self, v: T) -> Self
125 where
126 T: Into<i64>,
127 {
128 self.request.if_metageneration_not_match = Some(v.into());
129 self
130 }
131
132 pub fn with_read_offset<T>(mut self, v: T) -> Self
180 where
181 T: Into<i64>,
182 {
183 self.request.read_offset = v.into();
184 self
185 }
186
187 pub fn with_read_limit<T>(mut self, v: T) -> Self
222 where
223 T: Into<i64>,
224 {
225 self.request.read_limit = v.into();
226 self
227 }
228
229 pub fn with_key(mut self, v: KeyAes256) -> Self {
246 self.request.common_object_request_params = Some(v.into());
247 self
248 }
249
250 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
271 self.options.retry_policy = v.into().into();
272 self
273 }
274
275 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
292 mut self,
293 v: V,
294 ) -> Self {
295 self.options.backoff_policy = v.into().into();
296 self
297 }
298
299 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
322 mut self,
323 v: V,
324 ) -> Self {
325 self.options.retry_throttler = v.into().into();
326 self
327 }
328
329 pub fn with_download_resume_policy<V>(mut self, v: V) -> Self
349 where
350 V: DownloadResumePolicy + 'static,
351 {
352 self.options.download_resume_policy = std::sync::Arc::new(v);
353 self
354 }
355
356 pub async fn send(self) -> Result<ReadObjectResponse> {
358 let download = self.clone().download().await?;
359 ReadObjectResponse::new(self, download)
360 }
361
362 async fn download(self) -> Result<reqwest::Response> {
363 let throttler = self.options.retry_throttler.clone();
364 let retry = self.options.retry_policy.clone();
365 let backoff = self.options.backoff_policy.clone();
366
367 gax::retry_loop_internal::retry_loop(
368 async move |_| self.download_attempt().await,
369 async |duration| tokio::time::sleep(duration).await,
370 true,
371 throttler,
372 retry,
373 backoff,
374 )
375 .await
376 }
377
378 async fn download_attempt(&self) -> Result<reqwest::Response> {
379 let builder = self.http_request_builder().await?;
380 let response = builder.send().await.map_err(Error::io)?;
381 if !response.status().is_success() {
382 return gaxi::http::to_http_error(response).await;
383 }
384 Ok(response)
385 }
386
387 async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
388 let bucket = &self.request.bucket;
390 let bucket_id = bucket
391 .as_str()
392 .strip_prefix("projects/_/buckets/")
393 .ok_or_else(|| {
394 Error::binding(format!(
395 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
396 ))
397 })?;
398 let object = &self.request.object;
399
400 let builder = self
402 .inner
403 .client
404 .request(
405 reqwest::Method::GET,
406 format!(
407 "{}/storage/v1/b/{bucket_id}/o/{}",
408 &self.inner.endpoint,
409 enc(object)
410 ),
411 )
412 .query(&[("alt", "media")])
413 .header(
414 "x-goog-api-client",
415 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
416 );
417
418 let builder = if self.request.generation != 0 {
420 builder.query(&[("generation", self.request.generation)])
421 } else {
422 builder
423 };
424 let builder = self
425 .request
426 .if_generation_match
427 .iter()
428 .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
429 let builder = self
430 .request
431 .if_generation_not_match
432 .iter()
433 .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
434 let builder = self
435 .request
436 .if_metageneration_match
437 .iter()
438 .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
439 let builder = self
440 .request
441 .if_metageneration_not_match
442 .iter()
443 .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
444
445 let builder = apply_customer_supplied_encryption_headers(
446 builder,
447 &self.request.common_object_request_params,
448 );
449
450 let builder = match (self.request.read_offset, self.request.read_limit) {
452 (_, l) if l < 0 => Err(RangeError::NegativeLimit),
454 (o, l) if o < 0 && l > 0 => Err(RangeError::NegativeOffsetWithLimit),
456 (0, 0) => Ok(builder),
458 (o, 0) => Ok(builder.header("range", format!("bytes={o}-"))),
461 (o, l) => Ok(builder.header("range", format!("bytes={o}-{}", o + l - 1))),
464 }
465 .map_err(Error::ser)?;
466
467 self.inner.apply_auth_headers(builder).await
468 }
469}
470
471fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
472 headers
473 .get("x-goog-hash")
474 .and_then(|hash| hash.to_str().ok())
475 .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
476 .and_then(|hash| {
477 let hash = hash.trim_start_matches("crc32c=");
478 v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
479 })
480}
481
482fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
483 headers
484 .get("x-goog-hash")
485 .and_then(|hash| hash.to_str().ok())
486 .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
487 .and_then(|hash| {
488 let hash = hash.trim_start_matches("md5=");
489 base64::prelude::BASE64_STANDARD.decode(hash).ok()
490 })
491 .unwrap_or_default()
492}
493
494#[derive(Debug)]
496pub struct ReadObjectResponse {
497 inner: Option<reqwest::Response>,
498 highlights: ObjectHighlights,
499 response_crc32c: Option<u32>,
501 crc32c: u32,
502 range: ReadRange,
503 generation: i64,
504 builder: ReadObject,
505 resume_count: u32,
506}
507
508impl ReadObjectResponse {
509 fn new(builder: ReadObject, inner: reqwest::Response) -> Result<Self> {
510 let full = builder.request.read_offset == 0 && builder.request.read_limit == 0;
511 let response_crc32c = crc32c_from_response(full, inner.status(), inner.headers());
512 let range = response_range(&inner).map_err(Error::deser)?;
513 let generation = response_generation(&inner).map_err(Error::deser)?;
514
515 let headers = inner.headers();
516 let get_as_i64 = |header_name: &str| -> i64 {
517 headers
518 .get(header_name)
519 .and_then(|s| s.to_str().ok())
520 .and_then(|s| s.parse::<i64>().ok())
521 .unwrap_or_default()
522 };
523 let get_as_string = |header_name: &str| -> String {
524 headers
525 .get(header_name)
526 .and_then(|sc| sc.to_str().ok())
527 .map(|sc| sc.to_string())
528 .unwrap_or_default()
529 };
530 let highlights = ObjectHighlights {
531 generation,
532 metageneration: get_as_i64("x-goog-metageneration"),
533 size: get_as_i64("x-goog-stored-content-length"),
534 content_encoding: get_as_string("x-goog-stored-content-encoding"),
535 storage_class: get_as_string("x-goog-storage-class"),
536 content_type: get_as_string("content-type"),
537 content_language: get_as_string("content-language"),
538 content_disposition: get_as_string("content-disposition"),
539 etag: get_as_string("etag"),
540 checksums: headers.get("x-goog-hash").map(|_| {
541 crate::model::ObjectChecksums::new()
542 .set_or_clear_crc32c(headers_to_crc32c(headers))
543 .set_md5_hash(headers_to_md5_hash(headers))
544 }),
545 };
546
547 Ok(Self {
548 inner: Some(inner),
549 highlights,
550 response_crc32c,
551 crc32c: 0, range,
553 generation,
554 builder,
555 resume_count: 0,
556 })
557 }
558
559 pub fn object(&self) -> ObjectHighlights {
581 self.highlights.clone()
582 }
583
584 pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
604 match self.next_attempt().await {
605 None => None,
606 Some(Ok(b)) => Some(Ok(b)),
607 Some(Err(e)) => Box::pin(self.resume(e)).await,
610 }
611 }
612
613 async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
614 let inner = self.inner.as_mut()?;
615 let res = inner.chunk().await.map_err(Error::io);
616 match res {
617 Ok(Some(chunk)) => {
618 if self.response_crc32c.is_some() {
619 self.crc32c = crc32c::crc32c_append(self.crc32c, &chunk);
620 }
621 let len = chunk.len() as u64;
622 if self.range.limit < len {
623 return Some(Err(Error::deser(ReadError::LongRead {
624 expected: self.range.limit,
625 got: len,
626 })));
627 }
628 self.range.limit -= len;
629 self.range.start += len;
630 Some(Ok(chunk))
631 }
632 Ok(None) => {
633 if self.range.limit != 0 {
634 return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
635 }
636 let res = check_crc32c_match(self.crc32c, self.response_crc32c);
637 match res {
638 Err(e) => Some(Err(e)),
639 Ok(()) => None,
640 }
641 }
642 Err(e) => Some(Err(e)),
643 }
644 }
645
646 async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
647 use crate::download_resume_policy::{ResumeQuery, ResumeResult};
648
649 self.inner = None;
651 self.resume_count += 1;
652 let query = ResumeQuery::new(self.resume_count);
653 match self
654 .builder
655 .options
656 .download_resume_policy
657 .on_error(&query, error)
658 {
659 ResumeResult::Continue(_) => {}
660 ResumeResult::Permanent(e) => return Some(Err(e)),
661 ResumeResult::Exhausted(e) => return Some(Err(e)),
662 };
663 self.builder.request.read_offset = self.range.start as i64;
664 self.builder.request.read_limit = self.range.limit as i64;
665 self.builder.request.generation = self.generation;
666 self.inner = match self.builder.clone().download().await {
667 Ok(r) => Some(r),
668 Err(e) => return Some(Err(e)),
669 };
670 self.next().await
671 }
672
673 #[cfg(feature = "unstable-stream")]
674 #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
675 pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
677 use futures::stream::unfold;
678 Box::pin(unfold(Some(self), move |state| async move {
679 if let Some(mut this) = state {
680 if let Some(chunk) = this.next().await {
681 return Some((chunk, Some(this)));
682 }
683 };
684 None
685 }))
686 }
687}
688
689#[derive(Clone, Debug, PartialEq)]
691#[non_exhaustive]
692pub struct ObjectHighlights {
693 pub generation: i64,
695
696 pub metageneration: i64,
701
702 pub size: i64,
705
706 pub content_encoding: String,
709
710 pub checksums: std::option::Option<crate::model::ObjectChecksums>,
715
716 pub storage_class: String,
718
719 pub content_language: String,
722
723 pub content_type: String,
728
729 pub content_disposition: String,
732
733 pub etag: String,
735}
736
737#[derive(thiserror::Error, Debug)]
739#[non_exhaustive]
740enum ReadError {
741 #[error("bad CRC on read: got {got}, want {want}")]
743 BadCrc { got: u32, want: u32 },
744
745 #[error("missing {0} bytes at the end of the stream")]
746 ShortRead(u64),
747
748 #[error("too many bytes received: expected {expected}, stopped download at {got}")]
749 LongRead { got: u64, expected: u64 },
750
751 #[error("unexpected success code {0} in read request, only 200 and 206 are expected")]
753 UnexpectedSuccessCode(u16),
754
755 #[error("the response is missing '{0}', a required header")]
757 MissingHeader(&'static str),
758
759 #[error("the format for header '{0}' is incorrect")]
761 BadHeaderFormat(
762 &'static str,
763 #[source] Box<dyn std::error::Error + Send + Sync + 'static>,
764 ),
765}
766
767fn crc32c_from_response(
768 full_content_requested: bool,
769 status: http::StatusCode,
770 headers: &http::HeaderMap,
771) -> Option<u32> {
772 if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
782 return None;
783 }
784 let stored_encoding = headers
785 .get("x-goog-stored-content-encoding")
786 .and_then(|e| e.to_str().ok())
787 .map_or("", |e| e);
788 let content_encoding = headers
789 .get("content-encoding")
790 .and_then(|e| e.to_str().ok())
791 .map_or("", |e| e);
792 if stored_encoding == "gzip" && content_encoding != "gzip" {
793 return None;
794 }
795 headers_to_crc32c(headers)
796}
797
798fn check_crc32c_match(crc32c: u32, response: Option<u32>) -> Result<()> {
799 if let Some(response) = response {
800 if crc32c != response {
801 return Err(Error::deser(ReadError::BadCrc {
802 got: crc32c,
803 want: response,
804 }));
805 }
806 }
807 Ok(())
808}
809
810fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
811 match response.status() {
812 reqwest::StatusCode::OK => {
813 let header = required_header(response, "content-length")?;
814 let limit = header
815 .parse::<u64>()
816 .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
817 Ok(ReadRange { start: 0, limit })
818 }
819 reqwest::StatusCode::PARTIAL_CONTENT => {
820 let header = required_header(response, "content-range")?;
821 let header = header.strip_prefix("bytes ").ok_or_else(|| {
822 ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
823 })?;
824 let (range, _) = header.split_once('/').ok_or_else(|| {
825 ReadError::BadHeaderFormat("content-range", "missing / separator".into())
826 })?;
827 let (start, end) = range.split_once('-').ok_or_else(|| {
828 ReadError::BadHeaderFormat("content-range", "missing - separator".into())
829 })?;
830 let start = start
831 .parse::<u64>()
832 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
833 let end = end
834 .parse::<u64>()
835 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
836 let end = end + 1;
839 let limit = end
840 .checked_sub(start)
841 .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
842 Ok(ReadRange { start, limit })
843 }
844 s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
845 }
846}
847
848fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
849 let header = required_header(response, "x-goog-generation")?;
850 header
851 .parse::<i64>()
852 .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
853}
854
855fn required_header<'a>(
856 response: &'a reqwest::Response,
857 name: &'static str,
858) -> std::result::Result<&'a str, ReadError> {
859 let header = response
860 .headers()
861 .get(name)
862 .ok_or_else(|| ReadError::MissingHeader(name))?;
863 header
864 .to_str()
865 .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
866}
867
868#[derive(Debug, PartialEq)]
869struct ReadRange {
870 start: u64,
871 limit: u64,
872}
873
874#[cfg(test)]
875mod resume_tests;
876
877#[cfg(test)]
878mod tests {
879 use super::client::tests::{create_key_helper, test_builder, test_inner_client};
880 use super::*;
881 use futures::TryStreamExt;
882 use httptest::{Expectation, Server, matchers::*, responders::status_code};
883 use std::collections::HashMap;
884 use std::error::Error;
885 use test_case::test_case;
886
887 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
888
889 #[tokio::test]
891 async fn test_read_is_send_and_static() -> Result {
892 let client = Storage::builder()
893 .with_credentials(auth::credentials::testing::test_credentials())
894 .build()
895 .await?;
896
897 fn need_send<T: Send>(_val: &T) {}
898 fn need_sync<T: Sync>(_val: &T) {}
899 fn need_static<T: 'static>(_val: &T) {}
900
901 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
902 need_send(&read);
903 need_sync(&read);
904 need_static(&read);
905
906 let read = client
907 .read_object("projects/_/buckets/test-bucket", "test-object")
908 .send();
909 need_send(&read);
910 need_static(&read);
911
912 Ok(())
913 }
914 #[tokio::test]
915 async fn read_object_normal() -> Result {
916 let server = Server::run();
917 server.expect(
918 Expectation::matching(all_of![
919 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
920 request::query(url_decoded(contains(("alt", "media")))),
921 ])
922 .respond_with(
923 status_code(200)
924 .body("hello world")
925 .append_header("x-goog-generation", 123456),
926 ),
927 );
928
929 let client = Storage::builder()
930 .with_endpoint(format!("http://{}", server.addr()))
931 .with_credentials(auth::credentials::testing::test_credentials())
932 .build()
933 .await?;
934 let mut reader = client
935 .read_object("projects/_/buckets/test-bucket", "test-object")
936 .send()
937 .await?;
938 let mut got = Vec::new();
939 while let Some(b) = reader.next().await.transpose()? {
940 got.extend_from_slice(&b);
941 }
942 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
943
944 Ok(())
945 }
946
947 #[tokio::test]
948 async fn read_object_metadata() -> Result {
949 const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
950 let server = Server::run();
951 server.expect(
952 Expectation::matching(all_of![
953 request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
954 request::query(url_decoded(contains(("alt", "media")))),
955 ])
956 .respond_with(
957 status_code(200)
958 .body(CONTENTS)
959 .append_header(
960 "x-goog-hash",
961 "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
962 )
963 .append_header("x-goog-generation", 500)
964 .append_header("x-goog-metageneration", "1")
965 .append_header("x-goog-stored-content-length", 30)
966 .append_header("x-goog-stored-content-encoding", "identity")
967 .append_header("x-goog-storage-class", "STANDARD")
968 .append_header("content-language", "en")
969 .append_header("content-type", "text/plain")
970 .append_header("content-disposition", "inline")
971 .append_header("etag", "etagval"),
972 ),
973 );
974
975 let endpoint = server.url("");
976 let client = Storage::builder()
977 .with_endpoint(endpoint.to_string())
978 .with_credentials(auth::credentials::testing::test_credentials())
979 .build()
980 .await?;
981 let reader = client
982 .read_object("projects/_/buckets/test-bucket", "test-object")
983 .send()
984 .await?;
985 let object = reader.object();
986 assert_eq!(object.generation, 500);
987 assert_eq!(object.metageneration, 1);
988 assert_eq!(object.size, 30);
989 assert_eq!(object.content_encoding, "identity");
990 assert_eq!(
991 object.checksums.as_ref().unwrap().crc32c.unwrap(),
992 crc32c::crc32c(CONTENTS.as_bytes())
993 );
994 assert_eq!(
995 object.checksums.as_ref().unwrap().md5_hash,
996 base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
997 );
998
999 Ok(())
1000 }
1001
1002 #[tokio::test]
1003 async fn read_object_stream() -> Result {
1004 let server = Server::run();
1005 server.expect(
1006 Expectation::matching(all_of![
1007 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1008 request::query(url_decoded(contains(("alt", "media")))),
1009 ])
1010 .respond_with(
1011 status_code(200)
1012 .append_header("x-goog-generation", 123456)
1013 .body("hello world"),
1014 ),
1015 );
1016
1017 let client = Storage::builder()
1018 .with_endpoint(format!("http://{}", server.addr()))
1019 .with_credentials(auth::credentials::testing::test_credentials())
1020 .build()
1021 .await?;
1022 let response = client
1023 .read_object("projects/_/buckets/test-bucket", "test-object")
1024 .send()
1025 .await?;
1026 let result: Vec<_> = response.into_stream().try_collect().await?;
1027 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
1028
1029 Ok(())
1030 }
1031
1032 #[tokio::test]
1033 async fn read_object_next_then_consume_response() -> Result {
1034 const BLOCK_SIZE: usize = 500;
1036 let mut contents = Vec::new();
1037 for i in 0..50 {
1038 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
1039 }
1040
1041 let u = crc32c::crc32c(&contents);
1043 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1044
1045 let server = Server::run();
1046 server.expect(
1047 Expectation::matching(all_of![
1048 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1049 request::query(url_decoded(contains(("alt", "media")))),
1050 ])
1051 .times(1)
1052 .respond_with(
1053 status_code(200)
1054 .body(contents.clone())
1055 .append_header("x-goog-hash", format!("crc32c={value}"))
1056 .append_header("x-goog-generation", 123456),
1057 ),
1058 );
1059
1060 let client = Storage::builder()
1061 .with_endpoint(format!("http://{}", server.addr()))
1062 .with_credentials(auth::credentials::testing::test_credentials())
1063 .build()
1064 .await?;
1065
1066 let mut response = client
1068 .read_object("projects/_/buckets/test-bucket", "test-object")
1069 .send()
1070 .await?;
1071
1072 let mut all_bytes = bytes::BytesMut::new();
1073 let chunk = response.next().await.transpose()?.unwrap();
1074 assert!(!chunk.is_empty());
1075 all_bytes.extend(chunk);
1076 use futures::StreamExt;
1077 let mut stream = response.into_stream();
1078 while let Some(chunk) = stream.next().await.transpose()? {
1079 all_bytes.extend(chunk);
1080 }
1081 assert_eq!(all_bytes, contents);
1082
1083 Ok(())
1084 }
1085
1086 #[tokio::test]
1087 async fn read_object_not_found() -> Result {
1088 let server = Server::run();
1089 server.expect(
1090 Expectation::matching(all_of![
1091 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1092 request::query(url_decoded(contains(("alt", "media")))),
1093 ])
1094 .respond_with(status_code(404).body("NOT FOUND")),
1095 );
1096
1097 let client = Storage::builder()
1098 .with_endpoint(format!("http://{}", server.addr()))
1099 .with_credentials(auth::credentials::testing::test_credentials())
1100 .build()
1101 .await?;
1102 let err = client
1103 .read_object("projects/_/buckets/test-bucket", "test-object")
1104 .send()
1105 .await
1106 .expect_err("expected a not found error");
1107 assert_eq!(err.http_status_code(), Some(404));
1108
1109 Ok(())
1110 }
1111
1112 #[tokio::test]
1113 async fn read_object_incorrect_crc32c_check() -> Result {
1114 let u = crc32c::crc32c("goodbye world".as_bytes());
1116 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1117
1118 let server = Server::run();
1119 server.expect(
1120 Expectation::matching(all_of![
1121 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1122 request::query(url_decoded(contains(("alt", "media")))),
1123 ])
1124 .times(3)
1125 .respond_with(
1126 status_code(200)
1127 .body("hello world")
1128 .append_header("x-goog-hash", format!("crc32c={value}"))
1129 .append_header("x-goog-generation", 123456),
1130 ),
1131 );
1132
1133 let expected_got = crc32c::crc32c("hello world".as_bytes()); let expected_want = crc32c::crc32c("goodbye world".as_bytes()); let client = Storage::builder()
1137 .with_endpoint(format!("http://{}", server.addr()))
1138 .with_credentials(auth::credentials::testing::test_credentials())
1139 .build()
1140 .await?;
1141 let mut response = client
1142 .read_object("projects/_/buckets/test-bucket", "test-object")
1143 .send()
1144 .await?;
1145 let mut partial = Vec::new();
1146 let mut err = None;
1147 while let Some(r) = response.next().await {
1148 match r {
1149 Ok(b) => partial.extend_from_slice(&b),
1150 Err(e) => err = Some(e),
1151 };
1152 }
1153 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1154 let err = err.expect("expect error on incorrect crc32c");
1155 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1156 assert!(
1157 matches!(source, Some(&ReadError::BadCrc { got, want }) if got == expected_got && want == expected_want),
1158 "err={err:?}, expected_got={expected_got}, expected_want={expected_want}"
1159 );
1160
1161 let mut response = client
1162 .read_object("projects/_/buckets/test-bucket", "test-object")
1163 .send()
1164 .await?;
1165 let err: crate::Error = async {
1166 {
1167 while (response.next().await.transpose()?).is_some() {}
1168 Ok(())
1169 }
1170 }
1171 .await
1172 .expect_err("expect error on incorrect crc32c");
1173 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1174 assert!(
1175 matches!(source, Some(&ReadError::BadCrc { got, want }) if got == expected_got && want == expected_want),
1176 "err={err:?}, expected_got={expected_got}, expected_want={expected_want}"
1177 );
1178
1179 use futures::TryStreamExt;
1180 let err = client
1181 .read_object("projects/_/buckets/test-bucket", "test-object")
1182 .send()
1183 .await?
1184 .into_stream()
1185 .try_collect::<Vec<bytes::Bytes>>()
1186 .await
1187 .expect_err("expect error on incorrect crc32c");
1188 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1189 assert!(
1190 matches!(source, Some(&ReadError::BadCrc { got, want }) if got == expected_got && want == expected_want),
1191 "err={err:?}, expected_got={expected_got}, expected_want={expected_want}"
1192 );
1193 Ok(())
1194 }
1195
1196 #[tokio::test]
1197 async fn read_object() -> Result {
1198 let inner = test_inner_client(test_builder());
1199 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1200 .http_request_builder()
1201 .await?
1202 .build()?;
1203
1204 assert_eq!(request.method(), reqwest::Method::GET);
1205 assert_eq!(
1206 request.url().as_str(),
1207 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1208 );
1209 Ok(())
1210 }
1211
1212 #[tokio::test]
1213 async fn read_object_error_credentials() -> Result {
1214 let inner = test_inner_client(
1215 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1216 );
1217 let _ = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1218 .http_request_builder()
1219 .await
1220 .inspect_err(|e| assert!(e.is_authentication()))
1221 .expect_err("invalid credentials should err");
1222 Ok(())
1223 }
1224
1225 #[tokio::test]
1226 async fn read_object_bad_bucket() -> Result {
1227 let inner = test_inner_client(test_builder());
1228 ReadObject::new(inner, "malformed", "object")
1229 .http_request_builder()
1230 .await
1231 .expect_err("malformed bucket string should error");
1232 Ok(())
1233 }
1234
1235 #[tokio::test]
1236 async fn read_object_query_params() -> Result {
1237 let inner = test_inner_client(test_builder());
1238 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1239 .with_generation(5)
1240 .with_if_generation_match(10)
1241 .with_if_generation_not_match(20)
1242 .with_if_metageneration_match(30)
1243 .with_if_metageneration_not_match(40)
1244 .http_request_builder()
1245 .await?
1246 .build()?;
1247
1248 assert_eq!(request.method(), reqwest::Method::GET);
1249 let want_pairs: HashMap<String, String> = [
1250 ("alt", "media"),
1251 ("generation", "5"),
1252 ("ifGenerationMatch", "10"),
1253 ("ifGenerationNotMatch", "20"),
1254 ("ifMetagenerationMatch", "30"),
1255 ("ifMetagenerationNotMatch", "40"),
1256 ]
1257 .iter()
1258 .map(|(k, v)| (k.to_string(), v.to_string()))
1259 .collect();
1260 let query_pairs: HashMap<String, String> = request
1261 .url()
1262 .query_pairs()
1263 .map(|param| (param.0.to_string(), param.1.to_string()))
1264 .collect();
1265 assert_eq!(query_pairs.len(), want_pairs.len());
1266 assert_eq!(query_pairs, want_pairs);
1267 Ok(())
1268 }
1269
1270 #[tokio::test]
1271 async fn read_object_headers() -> Result {
1272 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1274
1275 let inner = test_inner_client(test_builder());
1277 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1278 .with_key(KeyAes256::new(&key)?)
1279 .http_request_builder()
1280 .await?
1281 .build()?;
1282
1283 assert_eq!(request.method(), reqwest::Method::GET);
1284 assert_eq!(
1285 request.url().as_str(),
1286 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1287 );
1288
1289 let want = vec![
1290 ("x-goog-encryption-algorithm", "AES256".to_string()),
1291 ("x-goog-encryption-key", key_base64),
1292 ("x-goog-encryption-key-sha256", key_sha256_base64),
1293 ];
1294
1295 for (name, value) in want {
1296 assert_eq!(
1297 request.headers().get(name).unwrap().as_bytes(),
1298 bytes::Bytes::from(value)
1299 );
1300 }
1301 Ok(())
1302 }
1303
1304 #[test_case(0, 0, None; "no headers needed")]
1305 #[test_case(10, 0, Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1306 #[test_case(-2000, 0, Some(&http::HeaderValue::from_static("bytes=-2000-")); "negative offset")]
1307 #[test_case(0, 100, Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1308 #[test_case(1000, 100, Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1309 #[tokio::test]
1310 async fn range_header(offset: i64, limit: i64, want: Option<&http::HeaderValue>) -> Result {
1311 let inner = test_inner_client(test_builder());
1312 let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1313 .with_read_offset(offset)
1314 .with_read_limit(limit)
1315 .http_request_builder()
1316 .await?
1317 .build()?;
1318
1319 assert_eq!(request.method(), reqwest::Method::GET);
1320 assert_eq!(
1321 request.url().as_str(),
1322 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1323 );
1324
1325 assert_eq!(request.headers().get("range"), want);
1326 Ok(())
1327 }
1328
1329 #[test_case(0, -100, RangeError::NegativeLimit; "negative limit")]
1330 #[test_case(-100, 100, RangeError::NegativeOffsetWithLimit; "negative offset with positive limit")]
1331 #[tokio::test]
1332 async fn test_range_header_error(offset: i64, limit: i64, want_err: RangeError) -> Result {
1333 let inner = test_inner_client(test_builder());
1334 let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1335 .with_read_offset(offset)
1336 .with_read_limit(limit)
1337 .http_request_builder()
1338 .await
1339 .unwrap_err();
1340
1341 assert_eq!(
1342 err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
1343 &want_err
1344 );
1345 Ok(())
1346 }
1347
1348 #[test_case("projects/p", "projects%2Fp")]
1349 #[test_case("kebab-case", "kebab-case")]
1350 #[test_case("dot.name", "dot.name")]
1351 #[test_case("under_score", "under_score")]
1352 #[test_case("tilde~123", "tilde~123")]
1353 #[test_case("exclamation!point!", "exclamation%21point%21")]
1354 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1355 #[test_case("preserve%percent%21", "preserve%percent%21")]
1356 #[test_case(
1357 "testall !#$&'()*+,/:;=?@[]",
1358 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1359 )]
1360 #[tokio::test]
1361 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1362 let inner = test_inner_client(test_builder());
1363 let request = ReadObject::new(inner, "projects/_/buckets/bucket", name)
1364 .http_request_builder()
1365 .await?
1366 .build()?;
1367 let got = request.url().path_segments().unwrap().next_back().unwrap();
1368 assert_eq!(got, want);
1369 Ok(())
1370 }
1371
1372 #[test_case(10, Some(10); "Match values")]
1373 #[test_case(10, None; "None response")]
1374 fn check_crc_success(crc: u32, resp_crc: Option<u32>) {
1375 let res = check_crc32c_match(crc, resp_crc);
1376 assert!(res.is_ok(), "{res:?}");
1377 }
1378
1379 #[test_case(10, 20)]
1380 fn check_crc_error(crc: u32, response: u32) {
1381 let err = check_crc32c_match(crc, Some(response))
1382 .expect_err("mismatched CRC values should result in error");
1383 assert!(err.is_deserialization());
1384 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1385 assert!(
1386 matches!(source, Some(&ReadError::BadCrc { got, want }) if got == crc && want == response),
1387 "{err:?}"
1388 );
1389 }
1390
1391 #[test]
1392 fn document_crc32c_values() {
1393 let bytes = (1234567890_u32).to_be_bytes();
1394 let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1395 assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1396 }
1397
1398 #[test_case("", None; "no header")]
1399 #[test_case("crc32c=hello", None; "invalid value")]
1400 #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1401 #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1402 #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1403 #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1404 fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1405 let mut headers = http::HeaderMap::new();
1406 if !val.is_empty() {
1407 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1408 }
1409 let got = headers_to_crc32c(&headers);
1410 assert_eq!(got, want);
1411 Ok(())
1412 }
1413
1414 #[test_case("", None; "no header")]
1415 #[test_case("md5=invalid", None; "invalid value")]
1416 #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1417 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1418 #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1419 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1420 fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1421 let mut headers = http::HeaderMap::new();
1422 if !val.is_empty() {
1423 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1424 }
1425 let got = headers_to_md5_hash(&headers);
1426 match want {
1427 Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1428 None => assert!(got.is_empty()),
1429 }
1430 Ok(())
1431 }
1432
1433 #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==")], http::StatusCode::OK, None; "full content not requested")]
1434 #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None; "No x-goog-hash")]
1435 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None; "server uncompressed")]
1436 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32); "both gzip")]
1437 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==")], http::StatusCode::OK, Some(1234567890_u32); "all ok")]
1438 fn test_check_crc_enabled(
1439 full_content_requested: bool,
1440 headers: Vec<(&str, &str)>,
1441 status: http::StatusCode,
1442 want: Option<u32>,
1443 ) -> Result {
1444 let mut header_map = http::HeaderMap::new();
1445 for (key, value) in headers {
1446 header_map.insert(
1447 http::HeaderName::from_bytes(key.as_bytes())?,
1448 http::HeaderValue::from_bytes(value.as_bytes())?,
1449 );
1450 }
1451
1452 let got = crc32c_from_response(full_content_requested, status, &header_map);
1453 assert_eq!(got, want);
1454 Ok(())
1455 }
1456
1457 #[test_case(0)]
1458 #[test_case(1024)]
1459 fn response_range_success(limit: u64) -> Result {
1460 let response = http::Response::builder()
1461 .status(200)
1462 .header("content-length", limit)
1463 .body(Vec::new())?;
1464 let response = reqwest::Response::from(response);
1465 let range = response_range(&response)?;
1466 assert_eq!(range, ReadRange { start: 0, limit });
1467 Ok(())
1468 }
1469
1470 #[test]
1471 fn response_range_missing() -> Result {
1472 let response = http::Response::builder().status(200).body(Vec::new())?;
1473 let response = reqwest::Response::from(response);
1474 let err = response_range(&response).expect_err("missing header should result in an error");
1475 assert!(
1476 matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1477 "{err:?}"
1478 );
1479 Ok(())
1480 }
1481
1482 #[test_case("")]
1483 #[test_case("abc")]
1484 #[test_case("-123")]
1485 fn response_range_format(value: &'static str) -> Result {
1486 let response = http::Response::builder()
1487 .status(200)
1488 .header("content-length", value)
1489 .body(Vec::new())?;
1490 let response = reqwest::Response::from(response);
1491 let err = response_range(&response).expect_err("header value should result in an error");
1492 assert!(
1493 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1494 "{err:?}"
1495 );
1496 assert!(err.source().is_some(), "{err:?}");
1497 Ok(())
1498 }
1499
1500 #[test_case(0, 123)]
1501 #[test_case(123, 456)]
1502 fn response_range_partial_success(start: u64, end: u64) -> Result {
1503 let response = http::Response::builder()
1504 .status(206)
1505 .header(
1506 "content-range",
1507 format!("bytes {}-{}/{}", start, end, end + 1),
1508 )
1509 .body(Vec::new())?;
1510 let response = reqwest::Response::from(response);
1511 let range = response_range(&response)?;
1512 assert_eq!(
1513 range,
1514 ReadRange {
1515 start,
1516 limit: (end + 1 - start)
1517 }
1518 );
1519 Ok(())
1520 }
1521
1522 #[test]
1523 fn response_range_partial_missing() -> Result {
1524 let response = http::Response::builder().status(206).body(Vec::new())?;
1525 let response = reqwest::Response::from(response);
1526 let err = response_range(&response).expect_err("missing header should result in an error");
1527 assert!(
1528 matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1529 "{err:?}"
1530 );
1531 Ok(())
1532 }
1533
1534 #[test_case("")]
1535 #[test_case("123-456/457"; "bad prefix")]
1536 #[test_case("bytes 123-456 457"; "bad separator")]
1537 #[test_case("bytes 123+456/457"; "bad separator [2]")]
1538 #[test_case("bytes abc-456/457"; "start is not numbers")]
1539 #[test_case("bytes 123-cde/457"; "end is not numbers")]
1540 #[test_case("bytes 123-0/457"; "invalid range")]
1541 fn response_range_partial_format(value: &'static str) -> Result {
1542 let response = http::Response::builder()
1543 .status(206)
1544 .header("content-range", value)
1545 .body(Vec::new())?;
1546 let response = reqwest::Response::from(response);
1547 let err = response_range(&response).expect_err("header value should result in an error");
1548 assert!(
1549 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1550 "{err:?}"
1551 );
1552 assert!(err.source().is_some(), "{err:?}");
1553 Ok(())
1554 }
1555
1556 #[test]
1557 fn response_range_bad_response() -> Result {
1558 let code = reqwest::StatusCode::CREATED;
1559 let response = http::Response::builder().status(code).body(Vec::new())?;
1560 let response = reqwest::Response::from(response);
1561 let err = response_range(&response).expect_err("unexpected status creates error");
1562 assert!(
1563 matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1564 "{err:?}"
1565 );
1566 Ok(())
1567 }
1568
1569 #[test_case(0)]
1570 #[test_case(1024)]
1571 fn response_generation_success(value: i64) -> Result {
1572 let response = http::Response::builder()
1573 .status(200)
1574 .header("x-goog-generation", value)
1575 .body(Vec::new())?;
1576 let response = reqwest::Response::from(response);
1577 let got = response_generation(&response)?;
1578 assert_eq!(got, value);
1579 Ok(())
1580 }
1581
1582 #[test]
1583 fn response_generation_missing() -> Result {
1584 let response = http::Response::builder().status(200).body(Vec::new())?;
1585 let response = reqwest::Response::from(response);
1586 let err =
1587 response_generation(&response).expect_err("missing header should result in an error");
1588 assert!(
1589 matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1590 "{err:?}"
1591 );
1592 Ok(())
1593 }
1594
1595 #[test_case("")]
1596 #[test_case("abc")]
1597 fn response_generation_format(value: &'static str) -> Result {
1598 let response = http::Response::builder()
1599 .status(200)
1600 .header("x-goog-generation", value)
1601 .body(Vec::new())?;
1602 let response = reqwest::Response::from(response);
1603 let err =
1604 response_generation(&response).expect_err("header value should result in an error");
1605 assert!(
1606 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1607 "{err:?}"
1608 );
1609 assert!(err.source().is_some(), "{err:?}");
1610 Ok(())
1611 }
1612
1613 #[test]
1614 fn required_header_not_str() -> Result {
1615 let name = "x-goog-test";
1616 let response = http::Response::builder()
1617 .status(200)
1618 .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1619 .body(Vec::new())?;
1620 let response = reqwest::Response::from(response);
1621 let err =
1622 required_header(&response, name).expect_err("header value should result in an error");
1623 assert!(
1624 matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1625 "{err:?}"
1626 );
1627 assert!(err.source().is_some(), "{err:?}");
1628 Ok(())
1629 }
1630}