Skip to main content

google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::http::map_send_error;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> Clone for ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    fn clone(&self) -> Self {
79        Self {
80            stub: self.stub.clone(),
81            request: self.request.clone(),
82            options: self.options.clone(),
83        }
84    }
85}
86
87impl<S> ReadObject<S>
88where
89    S: crate::storage::stub::Storage + 'static,
90{
91    pub(crate) fn new<B, O>(
92        stub: std::sync::Arc<S>,
93        bucket: B,
94        object: O,
95        options: RequestOptions,
96    ) -> Self
97    where
98        B: Into<String>,
99        O: Into<String>,
100    {
101        ReadObject {
102            stub,
103            request: crate::model::ReadObjectRequest::new()
104                .set_bucket(bucket)
105                .set_object(object),
106            options,
107        }
108    }
109
110    /// Enables computation of MD5 hashes.
111    ///
112    /// Crc32c hashes are checked by default.
113    ///
114    /// Checksum validation is supported iff:
115    /// 1. The full content is requested.
116    /// 2. All of the content is returned (status != PartialContent).
117    /// 3. The server sent a checksum header.
118    /// 4. The http stack did not uncompress the file.
119    /// 5. The server did not uncompress data on read.
120    ///
121    /// # Example
122    /// ```
123    /// # use google_cloud_storage::client::Storage;
124    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
125    /// let builder =  client
126    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
127    ///     .compute_md5();
128    /// let mut reader = builder
129    ///     .send()
130    ///     .await?;
131    /// let mut contents = Vec::new();
132    /// while let Some(chunk) = reader.next().await.transpose()? {
133    ///     contents.extend_from_slice(&chunk);
134    /// }
135    /// println!("object contents={:?}", contents);
136    /// # Ok(()) }
137    /// ```
138    pub fn compute_md5(self) -> Self {
139        let mut this = self;
140        this.options.checksum.md5_hash = Some(Md5::default());
141        this
142    }
143
144    /// If present, selects a specific revision of this object (as
145    /// opposed to the latest version, the default).
146    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
147        self.request.generation = v.into();
148        self
149    }
150
151    /// Makes the operation conditional on whether the object's current generation
152    /// matches the given value. Setting to 0 makes the operation succeed only if
153    /// there are no live versions of the object.
154    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
155    where
156        T: Into<i64>,
157    {
158        self.request.if_generation_match = Some(v.into());
159        self
160    }
161
162    /// Makes the operation conditional on whether the object's live generation
163    /// does not match the given value. If no live object exists, the precondition
164    /// fails. Setting to 0 makes the operation succeed only if there is a live
165    /// version of the object.
166    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
167    where
168        T: Into<i64>,
169    {
170        self.request.if_generation_not_match = Some(v.into());
171        self
172    }
173
174    /// Makes the operation conditional on whether the object's current
175    /// metageneration matches the given value.
176    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
177    where
178        T: Into<i64>,
179    {
180        self.request.if_metageneration_match = Some(v.into());
181        self
182    }
183
184    /// Makes the operation conditional on whether the object's current
185    /// metageneration does not match the given value.
186    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
187    where
188        T: Into<i64>,
189    {
190        self.request.if_metageneration_not_match = Some(v.into());
191        self
192    }
193
194    /// The range of bytes to return in the read.
195    ///
196    /// This can be all the bytes starting at a given offset
197    /// (`ReadRange::offset()`), all the bytes in an explicit range
198    /// (`Range::segment`), or the last N bytes of the object
199    /// (`ReadRange::tail`).
200    ///
201    /// # Examples
202    ///
203    /// Read starting at 100 bytes to end of file.
204    /// ```
205    /// # use google_cloud_storage::client::Storage;
206    /// # use google_cloud_storage::model_ext::ReadRange;
207    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
208    /// let response = client
209    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
210    ///     .set_read_range(ReadRange::offset(100))
211    ///     .send()
212    ///     .await?;
213    /// println!("response details={response:?}");
214    /// # Ok(()) }
215    /// ```
216    ///
217    /// Read last 100 bytes of file:
218    /// ```
219    /// # use google_cloud_storage::client::Storage;
220    /// # use google_cloud_storage::model_ext::ReadRange;
221    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
222    /// let response = client
223    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
224    ///     .set_read_range(ReadRange::tail(100))
225    ///     .send()
226    ///     .await?;
227    /// println!("response details={response:?}");
228    /// # Ok(()) }
229    /// ```
230    ///
231    /// Read bytes 1000 to 1099.
232    /// ```
233    /// # use google_cloud_storage::client::Storage;
234    /// # use google_cloud_storage::model_ext::ReadRange;
235    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
236    /// let response = client
237    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
238    ///     .set_read_range(ReadRange::segment(1000, 100))
239    ///     .send()
240    ///     .await?;
241    /// println!("response details={response:?}");
242    /// # Ok(()) }
243    /// ```
244    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
245        self.request.with_range(range);
246        self
247    }
248
249    /// The encryption key used with the Customer-Supplied Encryption Keys
250    /// feature. In raw bytes format (not base64-encoded).
251    ///
252    /// Example:
253    /// ```
254    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
255    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
256    /// let key: &[u8] = &[97; 32];
257    /// let response = client
258    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
259    ///     .set_key(KeyAes256::new(key)?)
260    ///     .send()
261    ///     .await?;
262    /// println!("response details={response:?}");
263    /// # Ok(()) }
264    /// ```
265    pub fn set_key(mut self, v: KeyAes256) -> Self {
266        self.request.common_object_request_params = Some(v.into());
267        self
268    }
269
270    /// The retry policy used for this request.
271    ///
272    /// # Example
273    /// ```
274    /// # use google_cloud_storage::client::Storage;
275    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
276    /// use google_cloud_storage::retry_policy::RetryableErrors;
277    /// use std::time::Duration;
278    /// use gax::retry_policy::RetryPolicyExt;
279    /// let response = client
280    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
281    ///     .with_retry_policy(
282    ///         RetryableErrors
283    ///             .with_attempt_limit(5)
284    ///             .with_time_limit(Duration::from_secs(10)),
285    ///     )
286    ///     .send()
287    ///     .await?;
288    /// println!("response details={response:?}");
289    /// # Ok(()) }
290    /// ```
291    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
292        self.options.retry_policy = v.into().into();
293        self
294    }
295
296    /// The backoff policy used for this request.
297    ///
298    /// # Example
299    /// ```
300    /// # use google_cloud_storage::client::Storage;
301    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
302    /// use std::time::Duration;
303    /// use gax::exponential_backoff::ExponentialBackoff;
304    /// let response = client
305    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
306    ///     .with_backoff_policy(ExponentialBackoff::default())
307    ///     .send()
308    ///     .await?;
309    /// println!("response details={response:?}");
310    /// # Ok(()) }
311    /// ```
312    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
313        mut self,
314        v: V,
315    ) -> Self {
316        self.options.backoff_policy = v.into().into();
317        self
318    }
319
320    /// The retry throttler used for this request.
321    ///
322    /// Most of the time you want to use the same throttler for all the requests
323    /// in a client, and even the same throttler for many clients. Rarely it
324    /// may be necessary to use an custom throttler for some subset of the
325    /// requests.
326    ///
327    /// # Example
328    /// ```
329    /// # use google_cloud_storage::client::Storage;
330    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
331    /// let response = client
332    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
333    ///     .with_retry_throttler(adhoc_throttler())
334    ///     .send()
335    ///     .await?;
336    /// println!("response details={response:?}");
337    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
338    ///     # panic!();
339    /// }
340    /// # Ok(()) }
341    /// ```
342    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
343        mut self,
344        v: V,
345    ) -> Self {
346        self.options.retry_throttler = v.into().into();
347        self
348    }
349
350    /// Configure the resume policy for read requests.
351    ///
352    /// The Cloud Storage client library can automatically resume a read that is
353    /// interrupted by a transient error. Applications may want to limit the
354    /// number of read attempts, or may wish to expand the type of errors
355    /// treated as retryable.
356    ///
357    /// # Example
358    /// ```
359    /// # use google_cloud_storage::client::Storage;
360    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
361    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
362    /// let response = client
363    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
364    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
365    ///     .send()
366    ///     .await?;
367    /// # Ok(()) }
368    /// ```
369    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
370    where
371        V: ReadResumePolicy + 'static,
372    {
373        self.options.set_read_resume_policy(std::sync::Arc::new(v));
374        self
375    }
376
377    /// Enables automatic decompression.
378    ///
379    /// The Cloud Storage service [automatically decompresses] objects
380    /// with `content_encoding == "gzip"` during reads. The client library
381    /// disables this behavior by default, as it is not possible to
382    /// perform ranged reads or to resume interrupted downloads if automatic
383    /// decompression is enabled.
384    ///
385    /// Use this option to enable automatic decompression.
386    ///
387    /// # Example
388    /// ```
389    /// # use google_cloud_storage::client::Storage;
390    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
391    /// let response = client
392    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
393    ///     .with_automatic_decompression(true)
394    ///     .send()
395    ///     .await?;
396    /// println!("response details={response:?}");
397    /// # Ok(()) }
398    /// ```
399    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
400        self.options.automatic_decompression = v;
401        self
402    }
403
404    /// Sends the request.
405    pub async fn send(self) -> Result<ReadObjectResponse> {
406        self.stub.read_object(self.request, self.options).await
407    }
408}
409
410// A convenience struct that saves the request conditions and performs the read.
411#[derive(Clone, Debug)]
412pub(crate) struct Reader {
413    pub inner: std::sync::Arc<StorageInner>,
414    pub request: crate::model::ReadObjectRequest,
415    pub options: RequestOptions,
416}
417
418impl Reader {
419    async fn read(self) -> Result<reqwest::Response> {
420        let throttler = self.options.retry_throttler.clone();
421        let retry = self.options.retry_policy.clone();
422        let backoff = self.options.backoff_policy.clone();
423
424        gax::retry_loop_internal::retry_loop(
425            async move |_| self.read_attempt().await,
426            async |duration| tokio::time::sleep(duration).await,
427            true,
428            throttler,
429            retry,
430            backoff,
431        )
432        .await
433    }
434
435    async fn read_attempt(&self) -> Result<reqwest::Response> {
436        let builder = self.http_request_builder().await?;
437        let response = builder.send().await.map_err(map_send_error)?;
438        if !response.status().is_success() {
439            return gaxi::http::to_http_error(response).await;
440        }
441        Ok(response)
442    }
443
444    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
445        // Collect the required bucket and object parameters.
446        let bucket = &self.request.bucket;
447        let bucket_id = bucket
448            .as_str()
449            .strip_prefix("projects/_/buckets/")
450            .ok_or_else(|| {
451                Error::binding(format!(
452                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
453                ))
454            })?;
455        let object = &self.request.object;
456
457        // Build the request.
458        let builder = self
459            .inner
460            .client
461            .builder(
462                reqwest::Method::GET,
463                format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
464            )
465            .query(&[("alt", "media")])
466            .header(
467                "x-goog-api-client",
468                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
469            );
470
471        let builder = if self.options.automatic_decompression {
472            builder
473        } else {
474            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
475            //
476            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
477            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
478            // with a different `contentEncoding` value.
479            builder.header(
480                "accept-encoding",
481                reqwest::header::HeaderValue::from_static("gzip"),
482            )
483        };
484
485        // Add the optional query parameters.
486        let builder = if self.request.generation != 0 {
487            builder.query(&[("generation", self.request.generation)])
488        } else {
489            builder
490        };
491        let builder = self
492            .request
493            .if_generation_match
494            .iter()
495            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
496        let builder = self
497            .request
498            .if_generation_not_match
499            .iter()
500            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
501        let builder = self
502            .request
503            .if_metageneration_match
504            .iter()
505            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
506        let builder = self
507            .request
508            .if_metageneration_not_match
509            .iter()
510            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
511
512        let builder = apply_customer_supplied_encryption_headers(
513            builder,
514            &self.request.common_object_request_params,
515        );
516
517        // Apply "range" header for read limits and offsets.
518        let builder = match (self.request.read_offset, self.request.read_limit) {
519            // read_limit can't be negative.
520            (_, l) if l < 0 => {
521                unreachable!("ReadObject build never sets a negative read_limit value")
522            }
523            // negative offset can't also have a read_limit.
524            (o, l) if o < 0 && l > 0 => unreachable!(
525                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
526            ),
527            // If both are zero, we use default implementation (no range header).
528            (0, 0) => builder,
529            // negative offset with no limit means the last N bytes.
530            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
531            // read_limit is zero, means no limit. Read from offset to end of file.
532            // This handles cases like (5, 0) -> "bytes=5-"
533            (o, 0) => builder.header("range", format!("bytes={o}-")),
534            // General case: non-negative offset and positive limit.
535            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
536            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
537        };
538
539        self.inner.apply_auth_headers(builder).await
540    }
541
542    fn is_gunzipped(response: &reqwest::Response) -> bool {
543        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
544        // objects. Reading such objects comes with a number of restrictions:
545        // - Ranged reads do not work.
546        // - The size of the decompressed data is not known.
547        // - Checksums do not work because the object checksums correspond to the
548        //   compressed data and the client library receives the decompressed data.
549        //
550        // Because ranged reads do not work, resuming a read does not work. Consequently,
551        // the implementation of `ReadObjectResponse` is substantially different for
552        // objects that are gunzipped.
553        //
554        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
555        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
556        use http::header::WARNING;
557        if response
558            .headers()
559            .get(TRANSFORMATION)
560            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
561        {
562            return true;
563        }
564        response
565            .headers()
566            .get(WARNING)
567            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
568    }
569
570    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
571        let response = self.clone().read().await?;
572        if Self::is_gunzipped(&response) {
573            return Ok(ReadObjectResponse::new(Box::new(
574                non_resumable::NonResumableResponse::new(response)?,
575            )));
576        }
577        Ok(ReadObjectResponse::new(Box::new(
578            resumable::ResumableResponse::new(self, response)?,
579        )))
580    }
581}
582
583#[cfg(test)]
584mod resume_tests;
585
586#[cfg(test)]
587mod tests {
588    use super::client::tests::{test_builder, test_inner_client};
589    use super::*;
590    use crate::error::{ChecksumMismatch, ReadError};
591    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
592    use base64::Engine;
593    use futures::TryStreamExt;
594    use google_cloud_auth::credentials::{
595        anonymous::Builder as Anonymous, testing::error_credentials,
596    };
597    use httptest::{Expectation, Server, matchers::*, responders::status_code};
598    use std::collections::HashMap;
599    use std::error::Error;
600    use std::sync::Arc;
601    use test_case::test_case;
602
603    type Result = anyhow::Result<()>;
604
605    async fn http_request_builder(
606        inner: Arc<StorageInner>,
607        builder: ReadObject,
608    ) -> crate::Result<reqwest::RequestBuilder> {
609        let reader = Reader {
610            inner,
611            request: builder.request,
612            options: builder.options,
613        };
614        reader.http_request_builder().await
615    }
616
617    #[tokio::test]
618    async fn test_clone() {
619        let inner = test_inner_client(test_builder()).await;
620        let stub = crate::storage::transport::Storage::new(inner.clone());
621        let options = {
622            let mut o = RequestOptions::new();
623            o.set_resumable_upload_threshold(12345_usize);
624            o
625        };
626        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
627
628        let clone = builder.clone();
629        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
630        assert_eq!(clone.request, builder.request);
631        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
632    }
633
634    // Verify `read_object()` meets normal Send, Sync, requirements.
635    #[tokio::test]
636    async fn test_read_is_send_and_static() -> Result {
637        let client = Storage::builder()
638            .with_credentials(Anonymous::new().build())
639            .build()
640            .await?;
641
642        fn need_send<T: Send>(_val: &T) {}
643        fn need_sync<T: Sync>(_val: &T) {}
644        fn need_static<T: 'static>(_val: &T) {}
645
646        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
647        need_send(&read);
648        need_sync(&read);
649        need_static(&read);
650
651        let read = client
652            .read_object("projects/_/buckets/test-bucket", "test-object")
653            .send();
654        need_send(&read);
655        need_static(&read);
656
657        Ok(())
658    }
659
660    #[tokio::test]
661    async fn read_object_normal() -> Result {
662        let server = Server::run();
663        server.expect(
664            Expectation::matching(all_of![
665                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
666                request::headers(contains(("accept-encoding", "gzip"))),
667                request::query(url_decoded(contains(("alt", "media")))),
668            ])
669            .respond_with(
670                status_code(200)
671                    .body("hello world")
672                    .append_header("x-goog-generation", 123456),
673            ),
674        );
675
676        let client = Storage::builder()
677            .with_endpoint(format!("http://{}", server.addr()))
678            .with_credentials(Anonymous::new().build())
679            .build()
680            .await?;
681        let mut reader = client
682            .read_object("projects/_/buckets/test-bucket", "test-object")
683            .send()
684            .await?;
685        let mut got = Vec::new();
686        while let Some(b) = reader.next().await.transpose()? {
687            got.extend_from_slice(&b);
688        }
689        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
690
691        Ok(())
692    }
693
694    #[tokio::test]
695    async fn read_object_stream() -> Result {
696        let server = Server::run();
697        server.expect(
698            Expectation::matching(all_of![
699                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
700                request::query(url_decoded(contains(("alt", "media")))),
701            ])
702            .respond_with(
703                status_code(200)
704                    .append_header("x-goog-generation", 123456)
705                    .body("hello world"),
706            ),
707        );
708
709        let client = Storage::builder()
710            .with_endpoint(format!("http://{}", server.addr()))
711            .with_credentials(Anonymous::new().build())
712            .build()
713            .await?;
714        let response = client
715            .read_object("projects/_/buckets/test-bucket", "test-object")
716            .send()
717            .await?;
718        let result: Vec<_> = response.into_stream().try_collect().await?;
719        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
720
721        Ok(())
722    }
723
724    #[tokio::test]
725    async fn read_object_next_then_consume_response() -> Result {
726        // Create a large enough file that will require multiple chunks to read.
727        const BLOCK_SIZE: usize = 500;
728        let mut contents = Vec::new();
729        for i in 0..50 {
730            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
731        }
732
733        // Calculate and serialize the crc32c checksum
734        let u = crc32c::crc32c(&contents);
735        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
736
737        let server = Server::run();
738        server.expect(
739            Expectation::matching(all_of![
740                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
741                request::query(url_decoded(contains(("alt", "media")))),
742            ])
743            .times(1)
744            .respond_with(
745                status_code(200)
746                    .body(contents.clone())
747                    .append_header("x-goog-hash", format!("crc32c={value}"))
748                    .append_header("x-goog-generation", 123456),
749            ),
750        );
751
752        let client = Storage::builder()
753            .with_endpoint(format!("http://{}", server.addr()))
754            .with_credentials(Anonymous::new().build())
755            .build()
756            .await?;
757
758        // Read some bytes, then remainder with stream.
759        let mut response = client
760            .read_object("projects/_/buckets/test-bucket", "test-object")
761            .send()
762            .await?;
763
764        let mut all_bytes = bytes::BytesMut::new();
765        let chunk = response.next().await.transpose()?.unwrap();
766        assert!(!chunk.is_empty());
767        all_bytes.extend(chunk);
768        use futures::StreamExt;
769        let mut stream = response.into_stream();
770        while let Some(chunk) = stream.next().await.transpose()? {
771            all_bytes.extend(chunk);
772        }
773        assert_eq!(all_bytes, contents);
774
775        Ok(())
776    }
777
778    #[tokio::test]
779    async fn read_object_not_found() -> Result {
780        let server = Server::run();
781        server.expect(
782            Expectation::matching(all_of![
783                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
784                request::query(url_decoded(contains(("alt", "media")))),
785            ])
786            .respond_with(status_code(404).body("NOT FOUND")),
787        );
788
789        let client = Storage::builder()
790            .with_endpoint(format!("http://{}", server.addr()))
791            .with_credentials(Anonymous::new().build())
792            .build()
793            .await?;
794        let err = client
795            .read_object("projects/_/buckets/test-bucket", "test-object")
796            .send()
797            .await
798            .expect_err("expected a not found error");
799        assert_eq!(err.http_status_code(), Some(404));
800
801        Ok(())
802    }
803
804    #[tokio::test]
805    async fn read_object_incorrect_crc32c_check() -> Result {
806        // Calculate and serialize the crc32c checksum
807        let u = crc32c::crc32c("goodbye world".as_bytes());
808        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
809
810        let server = Server::run();
811        server.expect(
812            Expectation::matching(all_of![
813                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
814                request::query(url_decoded(contains(("alt", "media")))),
815            ])
816            .times(3)
817            .respond_with(
818                status_code(200)
819                    .body("hello world")
820                    .append_header("x-goog-hash", format!("crc32c={value}"))
821                    .append_header("x-goog-generation", 123456),
822            ),
823        );
824
825        let client = Storage::builder()
826            .with_endpoint(format!("http://{}", server.addr()))
827            .with_credentials(Anonymous::new().build())
828            .build()
829            .await?;
830        let mut response = client
831            .read_object("projects/_/buckets/test-bucket", "test-object")
832            .send()
833            .await?;
834        let mut partial = Vec::new();
835        let mut err = None;
836        while let Some(r) = response.next().await {
837            match r {
838                Ok(b) => partial.extend_from_slice(&b),
839                Err(e) => err = Some(e),
840            };
841        }
842        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
843        let err = err.expect("expect error on incorrect crc32c");
844        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
845        assert!(
846            matches!(
847                source,
848                Some(&ReadError::ChecksumMismatch(
849                    ChecksumMismatch::Crc32c { .. }
850                ))
851            ),
852            "err={err:?}"
853        );
854
855        let mut response = client
856            .read_object("projects/_/buckets/test-bucket", "test-object")
857            .send()
858            .await?;
859        let err: crate::Error = async {
860            {
861                while (response.next().await.transpose()?).is_some() {}
862                Ok(())
863            }
864        }
865        .await
866        .expect_err("expect error on incorrect crc32c");
867        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
868        assert!(
869            matches!(
870                source,
871                Some(&ReadError::ChecksumMismatch(
872                    ChecksumMismatch::Crc32c { .. }
873                ))
874            ),
875            "err={err:?}"
876        );
877
878        use futures::TryStreamExt;
879        let err = client
880            .read_object("projects/_/buckets/test-bucket", "test-object")
881            .send()
882            .await?
883            .into_stream()
884            .try_collect::<Vec<bytes::Bytes>>()
885            .await
886            .expect_err("expect error on incorrect crc32c");
887        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
888        assert!(
889            matches!(
890                source,
891                Some(&ReadError::ChecksumMismatch(
892                    ChecksumMismatch::Crc32c { .. }
893                ))
894            ),
895            "err={err:?}"
896        );
897        Ok(())
898    }
899
900    #[tokio::test]
901    async fn read_object_incorrect_md5_check() -> Result {
902        // Calculate and serialize the md5 checksum
903        let digest = md5::compute("goodbye world".as_bytes());
904        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
905
906        let server = Server::run();
907        server.expect(
908            Expectation::matching(all_of![
909                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
910                request::query(url_decoded(contains(("alt", "media")))),
911            ])
912            .times(1)
913            .respond_with(
914                status_code(200)
915                    .body("hello world")
916                    .append_header("x-goog-hash", format!("md5={value}"))
917                    .append_header("x-goog-generation", 123456),
918            ),
919        );
920
921        let client = Storage::builder()
922            .with_endpoint(format!("http://{}", server.addr()))
923            .with_credentials(Anonymous::new().build())
924            .build()
925            .await?;
926        let mut response = client
927            .read_object("projects/_/buckets/test-bucket", "test-object")
928            .compute_md5()
929            .send()
930            .await?;
931        let mut partial = Vec::new();
932        let mut err = None;
933        while let Some(r) = response.next().await {
934            match r {
935                Ok(b) => partial.extend_from_slice(&b),
936                Err(e) => err = Some(e),
937            };
938        }
939        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
940        let err = err.expect("expect error on incorrect md5");
941        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
942        assert!(
943            matches!(
944                source,
945                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
946            ),
947            "err={err:?}"
948        );
949
950        Ok(())
951    }
952
953    #[tokio::test]
954    async fn read_object() -> Result {
955        let inner = test_inner_client(test_builder()).await;
956        let stub = crate::storage::transport::Storage::new(inner.clone());
957        let builder = ReadObject::new(
958            stub,
959            "projects/_/buckets/bucket",
960            "object",
961            inner.options.clone(),
962        );
963        let request = http_request_builder(inner, builder).await?.build()?;
964
965        assert_eq!(request.method(), reqwest::Method::GET);
966        assert_eq!(
967            request.url().as_str(),
968            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
969        );
970        Ok(())
971    }
972
973    #[tokio::test]
974    async fn read_object_error_credentials() -> Result {
975        let inner =
976            test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
977        let stub = crate::storage::transport::Storage::new(inner.clone());
978        let builder = ReadObject::new(
979            stub,
980            "projects/_/buckets/bucket",
981            "object",
982            inner.options.clone(),
983        );
984        let _ = http_request_builder(inner, builder)
985            .await
986            .inspect_err(|e| assert!(e.is_authentication()))
987            .expect_err("invalid credentials should err");
988        Ok(())
989    }
990
991    #[tokio::test]
992    async fn read_object_bad_bucket() -> Result {
993        let inner = test_inner_client(test_builder()).await;
994        let stub = crate::storage::transport::Storage::new(inner.clone());
995        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
996        let _ = http_request_builder(inner, builder)
997            .await
998            .expect_err("malformed bucket string should error");
999        Ok(())
1000    }
1001
1002    #[tokio::test]
1003    async fn read_object_query_params() -> Result {
1004        let inner = test_inner_client(test_builder()).await;
1005        let stub = crate::storage::transport::Storage::new(inner.clone());
1006        let builder = ReadObject::new(
1007            stub,
1008            "projects/_/buckets/bucket",
1009            "object",
1010            inner.options.clone(),
1011        )
1012        .set_generation(5)
1013        .set_if_generation_match(10)
1014        .set_if_generation_not_match(20)
1015        .set_if_metageneration_match(30)
1016        .set_if_metageneration_not_match(40);
1017        let request = http_request_builder(inner, builder).await?.build()?;
1018
1019        assert_eq!(request.method(), reqwest::Method::GET);
1020        let want_pairs: HashMap<String, String> = [
1021            ("alt", "media"),
1022            ("generation", "5"),
1023            ("ifGenerationMatch", "10"),
1024            ("ifGenerationNotMatch", "20"),
1025            ("ifMetagenerationMatch", "30"),
1026            ("ifMetagenerationNotMatch", "40"),
1027        ]
1028        .iter()
1029        .map(|(k, v)| (k.to_string(), v.to_string()))
1030        .collect();
1031        let query_pairs: HashMap<String, String> = request
1032            .url()
1033            .query_pairs()
1034            .map(|param| (param.0.to_string(), param.1.to_string()))
1035            .collect();
1036        assert_eq!(query_pairs.len(), want_pairs.len());
1037        assert_eq!(query_pairs, want_pairs);
1038        Ok(())
1039    }
1040
1041    #[tokio::test]
1042    async fn read_object_default_headers() -> Result {
1043        // The API takes the unencoded byte array.
1044        let inner = test_inner_client(test_builder()).await;
1045        let stub = crate::storage::transport::Storage::new(inner.clone());
1046        let builder = ReadObject::new(
1047            stub,
1048            "projects/_/buckets/bucket",
1049            "object",
1050            inner.options.clone(),
1051        );
1052        let request = http_request_builder(inner, builder).await?.build()?;
1053
1054        assert_eq!(request.method(), reqwest::Method::GET);
1055        assert_eq!(
1056            request.url().as_str(),
1057            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1058        );
1059
1060        let want = [("accept-encoding", "gzip")];
1061        let headers = request.headers();
1062        for (name, value) in want {
1063            assert_eq!(
1064                headers.get(name).and_then(|h| h.to_str().ok()),
1065                Some(value),
1066                "{request:?}"
1067            );
1068        }
1069        Ok(())
1070    }
1071
1072    #[tokio::test]
1073    async fn read_object_automatic_decompression_headers() -> Result {
1074        // The API takes the unencoded byte array.
1075        let inner = test_inner_client(test_builder()).await;
1076        let stub = crate::storage::transport::Storage::new(inner.clone());
1077        let builder = ReadObject::new(
1078            stub,
1079            "projects/_/buckets/bucket",
1080            "object",
1081            inner.options.clone(),
1082        )
1083        .with_automatic_decompression(true);
1084        let request = http_request_builder(inner, builder).await?.build()?;
1085
1086        assert_eq!(request.method(), reqwest::Method::GET);
1087        assert_eq!(
1088            request.url().as_str(),
1089            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1090        );
1091
1092        let headers = request.headers();
1093        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1094        Ok(())
1095    }
1096
1097    #[tokio::test]
1098    async fn read_object_encryption_headers() -> Result {
1099        // Make a 32-byte key.
1100        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1101
1102        // The API takes the unencoded byte array.
1103        let inner = test_inner_client(test_builder()).await;
1104        let stub = crate::storage::transport::Storage::new(inner.clone());
1105        let builder = ReadObject::new(
1106            stub,
1107            "projects/_/buckets/bucket",
1108            "object",
1109            inner.options.clone(),
1110        )
1111        .set_key(KeyAes256::new(&key)?);
1112        let request = http_request_builder(inner, builder).await?.build()?;
1113
1114        assert_eq!(request.method(), reqwest::Method::GET);
1115        assert_eq!(
1116            request.url().as_str(),
1117            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1118        );
1119
1120        let want = [
1121            ("x-goog-encryption-algorithm", "AES256".to_string()),
1122            ("x-goog-encryption-key", key_base64),
1123            ("x-goog-encryption-key-sha256", key_sha256_base64),
1124        ];
1125
1126        let headers = request.headers();
1127        for (name, value) in want {
1128            assert_eq!(
1129                headers.get(name).and_then(|h| h.to_str().ok()),
1130                Some(value.as_str())
1131            );
1132        }
1133        Ok(())
1134    }
1135
1136    #[test_case(ReadRange::all(), None; "no headers needed")]
1137    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1138    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1139    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1140    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1141    #[tokio::test]
1142    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1143        let inner = test_inner_client(test_builder()).await;
1144        let stub = crate::storage::transport::Storage::new(inner.clone());
1145        let builder = ReadObject::new(
1146            stub,
1147            "projects/_/buckets/bucket",
1148            "object",
1149            inner.options.clone(),
1150        )
1151        .set_read_range(input.clone());
1152        let request = http_request_builder(inner, builder).await?.build()?;
1153
1154        assert_eq!(request.method(), reqwest::Method::GET);
1155        assert_eq!(
1156            request.url().as_str(),
1157            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1158        );
1159
1160        assert_eq!(request.headers().get("range"), want);
1161        Ok(())
1162    }
1163
1164    #[test_case("projects/p", "projects%2Fp")]
1165    #[test_case("kebab-case", "kebab-case")]
1166    #[test_case("dot.name", "dot.name")]
1167    #[test_case("under_score", "under_score")]
1168    #[test_case("tilde~123", "tilde~123")]
1169    #[test_case("exclamation!point!", "exclamation%21point%21")]
1170    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1171    #[test_case("preserve%percent%21", "preserve%percent%21")]
1172    #[test_case(
1173        "testall !#$&'()*+,/:;=?@[]",
1174        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1175    )]
1176    #[tokio::test]
1177    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1178        let inner = test_inner_client(test_builder()).await;
1179        let stub = crate::storage::transport::Storage::new(inner.clone());
1180        let builder = ReadObject::new(
1181            stub,
1182            "projects/_/buckets/bucket",
1183            name,
1184            inner.options.clone(),
1185        );
1186        let request = http_request_builder(inner, builder).await?.build()?;
1187        let got = request.url().path_segments().unwrap().next_back().unwrap();
1188        assert_eq!(got, want);
1189        Ok(())
1190    }
1191
1192    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1193    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1194    #[test_case("warning", "214 UploadServer gunzipped", true)]
1195    #[test_case("warning", "no match", false)]
1196    #[test_case("unused", "unused", false)]
1197    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1198        let response = http::Response::builder()
1199            .status(200)
1200            .header(name, value)
1201            .body(Vec::new())?;
1202        let response = reqwest::Response::from(response);
1203        let got = Reader::is_gunzipped(&response);
1204        assert_eq!(got, want, "{response:?}");
1205        Ok(())
1206    }
1207}