google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26
27/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
28///
29/// # Example: accumulate the contents of an object into a vector
30/// ```
31/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
32/// async fn sample(client: &Storage) -> anyhow::Result<()> {
33///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
34///     let mut reader = builder.send().await?;
35///     let mut contents = Vec::new();
36///     while let Some(chunk) = reader.next().await.transpose()? {
37///         contents.extend_from_slice(&chunk);
38///     }
39///     println!("object contents={:?}", contents);
40///     Ok(())
41/// }
42/// ```
43///
44/// # Example: read part of an object
45/// ```
46/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
47/// use google_cloud_storage::model_ext::ReadRange;
48/// async fn sample(client: &Storage) -> anyhow::Result<()> {
49///     const MIB: u64 = 1024 * 1024;
50///     let mut contents = Vec::new();
51///     let mut reader = client
52///         .read_object("projects/_/buckets/my-bucket", "my-object")
53///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
54///         .send()
55///         .await?;
56///     while let Some(chunk) = reader.next().await.transpose()? {
57///         contents.extend_from_slice(&chunk);
58///     }
59///     println!("range contents={:?}", contents);
60///     Ok(())
61/// }
62/// ```
63#[derive(Clone, Debug)]
64pub struct ReadObject<S = crate::storage::transport::Storage>
65where
66    S: crate::storage::stub::Storage + 'static,
67{
68    stub: std::sync::Arc<S>,
69    request: crate::model::ReadObjectRequest,
70    options: RequestOptions,
71}
72
73impl<S> ReadObject<S>
74where
75    S: crate::storage::stub::Storage + 'static,
76{
77    pub(crate) fn new<B, O>(
78        stub: std::sync::Arc<S>,
79        bucket: B,
80        object: O,
81        options: RequestOptions,
82    ) -> Self
83    where
84        B: Into<String>,
85        O: Into<String>,
86    {
87        ReadObject {
88            stub,
89            request: crate::model::ReadObjectRequest::new()
90                .set_bucket(bucket)
91                .set_object(object),
92            options,
93        }
94    }
95
96    /// Enables computation of MD5 hashes.
97    ///
98    /// Crc32c hashes are checked by default.
99    ///
100    /// Checksum validation is supported iff:
101    /// 1. The full content is requested.
102    /// 2. All of the content is returned (status != PartialContent).
103    /// 3. The server sent a checksum header.
104    /// 4. The http stack did not uncompress the file.
105    /// 5. The server did not uncompress data on read.
106    ///
107    /// # Example
108    /// ```
109    /// # use google_cloud_storage::client::Storage;
110    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
111    /// let builder =  client
112    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
113    ///     .compute_md5();
114    /// let mut reader = builder
115    ///     .send()
116    ///     .await?;
117    /// let mut contents = Vec::new();
118    /// while let Some(chunk) = reader.next().await.transpose()? {
119    ///     contents.extend_from_slice(&chunk);
120    /// }
121    /// println!("object contents={:?}", contents);
122    /// # Ok(()) }
123    /// ```
124    pub fn compute_md5(self) -> Self {
125        let mut this = self;
126        this.options.checksum.md5_hash = Some(Md5::default());
127        this
128    }
129
130    /// If present, selects a specific revision of this object (as
131    /// opposed to the latest version, the default).
132    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
133        self.request.generation = v.into();
134        self
135    }
136
137    /// Makes the operation conditional on whether the object's current generation
138    /// matches the given value. Setting to 0 makes the operation succeed only if
139    /// there are no live versions of the object.
140    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
141    where
142        T: Into<i64>,
143    {
144        self.request.if_generation_match = Some(v.into());
145        self
146    }
147
148    /// Makes the operation conditional on whether the object's live generation
149    /// does not match the given value. If no live object exists, the precondition
150    /// fails. Setting to 0 makes the operation succeed only if there is a live
151    /// version of the object.
152    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
153    where
154        T: Into<i64>,
155    {
156        self.request.if_generation_not_match = Some(v.into());
157        self
158    }
159
160    /// Makes the operation conditional on whether the object's current
161    /// metageneration matches the given value.
162    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
163    where
164        T: Into<i64>,
165    {
166        self.request.if_metageneration_match = Some(v.into());
167        self
168    }
169
170    /// Makes the operation conditional on whether the object's current
171    /// metageneration does not match the given value.
172    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
173    where
174        T: Into<i64>,
175    {
176        self.request.if_metageneration_not_match = Some(v.into());
177        self
178    }
179
180    /// The range of bytes to return in the read.
181    ///
182    /// This can be all the bytes starting at a given offset
183    /// (`ReadRange::offset()`), all the bytes in an explicit range
184    /// (`Range::segment`), or the last N bytes of the object
185    /// (`ReadRange::tail`).
186    ///
187    /// # Examples
188    ///
189    /// Read starting at 100 bytes to end of file.
190    /// ```
191    /// # use google_cloud_storage::client::Storage;
192    /// # use google_cloud_storage::model_ext::ReadRange;
193    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
194    /// let response = client
195    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
196    ///     .set_read_range(ReadRange::offset(100))
197    ///     .send()
198    ///     .await?;
199    /// println!("response details={response:?}");
200    /// # Ok(()) }
201    /// ```
202    ///
203    /// Read last 100 bytes of file:
204    /// ```
205    /// # use google_cloud_storage::client::Storage;
206    /// # use google_cloud_storage::model_ext::ReadRange;
207    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
208    /// let response = client
209    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
210    ///     .set_read_range(ReadRange::tail(100))
211    ///     .send()
212    ///     .await?;
213    /// println!("response details={response:?}");
214    /// # Ok(()) }
215    /// ```
216    ///
217    /// Read bytes 1000 to 1099.
218    /// ```
219    /// # use google_cloud_storage::client::Storage;
220    /// # use google_cloud_storage::model_ext::ReadRange;
221    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
222    /// let response = client
223    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
224    ///     .set_read_range(ReadRange::segment(1000, 100))
225    ///     .send()
226    ///     .await?;
227    /// println!("response details={response:?}");
228    /// # Ok(()) }
229    /// ```
230    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
231        self.request.with_range(range);
232        self
233    }
234
235    /// The encryption key used with the Customer-Supplied Encryption Keys
236    /// feature. In raw bytes format (not base64-encoded).
237    ///
238    /// Example:
239    /// ```
240    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
241    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
242    /// let key: &[u8] = &[97; 32];
243    /// let response = client
244    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
245    ///     .set_key(KeyAes256::new(key)?)
246    ///     .send()
247    ///     .await?;
248    /// println!("response details={response:?}");
249    /// # Ok(()) }
250    /// ```
251    pub fn set_key(mut self, v: KeyAes256) -> Self {
252        self.request.common_object_request_params = Some(v.into());
253        self
254    }
255
256    /// The retry policy used for this request.
257    ///
258    /// # Example
259    /// ```
260    /// # use google_cloud_storage::client::Storage;
261    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
262    /// use google_cloud_storage::retry_policy::RetryableErrors;
263    /// use std::time::Duration;
264    /// use gax::retry_policy::RetryPolicyExt;
265    /// let response = client
266    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
267    ///     .with_retry_policy(
268    ///         RetryableErrors
269    ///             .with_attempt_limit(5)
270    ///             .with_time_limit(Duration::from_secs(10)),
271    ///     )
272    ///     .send()
273    ///     .await?;
274    /// println!("response details={response:?}");
275    /// # Ok(()) }
276    /// ```
277    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
278        self.options.retry_policy = v.into().into();
279        self
280    }
281
282    /// The backoff policy used for this request.
283    ///
284    /// # Example
285    /// ```
286    /// # use google_cloud_storage::client::Storage;
287    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
288    /// use std::time::Duration;
289    /// use gax::exponential_backoff::ExponentialBackoff;
290    /// let response = client
291    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
292    ///     .with_backoff_policy(ExponentialBackoff::default())
293    ///     .send()
294    ///     .await?;
295    /// println!("response details={response:?}");
296    /// # Ok(()) }
297    /// ```
298    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
299        mut self,
300        v: V,
301    ) -> Self {
302        self.options.backoff_policy = v.into().into();
303        self
304    }
305
306    /// The retry throttler used for this request.
307    ///
308    /// Most of the time you want to use the same throttler for all the requests
309    /// in a client, and even the same throttler for many clients. Rarely it
310    /// may be necessary to use an custom throttler for some subset of the
311    /// requests.
312    ///
313    /// # Example
314    /// ```
315    /// # use google_cloud_storage::client::Storage;
316    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
317    /// let response = client
318    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
319    ///     .with_retry_throttler(adhoc_throttler())
320    ///     .send()
321    ///     .await?;
322    /// println!("response details={response:?}");
323    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
324    ///     # panic!();
325    /// }
326    /// # Ok(()) }
327    /// ```
328    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
329        mut self,
330        v: V,
331    ) -> Self {
332        self.options.retry_throttler = v.into().into();
333        self
334    }
335
336    /// Configure the resume policy for read requests.
337    ///
338    /// The Cloud Storage client library can automatically resume a read that is
339    /// interrupted by a transient error. Applications may want to limit the
340    /// number of read attempts, or may wish to expand the type of errors
341    /// treated as retryable.
342    ///
343    /// # Example
344    /// ```
345    /// # use google_cloud_storage::client::Storage;
346    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
347    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
348    /// let response = client
349    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
350    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
351    ///     .send()
352    ///     .await?;
353    /// # Ok(()) }
354    /// ```
355    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
356    where
357        V: ReadResumePolicy + 'static,
358    {
359        self.options.read_resume_policy = std::sync::Arc::new(v);
360        self
361    }
362
363    /// Enables automatic decompression.
364    ///
365    /// The Cloud Storage service [automatically decompresses] objects
366    /// with `content_encoding == "gzip"` during reads. The client library
367    /// disables this behavior by default, as it is not possible to
368    /// perform ranged reads or to resume interrupted downloads if automatic
369    /// decompression is enabled.
370    ///
371    /// Use this option to enable automatic decompression.
372    ///
373    /// # Example
374    /// ```
375    /// # use google_cloud_storage::client::Storage;
376    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
377    /// let response = client
378    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
379    ///     .with_automatic_decompression(true)
380    ///     .send()
381    ///     .await?;
382    /// println!("response details={response:?}");
383    /// # Ok(()) }
384    /// ```
385    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
386        self.options.automatic_decompression = v;
387        self
388    }
389
390    /// Sends the request.
391    pub async fn send(self) -> Result<ReadObjectResponse> {
392        self.stub.read_object(self.request, self.options).await
393    }
394}
395
396// A convenience struct that saves the request conditions and performs the read.
397#[derive(Clone, Debug)]
398pub(crate) struct Reader {
399    pub inner: std::sync::Arc<StorageInner>,
400    pub request: crate::model::ReadObjectRequest,
401    pub options: RequestOptions,
402}
403
404impl Reader {
405    async fn read(self) -> Result<reqwest::Response> {
406        let throttler = self.options.retry_throttler.clone();
407        let retry = self.options.retry_policy.clone();
408        let backoff = self.options.backoff_policy.clone();
409
410        gax::retry_loop_internal::retry_loop(
411            async move |_| self.read_attempt().await,
412            async |duration| tokio::time::sleep(duration).await,
413            true,
414            throttler,
415            retry,
416            backoff,
417        )
418        .await
419    }
420
421    async fn read_attempt(&self) -> Result<reqwest::Response> {
422        let builder = self.http_request_builder().await?;
423        let response = builder.send().await.map_err(Error::io)?;
424        if !response.status().is_success() {
425            return gaxi::http::to_http_error(response).await;
426        }
427        Ok(response)
428    }
429
430    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
431        // Collect the required bucket and object parameters.
432        let bucket = &self.request.bucket;
433        let bucket_id = bucket
434            .as_str()
435            .strip_prefix("projects/_/buckets/")
436            .ok_or_else(|| {
437                Error::binding(format!(
438                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
439                ))
440            })?;
441        let object = &self.request.object;
442
443        // Build the request.
444        let builder = self
445            .inner
446            .client
447            .request(
448                reqwest::Method::GET,
449                format!(
450                    "{}/storage/v1/b/{bucket_id}/o/{}",
451                    &self.inner.endpoint,
452                    enc(object)
453                ),
454            )
455            .query(&[("alt", "media")])
456            .header(
457                "x-goog-api-client",
458                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
459            );
460
461        let builder = if self.options.automatic_decompression {
462            builder
463        } else {
464            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
465            //
466            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
467            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
468            // with a different `contentEncoding` value.
469            builder.header(
470                "accept-encoding",
471                reqwest::header::HeaderValue::from_static("gzip"),
472            )
473        };
474
475        // Add the optional query parameters.
476        let builder = if self.request.generation != 0 {
477            builder.query(&[("generation", self.request.generation)])
478        } else {
479            builder
480        };
481        let builder = self
482            .request
483            .if_generation_match
484            .iter()
485            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
486        let builder = self
487            .request
488            .if_generation_not_match
489            .iter()
490            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
491        let builder = self
492            .request
493            .if_metageneration_match
494            .iter()
495            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
496        let builder = self
497            .request
498            .if_metageneration_not_match
499            .iter()
500            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
501
502        let builder = apply_customer_supplied_encryption_headers(
503            builder,
504            &self.request.common_object_request_params,
505        );
506
507        // Apply "range" header for read limits and offsets.
508        let builder = match (self.request.read_offset, self.request.read_limit) {
509            // read_limit can't be negative.
510            (_, l) if l < 0 => {
511                unreachable!("ReadObject build never sets a negative read_limit value")
512            }
513            // negative offset can't also have a read_limit.
514            (o, l) if o < 0 && l > 0 => unreachable!(
515                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
516            ),
517            // If both are zero, we use default implementation (no range header).
518            (0, 0) => builder,
519            // negative offset with no limit means the last N bytes.
520            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
521            // read_limit is zero, means no limit. Read from offset to end of file.
522            // This handles cases like (5, 0) -> "bytes=5-"
523            (o, 0) => builder.header("range", format!("bytes={o}-")),
524            // General case: non-negative offset and positive limit.
525            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
526            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
527        };
528
529        self.inner.apply_auth_headers(builder).await
530    }
531
532    fn is_gunzipped(response: &reqwest::Response) -> bool {
533        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
534        // objects. Reading such objects comes with a number of restrictions:
535        // - Ranged reads do not work.
536        // - The size of the decompressed data is not known.
537        // - Checksums do not work because the object checksums correspond to the
538        //   compressed data and the client library receives the decompressed data.
539        //
540        // Because ranged reads do not work, resuming a read does not work. Consequently,
541        // the implementation of `ReadObjectResponse` is substantially different for
542        // objects that are gunzipped.
543        //
544        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
545        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
546        use http::header::WARNING;
547        if response
548            .headers()
549            .get(TRANSFORMATION)
550            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
551        {
552            return true;
553        }
554        response
555            .headers()
556            .get(WARNING)
557            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
558    }
559
560    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
561        let response = self.clone().read().await?;
562        if Self::is_gunzipped(&response) {
563            return Ok(ReadObjectResponse::new(Box::new(
564                non_resumable::NonResumableResponse::new(response)?,
565            )));
566        }
567        Ok(ReadObjectResponse::new(Box::new(
568            resumable::ResumableResponse::new(self, response)?,
569        )))
570    }
571}
572
573#[cfg(test)]
574mod resume_tests;
575
576#[cfg(test)]
577mod tests {
578    use super::client::tests::{test_builder, test_inner_client};
579    use super::*;
580    use crate::error::{ChecksumMismatch, ReadError};
581    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
582    use auth::credentials::anonymous::Builder as Anonymous;
583    use base64::Engine;
584    use futures::TryStreamExt;
585    use httptest::{Expectation, Server, matchers::*, responders::status_code};
586    use std::collections::HashMap;
587    use std::error::Error;
588    use std::sync::Arc;
589    use test_case::test_case;
590
591    type Result = anyhow::Result<()>;
592
593    async fn http_request_builder(
594        inner: Arc<StorageInner>,
595        builder: ReadObject,
596    ) -> crate::Result<reqwest::RequestBuilder> {
597        let reader = Reader {
598            inner,
599            request: builder.request,
600            options: builder.options,
601        };
602        reader.http_request_builder().await
603    }
604
605    // Verify `read_object()` meets normal Send, Sync, requirements.
606    #[tokio::test]
607    async fn test_read_is_send_and_static() -> Result {
608        let client = Storage::builder()
609            .with_credentials(Anonymous::new().build())
610            .build()
611            .await?;
612
613        fn need_send<T: Send>(_val: &T) {}
614        fn need_sync<T: Sync>(_val: &T) {}
615        fn need_static<T: 'static>(_val: &T) {}
616
617        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
618        need_send(&read);
619        need_sync(&read);
620        need_static(&read);
621
622        let read = client
623            .read_object("projects/_/buckets/test-bucket", "test-object")
624            .send();
625        need_send(&read);
626        need_static(&read);
627
628        Ok(())
629    }
630
631    #[tokio::test]
632    async fn read_object_normal() -> Result {
633        let server = Server::run();
634        server.expect(
635            Expectation::matching(all_of![
636                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
637                request::headers(contains(("accept-encoding", "gzip"))),
638                request::query(url_decoded(contains(("alt", "media")))),
639            ])
640            .respond_with(
641                status_code(200)
642                    .body("hello world")
643                    .append_header("x-goog-generation", 123456),
644            ),
645        );
646
647        let client = Storage::builder()
648            .with_endpoint(format!("http://{}", server.addr()))
649            .with_credentials(Anonymous::new().build())
650            .build()
651            .await?;
652        let mut reader = client
653            .read_object("projects/_/buckets/test-bucket", "test-object")
654            .send()
655            .await?;
656        let mut got = Vec::new();
657        while let Some(b) = reader.next().await.transpose()? {
658            got.extend_from_slice(&b);
659        }
660        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
661
662        Ok(())
663    }
664
665    #[tokio::test]
666    async fn read_object_stream() -> Result {
667        let server = Server::run();
668        server.expect(
669            Expectation::matching(all_of![
670                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
671                request::query(url_decoded(contains(("alt", "media")))),
672            ])
673            .respond_with(
674                status_code(200)
675                    .append_header("x-goog-generation", 123456)
676                    .body("hello world"),
677            ),
678        );
679
680        let client = Storage::builder()
681            .with_endpoint(format!("http://{}", server.addr()))
682            .with_credentials(Anonymous::new().build())
683            .build()
684            .await?;
685        let response = client
686            .read_object("projects/_/buckets/test-bucket", "test-object")
687            .send()
688            .await?;
689        let result: Vec<_> = response.into_stream().try_collect().await?;
690        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
691
692        Ok(())
693    }
694
695    #[tokio::test]
696    async fn read_object_next_then_consume_response() -> Result {
697        // Create a large enough file that will require multiple chunks to read.
698        const BLOCK_SIZE: usize = 500;
699        let mut contents = Vec::new();
700        for i in 0..50 {
701            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
702        }
703
704        // Calculate and serialize the crc32c checksum
705        let u = crc32c::crc32c(&contents);
706        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
707
708        let server = Server::run();
709        server.expect(
710            Expectation::matching(all_of![
711                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
712                request::query(url_decoded(contains(("alt", "media")))),
713            ])
714            .times(1)
715            .respond_with(
716                status_code(200)
717                    .body(contents.clone())
718                    .append_header("x-goog-hash", format!("crc32c={value}"))
719                    .append_header("x-goog-generation", 123456),
720            ),
721        );
722
723        let client = Storage::builder()
724            .with_endpoint(format!("http://{}", server.addr()))
725            .with_credentials(Anonymous::new().build())
726            .build()
727            .await?;
728
729        // Read some bytes, then remainder with stream.
730        let mut response = client
731            .read_object("projects/_/buckets/test-bucket", "test-object")
732            .send()
733            .await?;
734
735        let mut all_bytes = bytes::BytesMut::new();
736        let chunk = response.next().await.transpose()?.unwrap();
737        assert!(!chunk.is_empty());
738        all_bytes.extend(chunk);
739        use futures::StreamExt;
740        let mut stream = response.into_stream();
741        while let Some(chunk) = stream.next().await.transpose()? {
742            all_bytes.extend(chunk);
743        }
744        assert_eq!(all_bytes, contents);
745
746        Ok(())
747    }
748
749    #[tokio::test]
750    async fn read_object_not_found() -> Result {
751        let server = Server::run();
752        server.expect(
753            Expectation::matching(all_of![
754                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
755                request::query(url_decoded(contains(("alt", "media")))),
756            ])
757            .respond_with(status_code(404).body("NOT FOUND")),
758        );
759
760        let client = Storage::builder()
761            .with_endpoint(format!("http://{}", server.addr()))
762            .with_credentials(Anonymous::new().build())
763            .build()
764            .await?;
765        let err = client
766            .read_object("projects/_/buckets/test-bucket", "test-object")
767            .send()
768            .await
769            .expect_err("expected a not found error");
770        assert_eq!(err.http_status_code(), Some(404));
771
772        Ok(())
773    }
774
775    #[tokio::test]
776    async fn read_object_incorrect_crc32c_check() -> Result {
777        // Calculate and serialize the crc32c checksum
778        let u = crc32c::crc32c("goodbye world".as_bytes());
779        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
780
781        let server = Server::run();
782        server.expect(
783            Expectation::matching(all_of![
784                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
785                request::query(url_decoded(contains(("alt", "media")))),
786            ])
787            .times(3)
788            .respond_with(
789                status_code(200)
790                    .body("hello world")
791                    .append_header("x-goog-hash", format!("crc32c={value}"))
792                    .append_header("x-goog-generation", 123456),
793            ),
794        );
795
796        let client = Storage::builder()
797            .with_endpoint(format!("http://{}", server.addr()))
798            .with_credentials(Anonymous::new().build())
799            .build()
800            .await?;
801        let mut response = client
802            .read_object("projects/_/buckets/test-bucket", "test-object")
803            .send()
804            .await?;
805        let mut partial = Vec::new();
806        let mut err = None;
807        while let Some(r) = response.next().await {
808            match r {
809                Ok(b) => partial.extend_from_slice(&b),
810                Err(e) => err = Some(e),
811            };
812        }
813        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
814        let err = err.expect("expect error on incorrect crc32c");
815        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
816        assert!(
817            matches!(
818                source,
819                Some(&ReadError::ChecksumMismatch(
820                    ChecksumMismatch::Crc32c { .. }
821                ))
822            ),
823            "err={err:?}"
824        );
825
826        let mut response = client
827            .read_object("projects/_/buckets/test-bucket", "test-object")
828            .send()
829            .await?;
830        let err: crate::Error = async {
831            {
832                while (response.next().await.transpose()?).is_some() {}
833                Ok(())
834            }
835        }
836        .await
837        .expect_err("expect error on incorrect crc32c");
838        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
839        assert!(
840            matches!(
841                source,
842                Some(&ReadError::ChecksumMismatch(
843                    ChecksumMismatch::Crc32c { .. }
844                ))
845            ),
846            "err={err:?}"
847        );
848
849        use futures::TryStreamExt;
850        let err = client
851            .read_object("projects/_/buckets/test-bucket", "test-object")
852            .send()
853            .await?
854            .into_stream()
855            .try_collect::<Vec<bytes::Bytes>>()
856            .await
857            .expect_err("expect error on incorrect crc32c");
858        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
859        assert!(
860            matches!(
861                source,
862                Some(&ReadError::ChecksumMismatch(
863                    ChecksumMismatch::Crc32c { .. }
864                ))
865            ),
866            "err={err:?}"
867        );
868        Ok(())
869    }
870
871    #[tokio::test]
872    async fn read_object_incorrect_md5_check() -> Result {
873        // Calculate and serialize the md5 checksum
874        let digest = md5::compute("goodbye world".as_bytes());
875        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
876
877        let server = Server::run();
878        server.expect(
879            Expectation::matching(all_of![
880                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
881                request::query(url_decoded(contains(("alt", "media")))),
882            ])
883            .times(1)
884            .respond_with(
885                status_code(200)
886                    .body("hello world")
887                    .append_header("x-goog-hash", format!("md5={value}"))
888                    .append_header("x-goog-generation", 123456),
889            ),
890        );
891
892        let client = Storage::builder()
893            .with_endpoint(format!("http://{}", server.addr()))
894            .with_credentials(Anonymous::new().build())
895            .build()
896            .await?;
897        let mut response = client
898            .read_object("projects/_/buckets/test-bucket", "test-object")
899            .compute_md5()
900            .send()
901            .await?;
902        let mut partial = Vec::new();
903        let mut err = None;
904        while let Some(r) = response.next().await {
905            match r {
906                Ok(b) => partial.extend_from_slice(&b),
907                Err(e) => err = Some(e),
908            };
909        }
910        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
911        let err = err.expect("expect error on incorrect md5");
912        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
913        assert!(
914            matches!(
915                source,
916                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
917            ),
918            "err={err:?}"
919        );
920
921        Ok(())
922    }
923
924    #[tokio::test]
925    async fn read_object() -> Result {
926        let inner = test_inner_client(test_builder());
927        let stub = crate::storage::transport::Storage::new(inner.clone());
928        let builder = ReadObject::new(
929            stub,
930            "projects/_/buckets/bucket",
931            "object",
932            inner.options.clone(),
933        );
934        let request = http_request_builder(inner, builder).await?.build()?;
935
936        assert_eq!(request.method(), reqwest::Method::GET);
937        assert_eq!(
938            request.url().as_str(),
939            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
940        );
941        Ok(())
942    }
943
944    #[tokio::test]
945    async fn read_object_error_credentials() -> Result {
946        let inner = test_inner_client(
947            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
948        );
949        let stub = crate::storage::transport::Storage::new(inner.clone());
950        let builder = ReadObject::new(
951            stub,
952            "projects/_/buckets/bucket",
953            "object",
954            inner.options.clone(),
955        );
956        let _ = http_request_builder(inner, builder)
957            .await
958            .inspect_err(|e| assert!(e.is_authentication()))
959            .expect_err("invalid credentials should err");
960        Ok(())
961    }
962
963    #[tokio::test]
964    async fn read_object_bad_bucket() -> Result {
965        let inner = test_inner_client(test_builder());
966        let stub = crate::storage::transport::Storage::new(inner.clone());
967        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
968        let _ = http_request_builder(inner, builder)
969            .await
970            .expect_err("malformed bucket string should error");
971        Ok(())
972    }
973
974    #[tokio::test]
975    async fn read_object_query_params() -> Result {
976        let inner = test_inner_client(test_builder());
977        let stub = crate::storage::transport::Storage::new(inner.clone());
978        let builder = ReadObject::new(
979            stub,
980            "projects/_/buckets/bucket",
981            "object",
982            inner.options.clone(),
983        )
984        .set_generation(5)
985        .set_if_generation_match(10)
986        .set_if_generation_not_match(20)
987        .set_if_metageneration_match(30)
988        .set_if_metageneration_not_match(40);
989        let request = http_request_builder(inner, builder).await?.build()?;
990
991        assert_eq!(request.method(), reqwest::Method::GET);
992        let want_pairs: HashMap<String, String> = [
993            ("alt", "media"),
994            ("generation", "5"),
995            ("ifGenerationMatch", "10"),
996            ("ifGenerationNotMatch", "20"),
997            ("ifMetagenerationMatch", "30"),
998            ("ifMetagenerationNotMatch", "40"),
999        ]
1000        .iter()
1001        .map(|(k, v)| (k.to_string(), v.to_string()))
1002        .collect();
1003        let query_pairs: HashMap<String, String> = request
1004            .url()
1005            .query_pairs()
1006            .map(|param| (param.0.to_string(), param.1.to_string()))
1007            .collect();
1008        assert_eq!(query_pairs.len(), want_pairs.len());
1009        assert_eq!(query_pairs, want_pairs);
1010        Ok(())
1011    }
1012
1013    #[tokio::test]
1014    async fn read_object_default_headers() -> Result {
1015        // The API takes the unencoded byte array.
1016        let inner = test_inner_client(test_builder());
1017        let stub = crate::storage::transport::Storage::new(inner.clone());
1018        let builder = ReadObject::new(
1019            stub,
1020            "projects/_/buckets/bucket",
1021            "object",
1022            inner.options.clone(),
1023        );
1024        let request = http_request_builder(inner, builder).await?.build()?;
1025
1026        assert_eq!(request.method(), reqwest::Method::GET);
1027        assert_eq!(
1028            request.url().as_str(),
1029            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1030        );
1031
1032        let want = [("accept-encoding", "gzip")];
1033        let headers = request.headers();
1034        for (name, value) in want {
1035            assert_eq!(
1036                headers.get(name).and_then(|h| h.to_str().ok()),
1037                Some(value),
1038                "{request:?}"
1039            );
1040        }
1041        Ok(())
1042    }
1043
1044    #[tokio::test]
1045    async fn read_object_automatic_decompression_headers() -> Result {
1046        // The API takes the unencoded byte array.
1047        let inner = test_inner_client(test_builder());
1048        let stub = crate::storage::transport::Storage::new(inner.clone());
1049        let builder = ReadObject::new(
1050            stub,
1051            "projects/_/buckets/bucket",
1052            "object",
1053            inner.options.clone(),
1054        )
1055        .with_automatic_decompression(true);
1056        let request = http_request_builder(inner, builder).await?.build()?;
1057
1058        assert_eq!(request.method(), reqwest::Method::GET);
1059        assert_eq!(
1060            request.url().as_str(),
1061            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1062        );
1063
1064        let headers = request.headers();
1065        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1066        Ok(())
1067    }
1068
1069    #[tokio::test]
1070    async fn read_object_encryption_headers() -> Result {
1071        // Make a 32-byte key.
1072        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1073
1074        // The API takes the unencoded byte array.
1075        let inner = test_inner_client(test_builder());
1076        let stub = crate::storage::transport::Storage::new(inner.clone());
1077        let builder = ReadObject::new(
1078            stub,
1079            "projects/_/buckets/bucket",
1080            "object",
1081            inner.options.clone(),
1082        )
1083        .set_key(KeyAes256::new(&key)?);
1084        let request = http_request_builder(inner, builder).await?.build()?;
1085
1086        assert_eq!(request.method(), reqwest::Method::GET);
1087        assert_eq!(
1088            request.url().as_str(),
1089            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1090        );
1091
1092        let want = [
1093            ("x-goog-encryption-algorithm", "AES256".to_string()),
1094            ("x-goog-encryption-key", key_base64),
1095            ("x-goog-encryption-key-sha256", key_sha256_base64),
1096        ];
1097
1098        let headers = request.headers();
1099        for (name, value) in want {
1100            assert_eq!(
1101                headers.get(name).and_then(|h| h.to_str().ok()),
1102                Some(value.as_str())
1103            );
1104        }
1105        Ok(())
1106    }
1107
1108    #[test_case(ReadRange::all(), None; "no headers needed")]
1109    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1110    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1111    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1112    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1113    #[tokio::test]
1114    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1115        let inner = test_inner_client(test_builder());
1116        let stub = crate::storage::transport::Storage::new(inner.clone());
1117        let builder = ReadObject::new(
1118            stub,
1119            "projects/_/buckets/bucket",
1120            "object",
1121            inner.options.clone(),
1122        )
1123        .set_read_range(input.clone());
1124        let request = http_request_builder(inner, builder).await?.build()?;
1125
1126        assert_eq!(request.method(), reqwest::Method::GET);
1127        assert_eq!(
1128            request.url().as_str(),
1129            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1130        );
1131
1132        assert_eq!(request.headers().get("range"), want);
1133        Ok(())
1134    }
1135
1136    #[test_case("projects/p", "projects%2Fp")]
1137    #[test_case("kebab-case", "kebab-case")]
1138    #[test_case("dot.name", "dot.name")]
1139    #[test_case("under_score", "under_score")]
1140    #[test_case("tilde~123", "tilde~123")]
1141    #[test_case("exclamation!point!", "exclamation%21point%21")]
1142    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1143    #[test_case("preserve%percent%21", "preserve%percent%21")]
1144    #[test_case(
1145        "testall !#$&'()*+,/:;=?@[]",
1146        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1147    )]
1148    #[tokio::test]
1149    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1150        let inner = test_inner_client(test_builder());
1151        let stub = crate::storage::transport::Storage::new(inner.clone());
1152        let builder = ReadObject::new(
1153            stub,
1154            "projects/_/buckets/bucket",
1155            name,
1156            inner.options.clone(),
1157        );
1158        let request = http_request_builder(inner, builder).await?.build()?;
1159        let got = request.url().path_segments().unwrap().next_back().unwrap();
1160        assert_eq!(got, want);
1161        Ok(())
1162    }
1163
1164    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1165    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1166    #[test_case("warning", "214 UploadServer gunzipped", true)]
1167    #[test_case("warning", "no match", false)]
1168    #[test_case("unused", "unused", false)]
1169    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1170        let response = http::Response::builder()
1171            .status(200)
1172            .header(name, value)
1173            .body(Vec::new())?;
1174        let response = reqwest::Response::from(response);
1175        let got = Reader::is_gunzipped(&response);
1176        assert_eq!(got, want, "{response:?}");
1177        Ok(())
1178    }
1179}