google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::http::map_send_error;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Clone, Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    pub(crate) fn new<B, O>(
79        stub: std::sync::Arc<S>,
80        bucket: B,
81        object: O,
82        options: RequestOptions,
83    ) -> Self
84    where
85        B: Into<String>,
86        O: Into<String>,
87    {
88        ReadObject {
89            stub,
90            request: crate::model::ReadObjectRequest::new()
91                .set_bucket(bucket)
92                .set_object(object),
93            options,
94        }
95    }
96
97    /// Enables computation of MD5 hashes.
98    ///
99    /// Crc32c hashes are checked by default.
100    ///
101    /// Checksum validation is supported iff:
102    /// 1. The full content is requested.
103    /// 2. All of the content is returned (status != PartialContent).
104    /// 3. The server sent a checksum header.
105    /// 4. The http stack did not uncompress the file.
106    /// 5. The server did not uncompress data on read.
107    ///
108    /// # Example
109    /// ```
110    /// # use google_cloud_storage::client::Storage;
111    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
112    /// let builder =  client
113    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
114    ///     .compute_md5();
115    /// let mut reader = builder
116    ///     .send()
117    ///     .await?;
118    /// let mut contents = Vec::new();
119    /// while let Some(chunk) = reader.next().await.transpose()? {
120    ///     contents.extend_from_slice(&chunk);
121    /// }
122    /// println!("object contents={:?}", contents);
123    /// # Ok(()) }
124    /// ```
125    pub fn compute_md5(self) -> Self {
126        let mut this = self;
127        this.options.checksum.md5_hash = Some(Md5::default());
128        this
129    }
130
131    /// If present, selects a specific revision of this object (as
132    /// opposed to the latest version, the default).
133    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
134        self.request.generation = v.into();
135        self
136    }
137
138    /// Makes the operation conditional on whether the object's current generation
139    /// matches the given value. Setting to 0 makes the operation succeed only if
140    /// there are no live versions of the object.
141    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
142    where
143        T: Into<i64>,
144    {
145        self.request.if_generation_match = Some(v.into());
146        self
147    }
148
149    /// Makes the operation conditional on whether the object's live generation
150    /// does not match the given value. If no live object exists, the precondition
151    /// fails. Setting to 0 makes the operation succeed only if there is a live
152    /// version of the object.
153    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
154    where
155        T: Into<i64>,
156    {
157        self.request.if_generation_not_match = Some(v.into());
158        self
159    }
160
161    /// Makes the operation conditional on whether the object's current
162    /// metageneration matches the given value.
163    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
164    where
165        T: Into<i64>,
166    {
167        self.request.if_metageneration_match = Some(v.into());
168        self
169    }
170
171    /// Makes the operation conditional on whether the object's current
172    /// metageneration does not match the given value.
173    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
174    where
175        T: Into<i64>,
176    {
177        self.request.if_metageneration_not_match = Some(v.into());
178        self
179    }
180
181    /// The range of bytes to return in the read.
182    ///
183    /// This can be all the bytes starting at a given offset
184    /// (`ReadRange::offset()`), all the bytes in an explicit range
185    /// (`Range::segment`), or the last N bytes of the object
186    /// (`ReadRange::tail`).
187    ///
188    /// # Examples
189    ///
190    /// Read starting at 100 bytes to end of file.
191    /// ```
192    /// # use google_cloud_storage::client::Storage;
193    /// # use google_cloud_storage::model_ext::ReadRange;
194    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
195    /// let response = client
196    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
197    ///     .set_read_range(ReadRange::offset(100))
198    ///     .send()
199    ///     .await?;
200    /// println!("response details={response:?}");
201    /// # Ok(()) }
202    /// ```
203    ///
204    /// Read last 100 bytes of file:
205    /// ```
206    /// # use google_cloud_storage::client::Storage;
207    /// # use google_cloud_storage::model_ext::ReadRange;
208    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
209    /// let response = client
210    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
211    ///     .set_read_range(ReadRange::tail(100))
212    ///     .send()
213    ///     .await?;
214    /// println!("response details={response:?}");
215    /// # Ok(()) }
216    /// ```
217    ///
218    /// Read bytes 1000 to 1099.
219    /// ```
220    /// # use google_cloud_storage::client::Storage;
221    /// # use google_cloud_storage::model_ext::ReadRange;
222    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
223    /// let response = client
224    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
225    ///     .set_read_range(ReadRange::segment(1000, 100))
226    ///     .send()
227    ///     .await?;
228    /// println!("response details={response:?}");
229    /// # Ok(()) }
230    /// ```
231    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
232        self.request.with_range(range);
233        self
234    }
235
236    /// The encryption key used with the Customer-Supplied Encryption Keys
237    /// feature. In raw bytes format (not base64-encoded).
238    ///
239    /// Example:
240    /// ```
241    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243    /// let key: &[u8] = &[97; 32];
244    /// let response = client
245    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
246    ///     .set_key(KeyAes256::new(key)?)
247    ///     .send()
248    ///     .await?;
249    /// println!("response details={response:?}");
250    /// # Ok(()) }
251    /// ```
252    pub fn set_key(mut self, v: KeyAes256) -> Self {
253        self.request.common_object_request_params = Some(v.into());
254        self
255    }
256
257    /// The retry policy used for this request.
258    ///
259    /// # Example
260    /// ```
261    /// # use google_cloud_storage::client::Storage;
262    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
263    /// use google_cloud_storage::retry_policy::RetryableErrors;
264    /// use std::time::Duration;
265    /// use gax::retry_policy::RetryPolicyExt;
266    /// let response = client
267    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
268    ///     .with_retry_policy(
269    ///         RetryableErrors
270    ///             .with_attempt_limit(5)
271    ///             .with_time_limit(Duration::from_secs(10)),
272    ///     )
273    ///     .send()
274    ///     .await?;
275    /// println!("response details={response:?}");
276    /// # Ok(()) }
277    /// ```
278    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
279        self.options.retry_policy = v.into().into();
280        self
281    }
282
283    /// The backoff policy used for this request.
284    ///
285    /// # Example
286    /// ```
287    /// # use google_cloud_storage::client::Storage;
288    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
289    /// use std::time::Duration;
290    /// use gax::exponential_backoff::ExponentialBackoff;
291    /// let response = client
292    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
293    ///     .with_backoff_policy(ExponentialBackoff::default())
294    ///     .send()
295    ///     .await?;
296    /// println!("response details={response:?}");
297    /// # Ok(()) }
298    /// ```
299    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
300        mut self,
301        v: V,
302    ) -> Self {
303        self.options.backoff_policy = v.into().into();
304        self
305    }
306
307    /// The retry throttler used for this request.
308    ///
309    /// Most of the time you want to use the same throttler for all the requests
310    /// in a client, and even the same throttler for many clients. Rarely it
311    /// may be necessary to use an custom throttler for some subset of the
312    /// requests.
313    ///
314    /// # Example
315    /// ```
316    /// # use google_cloud_storage::client::Storage;
317    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
318    /// let response = client
319    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
320    ///     .with_retry_throttler(adhoc_throttler())
321    ///     .send()
322    ///     .await?;
323    /// println!("response details={response:?}");
324    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
325    ///     # panic!();
326    /// }
327    /// # Ok(()) }
328    /// ```
329    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
330        mut self,
331        v: V,
332    ) -> Self {
333        self.options.retry_throttler = v.into().into();
334        self
335    }
336
337    /// Configure the resume policy for read requests.
338    ///
339    /// The Cloud Storage client library can automatically resume a read that is
340    /// interrupted by a transient error. Applications may want to limit the
341    /// number of read attempts, or may wish to expand the type of errors
342    /// treated as retryable.
343    ///
344    /// # Example
345    /// ```
346    /// # use google_cloud_storage::client::Storage;
347    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
348    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
349    /// let response = client
350    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
351    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
352    ///     .send()
353    ///     .await?;
354    /// # Ok(()) }
355    /// ```
356    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
357    where
358        V: ReadResumePolicy + 'static,
359    {
360        self.options.set_read_resume_policy(std::sync::Arc::new(v));
361        self
362    }
363
364    /// Enables automatic decompression.
365    ///
366    /// The Cloud Storage service [automatically decompresses] objects
367    /// with `content_encoding == "gzip"` during reads. The client library
368    /// disables this behavior by default, as it is not possible to
369    /// perform ranged reads or to resume interrupted downloads if automatic
370    /// decompression is enabled.
371    ///
372    /// Use this option to enable automatic decompression.
373    ///
374    /// # Example
375    /// ```
376    /// # use google_cloud_storage::client::Storage;
377    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
378    /// let response = client
379    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
380    ///     .with_automatic_decompression(true)
381    ///     .send()
382    ///     .await?;
383    /// println!("response details={response:?}");
384    /// # Ok(()) }
385    /// ```
386    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
387        self.options.automatic_decompression = v;
388        self
389    }
390
391    /// Sends the request.
392    pub async fn send(self) -> Result<ReadObjectResponse> {
393        self.stub.read_object(self.request, self.options).await
394    }
395}
396
397// A convenience struct that saves the request conditions and performs the read.
398#[derive(Clone, Debug)]
399pub(crate) struct Reader {
400    pub inner: std::sync::Arc<StorageInner>,
401    pub request: crate::model::ReadObjectRequest,
402    pub options: RequestOptions,
403}
404
405impl Reader {
406    async fn read(self) -> Result<reqwest::Response> {
407        let throttler = self.options.retry_throttler.clone();
408        let retry = self.options.retry_policy.clone();
409        let backoff = self.options.backoff_policy.clone();
410
411        gax::retry_loop_internal::retry_loop(
412            async move |_| self.read_attempt().await,
413            async |duration| tokio::time::sleep(duration).await,
414            true,
415            throttler,
416            retry,
417            backoff,
418        )
419        .await
420    }
421
422    async fn read_attempt(&self) -> Result<reqwest::Response> {
423        let builder = self.http_request_builder().await?;
424        let response = builder.send().await.map_err(map_send_error)?;
425        if !response.status().is_success() {
426            return gaxi::http::to_http_error(response).await;
427        }
428        Ok(response)
429    }
430
431    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
432        // Collect the required bucket and object parameters.
433        let bucket = &self.request.bucket;
434        let bucket_id = bucket
435            .as_str()
436            .strip_prefix("projects/_/buckets/")
437            .ok_or_else(|| {
438                Error::binding(format!(
439                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
440                ))
441            })?;
442        let object = &self.request.object;
443
444        // Build the request.
445        let builder = self
446            .inner
447            .client
448            .request(
449                reqwest::Method::GET,
450                format!(
451                    "{}/storage/v1/b/{bucket_id}/o/{}",
452                    &self.inner.endpoint,
453                    enc(object)
454                ),
455            )
456            .query(&[("alt", "media")])
457            .header(
458                "x-goog-api-client",
459                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
460            );
461
462        let builder = if self.options.automatic_decompression {
463            builder
464        } else {
465            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
466            //
467            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
468            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
469            // with a different `contentEncoding` value.
470            builder.header(
471                "accept-encoding",
472                reqwest::header::HeaderValue::from_static("gzip"),
473            )
474        };
475
476        // Add the optional query parameters.
477        let builder = if self.request.generation != 0 {
478            builder.query(&[("generation", self.request.generation)])
479        } else {
480            builder
481        };
482        let builder = self
483            .request
484            .if_generation_match
485            .iter()
486            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
487        let builder = self
488            .request
489            .if_generation_not_match
490            .iter()
491            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
492        let builder = self
493            .request
494            .if_metageneration_match
495            .iter()
496            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
497        let builder = self
498            .request
499            .if_metageneration_not_match
500            .iter()
501            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
502
503        let builder = apply_customer_supplied_encryption_headers(
504            builder,
505            &self.request.common_object_request_params,
506        );
507
508        // Apply "range" header for read limits and offsets.
509        let builder = match (self.request.read_offset, self.request.read_limit) {
510            // read_limit can't be negative.
511            (_, l) if l < 0 => {
512                unreachable!("ReadObject build never sets a negative read_limit value")
513            }
514            // negative offset can't also have a read_limit.
515            (o, l) if o < 0 && l > 0 => unreachable!(
516                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
517            ),
518            // If both are zero, we use default implementation (no range header).
519            (0, 0) => builder,
520            // negative offset with no limit means the last N bytes.
521            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
522            // read_limit is zero, means no limit. Read from offset to end of file.
523            // This handles cases like (5, 0) -> "bytes=5-"
524            (o, 0) => builder.header("range", format!("bytes={o}-")),
525            // General case: non-negative offset and positive limit.
526            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
527            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
528        };
529
530        self.inner.apply_auth_headers(builder).await
531    }
532
533    fn is_gunzipped(response: &reqwest::Response) -> bool {
534        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
535        // objects. Reading such objects comes with a number of restrictions:
536        // - Ranged reads do not work.
537        // - The size of the decompressed data is not known.
538        // - Checksums do not work because the object checksums correspond to the
539        //   compressed data and the client library receives the decompressed data.
540        //
541        // Because ranged reads do not work, resuming a read does not work. Consequently,
542        // the implementation of `ReadObjectResponse` is substantially different for
543        // objects that are gunzipped.
544        //
545        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
546        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
547        use http::header::WARNING;
548        if response
549            .headers()
550            .get(TRANSFORMATION)
551            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
552        {
553            return true;
554        }
555        response
556            .headers()
557            .get(WARNING)
558            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
559    }
560
561    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
562        let response = self.clone().read().await?;
563        if Self::is_gunzipped(&response) {
564            return Ok(ReadObjectResponse::new(Box::new(
565                non_resumable::NonResumableResponse::new(response)?,
566            )));
567        }
568        Ok(ReadObjectResponse::new(Box::new(
569            resumable::ResumableResponse::new(self, response)?,
570        )))
571    }
572}
573
574#[cfg(test)]
575mod resume_tests;
576
577#[cfg(test)]
578mod tests {
579    use super::client::tests::{test_builder, test_inner_client};
580    use super::*;
581    use crate::error::{ChecksumMismatch, ReadError};
582    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
583    use auth::credentials::anonymous::Builder as Anonymous;
584    use base64::Engine;
585    use futures::TryStreamExt;
586    use httptest::{Expectation, Server, matchers::*, responders::status_code};
587    use std::collections::HashMap;
588    use std::error::Error;
589    use std::sync::Arc;
590    use test_case::test_case;
591
592    type Result = anyhow::Result<()>;
593
594    async fn http_request_builder(
595        inner: Arc<StorageInner>,
596        builder: ReadObject,
597    ) -> crate::Result<reqwest::RequestBuilder> {
598        let reader = Reader {
599            inner,
600            request: builder.request,
601            options: builder.options,
602        };
603        reader.http_request_builder().await
604    }
605
606    // Verify `read_object()` meets normal Send, Sync, requirements.
607    #[tokio::test]
608    async fn test_read_is_send_and_static() -> Result {
609        let client = Storage::builder()
610            .with_credentials(Anonymous::new().build())
611            .build()
612            .await?;
613
614        fn need_send<T: Send>(_val: &T) {}
615        fn need_sync<T: Sync>(_val: &T) {}
616        fn need_static<T: 'static>(_val: &T) {}
617
618        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
619        need_send(&read);
620        need_sync(&read);
621        need_static(&read);
622
623        let read = client
624            .read_object("projects/_/buckets/test-bucket", "test-object")
625            .send();
626        need_send(&read);
627        need_static(&read);
628
629        Ok(())
630    }
631
632    #[tokio::test]
633    async fn read_object_normal() -> Result {
634        let server = Server::run();
635        server.expect(
636            Expectation::matching(all_of![
637                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
638                request::headers(contains(("accept-encoding", "gzip"))),
639                request::query(url_decoded(contains(("alt", "media")))),
640            ])
641            .respond_with(
642                status_code(200)
643                    .body("hello world")
644                    .append_header("x-goog-generation", 123456),
645            ),
646        );
647
648        let client = Storage::builder()
649            .with_endpoint(format!("http://{}", server.addr()))
650            .with_credentials(Anonymous::new().build())
651            .build()
652            .await?;
653        let mut reader = client
654            .read_object("projects/_/buckets/test-bucket", "test-object")
655            .send()
656            .await?;
657        let mut got = Vec::new();
658        while let Some(b) = reader.next().await.transpose()? {
659            got.extend_from_slice(&b);
660        }
661        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
662
663        Ok(())
664    }
665
666    #[tokio::test]
667    async fn read_object_stream() -> Result {
668        let server = Server::run();
669        server.expect(
670            Expectation::matching(all_of![
671                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
672                request::query(url_decoded(contains(("alt", "media")))),
673            ])
674            .respond_with(
675                status_code(200)
676                    .append_header("x-goog-generation", 123456)
677                    .body("hello world"),
678            ),
679        );
680
681        let client = Storage::builder()
682            .with_endpoint(format!("http://{}", server.addr()))
683            .with_credentials(Anonymous::new().build())
684            .build()
685            .await?;
686        let response = client
687            .read_object("projects/_/buckets/test-bucket", "test-object")
688            .send()
689            .await?;
690        let result: Vec<_> = response.into_stream().try_collect().await?;
691        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn read_object_next_then_consume_response() -> Result {
698        // Create a large enough file that will require multiple chunks to read.
699        const BLOCK_SIZE: usize = 500;
700        let mut contents = Vec::new();
701        for i in 0..50 {
702            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
703        }
704
705        // Calculate and serialize the crc32c checksum
706        let u = crc32c::crc32c(&contents);
707        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
708
709        let server = Server::run();
710        server.expect(
711            Expectation::matching(all_of![
712                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
713                request::query(url_decoded(contains(("alt", "media")))),
714            ])
715            .times(1)
716            .respond_with(
717                status_code(200)
718                    .body(contents.clone())
719                    .append_header("x-goog-hash", format!("crc32c={value}"))
720                    .append_header("x-goog-generation", 123456),
721            ),
722        );
723
724        let client = Storage::builder()
725            .with_endpoint(format!("http://{}", server.addr()))
726            .with_credentials(Anonymous::new().build())
727            .build()
728            .await?;
729
730        // Read some bytes, then remainder with stream.
731        let mut response = client
732            .read_object("projects/_/buckets/test-bucket", "test-object")
733            .send()
734            .await?;
735
736        let mut all_bytes = bytes::BytesMut::new();
737        let chunk = response.next().await.transpose()?.unwrap();
738        assert!(!chunk.is_empty());
739        all_bytes.extend(chunk);
740        use futures::StreamExt;
741        let mut stream = response.into_stream();
742        while let Some(chunk) = stream.next().await.transpose()? {
743            all_bytes.extend(chunk);
744        }
745        assert_eq!(all_bytes, contents);
746
747        Ok(())
748    }
749
750    #[tokio::test]
751    async fn read_object_not_found() -> Result {
752        let server = Server::run();
753        server.expect(
754            Expectation::matching(all_of![
755                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
756                request::query(url_decoded(contains(("alt", "media")))),
757            ])
758            .respond_with(status_code(404).body("NOT FOUND")),
759        );
760
761        let client = Storage::builder()
762            .with_endpoint(format!("http://{}", server.addr()))
763            .with_credentials(Anonymous::new().build())
764            .build()
765            .await?;
766        let err = client
767            .read_object("projects/_/buckets/test-bucket", "test-object")
768            .send()
769            .await
770            .expect_err("expected a not found error");
771        assert_eq!(err.http_status_code(), Some(404));
772
773        Ok(())
774    }
775
776    #[tokio::test]
777    async fn read_object_incorrect_crc32c_check() -> Result {
778        // Calculate and serialize the crc32c checksum
779        let u = crc32c::crc32c("goodbye world".as_bytes());
780        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
781
782        let server = Server::run();
783        server.expect(
784            Expectation::matching(all_of![
785                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
786                request::query(url_decoded(contains(("alt", "media")))),
787            ])
788            .times(3)
789            .respond_with(
790                status_code(200)
791                    .body("hello world")
792                    .append_header("x-goog-hash", format!("crc32c={value}"))
793                    .append_header("x-goog-generation", 123456),
794            ),
795        );
796
797        let client = Storage::builder()
798            .with_endpoint(format!("http://{}", server.addr()))
799            .with_credentials(Anonymous::new().build())
800            .build()
801            .await?;
802        let mut response = client
803            .read_object("projects/_/buckets/test-bucket", "test-object")
804            .send()
805            .await?;
806        let mut partial = Vec::new();
807        let mut err = None;
808        while let Some(r) = response.next().await {
809            match r {
810                Ok(b) => partial.extend_from_slice(&b),
811                Err(e) => err = Some(e),
812            };
813        }
814        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
815        let err = err.expect("expect error on incorrect crc32c");
816        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
817        assert!(
818            matches!(
819                source,
820                Some(&ReadError::ChecksumMismatch(
821                    ChecksumMismatch::Crc32c { .. }
822                ))
823            ),
824            "err={err:?}"
825        );
826
827        let mut response = client
828            .read_object("projects/_/buckets/test-bucket", "test-object")
829            .send()
830            .await?;
831        let err: crate::Error = async {
832            {
833                while (response.next().await.transpose()?).is_some() {}
834                Ok(())
835            }
836        }
837        .await
838        .expect_err("expect error on incorrect crc32c");
839        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
840        assert!(
841            matches!(
842                source,
843                Some(&ReadError::ChecksumMismatch(
844                    ChecksumMismatch::Crc32c { .. }
845                ))
846            ),
847            "err={err:?}"
848        );
849
850        use futures::TryStreamExt;
851        let err = client
852            .read_object("projects/_/buckets/test-bucket", "test-object")
853            .send()
854            .await?
855            .into_stream()
856            .try_collect::<Vec<bytes::Bytes>>()
857            .await
858            .expect_err("expect error on incorrect crc32c");
859        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
860        assert!(
861            matches!(
862                source,
863                Some(&ReadError::ChecksumMismatch(
864                    ChecksumMismatch::Crc32c { .. }
865                ))
866            ),
867            "err={err:?}"
868        );
869        Ok(())
870    }
871
872    #[tokio::test]
873    async fn read_object_incorrect_md5_check() -> Result {
874        // Calculate and serialize the md5 checksum
875        let digest = md5::compute("goodbye world".as_bytes());
876        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
877
878        let server = Server::run();
879        server.expect(
880            Expectation::matching(all_of![
881                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
882                request::query(url_decoded(contains(("alt", "media")))),
883            ])
884            .times(1)
885            .respond_with(
886                status_code(200)
887                    .body("hello world")
888                    .append_header("x-goog-hash", format!("md5={value}"))
889                    .append_header("x-goog-generation", 123456),
890            ),
891        );
892
893        let client = Storage::builder()
894            .with_endpoint(format!("http://{}", server.addr()))
895            .with_credentials(Anonymous::new().build())
896            .build()
897            .await?;
898        let mut response = client
899            .read_object("projects/_/buckets/test-bucket", "test-object")
900            .compute_md5()
901            .send()
902            .await?;
903        let mut partial = Vec::new();
904        let mut err = None;
905        while let Some(r) = response.next().await {
906            match r {
907                Ok(b) => partial.extend_from_slice(&b),
908                Err(e) => err = Some(e),
909            };
910        }
911        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
912        let err = err.expect("expect error on incorrect md5");
913        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
914        assert!(
915            matches!(
916                source,
917                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
918            ),
919            "err={err:?}"
920        );
921
922        Ok(())
923    }
924
925    #[tokio::test]
926    async fn read_object() -> Result {
927        let inner = test_inner_client(test_builder());
928        let stub = crate::storage::transport::Storage::new(inner.clone());
929        let builder = ReadObject::new(
930            stub,
931            "projects/_/buckets/bucket",
932            "object",
933            inner.options.clone(),
934        );
935        let request = http_request_builder(inner, builder).await?.build()?;
936
937        assert_eq!(request.method(), reqwest::Method::GET);
938        assert_eq!(
939            request.url().as_str(),
940            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
941        );
942        Ok(())
943    }
944
945    #[tokio::test]
946    async fn read_object_error_credentials() -> Result {
947        let inner = test_inner_client(
948            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
949        );
950        let stub = crate::storage::transport::Storage::new(inner.clone());
951        let builder = ReadObject::new(
952            stub,
953            "projects/_/buckets/bucket",
954            "object",
955            inner.options.clone(),
956        );
957        let _ = http_request_builder(inner, builder)
958            .await
959            .inspect_err(|e| assert!(e.is_authentication()))
960            .expect_err("invalid credentials should err");
961        Ok(())
962    }
963
964    #[tokio::test]
965    async fn read_object_bad_bucket() -> Result {
966        let inner = test_inner_client(test_builder());
967        let stub = crate::storage::transport::Storage::new(inner.clone());
968        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
969        let _ = http_request_builder(inner, builder)
970            .await
971            .expect_err("malformed bucket string should error");
972        Ok(())
973    }
974
975    #[tokio::test]
976    async fn read_object_query_params() -> Result {
977        let inner = test_inner_client(test_builder());
978        let stub = crate::storage::transport::Storage::new(inner.clone());
979        let builder = ReadObject::new(
980            stub,
981            "projects/_/buckets/bucket",
982            "object",
983            inner.options.clone(),
984        )
985        .set_generation(5)
986        .set_if_generation_match(10)
987        .set_if_generation_not_match(20)
988        .set_if_metageneration_match(30)
989        .set_if_metageneration_not_match(40);
990        let request = http_request_builder(inner, builder).await?.build()?;
991
992        assert_eq!(request.method(), reqwest::Method::GET);
993        let want_pairs: HashMap<String, String> = [
994            ("alt", "media"),
995            ("generation", "5"),
996            ("ifGenerationMatch", "10"),
997            ("ifGenerationNotMatch", "20"),
998            ("ifMetagenerationMatch", "30"),
999            ("ifMetagenerationNotMatch", "40"),
1000        ]
1001        .iter()
1002        .map(|(k, v)| (k.to_string(), v.to_string()))
1003        .collect();
1004        let query_pairs: HashMap<String, String> = request
1005            .url()
1006            .query_pairs()
1007            .map(|param| (param.0.to_string(), param.1.to_string()))
1008            .collect();
1009        assert_eq!(query_pairs.len(), want_pairs.len());
1010        assert_eq!(query_pairs, want_pairs);
1011        Ok(())
1012    }
1013
1014    #[tokio::test]
1015    async fn read_object_default_headers() -> Result {
1016        // The API takes the unencoded byte array.
1017        let inner = test_inner_client(test_builder());
1018        let stub = crate::storage::transport::Storage::new(inner.clone());
1019        let builder = ReadObject::new(
1020            stub,
1021            "projects/_/buckets/bucket",
1022            "object",
1023            inner.options.clone(),
1024        );
1025        let request = http_request_builder(inner, builder).await?.build()?;
1026
1027        assert_eq!(request.method(), reqwest::Method::GET);
1028        assert_eq!(
1029            request.url().as_str(),
1030            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1031        );
1032
1033        let want = [("accept-encoding", "gzip")];
1034        let headers = request.headers();
1035        for (name, value) in want {
1036            assert_eq!(
1037                headers.get(name).and_then(|h| h.to_str().ok()),
1038                Some(value),
1039                "{request:?}"
1040            );
1041        }
1042        Ok(())
1043    }
1044
1045    #[tokio::test]
1046    async fn read_object_automatic_decompression_headers() -> Result {
1047        // The API takes the unencoded byte array.
1048        let inner = test_inner_client(test_builder());
1049        let stub = crate::storage::transport::Storage::new(inner.clone());
1050        let builder = ReadObject::new(
1051            stub,
1052            "projects/_/buckets/bucket",
1053            "object",
1054            inner.options.clone(),
1055        )
1056        .with_automatic_decompression(true);
1057        let request = http_request_builder(inner, builder).await?.build()?;
1058
1059        assert_eq!(request.method(), reqwest::Method::GET);
1060        assert_eq!(
1061            request.url().as_str(),
1062            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1063        );
1064
1065        let headers = request.headers();
1066        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1067        Ok(())
1068    }
1069
1070    #[tokio::test]
1071    async fn read_object_encryption_headers() -> Result {
1072        // Make a 32-byte key.
1073        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1074
1075        // The API takes the unencoded byte array.
1076        let inner = test_inner_client(test_builder());
1077        let stub = crate::storage::transport::Storage::new(inner.clone());
1078        let builder = ReadObject::new(
1079            stub,
1080            "projects/_/buckets/bucket",
1081            "object",
1082            inner.options.clone(),
1083        )
1084        .set_key(KeyAes256::new(&key)?);
1085        let request = http_request_builder(inner, builder).await?.build()?;
1086
1087        assert_eq!(request.method(), reqwest::Method::GET);
1088        assert_eq!(
1089            request.url().as_str(),
1090            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1091        );
1092
1093        let want = [
1094            ("x-goog-encryption-algorithm", "AES256".to_string()),
1095            ("x-goog-encryption-key", key_base64),
1096            ("x-goog-encryption-key-sha256", key_sha256_base64),
1097        ];
1098
1099        let headers = request.headers();
1100        for (name, value) in want {
1101            assert_eq!(
1102                headers.get(name).and_then(|h| h.to_str().ok()),
1103                Some(value.as_str())
1104            );
1105        }
1106        Ok(())
1107    }
1108
1109    #[test_case(ReadRange::all(), None; "no headers needed")]
1110    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1111    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1112    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1113    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1114    #[tokio::test]
1115    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1116        let inner = test_inner_client(test_builder());
1117        let stub = crate::storage::transport::Storage::new(inner.clone());
1118        let builder = ReadObject::new(
1119            stub,
1120            "projects/_/buckets/bucket",
1121            "object",
1122            inner.options.clone(),
1123        )
1124        .set_read_range(input.clone());
1125        let request = http_request_builder(inner, builder).await?.build()?;
1126
1127        assert_eq!(request.method(), reqwest::Method::GET);
1128        assert_eq!(
1129            request.url().as_str(),
1130            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1131        );
1132
1133        assert_eq!(request.headers().get("range"), want);
1134        Ok(())
1135    }
1136
1137    #[test_case("projects/p", "projects%2Fp")]
1138    #[test_case("kebab-case", "kebab-case")]
1139    #[test_case("dot.name", "dot.name")]
1140    #[test_case("under_score", "under_score")]
1141    #[test_case("tilde~123", "tilde~123")]
1142    #[test_case("exclamation!point!", "exclamation%21point%21")]
1143    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1144    #[test_case("preserve%percent%21", "preserve%percent%21")]
1145    #[test_case(
1146        "testall !#$&'()*+,/:;=?@[]",
1147        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1148    )]
1149    #[tokio::test]
1150    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1151        let inner = test_inner_client(test_builder());
1152        let stub = crate::storage::transport::Storage::new(inner.clone());
1153        let builder = ReadObject::new(
1154            stub,
1155            "projects/_/buckets/bucket",
1156            name,
1157            inner.options.clone(),
1158        );
1159        let request = http_request_builder(inner, builder).await?.build()?;
1160        let got = request.url().path_segments().unwrap().next_back().unwrap();
1161        assert_eq!(got, want);
1162        Ok(())
1163    }
1164
1165    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1166    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1167    #[test_case("warning", "214 UploadServer gunzipped", true)]
1168    #[test_case("warning", "no match", false)]
1169    #[test_case("unused", "unused", false)]
1170    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1171        let response = http::Response::builder()
1172            .status(200)
1173            .header(name, value)
1174            .body(Vec::new())?;
1175        let response = reqwest::Response::from(response);
1176        let got = Reader::is_gunzipped(&response);
1177        assert_eq!(got, want, "{response:?}");
1178        Ok(())
1179    }
1180}