1use super::client::*;
16use super::*;
17use crate::error::ReadError;
18use crate::model::ObjectChecksums;
19use crate::model_ext::KeyAes256;
20use crate::model_ext::ObjectHighlights;
21use crate::read_object::ReadObjectResponse;
22use crate::read_resume_policy::ReadResumePolicy;
23use crate::storage::checksum::details::{Md5, validate};
24use crate::storage::request_options::RequestOptions;
25use base64::Engine;
26use serde_with::DeserializeAs;
27
28#[derive(Clone, Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67 S: crate::storage::stub::Storage + 'static,
68{
69 stub: std::sync::Arc<S>,
70 request: crate::model::ReadObjectRequest,
71 options: RequestOptions,
72}
73
74impl<S> ReadObject<S>
75where
76 S: crate::storage::stub::Storage + 'static,
77{
78 pub(crate) fn new<B, O>(
79 stub: std::sync::Arc<S>,
80 bucket: B,
81 object: O,
82 options: RequestOptions,
83 ) -> Self
84 where
85 B: Into<String>,
86 O: Into<String>,
87 {
88 ReadObject {
89 stub,
90 request: crate::model::ReadObjectRequest::new()
91 .set_bucket(bucket)
92 .set_object(object),
93 options,
94 }
95 }
96
97 pub fn compute_md5(self) -> Self {
126 let mut this = self;
127 this.options.checksum.md5_hash = Some(Md5::default());
128 this
129 }
130
131 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
134 self.request.generation = v.into();
135 self
136 }
137
138 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
142 where
143 T: Into<i64>,
144 {
145 self.request.if_generation_match = Some(v.into());
146 self
147 }
148
149 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
154 where
155 T: Into<i64>,
156 {
157 self.request.if_generation_not_match = Some(v.into());
158 self
159 }
160
161 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
164 where
165 T: Into<i64>,
166 {
167 self.request.if_metageneration_match = Some(v.into());
168 self
169 }
170
171 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
174 where
175 T: Into<i64>,
176 {
177 self.request.if_metageneration_not_match = Some(v.into());
178 self
179 }
180
181 pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
232 self.request.with_range(range);
233 self
234 }
235
236 pub fn set_key(mut self, v: KeyAes256) -> Self {
253 self.request.common_object_request_params = Some(v.into());
254 self
255 }
256
257 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
279 self.options.retry_policy = v.into().into();
280 self
281 }
282
283 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
300 mut self,
301 v: V,
302 ) -> Self {
303 self.options.backoff_policy = v.into().into();
304 self
305 }
306
307 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
330 mut self,
331 v: V,
332 ) -> Self {
333 self.options.retry_throttler = v.into().into();
334 self
335 }
336
337 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
357 where
358 V: ReadResumePolicy + 'static,
359 {
360 self.options.read_resume_policy = std::sync::Arc::new(v);
361 self
362 }
363
364 pub async fn send(self) -> Result<ReadObjectResponse> {
366 self.stub.read_object(self.request, self.options).await
367 }
368}
369
370#[derive(Clone, Debug)]
372pub(crate) struct Reader {
373 pub inner: std::sync::Arc<StorageInner>,
374 pub request: crate::model::ReadObjectRequest,
375 pub options: RequestOptions,
376}
377
378impl Reader {
379 async fn read(self) -> Result<reqwest::Response> {
380 let throttler = self.options.retry_throttler.clone();
381 let retry = self.options.retry_policy.clone();
382 let backoff = self.options.backoff_policy.clone();
383
384 gax::retry_loop_internal::retry_loop(
385 async move |_| self.read_attempt().await,
386 async |duration| tokio::time::sleep(duration).await,
387 true,
388 throttler,
389 retry,
390 backoff,
391 )
392 .await
393 }
394
395 async fn read_attempt(&self) -> Result<reqwest::Response> {
396 let builder = self.http_request_builder().await?;
397 let response = builder.send().await.map_err(Error::io)?;
398 if !response.status().is_success() {
399 return gaxi::http::to_http_error(response).await;
400 }
401 Ok(response)
402 }
403
404 async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
405 let bucket = &self.request.bucket;
407 let bucket_id = bucket
408 .as_str()
409 .strip_prefix("projects/_/buckets/")
410 .ok_or_else(|| {
411 Error::binding(format!(
412 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
413 ))
414 })?;
415 let object = &self.request.object;
416
417 let builder = self
419 .inner
420 .client
421 .request(
422 reqwest::Method::GET,
423 format!(
424 "{}/storage/v1/b/{bucket_id}/o/{}",
425 &self.inner.endpoint,
426 enc(object)
427 ),
428 )
429 .query(&[("alt", "media")])
430 .header(
431 "x-goog-api-client",
432 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
433 );
434
435 let builder = if self.request.generation != 0 {
437 builder.query(&[("generation", self.request.generation)])
438 } else {
439 builder
440 };
441 let builder = self
442 .request
443 .if_generation_match
444 .iter()
445 .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
446 let builder = self
447 .request
448 .if_generation_not_match
449 .iter()
450 .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
451 let builder = self
452 .request
453 .if_metageneration_match
454 .iter()
455 .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
456 let builder = self
457 .request
458 .if_metageneration_not_match
459 .iter()
460 .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
461
462 let builder = apply_customer_supplied_encryption_headers(
463 builder,
464 &self.request.common_object_request_params,
465 );
466
467 let builder = match (self.request.read_offset, self.request.read_limit) {
469 (_, l) if l < 0 => {
471 unreachable!("ReadObject build never sets a negative read_limit value")
472 }
473 (o, l) if o < 0 && l > 0 => unreachable!(
475 "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
476 ),
477 (0, 0) => builder,
479 (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
481 (o, 0) => builder.header("range", format!("bytes={o}-")),
484 (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
487 };
488
489 self.inner.apply_auth_headers(builder).await
490 }
491}
492
493fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
494 headers
495 .get("x-goog-hash")
496 .and_then(|hash| hash.to_str().ok())
497 .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
498 .and_then(|hash| {
499 let hash = hash.trim_start_matches("crc32c=");
500 v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
501 })
502}
503
504fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
505 headers
506 .get("x-goog-hash")
507 .and_then(|hash| hash.to_str().ok())
508 .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
509 .and_then(|hash| {
510 let hash = hash.trim_start_matches("md5=");
511 base64::prelude::BASE64_STANDARD.decode(hash).ok()
512 })
513 .unwrap_or_default()
514}
515
516#[derive(Debug)]
518pub(crate) struct ReadObjectResponseImpl {
519 reader: Reader,
520 response: Option<reqwest::Response>,
521 highlights: ObjectHighlights,
522 response_checksums: ObjectChecksums,
524 range: ReadRange,
526 generation: i64,
527 resume_count: u32,
528}
529
530impl ReadObjectResponseImpl {
531 pub(crate) async fn new(reader: Reader) -> Result<Self> {
532 let response = reader.clone().read().await?;
533
534 let full = reader.request.read_offset == 0 && reader.request.read_limit == 0;
535 let response_checksums =
536 checksums_from_response(full, response.status(), response.headers());
537 let range = response_range(&response).map_err(Error::deser)?;
538 let generation = response_generation(&response).map_err(Error::deser)?;
539
540 let headers = response.headers();
541 let get_as_i64 = |header_name: &str| -> i64 {
542 headers
543 .get(header_name)
544 .and_then(|s| s.to_str().ok())
545 .and_then(|s| s.parse::<i64>().ok())
546 .unwrap_or_default()
547 };
548 let get_as_string = |header_name: &str| -> String {
549 headers
550 .get(header_name)
551 .and_then(|sc| sc.to_str().ok())
552 .map(|sc| sc.to_string())
553 .unwrap_or_default()
554 };
555 let highlights = ObjectHighlights {
556 generation,
557 metageneration: get_as_i64("x-goog-metageneration"),
558 size: get_as_i64("x-goog-stored-content-length"),
559 content_encoding: get_as_string("x-goog-stored-content-encoding"),
560 storage_class: get_as_string("x-goog-storage-class"),
561 content_type: get_as_string("content-type"),
562 content_language: get_as_string("content-language"),
563 content_disposition: get_as_string("content-disposition"),
564 etag: get_as_string("etag"),
565 checksums: headers.get("x-goog-hash").map(|_| {
566 crate::model::ObjectChecksums::new()
567 .set_or_clear_crc32c(headers_to_crc32c(headers))
568 .set_md5_hash(headers_to_md5_hash(headers))
569 }),
570 };
571
572 Ok(Self {
573 reader,
574 response: Some(response),
575 highlights,
576 response_checksums,
578 range,
580 generation,
581 resume_count: 0,
582 })
583 }
584}
585
586#[async_trait::async_trait]
587impl crate::read_object::dynamic::ReadObjectResponse for ReadObjectResponseImpl {
588 fn object(&self) -> ObjectHighlights {
589 self.highlights.clone()
590 }
591
592 async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
593 match self.next_attempt().await {
594 None => None,
595 Some(Ok(b)) => Some(Ok(b)),
596 Some(Err(e)) => Box::pin(self.resume(e)).await,
599 }
600 }
601}
602
603impl ReadObjectResponseImpl {
604 async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
605 let response = self.response.as_mut()?;
606 let res = response.chunk().await.map_err(Error::io);
607 match res {
608 Ok(Some(chunk)) => {
609 self.reader
610 .options
611 .checksum
612 .update(self.range.start, &chunk);
613 let len = chunk.len() as u64;
614 if self.range.limit < len {
615 return Some(Err(Error::deser(ReadError::LongRead {
616 expected: self.range.limit,
617 got: len,
618 })));
619 }
620 self.range.limit -= len;
621 self.range.start += len;
622 Some(Ok(chunk))
623 }
624 Ok(None) => {
625 if self.range.limit != 0 {
626 return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
627 }
628 let computed = self.reader.options.checksum.finalize();
629 let res = validate(&self.response_checksums, &Some(computed));
630 match res {
631 Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
632 Ok(()) => None,
633 }
634 }
635 Err(e) => Some(Err(e)),
636 }
637 }
638
639 async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
640 use crate::read_object::dynamic::ReadObjectResponse;
641 use crate::read_resume_policy::{ResumeQuery, ResumeResult};
642
643 self.response = None;
645 self.resume_count += 1;
646 let query = ResumeQuery::new(self.resume_count);
647 match self
648 .reader
649 .options
650 .read_resume_policy
651 .on_error(&query, error)
652 {
653 ResumeResult::Continue(_) => {}
654 ResumeResult::Permanent(e) => return Some(Err(e)),
655 ResumeResult::Exhausted(e) => return Some(Err(e)),
656 };
657 self.reader.request.read_offset = self.range.start as i64;
658 self.reader.request.read_limit = self.range.limit as i64;
659 self.reader.request.generation = self.generation;
660 self.response = match self.reader.clone().read().await {
661 Ok(r) => Some(r),
662 Err(e) => return Some(Err(e)),
663 };
664 self.next().await
665 }
666}
667
668fn checksums_from_response(
683 full_content_requested: bool,
684 status: http::StatusCode,
685 headers: &http::HeaderMap,
686) -> ObjectChecksums {
687 let checksums = ObjectChecksums::new();
688 if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
689 return checksums;
690 }
691 let stored_encoding = headers
692 .get("x-goog-stored-content-encoding")
693 .and_then(|e| e.to_str().ok())
694 .map_or("", |e| e);
695 let content_encoding = headers
696 .get("content-encoding")
697 .and_then(|e| e.to_str().ok())
698 .map_or("", |e| e);
699 if stored_encoding == "gzip" && content_encoding != "gzip" {
700 return checksums;
701 }
702 checksums
703 .set_or_clear_crc32c(headers_to_crc32c(headers))
704 .set_md5_hash(headers_to_md5_hash(headers))
705}
706
707fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
708 match response.status() {
709 reqwest::StatusCode::OK => {
710 let header = required_header(response, "content-length")?;
711 let limit = header
712 .parse::<u64>()
713 .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
714 Ok(ReadRange { start: 0, limit })
715 }
716 reqwest::StatusCode::PARTIAL_CONTENT => {
717 let header = required_header(response, "content-range")?;
718 let header = header.strip_prefix("bytes ").ok_or_else(|| {
719 ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
720 })?;
721 let (range, _) = header.split_once('/').ok_or_else(|| {
722 ReadError::BadHeaderFormat("content-range", "missing / separator".into())
723 })?;
724 let (start, end) = range.split_once('-').ok_or_else(|| {
725 ReadError::BadHeaderFormat("content-range", "missing - separator".into())
726 })?;
727 let start = start
728 .parse::<u64>()
729 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
730 let end = end
731 .parse::<u64>()
732 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
733 let end = end + 1;
736 let limit = end
737 .checked_sub(start)
738 .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
739 Ok(ReadRange { start, limit })
740 }
741 s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
742 }
743}
744
745fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
746 let header = required_header(response, "x-goog-generation")?;
747 header
748 .parse::<i64>()
749 .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
750}
751
752fn required_header<'a>(
753 response: &'a reqwest::Response,
754 name: &'static str,
755) -> std::result::Result<&'a str, ReadError> {
756 let header = response
757 .headers()
758 .get(name)
759 .ok_or_else(|| ReadError::MissingHeader(name))?;
760 header
761 .to_str()
762 .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
763}
764
765#[derive(Debug, PartialEq)]
766struct ReadRange {
767 start: u64,
768 limit: u64,
769}
770
771#[cfg(test)]
772mod resume_tests;
773
774#[cfg(test)]
775mod tests {
776 use super::client::tests::{test_builder, test_inner_client};
777 use super::*;
778 use crate::error::ChecksumMismatch;
779 use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
780 use futures::TryStreamExt;
781 use httptest::{Expectation, Server, matchers::*, responders::status_code};
782 use std::collections::HashMap;
783 use std::error::Error;
784 use std::sync::Arc;
785 use test_case::test_case;
786
787 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
788
789 async fn http_request_builder(
790 inner: Arc<StorageInner>,
791 builder: ReadObject,
792 ) -> crate::Result<reqwest::RequestBuilder> {
793 let reader = Reader {
794 inner,
795 request: builder.request,
796 options: builder.options,
797 };
798 reader.http_request_builder().await
799 }
800
801 #[tokio::test]
803 async fn test_read_is_send_and_static() -> Result {
804 let client = Storage::builder()
805 .with_credentials(auth::credentials::testing::test_credentials())
806 .build()
807 .await?;
808
809 fn need_send<T: Send>(_val: &T) {}
810 fn need_sync<T: Sync>(_val: &T) {}
811 fn need_static<T: 'static>(_val: &T) {}
812
813 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
814 need_send(&read);
815 need_sync(&read);
816 need_static(&read);
817
818 let read = client
819 .read_object("projects/_/buckets/test-bucket", "test-object")
820 .send();
821 need_send(&read);
822 need_static(&read);
823
824 Ok(())
825 }
826 #[tokio::test]
827 async fn read_object_normal() -> Result {
828 let server = Server::run();
829 server.expect(
830 Expectation::matching(all_of![
831 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
832 request::query(url_decoded(contains(("alt", "media")))),
833 ])
834 .respond_with(
835 status_code(200)
836 .body("hello world")
837 .append_header("x-goog-generation", 123456),
838 ),
839 );
840
841 let client = Storage::builder()
842 .with_endpoint(format!("http://{}", server.addr()))
843 .with_credentials(auth::credentials::testing::test_credentials())
844 .build()
845 .await?;
846 let mut reader = client
847 .read_object("projects/_/buckets/test-bucket", "test-object")
848 .send()
849 .await?;
850 let mut got = Vec::new();
851 while let Some(b) = reader.next().await.transpose()? {
852 got.extend_from_slice(&b);
853 }
854 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
855
856 Ok(())
857 }
858
859 #[tokio::test]
860 async fn read_object_metadata() -> Result {
861 const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
862 let server = Server::run();
863 server.expect(
864 Expectation::matching(all_of![
865 request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
866 request::query(url_decoded(contains(("alt", "media")))),
867 ])
868 .respond_with(
869 status_code(200)
870 .body(CONTENTS)
871 .append_header(
872 "x-goog-hash",
873 "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
874 )
875 .append_header("x-goog-generation", 500)
876 .append_header("x-goog-metageneration", "1")
877 .append_header("x-goog-stored-content-length", 30)
878 .append_header("x-goog-stored-content-encoding", "identity")
879 .append_header("x-goog-storage-class", "STANDARD")
880 .append_header("content-language", "en")
881 .append_header("content-type", "text/plain")
882 .append_header("content-disposition", "inline")
883 .append_header("etag", "etagval"),
884 ),
885 );
886
887 let endpoint = server.url("");
888 let client = Storage::builder()
889 .with_endpoint(endpoint.to_string())
890 .with_credentials(auth::credentials::testing::test_credentials())
891 .build()
892 .await?;
893 let reader = client
894 .read_object("projects/_/buckets/test-bucket", "test-object")
895 .send()
896 .await?;
897 let object = reader.object();
898 assert_eq!(object.generation, 500);
899 assert_eq!(object.metageneration, 1);
900 assert_eq!(object.size, 30);
901 assert_eq!(object.content_encoding, "identity");
902 assert_eq!(
903 object.checksums.as_ref().unwrap().crc32c.unwrap(),
904 crc32c::crc32c(CONTENTS.as_bytes())
905 );
906 assert_eq!(
907 object.checksums.as_ref().unwrap().md5_hash,
908 base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
909 );
910
911 Ok(())
912 }
913
914 #[tokio::test]
915 async fn read_object_stream() -> Result {
916 let server = Server::run();
917 server.expect(
918 Expectation::matching(all_of![
919 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
920 request::query(url_decoded(contains(("alt", "media")))),
921 ])
922 .respond_with(
923 status_code(200)
924 .append_header("x-goog-generation", 123456)
925 .body("hello world"),
926 ),
927 );
928
929 let client = Storage::builder()
930 .with_endpoint(format!("http://{}", server.addr()))
931 .with_credentials(auth::credentials::testing::test_credentials())
932 .build()
933 .await?;
934 let response = client
935 .read_object("projects/_/buckets/test-bucket", "test-object")
936 .send()
937 .await?;
938 let result: Vec<_> = response.into_stream().try_collect().await?;
939 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
940
941 Ok(())
942 }
943
944 #[tokio::test]
945 async fn read_object_next_then_consume_response() -> Result {
946 const BLOCK_SIZE: usize = 500;
948 let mut contents = Vec::new();
949 for i in 0..50 {
950 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
951 }
952
953 let u = crc32c::crc32c(&contents);
955 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
956
957 let server = Server::run();
958 server.expect(
959 Expectation::matching(all_of![
960 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
961 request::query(url_decoded(contains(("alt", "media")))),
962 ])
963 .times(1)
964 .respond_with(
965 status_code(200)
966 .body(contents.clone())
967 .append_header("x-goog-hash", format!("crc32c={value}"))
968 .append_header("x-goog-generation", 123456),
969 ),
970 );
971
972 let client = Storage::builder()
973 .with_endpoint(format!("http://{}", server.addr()))
974 .with_credentials(auth::credentials::testing::test_credentials())
975 .build()
976 .await?;
977
978 let mut response = client
980 .read_object("projects/_/buckets/test-bucket", "test-object")
981 .send()
982 .await?;
983
984 let mut all_bytes = bytes::BytesMut::new();
985 let chunk = response.next().await.transpose()?.unwrap();
986 assert!(!chunk.is_empty());
987 all_bytes.extend(chunk);
988 use futures::StreamExt;
989 let mut stream = response.into_stream();
990 while let Some(chunk) = stream.next().await.transpose()? {
991 all_bytes.extend(chunk);
992 }
993 assert_eq!(all_bytes, contents);
994
995 Ok(())
996 }
997
998 #[tokio::test]
999 async fn read_object_not_found() -> Result {
1000 let server = Server::run();
1001 server.expect(
1002 Expectation::matching(all_of![
1003 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1004 request::query(url_decoded(contains(("alt", "media")))),
1005 ])
1006 .respond_with(status_code(404).body("NOT FOUND")),
1007 );
1008
1009 let client = Storage::builder()
1010 .with_endpoint(format!("http://{}", server.addr()))
1011 .with_credentials(auth::credentials::testing::test_credentials())
1012 .build()
1013 .await?;
1014 let err = client
1015 .read_object("projects/_/buckets/test-bucket", "test-object")
1016 .send()
1017 .await
1018 .expect_err("expected a not found error");
1019 assert_eq!(err.http_status_code(), Some(404));
1020
1021 Ok(())
1022 }
1023
1024 #[tokio::test]
1025 async fn read_object_incorrect_crc32c_check() -> Result {
1026 let u = crc32c::crc32c("goodbye world".as_bytes());
1028 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1029
1030 let server = Server::run();
1031 server.expect(
1032 Expectation::matching(all_of![
1033 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1034 request::query(url_decoded(contains(("alt", "media")))),
1035 ])
1036 .times(3)
1037 .respond_with(
1038 status_code(200)
1039 .body("hello world")
1040 .append_header("x-goog-hash", format!("crc32c={value}"))
1041 .append_header("x-goog-generation", 123456),
1042 ),
1043 );
1044
1045 let client = Storage::builder()
1046 .with_endpoint(format!("http://{}", server.addr()))
1047 .with_credentials(auth::credentials::testing::test_credentials())
1048 .build()
1049 .await?;
1050 let mut response = client
1051 .read_object("projects/_/buckets/test-bucket", "test-object")
1052 .send()
1053 .await?;
1054 let mut partial = Vec::new();
1055 let mut err = None;
1056 while let Some(r) = response.next().await {
1057 match r {
1058 Ok(b) => partial.extend_from_slice(&b),
1059 Err(e) => err = Some(e),
1060 };
1061 }
1062 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1063 let err = err.expect("expect error on incorrect crc32c");
1064 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1065 assert!(
1066 matches!(
1067 source,
1068 Some(&ReadError::ChecksumMismatch(
1069 ChecksumMismatch::Crc32c { .. }
1070 ))
1071 ),
1072 "err={err:?}"
1073 );
1074
1075 let mut response = client
1076 .read_object("projects/_/buckets/test-bucket", "test-object")
1077 .send()
1078 .await?;
1079 let err: crate::Error = async {
1080 {
1081 while (response.next().await.transpose()?).is_some() {}
1082 Ok(())
1083 }
1084 }
1085 .await
1086 .expect_err("expect error on incorrect crc32c");
1087 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1088 assert!(
1089 matches!(
1090 source,
1091 Some(&ReadError::ChecksumMismatch(
1092 ChecksumMismatch::Crc32c { .. }
1093 ))
1094 ),
1095 "err={err:?}"
1096 );
1097
1098 use futures::TryStreamExt;
1099 let err = client
1100 .read_object("projects/_/buckets/test-bucket", "test-object")
1101 .send()
1102 .await?
1103 .into_stream()
1104 .try_collect::<Vec<bytes::Bytes>>()
1105 .await
1106 .expect_err("expect error on incorrect crc32c");
1107 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1108 assert!(
1109 matches!(
1110 source,
1111 Some(&ReadError::ChecksumMismatch(
1112 ChecksumMismatch::Crc32c { .. }
1113 ))
1114 ),
1115 "err={err:?}"
1116 );
1117 Ok(())
1118 }
1119
1120 #[tokio::test]
1121 async fn read_object_incorrect_md5_check() -> Result {
1122 let digest = md5::compute("goodbye world".as_bytes());
1124 let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1125
1126 let server = Server::run();
1127 server.expect(
1128 Expectation::matching(all_of![
1129 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1130 request::query(url_decoded(contains(("alt", "media")))),
1131 ])
1132 .times(1)
1133 .respond_with(
1134 status_code(200)
1135 .body("hello world")
1136 .append_header("x-goog-hash", format!("md5={value}"))
1137 .append_header("x-goog-generation", 123456),
1138 ),
1139 );
1140
1141 let client = Storage::builder()
1142 .with_endpoint(format!("http://{}", server.addr()))
1143 .with_credentials(auth::credentials::testing::test_credentials())
1144 .build()
1145 .await?;
1146 let mut response = client
1147 .read_object("projects/_/buckets/test-bucket", "test-object")
1148 .compute_md5()
1149 .send()
1150 .await?;
1151 let mut partial = Vec::new();
1152 let mut err = None;
1153 while let Some(r) = response.next().await {
1154 match r {
1155 Ok(b) => partial.extend_from_slice(&b),
1156 Err(e) => err = Some(e),
1157 };
1158 }
1159 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1160 let err = err.expect("expect error on incorrect md5");
1161 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1162 assert!(
1163 matches!(
1164 source,
1165 Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1166 ),
1167 "err={err:?}"
1168 );
1169
1170 Ok(())
1171 }
1172
1173 #[tokio::test]
1174 async fn read_object() -> Result {
1175 let inner = test_inner_client(test_builder());
1176 let stub = crate::storage::transport::Storage::new(inner.clone());
1177 let builder = ReadObject::new(
1178 stub,
1179 "projects/_/buckets/bucket",
1180 "object",
1181 inner.options.clone(),
1182 );
1183 let request = http_request_builder(inner, builder).await?.build()?;
1184
1185 assert_eq!(request.method(), reqwest::Method::GET);
1186 assert_eq!(
1187 request.url().as_str(),
1188 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1189 );
1190 Ok(())
1191 }
1192
1193 #[tokio::test]
1194 async fn read_object_error_credentials() -> Result {
1195 let inner = test_inner_client(
1196 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1197 );
1198 let stub = crate::storage::transport::Storage::new(inner.clone());
1199 let builder = ReadObject::new(
1200 stub,
1201 "projects/_/buckets/bucket",
1202 "object",
1203 inner.options.clone(),
1204 );
1205 let _ = http_request_builder(inner, builder)
1206 .await
1207 .inspect_err(|e| assert!(e.is_authentication()))
1208 .expect_err("invalid credentials should err");
1209 Ok(())
1210 }
1211
1212 #[tokio::test]
1213 async fn read_object_bad_bucket() -> Result {
1214 let inner = test_inner_client(test_builder());
1215 let stub = crate::storage::transport::Storage::new(inner.clone());
1216 let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1217 let _ = http_request_builder(inner, builder)
1218 .await
1219 .expect_err("malformed bucket string should error");
1220 Ok(())
1221 }
1222
1223 #[tokio::test]
1224 async fn read_object_query_params() -> Result {
1225 let inner = test_inner_client(test_builder());
1226 let stub = crate::storage::transport::Storage::new(inner.clone());
1227 let builder = ReadObject::new(
1228 stub,
1229 "projects/_/buckets/bucket",
1230 "object",
1231 inner.options.clone(),
1232 )
1233 .set_generation(5)
1234 .set_if_generation_match(10)
1235 .set_if_generation_not_match(20)
1236 .set_if_metageneration_match(30)
1237 .set_if_metageneration_not_match(40);
1238 let request = http_request_builder(inner, builder).await?.build()?;
1239
1240 assert_eq!(request.method(), reqwest::Method::GET);
1241 let want_pairs: HashMap<String, String> = [
1242 ("alt", "media"),
1243 ("generation", "5"),
1244 ("ifGenerationMatch", "10"),
1245 ("ifGenerationNotMatch", "20"),
1246 ("ifMetagenerationMatch", "30"),
1247 ("ifMetagenerationNotMatch", "40"),
1248 ]
1249 .iter()
1250 .map(|(k, v)| (k.to_string(), v.to_string()))
1251 .collect();
1252 let query_pairs: HashMap<String, String> = request
1253 .url()
1254 .query_pairs()
1255 .map(|param| (param.0.to_string(), param.1.to_string()))
1256 .collect();
1257 assert_eq!(query_pairs.len(), want_pairs.len());
1258 assert_eq!(query_pairs, want_pairs);
1259 Ok(())
1260 }
1261
1262 #[tokio::test]
1263 async fn read_object_headers() -> Result {
1264 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1266
1267 let inner = test_inner_client(test_builder());
1269 let stub = crate::storage::transport::Storage::new(inner.clone());
1270 let builder = ReadObject::new(
1271 stub,
1272 "projects/_/buckets/bucket",
1273 "object",
1274 inner.options.clone(),
1275 )
1276 .set_key(KeyAes256::new(&key)?);
1277 let request = http_request_builder(inner, builder).await?.build()?;
1278
1279 assert_eq!(request.method(), reqwest::Method::GET);
1280 assert_eq!(
1281 request.url().as_str(),
1282 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1283 );
1284
1285 let want = vec![
1286 ("x-goog-encryption-algorithm", "AES256".to_string()),
1287 ("x-goog-encryption-key", key_base64),
1288 ("x-goog-encryption-key-sha256", key_sha256_base64),
1289 ];
1290
1291 for (name, value) in want {
1292 assert_eq!(
1293 request.headers().get(name).unwrap().as_bytes(),
1294 bytes::Bytes::from(value)
1295 );
1296 }
1297 Ok(())
1298 }
1299
1300 #[test_case(ReadRange::all(), None; "no headers needed")]
1301 #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1302 #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1303 #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1304 #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1305 #[tokio::test]
1306 async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1307 let inner = test_inner_client(test_builder());
1308 let stub = crate::storage::transport::Storage::new(inner.clone());
1309 let builder = ReadObject::new(
1310 stub,
1311 "projects/_/buckets/bucket",
1312 "object",
1313 inner.options.clone(),
1314 )
1315 .set_read_range(input.clone());
1316 let request = http_request_builder(inner, builder).await?.build()?;
1317
1318 assert_eq!(request.method(), reqwest::Method::GET);
1319 assert_eq!(
1320 request.url().as_str(),
1321 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1322 );
1323
1324 assert_eq!(request.headers().get("range"), want);
1325 Ok(())
1326 }
1327
1328 #[test_case("projects/p", "projects%2Fp")]
1329 #[test_case("kebab-case", "kebab-case")]
1330 #[test_case("dot.name", "dot.name")]
1331 #[test_case("under_score", "under_score")]
1332 #[test_case("tilde~123", "tilde~123")]
1333 #[test_case("exclamation!point!", "exclamation%21point%21")]
1334 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1335 #[test_case("preserve%percent%21", "preserve%percent%21")]
1336 #[test_case(
1337 "testall !#$&'()*+,/:;=?@[]",
1338 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1339 )]
1340 #[tokio::test]
1341 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1342 let inner = test_inner_client(test_builder());
1343 let stub = crate::storage::transport::Storage::new(inner.clone());
1344 let builder = ReadObject::new(
1345 stub,
1346 "projects/_/buckets/bucket",
1347 name,
1348 inner.options.clone(),
1349 );
1350 let request = http_request_builder(inner, builder).await?.build()?;
1351 let got = request.url().path_segments().unwrap().next_back().unwrap();
1352 assert_eq!(got, want);
1353 Ok(())
1354 }
1355
1356 #[test]
1357 fn document_crc32c_values() {
1358 let bytes = (1234567890_u32).to_be_bytes();
1359 let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1360 assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1361 }
1362
1363 #[test_case("", None; "no header")]
1364 #[test_case("crc32c=hello", None; "invalid value")]
1365 #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1366 #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1367 #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1368 #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1369 fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1370 let mut headers = http::HeaderMap::new();
1371 if !val.is_empty() {
1372 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1373 }
1374 let got = headers_to_crc32c(&headers);
1375 assert_eq!(got, want);
1376 Ok(())
1377 }
1378
1379 #[test_case("", None; "no header")]
1380 #[test_case("md5=invalid", None; "invalid value")]
1381 #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1382 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1383 #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1384 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1385 fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1386 let mut headers = http::HeaderMap::new();
1387 if !val.is_empty() {
1388 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1389 }
1390 let got = headers_to_md5_hash(&headers);
1391 match want {
1392 Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1393 None => assert!(got.is_empty()),
1394 }
1395 Ok(())
1396 }
1397
1398 #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1399 #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1400 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1401 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1402 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1403 fn test_checksums_validation_enabled(
1404 full_content_requested: bool,
1405 headers: Vec<(&str, &str)>,
1406 status: http::StatusCode,
1407 want_crc32c: Option<u32>,
1408 want_md5: &str,
1409 ) -> Result {
1410 let mut header_map = http::HeaderMap::new();
1411 for (key, value) in headers {
1412 header_map.insert(
1413 http::HeaderName::from_bytes(key.as_bytes())?,
1414 http::HeaderValue::from_bytes(value.as_bytes())?,
1415 );
1416 }
1417
1418 let got = checksums_from_response(full_content_requested, status, &header_map);
1419 assert_eq!(got.crc32c, want_crc32c);
1420 assert_eq!(
1421 got.md5_hash,
1422 base64::prelude::BASE64_STANDARD.decode(want_md5)?
1423 );
1424 Ok(())
1425 }
1426
1427 #[test_case(0)]
1428 #[test_case(1024)]
1429 fn response_range_success(limit: u64) -> Result {
1430 let response = http::Response::builder()
1431 .status(200)
1432 .header("content-length", limit)
1433 .body(Vec::new())?;
1434 let response = reqwest::Response::from(response);
1435 let range = response_range(&response)?;
1436 assert_eq!(range, super::ReadRange { start: 0, limit });
1437 Ok(())
1438 }
1439
1440 #[test]
1441 fn response_range_missing() -> Result {
1442 let response = http::Response::builder().status(200).body(Vec::new())?;
1443 let response = reqwest::Response::from(response);
1444 let err = response_range(&response).expect_err("missing header should result in an error");
1445 assert!(
1446 matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1447 "{err:?}"
1448 );
1449 Ok(())
1450 }
1451
1452 #[test_case("")]
1453 #[test_case("abc")]
1454 #[test_case("-123")]
1455 fn response_range_format(value: &'static str) -> Result {
1456 let response = http::Response::builder()
1457 .status(200)
1458 .header("content-length", value)
1459 .body(Vec::new())?;
1460 let response = reqwest::Response::from(response);
1461 let err = response_range(&response).expect_err("header value should result in an error");
1462 assert!(
1463 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1464 "{err:?}"
1465 );
1466 assert!(err.source().is_some(), "{err:?}");
1467 Ok(())
1468 }
1469
1470 #[test_case(0, 123)]
1471 #[test_case(123, 456)]
1472 fn response_range_partial_success(start: u64, end: u64) -> Result {
1473 let response = http::Response::builder()
1474 .status(206)
1475 .header(
1476 "content-range",
1477 format!("bytes {}-{}/{}", start, end, end + 1),
1478 )
1479 .body(Vec::new())?;
1480 let response = reqwest::Response::from(response);
1481 let range = response_range(&response)?;
1482 assert_eq!(
1483 range,
1484 super::ReadRange {
1485 start,
1486 limit: (end + 1 - start)
1487 }
1488 );
1489 Ok(())
1490 }
1491
1492 #[test]
1493 fn response_range_partial_missing() -> Result {
1494 let response = http::Response::builder().status(206).body(Vec::new())?;
1495 let response = reqwest::Response::from(response);
1496 let err = response_range(&response).expect_err("missing header should result in an error");
1497 assert!(
1498 matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1499 "{err:?}"
1500 );
1501 Ok(())
1502 }
1503
1504 #[test_case("")]
1505 #[test_case("123-456/457"; "bad prefix")]
1506 #[test_case("bytes 123-456 457"; "bad separator")]
1507 #[test_case("bytes 123+456/457"; "bad separator [2]")]
1508 #[test_case("bytes abc-456/457"; "start is not numbers")]
1509 #[test_case("bytes 123-cde/457"; "end is not numbers")]
1510 #[test_case("bytes 123-0/457"; "invalid range")]
1511 fn response_range_partial_format(value: &'static str) -> Result {
1512 let response = http::Response::builder()
1513 .status(206)
1514 .header("content-range", value)
1515 .body(Vec::new())?;
1516 let response = reqwest::Response::from(response);
1517 let err = response_range(&response).expect_err("header value should result in an error");
1518 assert!(
1519 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1520 "{err:?}"
1521 );
1522 assert!(err.source().is_some(), "{err:?}");
1523 Ok(())
1524 }
1525
1526 #[test]
1527 fn response_range_bad_response() -> Result {
1528 let code = reqwest::StatusCode::CREATED;
1529 let response = http::Response::builder().status(code).body(Vec::new())?;
1530 let response = reqwest::Response::from(response);
1531 let err = response_range(&response).expect_err("unexpected status creates error");
1532 assert!(
1533 matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1534 "{err:?}"
1535 );
1536 Ok(())
1537 }
1538
1539 #[test_case(0)]
1540 #[test_case(1024)]
1541 fn response_generation_success(value: i64) -> Result {
1542 let response = http::Response::builder()
1543 .status(200)
1544 .header("x-goog-generation", value)
1545 .body(Vec::new())?;
1546 let response = reqwest::Response::from(response);
1547 let got = response_generation(&response)?;
1548 assert_eq!(got, value);
1549 Ok(())
1550 }
1551
1552 #[test]
1553 fn response_generation_missing() -> Result {
1554 let response = http::Response::builder().status(200).body(Vec::new())?;
1555 let response = reqwest::Response::from(response);
1556 let err =
1557 response_generation(&response).expect_err("missing header should result in an error");
1558 assert!(
1559 matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1560 "{err:?}"
1561 );
1562 Ok(())
1563 }
1564
1565 #[test_case("")]
1566 #[test_case("abc")]
1567 fn response_generation_format(value: &'static str) -> Result {
1568 let response = http::Response::builder()
1569 .status(200)
1570 .header("x-goog-generation", value)
1571 .body(Vec::new())?;
1572 let response = reqwest::Response::from(response);
1573 let err =
1574 response_generation(&response).expect_err("header value should result in an error");
1575 assert!(
1576 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1577 "{err:?}"
1578 );
1579 assert!(err.source().is_some(), "{err:?}");
1580 Ok(())
1581 }
1582
1583 #[test]
1584 fn required_header_not_str() -> Result {
1585 let name = "x-goog-test";
1586 let response = http::Response::builder()
1587 .status(200)
1588 .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1589 .body(Vec::new())?;
1590 let response = reqwest::Response::from(response);
1591 let err =
1592 required_header(&response, name).expect_err("header value should result in an error");
1593 assert!(
1594 matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1595 "{err:?}"
1596 );
1597 assert!(err.source().is_some(), "{err:?}");
1598 Ok(())
1599 }
1600}