Skip to main content

google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::attempt_info::AttemptInfo;
27use gaxi::http::HttpRequestBuilder;
28use gaxi::http::reqwest::{HeaderValue, Method, Response};
29use google_cloud_gax::options::internal::{PathTemplate, RequestOptionsExt, ResourceName};
30
31/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
32///
33/// # Example: accumulate the contents of an object into a vector
34/// ```
35/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
36/// async fn sample(client: &Storage) -> anyhow::Result<()> {
37///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
38///     let mut reader = builder.send().await?;
39///     let mut contents = Vec::new();
40///     while let Some(chunk) = reader.next().await.transpose()? {
41///         contents.extend_from_slice(&chunk);
42///     }
43///     println!("object contents={:?}", contents);
44///     Ok(())
45/// }
46/// ```
47///
48/// # Example: read part of an object
49/// ```
50/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
51/// use google_cloud_storage::model_ext::ReadRange;
52/// async fn sample(client: &Storage) -> anyhow::Result<()> {
53///     const MIB: u64 = 1024 * 1024;
54///     let mut contents = Vec::new();
55///     let mut reader = client
56///         .read_object("projects/_/buckets/my-bucket", "my-object")
57///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
58///         .send()
59///         .await?;
60///     while let Some(chunk) = reader.next().await.transpose()? {
61///         contents.extend_from_slice(&chunk);
62///     }
63///     println!("range contents={:?}", contents);
64///     Ok(())
65/// }
66/// ```
67#[derive(Debug)]
68pub struct ReadObject<S = crate::storage::transport::Storage>
69where
70    S: crate::storage::stub::Storage + 'static,
71{
72    stub: std::sync::Arc<S>,
73    request: crate::model::ReadObjectRequest,
74    options: RequestOptions,
75}
76
77impl<S> Clone for ReadObject<S>
78where
79    S: crate::storage::stub::Storage + 'static,
80{
81    fn clone(&self) -> Self {
82        Self {
83            stub: self.stub.clone(),
84            request: self.request.clone(),
85            options: self.options.clone(),
86        }
87    }
88}
89
90impl<S> ReadObject<S>
91where
92    S: crate::storage::stub::Storage + 'static,
93{
94    pub(crate) fn new<B, O>(
95        stub: std::sync::Arc<S>,
96        bucket: B,
97        object: O,
98        options: RequestOptions,
99    ) -> Self
100    where
101        B: Into<String>,
102        O: Into<String>,
103    {
104        ReadObject {
105            stub,
106            request: crate::model::ReadObjectRequest::new()
107                .set_bucket(bucket)
108                .set_object(object),
109            options,
110        }
111    }
112
113    /// Enables computation of MD5 hashes.
114    ///
115    /// Crc32c hashes are checked by default.
116    ///
117    /// Checksum validation is supported iff:
118    /// 1. The full content is requested.
119    /// 2. All of the content is returned (status != PartialContent).
120    /// 3. The server sent a checksum header.
121    /// 4. The http stack did not uncompress the file.
122    /// 5. The server did not uncompress data on read.
123    ///
124    /// # Example
125    /// ```
126    /// # use google_cloud_storage::client::Storage;
127    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
128    /// let builder =  client
129    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
130    ///     .compute_md5();
131    /// let mut reader = builder
132    ///     .send()
133    ///     .await?;
134    /// let mut contents = Vec::new();
135    /// while let Some(chunk) = reader.next().await.transpose()? {
136    ///     contents.extend_from_slice(&chunk);
137    /// }
138    /// println!("object contents={:?}", contents);
139    /// # Ok(()) }
140    /// ```
141    pub fn compute_md5(self) -> Self {
142        let mut this = self;
143        this.options.checksum.md5_hash = Some(Md5::default());
144        this
145    }
146
147    /// If present, selects a specific revision of this object (as
148    /// opposed to the latest version, the default).
149    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
150        self.request.generation = v.into();
151        self
152    }
153
154    /// Makes the operation conditional on whether the object's current generation
155    /// matches the given value. Setting to 0 makes the operation succeed only if
156    /// there are no live versions of the object.
157    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
158    where
159        T: Into<i64>,
160    {
161        self.request.if_generation_match = Some(v.into());
162        self
163    }
164
165    /// Makes the operation conditional on whether the object's live generation
166    /// does not match the given value. If no live object exists, the precondition
167    /// fails. Setting to 0 makes the operation succeed only if there is a live
168    /// version of the object.
169    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
170    where
171        T: Into<i64>,
172    {
173        self.request.if_generation_not_match = Some(v.into());
174        self
175    }
176
177    /// Makes the operation conditional on whether the object's current
178    /// metageneration matches the given value.
179    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
180    where
181        T: Into<i64>,
182    {
183        self.request.if_metageneration_match = Some(v.into());
184        self
185    }
186
187    /// Makes the operation conditional on whether the object's current
188    /// metageneration does not match the given value.
189    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
190    where
191        T: Into<i64>,
192    {
193        self.request.if_metageneration_not_match = Some(v.into());
194        self
195    }
196
197    /// The range of bytes to return in the read.
198    ///
199    /// This can be all the bytes starting at a given offset
200    /// (`ReadRange::offset()`), all the bytes in an explicit range
201    /// (`Range::segment`), or the last N bytes of the object
202    /// (`ReadRange::tail`).
203    ///
204    /// # Examples
205    ///
206    /// Read starting at 100 bytes to end of file.
207    /// ```
208    /// # use google_cloud_storage::client::Storage;
209    /// # use google_cloud_storage::model_ext::ReadRange;
210    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
211    /// let response = client
212    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
213    ///     .set_read_range(ReadRange::offset(100))
214    ///     .send()
215    ///     .await?;
216    /// println!("response details={response:?}");
217    /// # Ok(()) }
218    /// ```
219    ///
220    /// Read last 100 bytes of file:
221    /// ```
222    /// # use google_cloud_storage::client::Storage;
223    /// # use google_cloud_storage::model_ext::ReadRange;
224    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
225    /// let response = client
226    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
227    ///     .set_read_range(ReadRange::tail(100))
228    ///     .send()
229    ///     .await?;
230    /// println!("response details={response:?}");
231    /// # Ok(()) }
232    /// ```
233    ///
234    /// Read bytes 1000 to 1099.
235    /// ```
236    /// # use google_cloud_storage::client::Storage;
237    /// # use google_cloud_storage::model_ext::ReadRange;
238    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
239    /// let response = client
240    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
241    ///     .set_read_range(ReadRange::segment(1000, 100))
242    ///     .send()
243    ///     .await?;
244    /// println!("response details={response:?}");
245    /// # Ok(()) }
246    /// ```
247    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
248        self.request.with_range(range);
249        self
250    }
251
252    /// The encryption key used with the Customer-Supplied Encryption Keys
253    /// feature. In raw bytes format (not base64-encoded).
254    ///
255    /// Example:
256    /// ```
257    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
258    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
259    /// let key: &[u8] = &[97; 32];
260    /// let response = client
261    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
262    ///     .set_key(KeyAes256::new(key)?)
263    ///     .send()
264    ///     .await?;
265    /// println!("response details={response:?}");
266    /// # Ok(()) }
267    /// ```
268    pub fn set_key(mut self, v: KeyAes256) -> Self {
269        self.request.common_object_request_params = Some(v.into());
270        self
271    }
272
273    /// The retry policy used for this request.
274    ///
275    /// # Example
276    /// ```
277    /// # use google_cloud_storage::client::Storage;
278    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
279    /// use google_cloud_storage::retry_policy::RetryableErrors;
280    /// use std::time::Duration;
281    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
282    /// let response = client
283    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
284    ///     .with_retry_policy(
285    ///         RetryableErrors
286    ///             .with_attempt_limit(5)
287    ///             .with_time_limit(Duration::from_secs(10)),
288    ///     )
289    ///     .send()
290    ///     .await?;
291    /// println!("response details={response:?}");
292    /// # Ok(()) }
293    /// ```
294    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
295        mut self,
296        v: V,
297    ) -> Self {
298        self.options.retry_policy = v.into().into();
299        self
300    }
301
302    /// The backoff policy used for this request.
303    ///
304    /// # Example
305    /// ```
306    /// # use google_cloud_storage::client::Storage;
307    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
308    /// use std::time::Duration;
309    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
310    /// let response = client
311    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
312    ///     .with_backoff_policy(ExponentialBackoff::default())
313    ///     .send()
314    ///     .await?;
315    /// println!("response details={response:?}");
316    /// # Ok(()) }
317    /// ```
318    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
319        mut self,
320        v: V,
321    ) -> Self {
322        self.options.backoff_policy = v.into().into();
323        self
324    }
325
326    /// The retry throttler used for this request.
327    ///
328    /// Most of the time you want to use the same throttler for all the requests
329    /// in a client, and even the same throttler for many clients. Rarely it
330    /// may be necessary to use an custom throttler for some subset of the
331    /// requests.
332    ///
333    /// # Example
334    /// ```
335    /// # use google_cloud_storage::client::Storage;
336    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
337    /// let response = client
338    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
339    ///     .with_retry_throttler(adhoc_throttler())
340    ///     .send()
341    ///     .await?;
342    /// println!("response details={response:?}");
343    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
344    ///     # panic!();
345    /// }
346    /// # Ok(()) }
347    /// ```
348    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
349        mut self,
350        v: V,
351    ) -> Self {
352        self.options.retry_throttler = v.into().into();
353        self
354    }
355
356    /// Configure the resume policy for read requests.
357    ///
358    /// The Cloud Storage client library can automatically resume a read that is
359    /// interrupted by a transient error. Applications may want to limit the
360    /// number of read attempts, or may wish to expand the type of errors
361    /// treated as retryable.
362    ///
363    /// # Example
364    /// ```
365    /// # use google_cloud_storage::client::Storage;
366    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
367    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
368    /// let response = client
369    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
370    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
371    ///     .send()
372    ///     .await?;
373    /// # Ok(()) }
374    /// ```
375    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
376    where
377        V: ReadResumePolicy + 'static,
378    {
379        self.options.set_read_resume_policy(std::sync::Arc::new(v));
380        self
381    }
382
383    /// Enables automatic decompression.
384    ///
385    /// The Cloud Storage service [automatically decompresses] objects
386    /// with `content_encoding == "gzip"` during reads. The client library
387    /// disables this behavior by default, as it is not possible to
388    /// perform ranged reads or to resume interrupted downloads if automatic
389    /// decompression is enabled.
390    ///
391    /// Use this option to enable automatic decompression.
392    ///
393    /// # Example
394    /// ```
395    /// # use google_cloud_storage::client::Storage;
396    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
397    /// let response = client
398    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
399    ///     .with_automatic_decompression(true)
400    ///     .send()
401    ///     .await?;
402    /// println!("response details={response:?}");
403    /// # Ok(()) }
404    /// ```
405    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
406        self.options.automatic_decompression = v;
407        self
408    }
409
410    /// Sets the `User-Agent` header for this request.
411    ///
412    /// # Example
413    /// ```
414    /// # use google_cloud_storage::client::Storage;
415    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
416    /// let response = client
417    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
418    ///     .with_user_agent("my-app/1.0.0")
419    ///     .send()
420    ///     .await?;
421    /// println!("response details={response:?}");
422    /// # Ok(()) }
423    /// ```
424    pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
425        self.options.user_agent = Some(user_agent.into());
426        self
427    }
428
429    /// Sends the request.
430    pub async fn send(self) -> Result<ReadObjectResponse> {
431        self.stub.read_object(self.request, self.options).await
432    }
433}
434
435// A convenience struct that saves the request conditions and performs the read.
436#[derive(Clone, Debug)]
437pub(crate) struct Reader {
438    pub inner: std::sync::Arc<StorageInner>,
439    pub request: crate::model::ReadObjectRequest,
440    pub options: RequestOptions,
441}
442
443impl Reader {
444    async fn read(self) -> Result<Response> {
445        let throttler = self.options.retry_throttler.clone();
446        let retry = self.options.retry_policy.clone();
447        let backoff = self.options.backoff_policy.clone();
448        let mut count = 0;
449        let inner = async move |_| {
450            let current = count;
451            count += 1;
452            self.read_attempt(current).await
453        };
454
455        google_cloud_gax::retry_loop_internal::retry_loop(
456            inner,
457            async |duration| tokio::time::sleep(duration).await,
458            true,
459            throttler,
460            retry,
461            backoff,
462        )
463        .await
464    }
465
466    async fn read_attempt(&self, attempt_count: u32) -> Result<Response> {
467        let builder = self.http_request_builder().await?;
468        let options = self
469            .options
470            .gax()
471            .insert_extension(PathTemplate("/storage/v1/b/{bucket}/o/{object}"))
472            .insert_extension(ResourceName(format!(
473                "//storage.googleapis.com/{}",
474                self.request.bucket
475            )));
476        let response = builder
477            .send(options, AttemptInfo::new(attempt_count))
478            .await?;
479        if !response.status().is_success() {
480            return gaxi::http::to_http_error(response).await;
481        }
482        Ok(response)
483    }
484
485    async fn http_request_builder(&self) -> Result<HttpRequestBuilder> {
486        // Collect the required bucket and object parameters.
487        let bucket = &self.request.bucket;
488        let bucket_id = bucket
489            .as_str()
490            .strip_prefix("projects/_/buckets/")
491            .ok_or_else(|| {
492                Error::binding(format!(
493                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
494                ))
495            })?;
496        let object = &self.request.object;
497
498        // Build the request.
499        let builder = self
500            .inner
501            .client
502            .http_builder(
503                Method::GET,
504                &format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
505            )
506            .query("alt", "media")
507            .header(
508                "x-goog-api-client",
509                HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
510            );
511
512        let builder = if self.options.automatic_decompression {
513            builder
514        } else {
515            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
516            //
517            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
518            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
519            // with a different `contentEncoding` value.
520            builder.header("accept-encoding", HeaderValue::from_static("gzip"))
521        };
522
523        // Add the optional query parameters.
524        let builder = if self.request.generation != 0 {
525            builder.query("generation", self.request.generation)
526        } else {
527            builder
528        };
529        let builder = self
530            .request
531            .if_generation_match
532            .iter()
533            .fold(builder, |b, v| b.query("ifGenerationMatch", v));
534        let builder = self
535            .request
536            .if_generation_not_match
537            .iter()
538            .fold(builder, |b, v| b.query("ifGenerationNotMatch", v));
539        let builder = self
540            .request
541            .if_metageneration_match
542            .iter()
543            .fold(builder, |b, v| b.query("ifMetagenerationMatch", v));
544        let builder = self
545            .request
546            .if_metageneration_not_match
547            .iter()
548            .fold(builder, |b, v| b.query("ifMetagenerationNotMatch", v));
549
550        let builder = apply_customer_supplied_encryption_headers(
551            builder,
552            &self.request.common_object_request_params,
553        );
554
555        // Apply "range" header for read limits and offsets.
556        let builder = match (self.request.read_offset, self.request.read_limit) {
557            // read_limit can't be negative.
558            (_, l) if l < 0 => {
559                unreachable!("ReadObject build never sets a negative read_limit value")
560            }
561            // negative offset can't also have a read_limit.
562            (o, l) if o < 0 && l > 0 => unreachable!(
563                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
564            ),
565            // If both are zero, we use default implementation (no range header).
566            (0, 0) => builder,
567            // negative offset with no limit means the last N bytes.
568            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
569            // read_limit is zero, means no limit. Read from offset to end of file.
570            // This handles cases like (5, 0) -> "bytes=5-"
571            (o, 0) => builder.header("range", format!("bytes={o}-")),
572            // General case: non-negative offset and positive limit.
573            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
574            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
575        };
576
577        Ok(builder)
578    }
579
580    fn is_gunzipped(response: &Response) -> bool {
581        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
582        // objects. Reading such objects comes with a number of restrictions:
583        // - Ranged reads do not work.
584        // - The size of the decompressed data is not known.
585        // - Checksums do not work because the object checksums correspond to the
586        //   compressed data and the client library receives the decompressed data.
587        //
588        // Because ranged reads do not work, resuming a read does not work. Consequently,
589        // the implementation of `ReadObjectResponse` is substantially different for
590        // objects that are gunzipped.
591        //
592        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
593        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
594        use http::header::WARNING;
595        if response
596            .headers()
597            .get(TRANSFORMATION)
598            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
599        {
600            return true;
601        }
602        response
603            .headers()
604            .get(WARNING)
605            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
606    }
607
608    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
609        let response = self.clone().read().await?;
610        if Self::is_gunzipped(&response) {
611            return Ok(ReadObjectResponse::new(Box::new(
612                non_resumable::NonResumableResponse::new(response)?,
613            )));
614        }
615        Ok(ReadObjectResponse::new(Box::new(
616            resumable::ResumableResponse::new(self, response)?,
617        )))
618    }
619}
620
621#[cfg(test)]
622mod resume_tests;
623
624#[cfg(test)]
625mod tests {
626    use super::client::tests::{test_builder, test_inner_client};
627    use super::*;
628    use crate::error::{ChecksumMismatch, ReadError};
629    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
630    use base64::Engine;
631    use futures::TryStreamExt;
632    use google_cloud_auth::credentials::{
633        anonymous::Builder as Anonymous, testing::error_credentials,
634    };
635    use httptest::{Expectation, Server, matchers::*, responders::status_code};
636    use std::collections::HashMap;
637    use std::error::Error;
638    use std::sync::Arc;
639    use test_case::test_case;
640
641    type Result = anyhow::Result<()>;
642
643    async fn http_request_builder(
644        inner: Arc<StorageInner>,
645        builder: ReadObject,
646    ) -> crate::Result<HttpRequestBuilder> {
647        let reader = Reader {
648            inner,
649            request: builder.request,
650            options: builder.options,
651        };
652        reader.http_request_builder().await
653    }
654
655    #[tokio::test]
656    async fn test_clone() {
657        let inner = test_inner_client(test_builder()).await;
658        let stub = crate::storage::transport::Storage::new_test(inner.clone());
659        let options = {
660            let mut o = RequestOptions::new();
661            o.set_resumable_upload_threshold(12345_usize);
662            o
663        };
664        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
665
666        let clone = builder.clone();
667        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
668        assert_eq!(clone.request, builder.request);
669        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
670    }
671
672    // Verify `read_object()` meets normal Send, Sync, requirements.
673    #[tokio::test]
674    async fn test_read_is_send_and_static() -> Result {
675        let client = Storage::builder()
676            .with_credentials(Anonymous::new().build())
677            .build()
678            .await?;
679
680        fn need_send<T: Send>(_val: &T) {}
681        fn need_sync<T: Sync>(_val: &T) {}
682        fn need_static<T: 'static>(_val: &T) {}
683
684        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
685        need_send(&read);
686        need_sync(&read);
687        need_static(&read);
688
689        let read = client
690            .read_object("projects/_/buckets/test-bucket", "test-object")
691            .send();
692        need_send(&read);
693        need_static(&read);
694
695        Ok(())
696    }
697
698    #[tokio::test]
699    async fn read_object_normal() -> Result {
700        let server = Server::run();
701        server.expect(
702            Expectation::matching(all_of![
703                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
704                request::headers(contains(("accept-encoding", "gzip"))),
705                request::query(url_decoded(contains(("alt", "media")))),
706            ])
707            .respond_with(
708                status_code(200)
709                    .body("hello world")
710                    .append_header("x-goog-generation", 123456),
711            ),
712        );
713
714        let client = Storage::builder()
715            .with_endpoint(format!("http://{}", server.addr()))
716            .with_credentials(Anonymous::new().build())
717            .build()
718            .await?;
719        let mut reader = client
720            .read_object("projects/_/buckets/test-bucket", "test-object")
721            .send()
722            .await?;
723        let mut got = Vec::new();
724        while let Some(b) = reader.next().await.transpose()? {
725            got.extend_from_slice(&b);
726        }
727        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
728
729        Ok(())
730    }
731
732    #[tokio::test]
733    async fn read_object_stream() -> Result {
734        let server = Server::run();
735        server.expect(
736            Expectation::matching(all_of![
737                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
738                request::query(url_decoded(contains(("alt", "media")))),
739            ])
740            .respond_with(
741                status_code(200)
742                    .append_header("x-goog-generation", 123456)
743                    .body("hello world"),
744            ),
745        );
746
747        let client = Storage::builder()
748            .with_endpoint(format!("http://{}", server.addr()))
749            .with_credentials(Anonymous::new().build())
750            .build()
751            .await?;
752        let response = client
753            .read_object("projects/_/buckets/test-bucket", "test-object")
754            .send()
755            .await?;
756        let result: Vec<_> = response.into_stream().try_collect().await?;
757        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
758
759        Ok(())
760    }
761
762    #[tokio::test]
763    async fn read_object_next_then_consume_response() -> Result {
764        // Create a large enough file that will require multiple chunks to read.
765        const BLOCK_SIZE: usize = 500;
766        let mut contents = Vec::new();
767        for i in 0..50 {
768            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
769        }
770
771        // Calculate and serialize the crc32c checksum
772        let u = crc32c::crc32c(&contents);
773        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
774
775        let server = Server::run();
776        server.expect(
777            Expectation::matching(all_of![
778                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
779                request::query(url_decoded(contains(("alt", "media")))),
780            ])
781            .times(1)
782            .respond_with(
783                status_code(200)
784                    .body(contents.clone())
785                    .append_header("x-goog-hash", format!("crc32c={value}"))
786                    .append_header("x-goog-generation", 123456),
787            ),
788        );
789
790        let client = Storage::builder()
791            .with_endpoint(format!("http://{}", server.addr()))
792            .with_credentials(Anonymous::new().build())
793            .build()
794            .await?;
795
796        // Read some bytes, then remainder with stream.
797        let mut response = client
798            .read_object("projects/_/buckets/test-bucket", "test-object")
799            .send()
800            .await?;
801
802        let mut all_bytes = bytes::BytesMut::new();
803        let chunk = response.next().await.transpose()?.unwrap();
804        assert!(!chunk.is_empty());
805        all_bytes.extend(chunk);
806        use futures::StreamExt;
807        let mut stream = response.into_stream();
808        while let Some(chunk) = stream.next().await.transpose()? {
809            all_bytes.extend(chunk);
810        }
811        assert_eq!(all_bytes, contents);
812
813        Ok(())
814    }
815
816    #[tokio::test]
817    async fn read_object_not_found() -> Result {
818        let server = Server::run();
819        server.expect(
820            Expectation::matching(all_of![
821                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
822                request::query(url_decoded(contains(("alt", "media")))),
823            ])
824            .respond_with(status_code(404).body("NOT FOUND")),
825        );
826
827        let client = Storage::builder()
828            .with_endpoint(format!("http://{}", server.addr()))
829            .with_credentials(Anonymous::new().build())
830            .build()
831            .await?;
832        let err = client
833            .read_object("projects/_/buckets/test-bucket", "test-object")
834            .send()
835            .await
836            .expect_err("expected a not found error");
837        assert_eq!(err.http_status_code(), Some(404));
838
839        Ok(())
840    }
841
842    #[tokio::test]
843    async fn read_object_incorrect_crc32c_check() -> Result {
844        // Calculate and serialize the crc32c checksum
845        let u = crc32c::crc32c("goodbye world".as_bytes());
846        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
847
848        let server = Server::run();
849        server.expect(
850            Expectation::matching(all_of![
851                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
852                request::query(url_decoded(contains(("alt", "media")))),
853            ])
854            .times(3)
855            .respond_with(
856                status_code(200)
857                    .body("hello world")
858                    .append_header("x-goog-hash", format!("crc32c={value}"))
859                    .append_header("x-goog-generation", 123456),
860            ),
861        );
862
863        let client = Storage::builder()
864            .with_endpoint(format!("http://{}", server.addr()))
865            .with_credentials(Anonymous::new().build())
866            .build()
867            .await?;
868        let mut response = client
869            .read_object("projects/_/buckets/test-bucket", "test-object")
870            .send()
871            .await?;
872        let mut partial = Vec::new();
873        let mut err = None;
874        while let Some(r) = response.next().await {
875            match r {
876                Ok(b) => partial.extend_from_slice(&b),
877                Err(e) => err = Some(e),
878            };
879        }
880        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
881        let err = err.expect("expect error on incorrect crc32c");
882        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
883        assert!(
884            matches!(
885                source,
886                Some(&ReadError::ChecksumMismatch(
887                    ChecksumMismatch::Crc32c { .. }
888                ))
889            ),
890            "err={err:?}"
891        );
892
893        let mut response = client
894            .read_object("projects/_/buckets/test-bucket", "test-object")
895            .send()
896            .await?;
897        let err: crate::Error = async {
898            {
899                while (response.next().await.transpose()?).is_some() {}
900                Ok(())
901            }
902        }
903        .await
904        .expect_err("expect error on incorrect crc32c");
905        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
906        assert!(
907            matches!(
908                source,
909                Some(&ReadError::ChecksumMismatch(
910                    ChecksumMismatch::Crc32c { .. }
911                ))
912            ),
913            "err={err:?}"
914        );
915
916        use futures::TryStreamExt;
917        let err = client
918            .read_object("projects/_/buckets/test-bucket", "test-object")
919            .send()
920            .await?
921            .into_stream()
922            .try_collect::<Vec<bytes::Bytes>>()
923            .await
924            .expect_err("expect error on incorrect crc32c");
925        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
926        assert!(
927            matches!(
928                source,
929                Some(&ReadError::ChecksumMismatch(
930                    ChecksumMismatch::Crc32c { .. }
931                ))
932            ),
933            "err={err:?}"
934        );
935        Ok(())
936    }
937
938    #[tokio::test]
939    async fn read_object_incorrect_md5_check() -> Result {
940        // Calculate and serialize the md5 checksum
941        let digest = md5::compute("goodbye world".as_bytes());
942        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
943
944        let server = Server::run();
945        server.expect(
946            Expectation::matching(all_of![
947                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
948                request::query(url_decoded(contains(("alt", "media")))),
949            ])
950            .times(1)
951            .respond_with(
952                status_code(200)
953                    .body("hello world")
954                    .append_header("x-goog-hash", format!("md5={value}"))
955                    .append_header("x-goog-generation", 123456),
956            ),
957        );
958
959        let client = Storage::builder()
960            .with_endpoint(format!("http://{}", server.addr()))
961            .with_credentials(Anonymous::new().build())
962            .build()
963            .await?;
964        let mut response = client
965            .read_object("projects/_/buckets/test-bucket", "test-object")
966            .compute_md5()
967            .send()
968            .await?;
969        let mut partial = Vec::new();
970        let mut err = None;
971        while let Some(r) = response.next().await {
972            match r {
973                Ok(b) => partial.extend_from_slice(&b),
974                Err(e) => err = Some(e),
975            };
976        }
977        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
978        let err = err.expect("expect error on incorrect md5");
979        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
980        assert!(
981            matches!(
982                source,
983                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
984            ),
985            "err={err:?}"
986        );
987
988        Ok(())
989    }
990
991    #[tokio::test]
992    async fn read_object_with_user_agent() -> Result {
993        use http::header::USER_AGENT;
994
995        let user_agent = "quick_fox_lazy_dog/1.2.3";
996        let server = Server::run();
997        server.expect(
998            Expectation::matching(all_of![
999                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1000                request::headers(contains(("accept-encoding", "gzip"))),
1001                request::headers(contains((USER_AGENT.as_str(), user_agent))),
1002                request::query(url_decoded(contains(("alt", "media")))),
1003            ])
1004            .respond_with(
1005                status_code(200)
1006                    .body("hello world")
1007                    .append_header("x-goog-generation", 123456),
1008            ),
1009        );
1010
1011        let client = Storage::builder()
1012            .with_endpoint(format!("http://{}", server.addr()))
1013            .with_credentials(Anonymous::new().build())
1014            .build()
1015            .await?;
1016        let mut reader = client
1017            .read_object("projects/_/buckets/test-bucket", "test-object")
1018            .with_user_agent(user_agent)
1019            .send()
1020            .await?;
1021        let mut got = Vec::new();
1022        while let Some(b) = reader.next().await.transpose()? {
1023            got.extend_from_slice(&b);
1024        }
1025        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
1026
1027        Ok(())
1028    }
1029
1030    #[tokio::test]
1031    async fn read_object() -> Result {
1032        let inner = test_inner_client(test_builder()).await;
1033        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1034        let builder = ReadObject::new(
1035            stub,
1036            "projects/_/buckets/bucket",
1037            "object",
1038            inner.options.clone(),
1039        );
1040        let request = http_request_builder(inner, builder)
1041            .await?
1042            .build_for_tests()
1043            .await?;
1044
1045        assert_eq!(request.method(), Method::GET);
1046        assert_eq!(
1047            request.url().as_str(),
1048            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1049        );
1050        Ok(())
1051    }
1052
1053    #[tokio::test]
1054    async fn read_object_error_credentials() -> Result {
1055        let inner =
1056            test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
1057        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1058        let builder = ReadObject::new(
1059            stub,
1060            "projects/_/buckets/bucket",
1061            "object",
1062            inner.options.clone(),
1063        );
1064        let _ = http_request_builder(inner, builder)
1065            .await?
1066            .build_for_tests()
1067            .await
1068            .inspect_err(|e| assert!(e.is_authentication()))
1069            .expect_err("invalid credentials should err");
1070        Ok(())
1071    }
1072
1073    #[tokio::test]
1074    async fn read_object_bad_bucket() -> Result {
1075        let inner = test_inner_client(test_builder()).await;
1076        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1077        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1078        let _ = http_request_builder(inner, builder)
1079            .await
1080            .expect_err("malformed bucket string should error");
1081        Ok(())
1082    }
1083
1084    #[tokio::test]
1085    async fn read_object_query_params() -> Result {
1086        let inner = test_inner_client(test_builder()).await;
1087        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1088        let builder = ReadObject::new(
1089            stub,
1090            "projects/_/buckets/bucket",
1091            "object",
1092            inner.options.clone(),
1093        )
1094        .set_generation(5)
1095        .set_if_generation_match(10)
1096        .set_if_generation_not_match(20)
1097        .set_if_metageneration_match(30)
1098        .set_if_metageneration_not_match(40);
1099        let request = http_request_builder(inner, builder)
1100            .await?
1101            .build_for_tests()
1102            .await?;
1103
1104        assert_eq!(request.method(), Method::GET);
1105        let want_pairs: HashMap<String, String> = [
1106            ("alt", "media"),
1107            ("generation", "5"),
1108            ("ifGenerationMatch", "10"),
1109            ("ifGenerationNotMatch", "20"),
1110            ("ifMetagenerationMatch", "30"),
1111            ("ifMetagenerationNotMatch", "40"),
1112        ]
1113        .iter()
1114        .map(|(k, v)| (k.to_string(), v.to_string()))
1115        .collect();
1116        let query_pairs: HashMap<String, String> = request
1117            .url()
1118            .query_pairs()
1119            .map(|param| (param.0.to_string(), param.1.to_string()))
1120            .collect();
1121        assert_eq!(query_pairs.len(), want_pairs.len());
1122        assert_eq!(query_pairs, want_pairs);
1123        Ok(())
1124    }
1125
1126    #[tokio::test]
1127    async fn read_object_default_headers() -> Result {
1128        // The API takes the unencoded byte array.
1129        let inner = test_inner_client(test_builder()).await;
1130        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1131        let builder = ReadObject::new(
1132            stub,
1133            "projects/_/buckets/bucket",
1134            "object",
1135            inner.options.clone(),
1136        );
1137        let request = http_request_builder(inner, builder)
1138            .await?
1139            .build_for_tests()
1140            .await?;
1141
1142        assert_eq!(request.method(), Method::GET);
1143        assert_eq!(
1144            request.url().as_str(),
1145            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1146        );
1147
1148        let want = [("accept-encoding", "gzip")];
1149        let headers = request.headers();
1150        for (name, value) in want {
1151            assert_eq!(
1152                headers.get(name).and_then(|h| h.to_str().ok()),
1153                Some(value),
1154                "{request:?}"
1155            );
1156        }
1157        Ok(())
1158    }
1159
1160    #[tokio::test]
1161    async fn read_object_automatic_decompression_headers() -> Result {
1162        // The API takes the unencoded byte array.
1163        let inner = test_inner_client(test_builder()).await;
1164        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1165        let builder = ReadObject::new(
1166            stub,
1167            "projects/_/buckets/bucket",
1168            "object",
1169            inner.options.clone(),
1170        )
1171        .with_automatic_decompression(true);
1172        let request = http_request_builder(inner, builder)
1173            .await?
1174            .build_for_tests()
1175            .await?;
1176
1177        assert_eq!(request.method(), Method::GET);
1178        assert_eq!(
1179            request.url().as_str(),
1180            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1181        );
1182
1183        let headers = request.headers();
1184        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1185        Ok(())
1186    }
1187
1188    #[tokio::test]
1189    async fn read_object_encryption_headers() -> Result {
1190        // Make a 32-byte key.
1191        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1192
1193        // The API takes the unencoded byte array.
1194        let inner = test_inner_client(test_builder()).await;
1195        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1196        let builder = ReadObject::new(
1197            stub,
1198            "projects/_/buckets/bucket",
1199            "object",
1200            inner.options.clone(),
1201        )
1202        .set_key(KeyAes256::new(&key)?);
1203        let request = http_request_builder(inner, builder)
1204            .await?
1205            .build_for_tests()
1206            .await?;
1207
1208        assert_eq!(request.method(), Method::GET);
1209        assert_eq!(
1210            request.url().as_str(),
1211            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1212        );
1213
1214        let want = [
1215            ("x-goog-encryption-algorithm", "AES256".to_string()),
1216            ("x-goog-encryption-key", key_base64),
1217            ("x-goog-encryption-key-sha256", key_sha256_base64),
1218        ];
1219
1220        let headers = request.headers();
1221        for (name, value) in want {
1222            assert_eq!(
1223                headers.get(name).and_then(|h| h.to_str().ok()),
1224                Some(value.as_str())
1225            );
1226        }
1227        Ok(())
1228    }
1229
1230    #[test_case(ReadRange::all(), None; "no headers needed")]
1231    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1232    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1233    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1234    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1235    #[tokio::test]
1236    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1237        let inner = test_inner_client(test_builder()).await;
1238        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1239        let builder = ReadObject::new(
1240            stub,
1241            "projects/_/buckets/bucket",
1242            "object",
1243            inner.options.clone(),
1244        )
1245        .set_read_range(input.clone());
1246        let request = http_request_builder(inner, builder)
1247            .await?
1248            .build_for_tests()
1249            .await?;
1250
1251        assert_eq!(request.method(), Method::GET);
1252        assert_eq!(
1253            request.url().as_str(),
1254            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1255        );
1256
1257        assert_eq!(request.headers().get("range"), want);
1258        Ok(())
1259    }
1260
1261    #[test_case("projects/p", "projects%2Fp")]
1262    #[test_case("kebab-case", "kebab-case")]
1263    #[test_case("dot.name", "dot.name")]
1264    #[test_case("under_score", "under_score")]
1265    #[test_case("tilde~123", "tilde~123")]
1266    #[test_case("exclamation!point!", "exclamation%21point%21")]
1267    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1268    #[test_case("preserve%percent%21", "preserve%percent%21")]
1269    #[test_case(
1270        "testall !#$&'()*+,/:;=?@[]",
1271        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1272    )]
1273    #[tokio::test]
1274    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1275        let inner = test_inner_client(test_builder()).await;
1276        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1277        let builder = ReadObject::new(
1278            stub,
1279            "projects/_/buckets/bucket",
1280            name,
1281            inner.options.clone(),
1282        );
1283        let request = http_request_builder(inner, builder)
1284            .await?
1285            .build_for_tests()
1286            .await?;
1287        let got = request.url().path_segments().unwrap().next_back().unwrap();
1288        assert_eq!(got, want);
1289        Ok(())
1290    }
1291
1292    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1293    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1294    #[test_case("warning", "214 UploadServer gunzipped", true)]
1295    #[test_case("warning", "no match", false)]
1296    #[test_case("unused", "unused", false)]
1297    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1298        let response = http::Response::builder()
1299            .status(200)
1300            .header(name, value)
1301            .body(Vec::new())?;
1302        let response = Response::from(response);
1303        let got = Reader::is_gunzipped(&response);
1304        assert_eq!(got, want, "{response:?}");
1305        Ok(())
1306    }
1307}