1use super::client::*;
16use super::*;
17use crate::error::ReadError;
18use crate::model::ObjectChecksums;
19use crate::model_ext::KeyAes256;
20use crate::model_ext::ObjectHighlights;
21use crate::read_object::ReadObjectResponse;
22use crate::read_resume_policy::ReadResumePolicy;
23use crate::storage::checksum::details::{Md5, validate};
24use crate::storage::request_options::RequestOptions;
25use base64::Engine;
26use serde_with::DeserializeAs;
27
28#[derive(Clone, Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67 S: crate::storage::stub::Storage + 'static,
68{
69 stub: std::sync::Arc<S>,
70 request: crate::model::ReadObjectRequest,
71 options: RequestOptions,
72}
73
74impl<S> ReadObject<S>
75where
76 S: crate::storage::stub::Storage + 'static,
77{
78 pub(crate) fn new<B, O>(
79 stub: std::sync::Arc<S>,
80 bucket: B,
81 object: O,
82 options: RequestOptions,
83 ) -> Self
84 where
85 B: Into<String>,
86 O: Into<String>,
87 {
88 ReadObject {
89 stub,
90 request: crate::model::ReadObjectRequest::new()
91 .set_bucket(bucket)
92 .set_object(object),
93 options,
94 }
95 }
96
97 pub fn compute_md5(self) -> Self {
126 let mut this = self;
127 this.options.checksum.md5_hash = Some(Md5::default());
128 this
129 }
130
131 pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
134 self.request.generation = v.into();
135 self
136 }
137
138 pub fn set_if_generation_match<T>(mut self, v: T) -> Self
142 where
143 T: Into<i64>,
144 {
145 self.request.if_generation_match = Some(v.into());
146 self
147 }
148
149 pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
154 where
155 T: Into<i64>,
156 {
157 self.request.if_generation_not_match = Some(v.into());
158 self
159 }
160
161 pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
164 where
165 T: Into<i64>,
166 {
167 self.request.if_metageneration_match = Some(v.into());
168 self
169 }
170
171 pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
174 where
175 T: Into<i64>,
176 {
177 self.request.if_metageneration_not_match = Some(v.into());
178 self
179 }
180
181 pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
232 self.request.with_range(range);
233 self
234 }
235
236 pub fn set_key(mut self, v: KeyAes256) -> Self {
253 self.request.common_object_request_params = Some(v.into());
254 self
255 }
256
257 pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
279 self.options.retry_policy = v.into().into();
280 self
281 }
282
283 pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
300 mut self,
301 v: V,
302 ) -> Self {
303 self.options.backoff_policy = v.into().into();
304 self
305 }
306
307 pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
330 mut self,
331 v: V,
332 ) -> Self {
333 self.options.retry_throttler = v.into().into();
334 self
335 }
336
337 pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
357 where
358 V: ReadResumePolicy + 'static,
359 {
360 self.options.read_resume_policy = std::sync::Arc::new(v);
361 self
362 }
363
364 pub async fn send(self) -> Result<ReadObjectResponse> {
366 self.stub.read_object(self.request, self.options).await
367 }
368}
369
370#[derive(Clone, Debug)]
372pub(crate) struct Reader {
373 pub inner: std::sync::Arc<StorageInner>,
374 pub request: crate::model::ReadObjectRequest,
375 pub options: RequestOptions,
376}
377
378impl Reader {
379 async fn read(self) -> Result<reqwest::Response> {
380 let throttler = self.options.retry_throttler.clone();
381 let retry = self.options.retry_policy.clone();
382 let backoff = self.options.backoff_policy.clone();
383
384 gax::retry_loop_internal::retry_loop(
385 async move |_| self.read_attempt().await,
386 async |duration| tokio::time::sleep(duration).await,
387 true,
388 throttler,
389 retry,
390 backoff,
391 )
392 .await
393 }
394
395 async fn read_attempt(&self) -> Result<reqwest::Response> {
396 let builder = self.http_request_builder().await?;
397 let response = builder.send().await.map_err(Error::io)?;
398 if !response.status().is_success() {
399 return gaxi::http::to_http_error(response).await;
400 }
401 Ok(response)
402 }
403
404 async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
405 let bucket = &self.request.bucket;
407 let bucket_id = bucket
408 .as_str()
409 .strip_prefix("projects/_/buckets/")
410 .ok_or_else(|| {
411 Error::binding(format!(
412 "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
413 ))
414 })?;
415 let object = &self.request.object;
416
417 let builder = self
419 .inner
420 .client
421 .request(
422 reqwest::Method::GET,
423 format!(
424 "{}/storage/v1/b/{bucket_id}/o/{}",
425 &self.inner.endpoint,
426 enc(object)
427 ),
428 )
429 .query(&[("alt", "media")])
430 .header(
431 "x-goog-api-client",
432 reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
433 );
434
435 let builder = if self.request.generation != 0 {
437 builder.query(&[("generation", self.request.generation)])
438 } else {
439 builder
440 };
441 let builder = self
442 .request
443 .if_generation_match
444 .iter()
445 .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
446 let builder = self
447 .request
448 .if_generation_not_match
449 .iter()
450 .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
451 let builder = self
452 .request
453 .if_metageneration_match
454 .iter()
455 .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
456 let builder = self
457 .request
458 .if_metageneration_not_match
459 .iter()
460 .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
461
462 let builder = apply_customer_supplied_encryption_headers(
463 builder,
464 &self.request.common_object_request_params,
465 );
466
467 let builder = match (self.request.read_offset, self.request.read_limit) {
469 (_, l) if l < 0 => {
471 unreachable!("ReadObject build never sets a negative read_limit value")
472 }
473 (o, l) if o < 0 && l > 0 => unreachable!(
475 "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
476 ),
477 (0, 0) => builder,
479 (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
481 (o, 0) => builder.header("range", format!("bytes={o}-")),
484 (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
487 };
488
489 self.inner.apply_auth_headers(builder).await
490 }
491}
492
493fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
494 headers
495 .get("x-goog-hash")
496 .and_then(|hash| hash.to_str().ok())
497 .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
498 .and_then(|hash| {
499 let hash = hash.trim_start_matches("crc32c=");
500 v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
501 })
502}
503
504fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
505 headers
506 .get("x-goog-hash")
507 .and_then(|hash| hash.to_str().ok())
508 .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
509 .and_then(|hash| {
510 let hash = hash.trim_start_matches("md5=");
511 base64::prelude::BASE64_STANDARD.decode(hash).ok()
512 })
513 .unwrap_or_default()
514}
515
516#[derive(Debug)]
518pub(crate) struct ReadObjectResponseImpl {
519 reader: Reader,
520 response: Option<reqwest::Response>,
521 highlights: ObjectHighlights,
522 response_checksums: ObjectChecksums,
524 range: ReadRange,
526 generation: i64,
527 resume_count: u32,
528}
529
530impl ReadObjectResponseImpl {
531 pub(crate) async fn new(reader: Reader) -> Result<Self> {
532 let response = reader.clone().read().await?;
533
534 let full = reader.request.read_offset == 0 && reader.request.read_limit == 0;
535 let response_checksums =
536 checksums_from_response(full, response.status(), response.headers());
537 let range = response_range(&response).map_err(Error::deser)?;
538 let generation = response_generation(&response).map_err(Error::deser)?;
539
540 let headers = response.headers();
541 let get_as_i64 = |header_name: &str| -> i64 {
542 headers
543 .get(header_name)
544 .and_then(|s| s.to_str().ok())
545 .and_then(|s| s.parse::<i64>().ok())
546 .unwrap_or_default()
547 };
548 let get_as_string = |header_name: &str| -> String {
549 headers
550 .get(header_name)
551 .and_then(|sc| sc.to_str().ok())
552 .map(|sc| sc.to_string())
553 .unwrap_or_default()
554 };
555 let highlights = ObjectHighlights {
556 generation,
557 metageneration: get_as_i64("x-goog-metageneration"),
558 size: get_as_i64("x-goog-stored-content-length"),
559 content_encoding: get_as_string("x-goog-stored-content-encoding"),
560 storage_class: get_as_string("x-goog-storage-class"),
561 content_type: get_as_string("content-type"),
562 content_language: get_as_string("content-language"),
563 content_disposition: get_as_string("content-disposition"),
564 etag: get_as_string("etag"),
565 checksums: headers.get("x-goog-hash").map(|_| {
566 crate::model::ObjectChecksums::new()
567 .set_or_clear_crc32c(headers_to_crc32c(headers))
568 .set_md5_hash(headers_to_md5_hash(headers))
569 }),
570 };
571
572 Ok(Self {
573 reader,
574 response: Some(response),
575 highlights,
576 response_checksums,
578 range,
580 generation,
581 resume_count: 0,
582 })
583 }
584}
585
586#[async_trait::async_trait]
587impl crate::read_object::dynamic::ReadObjectResponse for ReadObjectResponseImpl {
588 fn object(&self) -> ObjectHighlights {
589 self.highlights.clone()
590 }
591
592 async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
593 match self.next_attempt().await {
594 None => None,
595 Some(Ok(b)) => Some(Ok(b)),
596 Some(Err(e)) => Box::pin(self.resume(e)).await,
599 }
600 }
601}
602
603impl ReadObjectResponseImpl {
604 async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
605 let response = self.response.as_mut()?;
606 let res = response.chunk().await.map_err(Error::io);
607 match res {
608 Ok(Some(chunk)) => {
609 self.reader
610 .options
611 .checksum
612 .update(self.range.start, &chunk);
613 let len = chunk.len() as u64;
614 if self.range.limit < len {
615 return Some(Err(Error::deser(ReadError::LongRead {
616 expected: self.range.limit,
617 got: len,
618 })));
619 }
620 self.range.limit -= len;
621 self.range.start += len;
622 Some(Ok(chunk))
623 }
624 Ok(None) => {
625 if self.range.limit != 0 {
626 return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
627 }
628 let computed = self.reader.options.checksum.finalize();
629 let res = validate(&self.response_checksums, &Some(computed));
630 match res {
631 Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
632 Ok(()) => None,
633 }
634 }
635 Err(e) => Some(Err(e)),
636 }
637 }
638
639 async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
640 use crate::read_object::dynamic::ReadObjectResponse;
641 use crate::read_resume_policy::{ResumeQuery, ResumeResult};
642
643 self.response = None;
645 self.resume_count += 1;
646 let query = ResumeQuery::new(self.resume_count);
647 match self
648 .reader
649 .options
650 .read_resume_policy
651 .on_error(&query, error)
652 {
653 ResumeResult::Continue(_) => {}
654 ResumeResult::Permanent(e) => return Some(Err(e)),
655 ResumeResult::Exhausted(e) => return Some(Err(e)),
656 };
657 self.reader.request.read_offset = self.range.start as i64;
658 self.reader.request.read_limit = self.range.limit as i64;
659 self.reader.request.generation = self.generation;
660 self.response = match self.reader.clone().read().await {
661 Ok(r) => Some(r),
662 Err(e) => return Some(Err(e)),
663 };
664 self.next().await
665 }
666}
667
668fn checksums_from_response(
683 full_content_requested: bool,
684 status: http::StatusCode,
685 headers: &http::HeaderMap,
686) -> ObjectChecksums {
687 let checksums = ObjectChecksums::new();
688 if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
689 return checksums;
690 }
691 let stored_encoding = headers
692 .get("x-goog-stored-content-encoding")
693 .and_then(|e| e.to_str().ok())
694 .map_or("", |e| e);
695 let content_encoding = headers
696 .get("content-encoding")
697 .and_then(|e| e.to_str().ok())
698 .map_or("", |e| e);
699 if stored_encoding == "gzip" && content_encoding != "gzip" {
700 return checksums;
701 }
702 checksums
703 .set_or_clear_crc32c(headers_to_crc32c(headers))
704 .set_md5_hash(headers_to_md5_hash(headers))
705}
706
707fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
708 match response.status() {
709 reqwest::StatusCode::OK => {
710 let header = required_header(response, "content-length")?;
711 let limit = header
712 .parse::<u64>()
713 .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
714 Ok(ReadRange { start: 0, limit })
715 }
716 reqwest::StatusCode::PARTIAL_CONTENT => {
717 let header = required_header(response, "content-range")?;
718 let header = header.strip_prefix("bytes ").ok_or_else(|| {
719 ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
720 })?;
721 let (range, _) = header.split_once('/').ok_or_else(|| {
722 ReadError::BadHeaderFormat("content-range", "missing / separator".into())
723 })?;
724 let (start, end) = range.split_once('-').ok_or_else(|| {
725 ReadError::BadHeaderFormat("content-range", "missing - separator".into())
726 })?;
727 let start = start
728 .parse::<u64>()
729 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
730 let end = end
731 .parse::<u64>()
732 .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
733 let end = end + 1;
736 let limit = end
737 .checked_sub(start)
738 .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
739 Ok(ReadRange { start, limit })
740 }
741 s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
742 }
743}
744
745fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
746 let header = required_header(response, "x-goog-generation")?;
747 header
748 .parse::<i64>()
749 .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
750}
751
752fn required_header<'a>(
753 response: &'a reqwest::Response,
754 name: &'static str,
755) -> std::result::Result<&'a str, ReadError> {
756 let header = response
757 .headers()
758 .get(name)
759 .ok_or_else(|| ReadError::MissingHeader(name))?;
760 header
761 .to_str()
762 .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
763}
764
765#[derive(Debug, PartialEq)]
766struct ReadRange {
767 start: u64,
768 limit: u64,
769}
770
771#[cfg(test)]
772mod resume_tests;
773
774#[cfg(test)]
775mod tests {
776 use super::client::tests::{test_builder, test_inner_client};
777 use super::*;
778 use crate::error::ChecksumMismatch;
779 use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
780 use auth::credentials::anonymous::Builder as Anonymous;
781 use futures::TryStreamExt;
782 use httptest::{Expectation, Server, matchers::*, responders::status_code};
783 use std::collections::HashMap;
784 use std::error::Error;
785 use std::sync::Arc;
786 use test_case::test_case;
787
788 type Result = std::result::Result<(), Box<dyn std::error::Error>>;
789
790 async fn http_request_builder(
791 inner: Arc<StorageInner>,
792 builder: ReadObject,
793 ) -> crate::Result<reqwest::RequestBuilder> {
794 let reader = Reader {
795 inner,
796 request: builder.request,
797 options: builder.options,
798 };
799 reader.http_request_builder().await
800 }
801
802 #[tokio::test]
804 async fn test_read_is_send_and_static() -> Result {
805 let client = Storage::builder()
806 .with_credentials(Anonymous::new().build())
807 .build()
808 .await?;
809
810 fn need_send<T: Send>(_val: &T) {}
811 fn need_sync<T: Sync>(_val: &T) {}
812 fn need_static<T: 'static>(_val: &T) {}
813
814 let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
815 need_send(&read);
816 need_sync(&read);
817 need_static(&read);
818
819 let read = client
820 .read_object("projects/_/buckets/test-bucket", "test-object")
821 .send();
822 need_send(&read);
823 need_static(&read);
824
825 Ok(())
826 }
827 #[tokio::test]
828 async fn read_object_normal() -> Result {
829 let server = Server::run();
830 server.expect(
831 Expectation::matching(all_of![
832 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
833 request::query(url_decoded(contains(("alt", "media")))),
834 ])
835 .respond_with(
836 status_code(200)
837 .body("hello world")
838 .append_header("x-goog-generation", 123456),
839 ),
840 );
841
842 let client = Storage::builder()
843 .with_endpoint(format!("http://{}", server.addr()))
844 .with_credentials(Anonymous::new().build())
845 .build()
846 .await?;
847 let mut reader = client
848 .read_object("projects/_/buckets/test-bucket", "test-object")
849 .send()
850 .await?;
851 let mut got = Vec::new();
852 while let Some(b) = reader.next().await.transpose()? {
853 got.extend_from_slice(&b);
854 }
855 assert_eq!(bytes::Bytes::from_owner(got), "hello world");
856
857 Ok(())
858 }
859
860 #[tokio::test]
861 async fn read_object_metadata() -> Result {
862 const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
863 let server = Server::run();
864 server.expect(
865 Expectation::matching(all_of![
866 request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
867 request::query(url_decoded(contains(("alt", "media")))),
868 ])
869 .respond_with(
870 status_code(200)
871 .body(CONTENTS)
872 .append_header(
873 "x-goog-hash",
874 "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
875 )
876 .append_header("x-goog-generation", 500)
877 .append_header("x-goog-metageneration", "1")
878 .append_header("x-goog-stored-content-length", 30)
879 .append_header("x-goog-stored-content-encoding", "identity")
880 .append_header("x-goog-storage-class", "STANDARD")
881 .append_header("content-language", "en")
882 .append_header("content-type", "text/plain")
883 .append_header("content-disposition", "inline")
884 .append_header("etag", "etagval"),
885 ),
886 );
887
888 let endpoint = server.url("");
889 let client = Storage::builder()
890 .with_endpoint(endpoint.to_string())
891 .with_credentials(Anonymous::new().build())
892 .build()
893 .await?;
894 let reader = client
895 .read_object("projects/_/buckets/test-bucket", "test-object")
896 .send()
897 .await?;
898 let object = reader.object();
899 assert_eq!(object.generation, 500);
900 assert_eq!(object.metageneration, 1);
901 assert_eq!(object.size, 30);
902 assert_eq!(object.content_encoding, "identity");
903 assert_eq!(
904 object.checksums.as_ref().unwrap().crc32c.unwrap(),
905 crc32c::crc32c(CONTENTS.as_bytes())
906 );
907 assert_eq!(
908 object.checksums.as_ref().unwrap().md5_hash,
909 base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
910 );
911
912 Ok(())
913 }
914
915 #[tokio::test]
916 async fn read_object_stream() -> Result {
917 let server = Server::run();
918 server.expect(
919 Expectation::matching(all_of![
920 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
921 request::query(url_decoded(contains(("alt", "media")))),
922 ])
923 .respond_with(
924 status_code(200)
925 .append_header("x-goog-generation", 123456)
926 .body("hello world"),
927 ),
928 );
929
930 let client = Storage::builder()
931 .with_endpoint(format!("http://{}", server.addr()))
932 .with_credentials(Anonymous::new().build())
933 .build()
934 .await?;
935 let response = client
936 .read_object("projects/_/buckets/test-bucket", "test-object")
937 .send()
938 .await?;
939 let result: Vec<_> = response.into_stream().try_collect().await?;
940 assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
941
942 Ok(())
943 }
944
945 #[tokio::test]
946 async fn read_object_next_then_consume_response() -> Result {
947 const BLOCK_SIZE: usize = 500;
949 let mut contents = Vec::new();
950 for i in 0..50 {
951 contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
952 }
953
954 let u = crc32c::crc32c(&contents);
956 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
957
958 let server = Server::run();
959 server.expect(
960 Expectation::matching(all_of![
961 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
962 request::query(url_decoded(contains(("alt", "media")))),
963 ])
964 .times(1)
965 .respond_with(
966 status_code(200)
967 .body(contents.clone())
968 .append_header("x-goog-hash", format!("crc32c={value}"))
969 .append_header("x-goog-generation", 123456),
970 ),
971 );
972
973 let client = Storage::builder()
974 .with_endpoint(format!("http://{}", server.addr()))
975 .with_credentials(Anonymous::new().build())
976 .build()
977 .await?;
978
979 let mut response = client
981 .read_object("projects/_/buckets/test-bucket", "test-object")
982 .send()
983 .await?;
984
985 let mut all_bytes = bytes::BytesMut::new();
986 let chunk = response.next().await.transpose()?.unwrap();
987 assert!(!chunk.is_empty());
988 all_bytes.extend(chunk);
989 use futures::StreamExt;
990 let mut stream = response.into_stream();
991 while let Some(chunk) = stream.next().await.transpose()? {
992 all_bytes.extend(chunk);
993 }
994 assert_eq!(all_bytes, contents);
995
996 Ok(())
997 }
998
999 #[tokio::test]
1000 async fn read_object_not_found() -> Result {
1001 let server = Server::run();
1002 server.expect(
1003 Expectation::matching(all_of![
1004 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1005 request::query(url_decoded(contains(("alt", "media")))),
1006 ])
1007 .respond_with(status_code(404).body("NOT FOUND")),
1008 );
1009
1010 let client = Storage::builder()
1011 .with_endpoint(format!("http://{}", server.addr()))
1012 .with_credentials(Anonymous::new().build())
1013 .build()
1014 .await?;
1015 let err = client
1016 .read_object("projects/_/buckets/test-bucket", "test-object")
1017 .send()
1018 .await
1019 .expect_err("expected a not found error");
1020 assert_eq!(err.http_status_code(), Some(404));
1021
1022 Ok(())
1023 }
1024
1025 #[tokio::test]
1026 async fn read_object_incorrect_crc32c_check() -> Result {
1027 let u = crc32c::crc32c("goodbye world".as_bytes());
1029 let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1030
1031 let server = Server::run();
1032 server.expect(
1033 Expectation::matching(all_of![
1034 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1035 request::query(url_decoded(contains(("alt", "media")))),
1036 ])
1037 .times(3)
1038 .respond_with(
1039 status_code(200)
1040 .body("hello world")
1041 .append_header("x-goog-hash", format!("crc32c={value}"))
1042 .append_header("x-goog-generation", 123456),
1043 ),
1044 );
1045
1046 let client = Storage::builder()
1047 .with_endpoint(format!("http://{}", server.addr()))
1048 .with_credentials(Anonymous::new().build())
1049 .build()
1050 .await?;
1051 let mut response = client
1052 .read_object("projects/_/buckets/test-bucket", "test-object")
1053 .send()
1054 .await?;
1055 let mut partial = Vec::new();
1056 let mut err = None;
1057 while let Some(r) = response.next().await {
1058 match r {
1059 Ok(b) => partial.extend_from_slice(&b),
1060 Err(e) => err = Some(e),
1061 };
1062 }
1063 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1064 let err = err.expect("expect error on incorrect crc32c");
1065 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1066 assert!(
1067 matches!(
1068 source,
1069 Some(&ReadError::ChecksumMismatch(
1070 ChecksumMismatch::Crc32c { .. }
1071 ))
1072 ),
1073 "err={err:?}"
1074 );
1075
1076 let mut response = client
1077 .read_object("projects/_/buckets/test-bucket", "test-object")
1078 .send()
1079 .await?;
1080 let err: crate::Error = async {
1081 {
1082 while (response.next().await.transpose()?).is_some() {}
1083 Ok(())
1084 }
1085 }
1086 .await
1087 .expect_err("expect error on incorrect crc32c");
1088 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1089 assert!(
1090 matches!(
1091 source,
1092 Some(&ReadError::ChecksumMismatch(
1093 ChecksumMismatch::Crc32c { .. }
1094 ))
1095 ),
1096 "err={err:?}"
1097 );
1098
1099 use futures::TryStreamExt;
1100 let err = client
1101 .read_object("projects/_/buckets/test-bucket", "test-object")
1102 .send()
1103 .await?
1104 .into_stream()
1105 .try_collect::<Vec<bytes::Bytes>>()
1106 .await
1107 .expect_err("expect error on incorrect crc32c");
1108 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1109 assert!(
1110 matches!(
1111 source,
1112 Some(&ReadError::ChecksumMismatch(
1113 ChecksumMismatch::Crc32c { .. }
1114 ))
1115 ),
1116 "err={err:?}"
1117 );
1118 Ok(())
1119 }
1120
1121 #[tokio::test]
1122 async fn read_object_incorrect_md5_check() -> Result {
1123 let digest = md5::compute("goodbye world".as_bytes());
1125 let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1126
1127 let server = Server::run();
1128 server.expect(
1129 Expectation::matching(all_of![
1130 request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1131 request::query(url_decoded(contains(("alt", "media")))),
1132 ])
1133 .times(1)
1134 .respond_with(
1135 status_code(200)
1136 .body("hello world")
1137 .append_header("x-goog-hash", format!("md5={value}"))
1138 .append_header("x-goog-generation", 123456),
1139 ),
1140 );
1141
1142 let client = Storage::builder()
1143 .with_endpoint(format!("http://{}", server.addr()))
1144 .with_credentials(Anonymous::new().build())
1145 .build()
1146 .await?;
1147 let mut response = client
1148 .read_object("projects/_/buckets/test-bucket", "test-object")
1149 .compute_md5()
1150 .send()
1151 .await?;
1152 let mut partial = Vec::new();
1153 let mut err = None;
1154 while let Some(r) = response.next().await {
1155 match r {
1156 Ok(b) => partial.extend_from_slice(&b),
1157 Err(e) => err = Some(e),
1158 };
1159 }
1160 assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1161 let err = err.expect("expect error on incorrect md5");
1162 let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1163 assert!(
1164 matches!(
1165 source,
1166 Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1167 ),
1168 "err={err:?}"
1169 );
1170
1171 Ok(())
1172 }
1173
1174 #[tokio::test]
1175 async fn read_object() -> Result {
1176 let inner = test_inner_client(test_builder());
1177 let stub = crate::storage::transport::Storage::new(inner.clone());
1178 let builder = ReadObject::new(
1179 stub,
1180 "projects/_/buckets/bucket",
1181 "object",
1182 inner.options.clone(),
1183 );
1184 let request = http_request_builder(inner, builder).await?.build()?;
1185
1186 assert_eq!(request.method(), reqwest::Method::GET);
1187 assert_eq!(
1188 request.url().as_str(),
1189 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1190 );
1191 Ok(())
1192 }
1193
1194 #[tokio::test]
1195 async fn read_object_error_credentials() -> Result {
1196 let inner = test_inner_client(
1197 test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1198 );
1199 let stub = crate::storage::transport::Storage::new(inner.clone());
1200 let builder = ReadObject::new(
1201 stub,
1202 "projects/_/buckets/bucket",
1203 "object",
1204 inner.options.clone(),
1205 );
1206 let _ = http_request_builder(inner, builder)
1207 .await
1208 .inspect_err(|e| assert!(e.is_authentication()))
1209 .expect_err("invalid credentials should err");
1210 Ok(())
1211 }
1212
1213 #[tokio::test]
1214 async fn read_object_bad_bucket() -> Result {
1215 let inner = test_inner_client(test_builder());
1216 let stub = crate::storage::transport::Storage::new(inner.clone());
1217 let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1218 let _ = http_request_builder(inner, builder)
1219 .await
1220 .expect_err("malformed bucket string should error");
1221 Ok(())
1222 }
1223
1224 #[tokio::test]
1225 async fn read_object_query_params() -> Result {
1226 let inner = test_inner_client(test_builder());
1227 let stub = crate::storage::transport::Storage::new(inner.clone());
1228 let builder = ReadObject::new(
1229 stub,
1230 "projects/_/buckets/bucket",
1231 "object",
1232 inner.options.clone(),
1233 )
1234 .set_generation(5)
1235 .set_if_generation_match(10)
1236 .set_if_generation_not_match(20)
1237 .set_if_metageneration_match(30)
1238 .set_if_metageneration_not_match(40);
1239 let request = http_request_builder(inner, builder).await?.build()?;
1240
1241 assert_eq!(request.method(), reqwest::Method::GET);
1242 let want_pairs: HashMap<String, String> = [
1243 ("alt", "media"),
1244 ("generation", "5"),
1245 ("ifGenerationMatch", "10"),
1246 ("ifGenerationNotMatch", "20"),
1247 ("ifMetagenerationMatch", "30"),
1248 ("ifMetagenerationNotMatch", "40"),
1249 ]
1250 .iter()
1251 .map(|(k, v)| (k.to_string(), v.to_string()))
1252 .collect();
1253 let query_pairs: HashMap<String, String> = request
1254 .url()
1255 .query_pairs()
1256 .map(|param| (param.0.to_string(), param.1.to_string()))
1257 .collect();
1258 assert_eq!(query_pairs.len(), want_pairs.len());
1259 assert_eq!(query_pairs, want_pairs);
1260 Ok(())
1261 }
1262
1263 #[tokio::test]
1264 async fn read_object_headers() -> Result {
1265 let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1267
1268 let inner = test_inner_client(test_builder());
1270 let stub = crate::storage::transport::Storage::new(inner.clone());
1271 let builder = ReadObject::new(
1272 stub,
1273 "projects/_/buckets/bucket",
1274 "object",
1275 inner.options.clone(),
1276 )
1277 .set_key(KeyAes256::new(&key)?);
1278 let request = http_request_builder(inner, builder).await?.build()?;
1279
1280 assert_eq!(request.method(), reqwest::Method::GET);
1281 assert_eq!(
1282 request.url().as_str(),
1283 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1284 );
1285
1286 let want = vec![
1287 ("x-goog-encryption-algorithm", "AES256".to_string()),
1288 ("x-goog-encryption-key", key_base64),
1289 ("x-goog-encryption-key-sha256", key_sha256_base64),
1290 ];
1291
1292 for (name, value) in want {
1293 assert_eq!(
1294 request.headers().get(name).unwrap().as_bytes(),
1295 bytes::Bytes::from(value)
1296 );
1297 }
1298 Ok(())
1299 }
1300
1301 #[test_case(ReadRange::all(), None; "no headers needed")]
1302 #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1303 #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1304 #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1305 #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1306 #[tokio::test]
1307 async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1308 let inner = test_inner_client(test_builder());
1309 let stub = crate::storage::transport::Storage::new(inner.clone());
1310 let builder = ReadObject::new(
1311 stub,
1312 "projects/_/buckets/bucket",
1313 "object",
1314 inner.options.clone(),
1315 )
1316 .set_read_range(input.clone());
1317 let request = http_request_builder(inner, builder).await?.build()?;
1318
1319 assert_eq!(request.method(), reqwest::Method::GET);
1320 assert_eq!(
1321 request.url().as_str(),
1322 "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1323 );
1324
1325 assert_eq!(request.headers().get("range"), want);
1326 Ok(())
1327 }
1328
1329 #[test_case("projects/p", "projects%2Fp")]
1330 #[test_case("kebab-case", "kebab-case")]
1331 #[test_case("dot.name", "dot.name")]
1332 #[test_case("under_score", "under_score")]
1333 #[test_case("tilde~123", "tilde~123")]
1334 #[test_case("exclamation!point!", "exclamation%21point%21")]
1335 #[test_case("spaces spaces", "spaces%20%20%20spaces")]
1336 #[test_case("preserve%percent%21", "preserve%percent%21")]
1337 #[test_case(
1338 "testall !#$&'()*+,/:;=?@[]",
1339 "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1340 )]
1341 #[tokio::test]
1342 async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1343 let inner = test_inner_client(test_builder());
1344 let stub = crate::storage::transport::Storage::new(inner.clone());
1345 let builder = ReadObject::new(
1346 stub,
1347 "projects/_/buckets/bucket",
1348 name,
1349 inner.options.clone(),
1350 );
1351 let request = http_request_builder(inner, builder).await?.build()?;
1352 let got = request.url().path_segments().unwrap().next_back().unwrap();
1353 assert_eq!(got, want);
1354 Ok(())
1355 }
1356
1357 #[test]
1358 fn document_crc32c_values() {
1359 let bytes = (1234567890_u32).to_be_bytes();
1360 let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1361 assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1362 }
1363
1364 #[test_case("", None; "no header")]
1365 #[test_case("crc32c=hello", None; "invalid value")]
1366 #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1367 #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1368 #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1369 #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1370 fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1371 let mut headers = http::HeaderMap::new();
1372 if !val.is_empty() {
1373 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1374 }
1375 let got = headers_to_crc32c(&headers);
1376 assert_eq!(got, want);
1377 Ok(())
1378 }
1379
1380 #[test_case("", None; "no header")]
1381 #[test_case("md5=invalid", None; "invalid value")]
1382 #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1383 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1384 #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1385 #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1386 fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1387 let mut headers = http::HeaderMap::new();
1388 if !val.is_empty() {
1389 headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1390 }
1391 let got = headers_to_md5_hash(&headers);
1392 match want {
1393 Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1394 None => assert!(got.is_empty()),
1395 }
1396 Ok(())
1397 }
1398
1399 #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1400 #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1401 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1402 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1403 #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1404 fn test_checksums_validation_enabled(
1405 full_content_requested: bool,
1406 headers: Vec<(&str, &str)>,
1407 status: http::StatusCode,
1408 want_crc32c: Option<u32>,
1409 want_md5: &str,
1410 ) -> Result {
1411 let mut header_map = http::HeaderMap::new();
1412 for (key, value) in headers {
1413 header_map.insert(
1414 http::HeaderName::from_bytes(key.as_bytes())?,
1415 http::HeaderValue::from_bytes(value.as_bytes())?,
1416 );
1417 }
1418
1419 let got = checksums_from_response(full_content_requested, status, &header_map);
1420 assert_eq!(got.crc32c, want_crc32c);
1421 assert_eq!(
1422 got.md5_hash,
1423 base64::prelude::BASE64_STANDARD.decode(want_md5)?
1424 );
1425 Ok(())
1426 }
1427
1428 #[test_case(0)]
1429 #[test_case(1024)]
1430 fn response_range_success(limit: u64) -> Result {
1431 let response = http::Response::builder()
1432 .status(200)
1433 .header("content-length", limit)
1434 .body(Vec::new())?;
1435 let response = reqwest::Response::from(response);
1436 let range = response_range(&response)?;
1437 assert_eq!(range, super::ReadRange { start: 0, limit });
1438 Ok(())
1439 }
1440
1441 #[test]
1442 fn response_range_missing() -> Result {
1443 let response = http::Response::builder().status(200).body(Vec::new())?;
1444 let response = reqwest::Response::from(response);
1445 let err = response_range(&response).expect_err("missing header should result in an error");
1446 assert!(
1447 matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1448 "{err:?}"
1449 );
1450 Ok(())
1451 }
1452
1453 #[test_case("")]
1454 #[test_case("abc")]
1455 #[test_case("-123")]
1456 fn response_range_format(value: &'static str) -> Result {
1457 let response = http::Response::builder()
1458 .status(200)
1459 .header("content-length", value)
1460 .body(Vec::new())?;
1461 let response = reqwest::Response::from(response);
1462 let err = response_range(&response).expect_err("header value should result in an error");
1463 assert!(
1464 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1465 "{err:?}"
1466 );
1467 assert!(err.source().is_some(), "{err:?}");
1468 Ok(())
1469 }
1470
1471 #[test_case(0, 123)]
1472 #[test_case(123, 456)]
1473 fn response_range_partial_success(start: u64, end: u64) -> Result {
1474 let response = http::Response::builder()
1475 .status(206)
1476 .header(
1477 "content-range",
1478 format!("bytes {}-{}/{}", start, end, end + 1),
1479 )
1480 .body(Vec::new())?;
1481 let response = reqwest::Response::from(response);
1482 let range = response_range(&response)?;
1483 assert_eq!(
1484 range,
1485 super::ReadRange {
1486 start,
1487 limit: (end + 1 - start)
1488 }
1489 );
1490 Ok(())
1491 }
1492
1493 #[test]
1494 fn response_range_partial_missing() -> Result {
1495 let response = http::Response::builder().status(206).body(Vec::new())?;
1496 let response = reqwest::Response::from(response);
1497 let err = response_range(&response).expect_err("missing header should result in an error");
1498 assert!(
1499 matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1500 "{err:?}"
1501 );
1502 Ok(())
1503 }
1504
1505 #[test_case("")]
1506 #[test_case("123-456/457"; "bad prefix")]
1507 #[test_case("bytes 123-456 457"; "bad separator")]
1508 #[test_case("bytes 123+456/457"; "bad separator [2]")]
1509 #[test_case("bytes abc-456/457"; "start is not numbers")]
1510 #[test_case("bytes 123-cde/457"; "end is not numbers")]
1511 #[test_case("bytes 123-0/457"; "invalid range")]
1512 fn response_range_partial_format(value: &'static str) -> Result {
1513 let response = http::Response::builder()
1514 .status(206)
1515 .header("content-range", value)
1516 .body(Vec::new())?;
1517 let response = reqwest::Response::from(response);
1518 let err = response_range(&response).expect_err("header value should result in an error");
1519 assert!(
1520 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1521 "{err:?}"
1522 );
1523 assert!(err.source().is_some(), "{err:?}");
1524 Ok(())
1525 }
1526
1527 #[test]
1528 fn response_range_bad_response() -> Result {
1529 let code = reqwest::StatusCode::CREATED;
1530 let response = http::Response::builder().status(code).body(Vec::new())?;
1531 let response = reqwest::Response::from(response);
1532 let err = response_range(&response).expect_err("unexpected status creates error");
1533 assert!(
1534 matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1535 "{err:?}"
1536 );
1537 Ok(())
1538 }
1539
1540 #[test_case(0)]
1541 #[test_case(1024)]
1542 fn response_generation_success(value: i64) -> Result {
1543 let response = http::Response::builder()
1544 .status(200)
1545 .header("x-goog-generation", value)
1546 .body(Vec::new())?;
1547 let response = reqwest::Response::from(response);
1548 let got = response_generation(&response)?;
1549 assert_eq!(got, value);
1550 Ok(())
1551 }
1552
1553 #[test]
1554 fn response_generation_missing() -> Result {
1555 let response = http::Response::builder().status(200).body(Vec::new())?;
1556 let response = reqwest::Response::from(response);
1557 let err =
1558 response_generation(&response).expect_err("missing header should result in an error");
1559 assert!(
1560 matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1561 "{err:?}"
1562 );
1563 Ok(())
1564 }
1565
1566 #[test_case("")]
1567 #[test_case("abc")]
1568 fn response_generation_format(value: &'static str) -> Result {
1569 let response = http::Response::builder()
1570 .status(200)
1571 .header("x-goog-generation", value)
1572 .body(Vec::new())?;
1573 let response = reqwest::Response::from(response);
1574 let err =
1575 response_generation(&response).expect_err("header value should result in an error");
1576 assert!(
1577 matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1578 "{err:?}"
1579 );
1580 assert!(err.source().is_some(), "{err:?}");
1581 Ok(())
1582 }
1583
1584 #[test]
1585 fn required_header_not_str() -> Result {
1586 let name = "x-goog-test";
1587 let response = http::Response::builder()
1588 .status(200)
1589 .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1590 .body(Vec::new())?;
1591 let response = reqwest::Response::from(response);
1592 let err =
1593 required_header(&response, name).expect_err("header value should result in an error");
1594 assert!(
1595 matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1596 "{err:?}"
1597 );
1598 assert!(err.source().is_some(), "{err:?}");
1599 Ok(())
1600 }
1601}