google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::http::map_send_error;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> Clone for ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    fn clone(&self) -> Self {
79        Self {
80            stub: self.stub.clone(),
81            request: self.request.clone(),
82            options: self.options.clone(),
83        }
84    }
85}
86
87impl<S> ReadObject<S>
88where
89    S: crate::storage::stub::Storage + 'static,
90{
91    pub(crate) fn new<B, O>(
92        stub: std::sync::Arc<S>,
93        bucket: B,
94        object: O,
95        options: RequestOptions,
96    ) -> Self
97    where
98        B: Into<String>,
99        O: Into<String>,
100    {
101        ReadObject {
102            stub,
103            request: crate::model::ReadObjectRequest::new()
104                .set_bucket(bucket)
105                .set_object(object),
106            options,
107        }
108    }
109
110    /// Enables computation of MD5 hashes.
111    ///
112    /// Crc32c hashes are checked by default.
113    ///
114    /// Checksum validation is supported iff:
115    /// 1. The full content is requested.
116    /// 2. All of the content is returned (status != PartialContent).
117    /// 3. The server sent a checksum header.
118    /// 4. The http stack did not uncompress the file.
119    /// 5. The server did not uncompress data on read.
120    ///
121    /// # Example
122    /// ```
123    /// # use google_cloud_storage::client::Storage;
124    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
125    /// let builder =  client
126    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
127    ///     .compute_md5();
128    /// let mut reader = builder
129    ///     .send()
130    ///     .await?;
131    /// let mut contents = Vec::new();
132    /// while let Some(chunk) = reader.next().await.transpose()? {
133    ///     contents.extend_from_slice(&chunk);
134    /// }
135    /// println!("object contents={:?}", contents);
136    /// # Ok(()) }
137    /// ```
138    pub fn compute_md5(self) -> Self {
139        let mut this = self;
140        this.options.checksum.md5_hash = Some(Md5::default());
141        this
142    }
143
144    /// If present, selects a specific revision of this object (as
145    /// opposed to the latest version, the default).
146    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
147        self.request.generation = v.into();
148        self
149    }
150
151    /// Makes the operation conditional on whether the object's current generation
152    /// matches the given value. Setting to 0 makes the operation succeed only if
153    /// there are no live versions of the object.
154    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
155    where
156        T: Into<i64>,
157    {
158        self.request.if_generation_match = Some(v.into());
159        self
160    }
161
162    /// Makes the operation conditional on whether the object's live generation
163    /// does not match the given value. If no live object exists, the precondition
164    /// fails. Setting to 0 makes the operation succeed only if there is a live
165    /// version of the object.
166    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
167    where
168        T: Into<i64>,
169    {
170        self.request.if_generation_not_match = Some(v.into());
171        self
172    }
173
174    /// Makes the operation conditional on whether the object's current
175    /// metageneration matches the given value.
176    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
177    where
178        T: Into<i64>,
179    {
180        self.request.if_metageneration_match = Some(v.into());
181        self
182    }
183
184    /// Makes the operation conditional on whether the object's current
185    /// metageneration does not match the given value.
186    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
187    where
188        T: Into<i64>,
189    {
190        self.request.if_metageneration_not_match = Some(v.into());
191        self
192    }
193
194    /// The range of bytes to return in the read.
195    ///
196    /// This can be all the bytes starting at a given offset
197    /// (`ReadRange::offset()`), all the bytes in an explicit range
198    /// (`Range::segment`), or the last N bytes of the object
199    /// (`ReadRange::tail`).
200    ///
201    /// # Examples
202    ///
203    /// Read starting at 100 bytes to end of file.
204    /// ```
205    /// # use google_cloud_storage::client::Storage;
206    /// # use google_cloud_storage::model_ext::ReadRange;
207    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
208    /// let response = client
209    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
210    ///     .set_read_range(ReadRange::offset(100))
211    ///     .send()
212    ///     .await?;
213    /// println!("response details={response:?}");
214    /// # Ok(()) }
215    /// ```
216    ///
217    /// Read last 100 bytes of file:
218    /// ```
219    /// # use google_cloud_storage::client::Storage;
220    /// # use google_cloud_storage::model_ext::ReadRange;
221    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
222    /// let response = client
223    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
224    ///     .set_read_range(ReadRange::tail(100))
225    ///     .send()
226    ///     .await?;
227    /// println!("response details={response:?}");
228    /// # Ok(()) }
229    /// ```
230    ///
231    /// Read bytes 1000 to 1099.
232    /// ```
233    /// # use google_cloud_storage::client::Storage;
234    /// # use google_cloud_storage::model_ext::ReadRange;
235    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
236    /// let response = client
237    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
238    ///     .set_read_range(ReadRange::segment(1000, 100))
239    ///     .send()
240    ///     .await?;
241    /// println!("response details={response:?}");
242    /// # Ok(()) }
243    /// ```
244    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
245        self.request.with_range(range);
246        self
247    }
248
249    /// The encryption key used with the Customer-Supplied Encryption Keys
250    /// feature. In raw bytes format (not base64-encoded).
251    ///
252    /// Example:
253    /// ```
254    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
255    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
256    /// let key: &[u8] = &[97; 32];
257    /// let response = client
258    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
259    ///     .set_key(KeyAes256::new(key)?)
260    ///     .send()
261    ///     .await?;
262    /// println!("response details={response:?}");
263    /// # Ok(()) }
264    /// ```
265    pub fn set_key(mut self, v: KeyAes256) -> Self {
266        self.request.common_object_request_params = Some(v.into());
267        self
268    }
269
270    /// The retry policy used for this request.
271    ///
272    /// # Example
273    /// ```
274    /// # use google_cloud_storage::client::Storage;
275    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
276    /// use google_cloud_storage::retry_policy::RetryableErrors;
277    /// use std::time::Duration;
278    /// use gax::retry_policy::RetryPolicyExt;
279    /// let response = client
280    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
281    ///     .with_retry_policy(
282    ///         RetryableErrors
283    ///             .with_attempt_limit(5)
284    ///             .with_time_limit(Duration::from_secs(10)),
285    ///     )
286    ///     .send()
287    ///     .await?;
288    /// println!("response details={response:?}");
289    /// # Ok(()) }
290    /// ```
291    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
292        self.options.retry_policy = v.into().into();
293        self
294    }
295
296    /// The backoff policy used for this request.
297    ///
298    /// # Example
299    /// ```
300    /// # use google_cloud_storage::client::Storage;
301    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
302    /// use std::time::Duration;
303    /// use gax::exponential_backoff::ExponentialBackoff;
304    /// let response = client
305    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
306    ///     .with_backoff_policy(ExponentialBackoff::default())
307    ///     .send()
308    ///     .await?;
309    /// println!("response details={response:?}");
310    /// # Ok(()) }
311    /// ```
312    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
313        mut self,
314        v: V,
315    ) -> Self {
316        self.options.backoff_policy = v.into().into();
317        self
318    }
319
320    /// The retry throttler used for this request.
321    ///
322    /// Most of the time you want to use the same throttler for all the requests
323    /// in a client, and even the same throttler for many clients. Rarely it
324    /// may be necessary to use an custom throttler for some subset of the
325    /// requests.
326    ///
327    /// # Example
328    /// ```
329    /// # use google_cloud_storage::client::Storage;
330    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
331    /// let response = client
332    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
333    ///     .with_retry_throttler(adhoc_throttler())
334    ///     .send()
335    ///     .await?;
336    /// println!("response details={response:?}");
337    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
338    ///     # panic!();
339    /// }
340    /// # Ok(()) }
341    /// ```
342    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
343        mut self,
344        v: V,
345    ) -> Self {
346        self.options.retry_throttler = v.into().into();
347        self
348    }
349
350    /// Configure the resume policy for read requests.
351    ///
352    /// The Cloud Storage client library can automatically resume a read that is
353    /// interrupted by a transient error. Applications may want to limit the
354    /// number of read attempts, or may wish to expand the type of errors
355    /// treated as retryable.
356    ///
357    /// # Example
358    /// ```
359    /// # use google_cloud_storage::client::Storage;
360    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
361    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
362    /// let response = client
363    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
364    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
365    ///     .send()
366    ///     .await?;
367    /// # Ok(()) }
368    /// ```
369    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
370    where
371        V: ReadResumePolicy + 'static,
372    {
373        self.options.set_read_resume_policy(std::sync::Arc::new(v));
374        self
375    }
376
377    /// Enables automatic decompression.
378    ///
379    /// The Cloud Storage service [automatically decompresses] objects
380    /// with `content_encoding == "gzip"` during reads. The client library
381    /// disables this behavior by default, as it is not possible to
382    /// perform ranged reads or to resume interrupted downloads if automatic
383    /// decompression is enabled.
384    ///
385    /// Use this option to enable automatic decompression.
386    ///
387    /// # Example
388    /// ```
389    /// # use google_cloud_storage::client::Storage;
390    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
391    /// let response = client
392    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
393    ///     .with_automatic_decompression(true)
394    ///     .send()
395    ///     .await?;
396    /// println!("response details={response:?}");
397    /// # Ok(()) }
398    /// ```
399    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
400        self.options.automatic_decompression = v;
401        self
402    }
403
404    /// Sends the request.
405    pub async fn send(self) -> Result<ReadObjectResponse> {
406        self.stub.read_object(self.request, self.options).await
407    }
408}
409
410// A convenience struct that saves the request conditions and performs the read.
411#[derive(Clone, Debug)]
412pub(crate) struct Reader {
413    pub inner: std::sync::Arc<StorageInner>,
414    pub request: crate::model::ReadObjectRequest,
415    pub options: RequestOptions,
416}
417
418impl Reader {
419    async fn read(self) -> Result<reqwest::Response> {
420        let throttler = self.options.retry_throttler.clone();
421        let retry = self.options.retry_policy.clone();
422        let backoff = self.options.backoff_policy.clone();
423
424        gax::retry_loop_internal::retry_loop(
425            async move |_| self.read_attempt().await,
426            async |duration| tokio::time::sleep(duration).await,
427            true,
428            throttler,
429            retry,
430            backoff,
431        )
432        .await
433    }
434
435    async fn read_attempt(&self) -> Result<reqwest::Response> {
436        let builder = self.http_request_builder().await?;
437        let response = builder.send().await.map_err(map_send_error)?;
438        if !response.status().is_success() {
439            return gaxi::http::to_http_error(response).await;
440        }
441        Ok(response)
442    }
443
444    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
445        // Collect the required bucket and object parameters.
446        let bucket = &self.request.bucket;
447        let bucket_id = bucket
448            .as_str()
449            .strip_prefix("projects/_/buckets/")
450            .ok_or_else(|| {
451                Error::binding(format!(
452                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
453                ))
454            })?;
455        let object = &self.request.object;
456
457        // Build the request.
458        let builder = self
459            .inner
460            .client
461            .request(
462                reqwest::Method::GET,
463                format!(
464                    "{}/storage/v1/b/{bucket_id}/o/{}",
465                    &self.inner.endpoint,
466                    enc(object)
467                ),
468            )
469            .query(&[("alt", "media")])
470            .header(
471                "x-goog-api-client",
472                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
473            );
474
475        let builder = if self.options.automatic_decompression {
476            builder
477        } else {
478            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
479            //
480            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
481            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
482            // with a different `contentEncoding` value.
483            builder.header(
484                "accept-encoding",
485                reqwest::header::HeaderValue::from_static("gzip"),
486            )
487        };
488
489        // Add the optional query parameters.
490        let builder = if self.request.generation != 0 {
491            builder.query(&[("generation", self.request.generation)])
492        } else {
493            builder
494        };
495        let builder = self
496            .request
497            .if_generation_match
498            .iter()
499            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
500        let builder = self
501            .request
502            .if_generation_not_match
503            .iter()
504            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
505        let builder = self
506            .request
507            .if_metageneration_match
508            .iter()
509            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
510        let builder = self
511            .request
512            .if_metageneration_not_match
513            .iter()
514            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
515
516        let builder = apply_customer_supplied_encryption_headers(
517            builder,
518            &self.request.common_object_request_params,
519        );
520
521        // Apply "range" header for read limits and offsets.
522        let builder = match (self.request.read_offset, self.request.read_limit) {
523            // read_limit can't be negative.
524            (_, l) if l < 0 => {
525                unreachable!("ReadObject build never sets a negative read_limit value")
526            }
527            // negative offset can't also have a read_limit.
528            (o, l) if o < 0 && l > 0 => unreachable!(
529                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
530            ),
531            // If both are zero, we use default implementation (no range header).
532            (0, 0) => builder,
533            // negative offset with no limit means the last N bytes.
534            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
535            // read_limit is zero, means no limit. Read from offset to end of file.
536            // This handles cases like (5, 0) -> "bytes=5-"
537            (o, 0) => builder.header("range", format!("bytes={o}-")),
538            // General case: non-negative offset and positive limit.
539            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
540            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
541        };
542
543        self.inner.apply_auth_headers(builder).await
544    }
545
546    fn is_gunzipped(response: &reqwest::Response) -> bool {
547        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
548        // objects. Reading such objects comes with a number of restrictions:
549        // - Ranged reads do not work.
550        // - The size of the decompressed data is not known.
551        // - Checksums do not work because the object checksums correspond to the
552        //   compressed data and the client library receives the decompressed data.
553        //
554        // Because ranged reads do not work, resuming a read does not work. Consequently,
555        // the implementation of `ReadObjectResponse` is substantially different for
556        // objects that are gunzipped.
557        //
558        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
559        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
560        use http::header::WARNING;
561        if response
562            .headers()
563            .get(TRANSFORMATION)
564            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
565        {
566            return true;
567        }
568        response
569            .headers()
570            .get(WARNING)
571            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
572    }
573
574    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
575        let response = self.clone().read().await?;
576        if Self::is_gunzipped(&response) {
577            return Ok(ReadObjectResponse::new(Box::new(
578                non_resumable::NonResumableResponse::new(response)?,
579            )));
580        }
581        Ok(ReadObjectResponse::new(Box::new(
582            resumable::ResumableResponse::new(self, response)?,
583        )))
584    }
585}
586
587#[cfg(test)]
588mod resume_tests;
589
590#[cfg(test)]
591mod tests {
592    use super::client::tests::{test_builder, test_inner_client};
593    use super::*;
594    use crate::error::{ChecksumMismatch, ReadError};
595    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
596    use base64::Engine;
597    use futures::TryStreamExt;
598    use google_cloud_auth::credentials::{
599        anonymous::Builder as Anonymous, testing::error_credentials,
600    };
601    use httptest::{Expectation, Server, matchers::*, responders::status_code};
602    use std::collections::HashMap;
603    use std::error::Error;
604    use std::sync::Arc;
605    use test_case::test_case;
606
607    type Result = anyhow::Result<()>;
608
609    async fn http_request_builder(
610        inner: Arc<StorageInner>,
611        builder: ReadObject,
612    ) -> crate::Result<reqwest::RequestBuilder> {
613        let reader = Reader {
614            inner,
615            request: builder.request,
616            options: builder.options,
617        };
618        reader.http_request_builder().await
619    }
620
621    #[tokio::test]
622    async fn test_clone() {
623        let inner = test_inner_client(test_builder()).await;
624        let stub = crate::storage::transport::Storage::new(inner.clone());
625        let options = {
626            let mut o = RequestOptions::new();
627            o.set_resumable_upload_threshold(12345_usize);
628            o
629        };
630        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
631
632        let clone = builder.clone();
633        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
634        assert_eq!(clone.request, builder.request);
635        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
636    }
637
638    // Verify `read_object()` meets normal Send, Sync, requirements.
639    #[tokio::test]
640    async fn test_read_is_send_and_static() -> Result {
641        let client = Storage::builder()
642            .with_credentials(Anonymous::new().build())
643            .build()
644            .await?;
645
646        fn need_send<T: Send>(_val: &T) {}
647        fn need_sync<T: Sync>(_val: &T) {}
648        fn need_static<T: 'static>(_val: &T) {}
649
650        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
651        need_send(&read);
652        need_sync(&read);
653        need_static(&read);
654
655        let read = client
656            .read_object("projects/_/buckets/test-bucket", "test-object")
657            .send();
658        need_send(&read);
659        need_static(&read);
660
661        Ok(())
662    }
663
664    #[tokio::test]
665    async fn read_object_normal() -> Result {
666        let server = Server::run();
667        server.expect(
668            Expectation::matching(all_of![
669                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
670                request::headers(contains(("accept-encoding", "gzip"))),
671                request::query(url_decoded(contains(("alt", "media")))),
672            ])
673            .respond_with(
674                status_code(200)
675                    .body("hello world")
676                    .append_header("x-goog-generation", 123456),
677            ),
678        );
679
680        let client = Storage::builder()
681            .with_endpoint(format!("http://{}", server.addr()))
682            .with_credentials(Anonymous::new().build())
683            .build()
684            .await?;
685        let mut reader = client
686            .read_object("projects/_/buckets/test-bucket", "test-object")
687            .send()
688            .await?;
689        let mut got = Vec::new();
690        while let Some(b) = reader.next().await.transpose()? {
691            got.extend_from_slice(&b);
692        }
693        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
694
695        Ok(())
696    }
697
698    #[tokio::test]
699    async fn read_object_stream() -> Result {
700        let server = Server::run();
701        server.expect(
702            Expectation::matching(all_of![
703                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
704                request::query(url_decoded(contains(("alt", "media")))),
705            ])
706            .respond_with(
707                status_code(200)
708                    .append_header("x-goog-generation", 123456)
709                    .body("hello world"),
710            ),
711        );
712
713        let client = Storage::builder()
714            .with_endpoint(format!("http://{}", server.addr()))
715            .with_credentials(Anonymous::new().build())
716            .build()
717            .await?;
718        let response = client
719            .read_object("projects/_/buckets/test-bucket", "test-object")
720            .send()
721            .await?;
722        let result: Vec<_> = response.into_stream().try_collect().await?;
723        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
724
725        Ok(())
726    }
727
728    #[tokio::test]
729    async fn read_object_next_then_consume_response() -> Result {
730        // Create a large enough file that will require multiple chunks to read.
731        const BLOCK_SIZE: usize = 500;
732        let mut contents = Vec::new();
733        for i in 0..50 {
734            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
735        }
736
737        // Calculate and serialize the crc32c checksum
738        let u = crc32c::crc32c(&contents);
739        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
740
741        let server = Server::run();
742        server.expect(
743            Expectation::matching(all_of![
744                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
745                request::query(url_decoded(contains(("alt", "media")))),
746            ])
747            .times(1)
748            .respond_with(
749                status_code(200)
750                    .body(contents.clone())
751                    .append_header("x-goog-hash", format!("crc32c={value}"))
752                    .append_header("x-goog-generation", 123456),
753            ),
754        );
755
756        let client = Storage::builder()
757            .with_endpoint(format!("http://{}", server.addr()))
758            .with_credentials(Anonymous::new().build())
759            .build()
760            .await?;
761
762        // Read some bytes, then remainder with stream.
763        let mut response = client
764            .read_object("projects/_/buckets/test-bucket", "test-object")
765            .send()
766            .await?;
767
768        let mut all_bytes = bytes::BytesMut::new();
769        let chunk = response.next().await.transpose()?.unwrap();
770        assert!(!chunk.is_empty());
771        all_bytes.extend(chunk);
772        use futures::StreamExt;
773        let mut stream = response.into_stream();
774        while let Some(chunk) = stream.next().await.transpose()? {
775            all_bytes.extend(chunk);
776        }
777        assert_eq!(all_bytes, contents);
778
779        Ok(())
780    }
781
782    #[tokio::test]
783    async fn read_object_not_found() -> Result {
784        let server = Server::run();
785        server.expect(
786            Expectation::matching(all_of![
787                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
788                request::query(url_decoded(contains(("alt", "media")))),
789            ])
790            .respond_with(status_code(404).body("NOT FOUND")),
791        );
792
793        let client = Storage::builder()
794            .with_endpoint(format!("http://{}", server.addr()))
795            .with_credentials(Anonymous::new().build())
796            .build()
797            .await?;
798        let err = client
799            .read_object("projects/_/buckets/test-bucket", "test-object")
800            .send()
801            .await
802            .expect_err("expected a not found error");
803        assert_eq!(err.http_status_code(), Some(404));
804
805        Ok(())
806    }
807
808    #[tokio::test]
809    async fn read_object_incorrect_crc32c_check() -> Result {
810        // Calculate and serialize the crc32c checksum
811        let u = crc32c::crc32c("goodbye world".as_bytes());
812        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
813
814        let server = Server::run();
815        server.expect(
816            Expectation::matching(all_of![
817                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
818                request::query(url_decoded(contains(("alt", "media")))),
819            ])
820            .times(3)
821            .respond_with(
822                status_code(200)
823                    .body("hello world")
824                    .append_header("x-goog-hash", format!("crc32c={value}"))
825                    .append_header("x-goog-generation", 123456),
826            ),
827        );
828
829        let client = Storage::builder()
830            .with_endpoint(format!("http://{}", server.addr()))
831            .with_credentials(Anonymous::new().build())
832            .build()
833            .await?;
834        let mut response = client
835            .read_object("projects/_/buckets/test-bucket", "test-object")
836            .send()
837            .await?;
838        let mut partial = Vec::new();
839        let mut err = None;
840        while let Some(r) = response.next().await {
841            match r {
842                Ok(b) => partial.extend_from_slice(&b),
843                Err(e) => err = Some(e),
844            };
845        }
846        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
847        let err = err.expect("expect error on incorrect crc32c");
848        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
849        assert!(
850            matches!(
851                source,
852                Some(&ReadError::ChecksumMismatch(
853                    ChecksumMismatch::Crc32c { .. }
854                ))
855            ),
856            "err={err:?}"
857        );
858
859        let mut response = client
860            .read_object("projects/_/buckets/test-bucket", "test-object")
861            .send()
862            .await?;
863        let err: crate::Error = async {
864            {
865                while (response.next().await.transpose()?).is_some() {}
866                Ok(())
867            }
868        }
869        .await
870        .expect_err("expect error on incorrect crc32c");
871        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
872        assert!(
873            matches!(
874                source,
875                Some(&ReadError::ChecksumMismatch(
876                    ChecksumMismatch::Crc32c { .. }
877                ))
878            ),
879            "err={err:?}"
880        );
881
882        use futures::TryStreamExt;
883        let err = client
884            .read_object("projects/_/buckets/test-bucket", "test-object")
885            .send()
886            .await?
887            .into_stream()
888            .try_collect::<Vec<bytes::Bytes>>()
889            .await
890            .expect_err("expect error on incorrect crc32c");
891        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
892        assert!(
893            matches!(
894                source,
895                Some(&ReadError::ChecksumMismatch(
896                    ChecksumMismatch::Crc32c { .. }
897                ))
898            ),
899            "err={err:?}"
900        );
901        Ok(())
902    }
903
904    #[tokio::test]
905    async fn read_object_incorrect_md5_check() -> Result {
906        // Calculate and serialize the md5 checksum
907        let digest = md5::compute("goodbye world".as_bytes());
908        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
909
910        let server = Server::run();
911        server.expect(
912            Expectation::matching(all_of![
913                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
914                request::query(url_decoded(contains(("alt", "media")))),
915            ])
916            .times(1)
917            .respond_with(
918                status_code(200)
919                    .body("hello world")
920                    .append_header("x-goog-hash", format!("md5={value}"))
921                    .append_header("x-goog-generation", 123456),
922            ),
923        );
924
925        let client = Storage::builder()
926            .with_endpoint(format!("http://{}", server.addr()))
927            .with_credentials(Anonymous::new().build())
928            .build()
929            .await?;
930        let mut response = client
931            .read_object("projects/_/buckets/test-bucket", "test-object")
932            .compute_md5()
933            .send()
934            .await?;
935        let mut partial = Vec::new();
936        let mut err = None;
937        while let Some(r) = response.next().await {
938            match r {
939                Ok(b) => partial.extend_from_slice(&b),
940                Err(e) => err = Some(e),
941            };
942        }
943        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
944        let err = err.expect("expect error on incorrect md5");
945        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
946        assert!(
947            matches!(
948                source,
949                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
950            ),
951            "err={err:?}"
952        );
953
954        Ok(())
955    }
956
957    #[tokio::test]
958    async fn read_object() -> Result {
959        let inner = test_inner_client(test_builder()).await;
960        let stub = crate::storage::transport::Storage::new(inner.clone());
961        let builder = ReadObject::new(
962            stub,
963            "projects/_/buckets/bucket",
964            "object",
965            inner.options.clone(),
966        );
967        let request = http_request_builder(inner, builder).await?.build()?;
968
969        assert_eq!(request.method(), reqwest::Method::GET);
970        assert_eq!(
971            request.url().as_str(),
972            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
973        );
974        Ok(())
975    }
976
977    #[tokio::test]
978    async fn read_object_error_credentials() -> Result {
979        let inner =
980            test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
981        let stub = crate::storage::transport::Storage::new(inner.clone());
982        let builder = ReadObject::new(
983            stub,
984            "projects/_/buckets/bucket",
985            "object",
986            inner.options.clone(),
987        );
988        let _ = http_request_builder(inner, builder)
989            .await
990            .inspect_err(|e| assert!(e.is_authentication()))
991            .expect_err("invalid credentials should err");
992        Ok(())
993    }
994
995    #[tokio::test]
996    async fn read_object_bad_bucket() -> Result {
997        let inner = test_inner_client(test_builder()).await;
998        let stub = crate::storage::transport::Storage::new(inner.clone());
999        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1000        let _ = http_request_builder(inner, builder)
1001            .await
1002            .expect_err("malformed bucket string should error");
1003        Ok(())
1004    }
1005
1006    #[tokio::test]
1007    async fn read_object_query_params() -> Result {
1008        let inner = test_inner_client(test_builder()).await;
1009        let stub = crate::storage::transport::Storage::new(inner.clone());
1010        let builder = ReadObject::new(
1011            stub,
1012            "projects/_/buckets/bucket",
1013            "object",
1014            inner.options.clone(),
1015        )
1016        .set_generation(5)
1017        .set_if_generation_match(10)
1018        .set_if_generation_not_match(20)
1019        .set_if_metageneration_match(30)
1020        .set_if_metageneration_not_match(40);
1021        let request = http_request_builder(inner, builder).await?.build()?;
1022
1023        assert_eq!(request.method(), reqwest::Method::GET);
1024        let want_pairs: HashMap<String, String> = [
1025            ("alt", "media"),
1026            ("generation", "5"),
1027            ("ifGenerationMatch", "10"),
1028            ("ifGenerationNotMatch", "20"),
1029            ("ifMetagenerationMatch", "30"),
1030            ("ifMetagenerationNotMatch", "40"),
1031        ]
1032        .iter()
1033        .map(|(k, v)| (k.to_string(), v.to_string()))
1034        .collect();
1035        let query_pairs: HashMap<String, String> = request
1036            .url()
1037            .query_pairs()
1038            .map(|param| (param.0.to_string(), param.1.to_string()))
1039            .collect();
1040        assert_eq!(query_pairs.len(), want_pairs.len());
1041        assert_eq!(query_pairs, want_pairs);
1042        Ok(())
1043    }
1044
1045    #[tokio::test]
1046    async fn read_object_default_headers() -> Result {
1047        // The API takes the unencoded byte array.
1048        let inner = test_inner_client(test_builder()).await;
1049        let stub = crate::storage::transport::Storage::new(inner.clone());
1050        let builder = ReadObject::new(
1051            stub,
1052            "projects/_/buckets/bucket",
1053            "object",
1054            inner.options.clone(),
1055        );
1056        let request = http_request_builder(inner, builder).await?.build()?;
1057
1058        assert_eq!(request.method(), reqwest::Method::GET);
1059        assert_eq!(
1060            request.url().as_str(),
1061            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1062        );
1063
1064        let want = [("accept-encoding", "gzip")];
1065        let headers = request.headers();
1066        for (name, value) in want {
1067            assert_eq!(
1068                headers.get(name).and_then(|h| h.to_str().ok()),
1069                Some(value),
1070                "{request:?}"
1071            );
1072        }
1073        Ok(())
1074    }
1075
1076    #[tokio::test]
1077    async fn read_object_automatic_decompression_headers() -> Result {
1078        // The API takes the unencoded byte array.
1079        let inner = test_inner_client(test_builder()).await;
1080        let stub = crate::storage::transport::Storage::new(inner.clone());
1081        let builder = ReadObject::new(
1082            stub,
1083            "projects/_/buckets/bucket",
1084            "object",
1085            inner.options.clone(),
1086        )
1087        .with_automatic_decompression(true);
1088        let request = http_request_builder(inner, builder).await?.build()?;
1089
1090        assert_eq!(request.method(), reqwest::Method::GET);
1091        assert_eq!(
1092            request.url().as_str(),
1093            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1094        );
1095
1096        let headers = request.headers();
1097        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1098        Ok(())
1099    }
1100
1101    #[tokio::test]
1102    async fn read_object_encryption_headers() -> Result {
1103        // Make a 32-byte key.
1104        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1105
1106        // The API takes the unencoded byte array.
1107        let inner = test_inner_client(test_builder()).await;
1108        let stub = crate::storage::transport::Storage::new(inner.clone());
1109        let builder = ReadObject::new(
1110            stub,
1111            "projects/_/buckets/bucket",
1112            "object",
1113            inner.options.clone(),
1114        )
1115        .set_key(KeyAes256::new(&key)?);
1116        let request = http_request_builder(inner, builder).await?.build()?;
1117
1118        assert_eq!(request.method(), reqwest::Method::GET);
1119        assert_eq!(
1120            request.url().as_str(),
1121            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1122        );
1123
1124        let want = [
1125            ("x-goog-encryption-algorithm", "AES256".to_string()),
1126            ("x-goog-encryption-key", key_base64),
1127            ("x-goog-encryption-key-sha256", key_sha256_base64),
1128        ];
1129
1130        let headers = request.headers();
1131        for (name, value) in want {
1132            assert_eq!(
1133                headers.get(name).and_then(|h| h.to_str().ok()),
1134                Some(value.as_str())
1135            );
1136        }
1137        Ok(())
1138    }
1139
1140    #[test_case(ReadRange::all(), None; "no headers needed")]
1141    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1142    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1143    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1144    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1145    #[tokio::test]
1146    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1147        let inner = test_inner_client(test_builder()).await;
1148        let stub = crate::storage::transport::Storage::new(inner.clone());
1149        let builder = ReadObject::new(
1150            stub,
1151            "projects/_/buckets/bucket",
1152            "object",
1153            inner.options.clone(),
1154        )
1155        .set_read_range(input.clone());
1156        let request = http_request_builder(inner, builder).await?.build()?;
1157
1158        assert_eq!(request.method(), reqwest::Method::GET);
1159        assert_eq!(
1160            request.url().as_str(),
1161            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1162        );
1163
1164        assert_eq!(request.headers().get("range"), want);
1165        Ok(())
1166    }
1167
1168    #[test_case("projects/p", "projects%2Fp")]
1169    #[test_case("kebab-case", "kebab-case")]
1170    #[test_case("dot.name", "dot.name")]
1171    #[test_case("under_score", "under_score")]
1172    #[test_case("tilde~123", "tilde~123")]
1173    #[test_case("exclamation!point!", "exclamation%21point%21")]
1174    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1175    #[test_case("preserve%percent%21", "preserve%percent%21")]
1176    #[test_case(
1177        "testall !#$&'()*+,/:;=?@[]",
1178        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1179    )]
1180    #[tokio::test]
1181    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1182        let inner = test_inner_client(test_builder()).await;
1183        let stub = crate::storage::transport::Storage::new(inner.clone());
1184        let builder = ReadObject::new(
1185            stub,
1186            "projects/_/buckets/bucket",
1187            name,
1188            inner.options.clone(),
1189        );
1190        let request = http_request_builder(inner, builder).await?.build()?;
1191        let got = request.url().path_segments().unwrap().next_back().unwrap();
1192        assert_eq!(got, want);
1193        Ok(())
1194    }
1195
1196    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1197    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1198    #[test_case("warning", "214 UploadServer gunzipped", true)]
1199    #[test_case("warning", "no match", false)]
1200    #[test_case("unused", "unused", false)]
1201    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1202        let response = http::Response::builder()
1203            .status(200)
1204            .header(name, value)
1205            .body(Vec::new())?;
1206        let response = reqwest::Response::from(response);
1207        let got = Reader::is_gunzipped(&response);
1208        assert_eq!(got, want, "{response:?}");
1209        Ok(())
1210    }
1211}