Skip to main content

google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::attempt_info::AttemptInfo;
27use gaxi::http::HttpRequestBuilder;
28use gaxi::http::reqwest::{HeaderValue, Method, Response};
29use google_cloud_gax::options::internal::{PathTemplate, RequestOptionsExt, ResourceName};
30
31/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
32///
33/// # Example: accumulate the contents of an object into a vector
34/// ```
35/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
36/// async fn sample(client: &Storage) -> anyhow::Result<()> {
37///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
38///     let mut reader = builder.send().await?;
39///     let mut contents = Vec::new();
40///     while let Some(chunk) = reader.next().await.transpose()? {
41///         contents.extend_from_slice(&chunk);
42///     }
43///     println!("object contents={:?}", contents);
44///     Ok(())
45/// }
46/// ```
47///
48/// # Example: read part of an object
49/// ```
50/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
51/// use google_cloud_storage::model_ext::ReadRange;
52/// async fn sample(client: &Storage) -> anyhow::Result<()> {
53///     const MIB: u64 = 1024 * 1024;
54///     let mut contents = Vec::new();
55///     let mut reader = client
56///         .read_object("projects/_/buckets/my-bucket", "my-object")
57///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
58///         .send()
59///         .await?;
60///     while let Some(chunk) = reader.next().await.transpose()? {
61///         contents.extend_from_slice(&chunk);
62///     }
63///     println!("range contents={:?}", contents);
64///     Ok(())
65/// }
66/// ```
67#[derive(Debug)]
68pub struct ReadObject<S = crate::storage::transport::Storage>
69where
70    S: crate::storage::stub::Storage + 'static,
71{
72    stub: std::sync::Arc<S>,
73    request: crate::model::ReadObjectRequest,
74    options: RequestOptions,
75}
76
77impl<S> Clone for ReadObject<S>
78where
79    S: crate::storage::stub::Storage + 'static,
80{
81    fn clone(&self) -> Self {
82        Self {
83            stub: self.stub.clone(),
84            request: self.request.clone(),
85            options: self.options.clone(),
86        }
87    }
88}
89
90impl<S> ReadObject<S>
91where
92    S: crate::storage::stub::Storage + 'static,
93{
94    pub(crate) fn new<B, O>(
95        stub: std::sync::Arc<S>,
96        bucket: B,
97        object: O,
98        options: RequestOptions,
99    ) -> Self
100    where
101        B: Into<String>,
102        O: Into<String>,
103    {
104        ReadObject {
105            stub,
106            request: crate::model::ReadObjectRequest::new()
107                .set_bucket(bucket)
108                .set_object(object),
109            options,
110        }
111    }
112
113    /// Enables computation of MD5 hashes.
114    ///
115    /// Crc32c hashes are checked by default.
116    ///
117    /// Checksum validation is supported iff:
118    /// 1. The full content is requested.
119    /// 2. All of the content is returned (status != PartialContent).
120    /// 3. The server sent a checksum header.
121    /// 4. The http stack did not uncompress the file.
122    /// 5. The server did not uncompress data on read.
123    ///
124    /// # Example
125    /// ```
126    /// # use google_cloud_storage::client::Storage;
127    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
128    /// let builder =  client
129    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
130    ///     .compute_md5();
131    /// let mut reader = builder
132    ///     .send()
133    ///     .await?;
134    /// let mut contents = Vec::new();
135    /// while let Some(chunk) = reader.next().await.transpose()? {
136    ///     contents.extend_from_slice(&chunk);
137    /// }
138    /// println!("object contents={:?}", contents);
139    /// # Ok(()) }
140    /// ```
141    pub fn compute_md5(self) -> Self {
142        let mut this = self;
143        this.options.checksum.md5_hash = Some(Md5::default());
144        this
145    }
146
147    /// Enables computation of CRC32C checksums.
148    ///
149    /// Note that the library computes and verifies (if available) CRC32C checksums at the end of
150    /// the download. Use `compute_crc32c(false)` to disable the computation, but note
151    /// that this reduces the data integrity guarantees. Data *can* be corrupted even when
152    /// downloaded over HTTPS or other encrypted channels.
153    ///
154    /// # Example
155    /// ```
156    /// # use google_cloud_storage::client::Storage;
157    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
158    /// let builder =  client
159    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
160    ///     .compute_crc32c(false);
161    /// let mut reader = builder
162    ///     .send()
163    ///     .await?;
164    /// let mut contents = Vec::new();
165    /// while let Some(chunk) = reader.next().await.transpose()? {
166    ///     contents.extend_from_slice(&chunk);
167    /// }
168    /// println!("object contents={:?}", contents);
169    /// # Ok(()) }
170    /// ```
171    pub fn compute_crc32c(mut self, enable: bool) -> Self {
172        if enable {
173            self.options
174                .checksum
175                .crc32c
176                .get_or_insert_with(Default::default);
177        } else {
178            self.options.checksum.crc32c = None;
179        }
180        self
181    }
182
183    /// If present, selects a specific revision of this object (as
184    /// opposed to the latest version, the default).
185    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
186        self.request.generation = v.into();
187        self
188    }
189
190    /// Makes the operation conditional on whether the object's current generation
191    /// matches the given value. Setting to 0 makes the operation succeed only if
192    /// there are no live versions of the object.
193    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
194    where
195        T: Into<i64>,
196    {
197        self.request.if_generation_match = Some(v.into());
198        self
199    }
200
201    /// Makes the operation conditional on whether the object's live generation
202    /// does not match the given value. If no live object exists, the precondition
203    /// fails. Setting to 0 makes the operation succeed only if there is a live
204    /// version of the object.
205    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
206    where
207        T: Into<i64>,
208    {
209        self.request.if_generation_not_match = Some(v.into());
210        self
211    }
212
213    /// Makes the operation conditional on whether the object's current
214    /// metageneration matches the given value.
215    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
216    where
217        T: Into<i64>,
218    {
219        self.request.if_metageneration_match = Some(v.into());
220        self
221    }
222
223    /// Makes the operation conditional on whether the object's current
224    /// metageneration does not match the given value.
225    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
226    where
227        T: Into<i64>,
228    {
229        self.request.if_metageneration_not_match = Some(v.into());
230        self
231    }
232
233    /// The range of bytes to return in the read.
234    ///
235    /// This can be all the bytes starting at a given offset
236    /// (`ReadRange::offset()`), all the bytes in an explicit range
237    /// (`Range::segment`), or the last N bytes of the object
238    /// (`ReadRange::tail`).
239    ///
240    /// # Examples
241    ///
242    /// Read starting at 100 bytes to end of file.
243    /// ```
244    /// # use google_cloud_storage::client::Storage;
245    /// # use google_cloud_storage::model_ext::ReadRange;
246    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
247    /// let response = client
248    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
249    ///     .set_read_range(ReadRange::offset(100))
250    ///     .send()
251    ///     .await?;
252    /// println!("response details={response:?}");
253    /// # Ok(()) }
254    /// ```
255    ///
256    /// Read last 100 bytes of file:
257    /// ```
258    /// # use google_cloud_storage::client::Storage;
259    /// # use google_cloud_storage::model_ext::ReadRange;
260    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
261    /// let response = client
262    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
263    ///     .set_read_range(ReadRange::tail(100))
264    ///     .send()
265    ///     .await?;
266    /// println!("response details={response:?}");
267    /// # Ok(()) }
268    /// ```
269    ///
270    /// Read bytes 1000 to 1099.
271    /// ```
272    /// # use google_cloud_storage::client::Storage;
273    /// # use google_cloud_storage::model_ext::ReadRange;
274    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
275    /// let response = client
276    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
277    ///     .set_read_range(ReadRange::segment(1000, 100))
278    ///     .send()
279    ///     .await?;
280    /// println!("response details={response:?}");
281    /// # Ok(()) }
282    /// ```
283    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
284        self.request.with_range(range);
285        self
286    }
287
288    /// The encryption key used with the Customer-Supplied Encryption Keys
289    /// feature. In raw bytes format (not base64-encoded).
290    ///
291    /// Example:
292    /// ```
293    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
294    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
295    /// let key: &[u8] = &[97; 32];
296    /// let response = client
297    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
298    ///     .set_key(KeyAes256::new(key)?)
299    ///     .send()
300    ///     .await?;
301    /// println!("response details={response:?}");
302    /// # Ok(()) }
303    /// ```
304    pub fn set_key(mut self, v: KeyAes256) -> Self {
305        self.request.common_object_request_params = Some(v.into());
306        self
307    }
308
309    /// The retry policy used for this request.
310    ///
311    /// # Example
312    /// ```
313    /// # use google_cloud_storage::client::Storage;
314    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
315    /// use google_cloud_storage::retry_policy::RetryableErrors;
316    /// use std::time::Duration;
317    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
318    /// let response = client
319    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
320    ///     .with_retry_policy(
321    ///         RetryableErrors
322    ///             .with_attempt_limit(5)
323    ///             .with_time_limit(Duration::from_secs(10)),
324    ///     )
325    ///     .send()
326    ///     .await?;
327    /// println!("response details={response:?}");
328    /// # Ok(()) }
329    /// ```
330    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
331        mut self,
332        v: V,
333    ) -> Self {
334        self.options.retry_policy = v.into().into();
335        self
336    }
337
338    /// The backoff policy used for this request.
339    ///
340    /// # Example
341    /// ```
342    /// # use google_cloud_storage::client::Storage;
343    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
344    /// use std::time::Duration;
345    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
346    /// let response = client
347    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
348    ///     .with_backoff_policy(ExponentialBackoff::default())
349    ///     .send()
350    ///     .await?;
351    /// println!("response details={response:?}");
352    /// # Ok(()) }
353    /// ```
354    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
355        mut self,
356        v: V,
357    ) -> Self {
358        self.options.backoff_policy = v.into().into();
359        self
360    }
361
362    /// The retry throttler used for this request.
363    ///
364    /// Most of the time you want to use the same throttler for all the requests
365    /// in a client, and even the same throttler for many clients. Rarely it
366    /// may be necessary to use an custom throttler for some subset of the
367    /// requests.
368    ///
369    /// # Example
370    /// ```
371    /// # use google_cloud_storage::client::Storage;
372    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
373    /// let response = client
374    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
375    ///     .with_retry_throttler(adhoc_throttler())
376    ///     .send()
377    ///     .await?;
378    /// println!("response details={response:?}");
379    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
380    ///     # panic!();
381    /// }
382    /// # Ok(()) }
383    /// ```
384    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
385        mut self,
386        v: V,
387    ) -> Self {
388        self.options.retry_throttler = v.into().into();
389        self
390    }
391
392    /// Configure the resume policy for read requests.
393    ///
394    /// The Cloud Storage client library can automatically resume a read that is
395    /// interrupted by a transient error. Applications may want to limit the
396    /// number of read attempts, or may wish to expand the type of errors
397    /// treated as retryable.
398    ///
399    /// # Example
400    /// ```
401    /// # use google_cloud_storage::client::Storage;
402    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
403    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
404    /// let response = client
405    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
406    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
407    ///     .send()
408    ///     .await?;
409    /// # Ok(()) }
410    /// ```
411    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
412    where
413        V: ReadResumePolicy + 'static,
414    {
415        self.options.set_read_resume_policy(std::sync::Arc::new(v));
416        self
417    }
418
419    /// Enables automatic decompression.
420    ///
421    /// The Cloud Storage service [automatically decompresses] objects
422    /// with `content_encoding == "gzip"` during reads. The client library
423    /// disables this behavior by default, as it is not possible to
424    /// perform ranged reads or to resume interrupted downloads if automatic
425    /// decompression is enabled.
426    ///
427    /// Use this option to enable automatic decompression.
428    ///
429    /// # Example
430    /// ```
431    /// # use google_cloud_storage::client::Storage;
432    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
433    /// let response = client
434    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
435    ///     .with_automatic_decompression(true)
436    ///     .send()
437    ///     .await?;
438    /// println!("response details={response:?}");
439    /// # Ok(()) }
440    /// ```
441    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
442        self.options.automatic_decompression = v;
443        self
444    }
445
446    /// Sets the `User-Agent` header for this request.
447    ///
448    /// # Example
449    /// ```
450    /// # use google_cloud_storage::client::Storage;
451    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
452    /// let response = client
453    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
454    ///     .with_user_agent("my-app/1.0.0")
455    ///     .send()
456    ///     .await?;
457    /// println!("response details={response:?}");
458    /// # Ok(()) }
459    /// ```
460    pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
461        self.options.user_agent = Some(user_agent.into());
462        self
463    }
464
465    /// Sets the project that will be billed for this request.
466    ///
467    /// Required for [Requester Pays] buckets. The value overrides any
468    /// `quota_project_id` configured on the credentials; the credential-level
469    /// header is suppressed for this RPC.
470    ///
471    /// # Example
472    /// ```
473    /// # use google_cloud_storage::client::Storage;
474    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
475    /// let response = client
476    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
477    ///     .with_quota_project("my-billing-project")
478    ///     .send()
479    ///     .await?;
480    /// # Ok(()) }
481    /// ```
482    ///
483    /// [Requester Pays]: https://cloud.google.com/storage/docs/requester-pays
484    pub fn with_quota_project(mut self, project: impl Into<String>) -> Self {
485        self.options.set_quota_project(project);
486        self
487    }
488
489    /// Sends the request.
490    pub async fn send(self) -> Result<ReadObjectResponse> {
491        self.stub.read_object(self.request, self.options).await
492    }
493}
494
495// A convenience struct that saves the request conditions and performs the read.
496#[derive(Clone, Debug)]
497pub(crate) struct Reader {
498    pub inner: std::sync::Arc<StorageInner>,
499    pub request: crate::model::ReadObjectRequest,
500    pub options: RequestOptions,
501}
502
503impl Reader {
504    async fn read(self) -> Result<Response> {
505        let throttler = self.options.retry_throttler.clone();
506        let retry = self.options.retry_policy.clone();
507        let backoff = self.options.backoff_policy.clone();
508        let mut count = 0;
509        let inner = async move |_| {
510            let current = count;
511            count += 1;
512            self.read_attempt(current).await
513        };
514
515        google_cloud_gax::retry_loop_internal::retry_loop(
516            inner,
517            async |duration| tokio::time::sleep(duration).await,
518            true,
519            throttler,
520            retry,
521            backoff,
522        )
523        .await
524    }
525
526    async fn read_attempt(&self, attempt_count: u32) -> Result<Response> {
527        let builder = self.http_request_builder().await?;
528        let options = self
529            .options
530            .gax()
531            .insert_extension(PathTemplate("/storage/v1/b/{bucket}/o/{object}"))
532            .insert_extension(ResourceName(format!(
533                "//storage.googleapis.com/{}",
534                self.request.bucket
535            )));
536        let response = builder
537            .send(options, AttemptInfo::new(attempt_count))
538            .await?;
539        if !response.status().is_success() {
540            return gaxi::http::to_http_error(response).await;
541        }
542        Ok(response)
543    }
544
545    async fn http_request_builder(&self) -> Result<HttpRequestBuilder> {
546        // Collect the required bucket and object parameters.
547        let bucket = &self.request.bucket;
548        let bucket_id = bucket
549            .as_str()
550            .strip_prefix("projects/_/buckets/")
551            .ok_or_else(|| {
552                Error::binding(format!(
553                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
554                ))
555            })?;
556        let object = &self.request.object;
557
558        // Build the request.
559        let builder = self
560            .inner
561            .client
562            .http_builder(
563                Method::GET,
564                &format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
565            )
566            .query("alt", "media")
567            .header(
568                "x-goog-api-client",
569                HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
570            );
571
572        let builder = if self.options.automatic_decompression {
573            builder
574        } else {
575            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
576            //
577            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
578            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
579            // with a different `contentEncoding` value.
580            builder.header("accept-encoding", HeaderValue::from_static("gzip"))
581        };
582
583        // Add the optional query parameters.
584        let builder = if self.request.generation != 0 {
585            builder.query("generation", self.request.generation)
586        } else {
587            builder
588        };
589        let builder = self
590            .request
591            .if_generation_match
592            .iter()
593            .fold(builder, |b, v| b.query("ifGenerationMatch", v));
594        let builder = self
595            .request
596            .if_generation_not_match
597            .iter()
598            .fold(builder, |b, v| b.query("ifGenerationNotMatch", v));
599        let builder = self
600            .request
601            .if_metageneration_match
602            .iter()
603            .fold(builder, |b, v| b.query("ifMetagenerationMatch", v));
604        let builder = self
605            .request
606            .if_metageneration_not_match
607            .iter()
608            .fold(builder, |b, v| b.query("ifMetagenerationNotMatch", v));
609
610        let builder = apply_customer_supplied_encryption_headers(
611            builder,
612            &self.request.common_object_request_params,
613        );
614
615        // Apply "range" header for read limits and offsets.
616        let builder = match (self.request.read_offset, self.request.read_limit) {
617            // read_limit can't be negative.
618            (_, l) if l < 0 => {
619                unreachable!("ReadObject build never sets a negative read_limit value")
620            }
621            // negative offset can't also have a read_limit.
622            (o, l) if o < 0 && l > 0 => unreachable!(
623                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
624            ),
625            // If both are zero, we use default implementation (no range header).
626            (0, 0) => builder,
627            // negative offset with no limit means the last N bytes.
628            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
629            // read_limit is zero, means no limit. Read from offset to end of file.
630            // This handles cases like (5, 0) -> "bytes=5-"
631            (o, 0) => builder.header("range", format!("bytes={o}-")),
632            // General case: non-negative offset and positive limit.
633            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
634            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
635        };
636
637        Ok(builder)
638    }
639
640    fn is_gunzipped(response: &Response) -> bool {
641        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
642        // objects. Reading such objects comes with a number of restrictions:
643        // - Ranged reads do not work.
644        // - The size of the decompressed data is not known.
645        // - Checksums do not work because the object checksums correspond to the
646        //   compressed data and the client library receives the decompressed data.
647        //
648        // Because ranged reads do not work, resuming a read does not work. Consequently,
649        // the implementation of `ReadObjectResponse` is substantially different for
650        // objects that are gunzipped.
651        //
652        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
653        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
654        use http::header::WARNING;
655        if response
656            .headers()
657            .get(TRANSFORMATION)
658            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
659        {
660            return true;
661        }
662        response
663            .headers()
664            .get(WARNING)
665            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
666    }
667
668    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
669        let response = self.clone().read().await?;
670        if Self::is_gunzipped(&response) {
671            return Ok(ReadObjectResponse::new(Box::new(
672                non_resumable::NonResumableResponse::new(response)?,
673            )));
674        }
675        Ok(ReadObjectResponse::new(Box::new(
676            resumable::ResumableResponse::new(self, response)?,
677        )))
678    }
679}
680
681#[cfg(test)]
682mod resume_tests;
683
684#[cfg(test)]
685mod tests {
686    use super::client::tests::{test_builder, test_inner_client};
687    use super::*;
688    use crate::error::{ChecksumMismatch, ReadError};
689    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
690    use base64::Engine;
691    use futures::TryStreamExt;
692    use google_cloud_auth::credentials::{
693        CacheableResource, Credentials, EntityTag, anonymous::Builder as Anonymous,
694        testing::error_credentials,
695    };
696    use httptest::{Expectation, Server, all_of, cycle, matchers::*, responders::status_code};
697    use std::collections::HashMap;
698    use std::error::Error;
699    use std::sync::Arc;
700    use test_case::test_case;
701
702    type Result = anyhow::Result<()>;
703
704    mockall::mock! {
705        #[derive(Debug)]
706        Credentials {}
707        impl google_cloud_auth::credentials::CredentialsProvider for Credentials {
708            async fn headers(
709                &self,
710                extensions: http::Extensions,
711            ) -> std::result::Result<
712                google_cloud_auth::credentials::CacheableResource<http::HeaderMap>,
713                google_cloud_gax::error::CredentialsError,
714            >;
715            async fn universe_domain(&self) -> Option<String>;
716        }
717    }
718
719    async fn http_request_builder(
720        inner: Arc<StorageInner>,
721        builder: ReadObject,
722    ) -> crate::Result<HttpRequestBuilder> {
723        let reader = Reader {
724            inner,
725            request: builder.request,
726            options: builder.options,
727        };
728        reader.http_request_builder().await
729    }
730
731    #[tokio::test]
732    async fn test_clone() {
733        let inner = test_inner_client(test_builder()).await;
734        let stub = crate::storage::transport::Storage::new_test(inner.clone());
735        let options = {
736            let mut o = RequestOptions::new();
737            o.set_resumable_upload_threshold(12345_usize);
738            o
739        };
740        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
741
742        let clone = builder.clone();
743        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
744        assert_eq!(clone.request, builder.request);
745        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
746    }
747
748    // Verify `read_object()` meets normal Send, Sync, requirements.
749    #[tokio::test]
750    async fn test_read_is_send_and_static() -> Result {
751        let client = Storage::builder()
752            .with_credentials(Anonymous::new().build())
753            .build()
754            .await?;
755
756        fn need_send<T: Send>(_val: &T) {}
757        fn need_sync<T: Sync>(_val: &T) {}
758        fn need_static<T: 'static>(_val: &T) {}
759
760        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
761        need_send(&read);
762        need_sync(&read);
763        need_static(&read);
764
765        let read = client
766            .read_object("projects/_/buckets/test-bucket", "test-object")
767            .send();
768        need_send(&read);
769        need_static(&read);
770
771        Ok(())
772    }
773
774    #[tokio::test]
775    async fn read_object_normal() -> Result {
776        let server = Server::run();
777        server.expect(
778            Expectation::matching(all_of![
779                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
780                request::headers(contains(("accept-encoding", "gzip"))),
781                request::query(url_decoded(contains(("alt", "media")))),
782            ])
783            .respond_with(
784                status_code(200)
785                    .body("hello world")
786                    .append_header("x-goog-generation", 123456),
787            ),
788        );
789
790        let client = Storage::builder()
791            .with_endpoint(format!("http://{}", server.addr()))
792            .with_credentials(Anonymous::new().build())
793            .build()
794            .await?;
795        let mut reader = client
796            .read_object("projects/_/buckets/test-bucket", "test-object")
797            .send()
798            .await?;
799        let mut got = Vec::new();
800        while let Some(b) = reader.next().await.transpose()? {
801            got.extend_from_slice(&b);
802        }
803        assert_eq!(got, b"hello world");
804
805        Ok(())
806    }
807
808    #[tokio::test]
809    async fn read_object_stream() -> Result {
810        let server = Server::run();
811        server.expect(
812            Expectation::matching(all_of![
813                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
814                request::query(url_decoded(contains(("alt", "media")))),
815            ])
816            .respond_with(
817                status_code(200)
818                    .append_header("x-goog-generation", 123456)
819                    .body("hello world"),
820            ),
821        );
822
823        let client = Storage::builder()
824            .with_endpoint(format!("http://{}", server.addr()))
825            .with_credentials(Anonymous::new().build())
826            .build()
827            .await?;
828        let response = client
829            .read_object("projects/_/buckets/test-bucket", "test-object")
830            .send()
831            .await?;
832        let result: Vec<_> = response.into_stream().try_collect().await?;
833        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
834
835        Ok(())
836    }
837
838    #[tokio::test]
839    async fn read_object_next_then_consume_response() -> Result {
840        // Create a large enough file that will require multiple chunks to read.
841        const BLOCK_SIZE: usize = 500;
842        let mut contents = Vec::new();
843        for i in 0..50 {
844            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
845        }
846
847        // Calculate and serialize the crc32c checksum
848        let u = crc32c::crc32c(&contents);
849        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
850
851        let server = Server::run();
852        server.expect(
853            Expectation::matching(all_of![
854                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
855                request::query(url_decoded(contains(("alt", "media")))),
856            ])
857            .times(1)
858            .respond_with(
859                status_code(200)
860                    .body(contents.clone())
861                    .append_header("x-goog-hash", format!("crc32c={value}"))
862                    .append_header("x-goog-generation", 123456),
863            ),
864        );
865
866        let client = Storage::builder()
867            .with_endpoint(format!("http://{}", server.addr()))
868            .with_credentials(Anonymous::new().build())
869            .build()
870            .await?;
871
872        // Read some bytes, then remainder with stream.
873        let mut response = client
874            .read_object("projects/_/buckets/test-bucket", "test-object")
875            .send()
876            .await?;
877
878        let mut all_bytes = bytes::BytesMut::new();
879        let chunk = response.next().await.transpose()?.unwrap();
880        assert!(!chunk.is_empty());
881        all_bytes.extend(chunk);
882        use futures::StreamExt;
883        let mut stream = response.into_stream();
884        while let Some(chunk) = stream.next().await.transpose()? {
885            all_bytes.extend(chunk);
886        }
887        assert_eq!(all_bytes, contents);
888
889        Ok(())
890    }
891
892    #[tokio::test]
893    async fn read_object_not_found() -> Result {
894        let server = Server::run();
895        server.expect(
896            Expectation::matching(all_of![
897                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
898                request::query(url_decoded(contains(("alt", "media")))),
899            ])
900            .respond_with(status_code(404).body("NOT FOUND")),
901        );
902
903        let client = Storage::builder()
904            .with_endpoint(format!("http://{}", server.addr()))
905            .with_credentials(Anonymous::new().build())
906            .build()
907            .await?;
908        let err = client
909            .read_object("projects/_/buckets/test-bucket", "test-object")
910            .send()
911            .await
912            .expect_err("expected a not found error");
913        assert_eq!(err.http_status_code(), Some(404));
914
915        Ok(())
916    }
917
918    #[tokio::test]
919    async fn read_object_incorrect_crc32c_check() -> Result {
920        // Calculate and serialize the crc32c checksum
921        let u = crc32c::crc32c("goodbye world".as_bytes());
922        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
923
924        let server = Server::run();
925        server.expect(
926            Expectation::matching(all_of![
927                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
928                request::query(url_decoded(contains(("alt", "media")))),
929            ])
930            .times(3)
931            .respond_with(
932                status_code(200)
933                    .body("hello world")
934                    .append_header("x-goog-hash", format!("crc32c={value}"))
935                    .append_header("x-goog-generation", 123456),
936            ),
937        );
938
939        let client = Storage::builder()
940            .with_endpoint(format!("http://{}", server.addr()))
941            .with_credentials(Anonymous::new().build())
942            .build()
943            .await?;
944        let mut response = client
945            .read_object("projects/_/buckets/test-bucket", "test-object")
946            .send()
947            .await?;
948        let mut partial = Vec::new();
949        let mut err = None;
950        while let Some(r) = response.next().await {
951            match r {
952                Ok(b) => partial.extend_from_slice(&b),
953                Err(e) => err = Some(e),
954            };
955        }
956        assert_eq!(partial, b"hello world");
957        let err = err.expect("expect error on incorrect crc32c");
958        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
959        assert!(
960            matches!(
961                source,
962                Some(&ReadError::ChecksumMismatch(
963                    ChecksumMismatch::Crc32c { .. }
964                ))
965            ),
966            "err={err:?}"
967        );
968
969        let mut response = client
970            .read_object("projects/_/buckets/test-bucket", "test-object")
971            .send()
972            .await?;
973        let err: crate::Error = async {
974            {
975                while (response.next().await.transpose()?).is_some() {}
976                Ok(())
977            }
978        }
979        .await
980        .expect_err("expect error on incorrect crc32c");
981        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
982        assert!(
983            matches!(
984                source,
985                Some(&ReadError::ChecksumMismatch(
986                    ChecksumMismatch::Crc32c { .. }
987                ))
988            ),
989            "err={err:?}"
990        );
991
992        use futures::TryStreamExt;
993        let err = client
994            .read_object("projects/_/buckets/test-bucket", "test-object")
995            .send()
996            .await?
997            .into_stream()
998            .try_collect::<Vec<bytes::Bytes>>()
999            .await
1000            .expect_err("expect error on incorrect crc32c");
1001        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1002        assert!(
1003            matches!(
1004                source,
1005                Some(&ReadError::ChecksumMismatch(
1006                    ChecksumMismatch::Crc32c { .. }
1007                ))
1008            ),
1009            "err={err:?}"
1010        );
1011        Ok(())
1012    }
1013
1014    #[tokio::test]
1015    async fn read_object_disable_crc32c() -> Result {
1016        let inner = test_inner_client(test_builder()).await;
1017        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1018
1019        let builder = ReadObject::new(
1020            stub,
1021            "projects/_/buckets/bucket",
1022            "object",
1023            inner.options.clone(),
1024        )
1025        .compute_crc32c(false);
1026
1027        assert!(builder.options.checksum.crc32c.is_none());
1028        Ok(())
1029    }
1030
1031    #[tokio::test]
1032    async fn read_object_enable_crc32c() -> Result {
1033        let inner = test_inner_client(test_builder()).await;
1034        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1035
1036        let builder = ReadObject::new(
1037            stub,
1038            "projects/_/buckets/bucket",
1039            "object",
1040            inner.options.clone(),
1041        )
1042        .compute_crc32c(false) // Disable it first, because by default it's enabled
1043        .compute_crc32c(true);
1044
1045        assert!(builder.options.checksum.crc32c.is_some());
1046        Ok(())
1047    }
1048
1049    #[tokio::test]
1050    async fn read_object_disable_crc32c_ignores_mismatch() -> Result {
1051        let server = Server::run();
1052        // Calculate and serialize the incorrect crc32c checksum
1053        let u = crc32c::crc32c("goodbye world".as_bytes());
1054        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1055        server.expect(
1056            Expectation::matching(all_of![
1057                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1058                request::query(url_decoded(contains(("alt", "media")))),
1059            ])
1060            .respond_with(
1061                status_code(200)
1062                    .body("hello world")
1063                    .append_header("x-goog-hash", format!("crc32c={value}"))
1064                    .append_header("x-goog-generation", 123456),
1065            ),
1066        );
1067
1068        let client = Storage::builder()
1069            .with_endpoint(format!("http://{}", server.addr()))
1070            .with_credentials(Anonymous::new().build())
1071            .build()
1072            .await?;
1073        let mut response = client
1074            .read_object("projects/_/buckets/test-bucket", "test-object")
1075            .compute_crc32c(false)
1076            .send()
1077            .await?;
1078        let mut got = Vec::new();
1079        while let Some(b) = response.next().await.transpose()? {
1080            got.extend_from_slice(&b);
1081        }
1082
1083        assert_eq!(got, b"hello world");
1084        Ok(())
1085    }
1086
1087    #[tokio::test]
1088    async fn read_object_incorrect_md5_check() -> Result {
1089        // Calculate and serialize the md5 checksum
1090        let digest = md5::compute("goodbye world".as_bytes());
1091        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1092
1093        let server = Server::run();
1094        server.expect(
1095            Expectation::matching(all_of![
1096                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1097                request::query(url_decoded(contains(("alt", "media")))),
1098            ])
1099            .times(1)
1100            .respond_with(
1101                status_code(200)
1102                    .body("hello world")
1103                    .append_header("x-goog-hash", format!("md5={value}"))
1104                    .append_header("x-goog-generation", 123456),
1105            ),
1106        );
1107
1108        let client = Storage::builder()
1109            .with_endpoint(format!("http://{}", server.addr()))
1110            .with_credentials(Anonymous::new().build())
1111            .build()
1112            .await?;
1113        let mut response = client
1114            .read_object("projects/_/buckets/test-bucket", "test-object")
1115            .compute_md5()
1116            .send()
1117            .await?;
1118        let mut partial = Vec::new();
1119        let mut err = None;
1120        while let Some(r) = response.next().await {
1121            match r {
1122                Ok(b) => partial.extend_from_slice(&b),
1123                Err(e) => err = Some(e),
1124            };
1125        }
1126        assert_eq!(partial, b"hello world");
1127        let err = err.expect("expect error on incorrect md5");
1128        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1129        assert!(
1130            matches!(
1131                source,
1132                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1133            ),
1134            "err={err:?}"
1135        );
1136
1137        Ok(())
1138    }
1139
1140    #[tokio::test]
1141    async fn read_object_with_user_agent() -> Result {
1142        use http::header::USER_AGENT;
1143
1144        let user_agent = "quick_fox_lazy_dog/1.2.3";
1145        let server = Server::run();
1146        server.expect(
1147            Expectation::matching(all_of![
1148                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1149                request::headers(contains(("accept-encoding", "gzip"))),
1150                request::headers(contains((USER_AGENT.as_str(), user_agent))),
1151                request::query(url_decoded(contains(("alt", "media")))),
1152            ])
1153            .respond_with(
1154                status_code(200)
1155                    .body("hello world")
1156                    .append_header("x-goog-generation", 123456),
1157            ),
1158        );
1159
1160        let client = Storage::builder()
1161            .with_endpoint(format!("http://{}", server.addr()))
1162            .with_credentials(Anonymous::new().build())
1163            .build()
1164            .await?;
1165        let mut reader = client
1166            .read_object("projects/_/buckets/test-bucket", "test-object")
1167            .with_user_agent(user_agent)
1168            .send()
1169            .await?;
1170        let mut got = Vec::new();
1171        while let Some(b) = reader.next().await.transpose()? {
1172            got.extend_from_slice(&b);
1173        }
1174        assert_eq!(got, b"hello world");
1175
1176        Ok(())
1177    }
1178
1179    #[tokio::test]
1180    async fn read_object_with_quota_project() -> Result {
1181        const PROJECT_NAME: &str = "project_lazy_dog";
1182        let server = Server::run();
1183        server.expect(
1184            Expectation::matching(all_of![
1185                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1186                request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1187            ])
1188            .respond_with(
1189                status_code(200)
1190                    .body("hello world")
1191                    .append_header("x-goog-generation", 123456),
1192            ),
1193        );
1194
1195        let client = Storage::builder()
1196            .with_endpoint(format!("http://{}", server.addr()))
1197            .with_credentials(Anonymous::new().build())
1198            .build()
1199            .await?;
1200        let mut reader = client
1201            .read_object("projects/_/buckets/test-bucket", "test-object")
1202            .with_quota_project(PROJECT_NAME)
1203            .send()
1204            .await?;
1205        let mut got = Vec::new();
1206        while let Some(b) = reader.next().await.transpose()? {
1207            got.extend_from_slice(&b);
1208        }
1209        assert_eq!(got, b"hello world");
1210
1211        Ok(())
1212    }
1213
1214    #[tokio::test]
1215    async fn read_object_strips_credential_quota_project() -> Result {
1216        const PROJECT_NAME: &str = "project_lazy_dog";
1217        const CRED_QUOTA_PROJECT: &str = "cred_quota_project";
1218        let server = Server::run();
1219        server.expect(
1220            Expectation::matching(all_of![
1221                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1222                request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1223                request::headers(not(contains(("x-goog-user-project", CRED_QUOTA_PROJECT)))),
1224            ])
1225            .times(1)
1226            .respond_with(
1227                status_code(200)
1228                    .body("hello world")
1229                    .append_header("x-goog-generation", 123456),
1230            ),
1231        );
1232
1233        let mut mock = MockCredentials::new();
1234        mock.expect_headers().returning(|_exts: http::Extensions| {
1235            let mut map = http::HeaderMap::new();
1236            map.insert(
1237                http::header::AUTHORIZATION,
1238                http::HeaderValue::from_static("Bearer test-token"),
1239            );
1240            map.insert(
1241                "x-goog-user-project",
1242                http::HeaderValue::from_static(CRED_QUOTA_PROJECT),
1243            );
1244            Ok(CacheableResource::New {
1245                data: map,
1246                entity_tag: EntityTag::default(),
1247            })
1248        });
1249        mock.expect_universe_domain().returning(|| None);
1250
1251        let client = Storage::builder()
1252            .with_endpoint(format!("http://{}", server.addr()))
1253            .with_credentials(Credentials::from(mock))
1254            .build()
1255            .await?;
1256        let mut reader = client
1257            .read_object("projects/_/buckets/test-bucket", "test-object")
1258            .with_quota_project(PROJECT_NAME)
1259            .send()
1260            .await?;
1261        let mut got = Vec::new();
1262        while let Some(b) = reader.next().await.transpose()? {
1263            got.extend_from_slice(&b);
1264        }
1265        assert_eq!(got, b"hello world");
1266
1267        Ok(())
1268    }
1269
1270    #[tokio::test]
1271    async fn read_object_retry_preserves_quota_project() -> Result {
1272        const PROJECT_NAME: &str = "project_lazy_dog";
1273        let server = Server::run();
1274        server.expect(
1275            Expectation::matching(all_of![
1276                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1277                request::headers(contains(("x-goog-user-project", PROJECT_NAME))),
1278            ])
1279            .times(2)
1280            .respond_with(cycle![
1281                status_code(503),
1282                status_code(200)
1283                    .body("hello")
1284                    .append_header("x-goog-generation", 1)
1285            ]),
1286        );
1287
1288        let client = Storage::builder()
1289            .with_endpoint(format!("http://{}", server.addr()))
1290            .with_credentials(Anonymous::new().build())
1291            .build()
1292            .await?;
1293        let mut reader = client
1294            .read_object("projects/_/buckets/test-bucket", "test-object")
1295            .with_quota_project(PROJECT_NAME)
1296            .send()
1297            .await?;
1298        let mut got = Vec::new();
1299        while let Some(b) = reader.next().await.transpose()? {
1300            got.extend_from_slice(&b);
1301        }
1302        assert_eq!(got, b"hello");
1303
1304        Ok(())
1305    }
1306
1307    #[tokio::test]
1308    async fn read_object() -> Result {
1309        let inner = test_inner_client(test_builder()).await;
1310        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1311        let builder = ReadObject::new(
1312            stub,
1313            "projects/_/buckets/bucket",
1314            "object",
1315            inner.options.clone(),
1316        );
1317        let request = http_request_builder(inner, builder)
1318            .await?
1319            .build_for_tests()
1320            .await?;
1321
1322        assert_eq!(request.method(), Method::GET);
1323        assert_eq!(
1324            request.url().as_str(),
1325            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1326        );
1327        Ok(())
1328    }
1329
1330    #[tokio::test]
1331    async fn read_object_error_credentials() -> Result {
1332        let inner =
1333            test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
1334        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1335        let builder = ReadObject::new(
1336            stub,
1337            "projects/_/buckets/bucket",
1338            "object",
1339            inner.options.clone(),
1340        );
1341        let _ = http_request_builder(inner, builder)
1342            .await?
1343            .build_for_tests()
1344            .await
1345            .inspect_err(|e| assert!(e.is_authentication()))
1346            .expect_err("invalid credentials should err");
1347        Ok(())
1348    }
1349
1350    #[tokio::test]
1351    async fn read_object_bad_bucket() -> Result {
1352        let inner = test_inner_client(test_builder()).await;
1353        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1354        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1355        let _ = http_request_builder(inner, builder)
1356            .await
1357            .expect_err("malformed bucket string should error");
1358        Ok(())
1359    }
1360
1361    #[tokio::test]
1362    async fn read_object_query_params() -> Result {
1363        let inner = test_inner_client(test_builder()).await;
1364        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1365        let builder = ReadObject::new(
1366            stub,
1367            "projects/_/buckets/bucket",
1368            "object",
1369            inner.options.clone(),
1370        )
1371        .set_generation(5)
1372        .set_if_generation_match(10)
1373        .set_if_generation_not_match(20)
1374        .set_if_metageneration_match(30)
1375        .set_if_metageneration_not_match(40);
1376        let request = http_request_builder(inner, builder)
1377            .await?
1378            .build_for_tests()
1379            .await?;
1380
1381        assert_eq!(request.method(), Method::GET);
1382        let want_pairs: HashMap<String, String> = [
1383            ("alt", "media"),
1384            ("generation", "5"),
1385            ("ifGenerationMatch", "10"),
1386            ("ifGenerationNotMatch", "20"),
1387            ("ifMetagenerationMatch", "30"),
1388            ("ifMetagenerationNotMatch", "40"),
1389        ]
1390        .iter()
1391        .map(|(k, v)| (k.to_string(), v.to_string()))
1392        .collect();
1393        let query_pairs: HashMap<String, String> = request
1394            .url()
1395            .query_pairs()
1396            .map(|param| (param.0.to_string(), param.1.to_string()))
1397            .collect();
1398        assert_eq!(query_pairs.len(), want_pairs.len());
1399        assert_eq!(query_pairs, want_pairs);
1400        Ok(())
1401    }
1402
1403    #[tokio::test]
1404    async fn read_object_default_headers() -> Result {
1405        // The API takes the unencoded byte array.
1406        let inner = test_inner_client(test_builder()).await;
1407        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1408        let builder = ReadObject::new(
1409            stub,
1410            "projects/_/buckets/bucket",
1411            "object",
1412            inner.options.clone(),
1413        );
1414        let request = http_request_builder(inner, builder)
1415            .await?
1416            .build_for_tests()
1417            .await?;
1418
1419        assert_eq!(request.method(), Method::GET);
1420        assert_eq!(
1421            request.url().as_str(),
1422            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1423        );
1424
1425        let want = [("accept-encoding", "gzip")];
1426        let headers = request.headers();
1427        for (name, value) in want {
1428            assert_eq!(
1429                headers.get(name).and_then(|h| h.to_str().ok()),
1430                Some(value),
1431                "{request:?}"
1432            );
1433        }
1434        Ok(())
1435    }
1436
1437    #[tokio::test]
1438    async fn read_object_automatic_decompression_headers() -> Result {
1439        // The API takes the unencoded byte array.
1440        let inner = test_inner_client(test_builder()).await;
1441        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1442        let builder = ReadObject::new(
1443            stub,
1444            "projects/_/buckets/bucket",
1445            "object",
1446            inner.options.clone(),
1447        )
1448        .with_automatic_decompression(true);
1449        let request = http_request_builder(inner, builder)
1450            .await?
1451            .build_for_tests()
1452            .await?;
1453
1454        assert_eq!(request.method(), Method::GET);
1455        assert_eq!(
1456            request.url().as_str(),
1457            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1458        );
1459
1460        let headers = request.headers();
1461        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1462        Ok(())
1463    }
1464
1465    #[tokio::test]
1466    async fn read_object_encryption_headers() -> Result {
1467        // Make a 32-byte key.
1468        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1469
1470        // The API takes the unencoded byte array.
1471        let inner = test_inner_client(test_builder()).await;
1472        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1473        let builder = ReadObject::new(
1474            stub,
1475            "projects/_/buckets/bucket",
1476            "object",
1477            inner.options.clone(),
1478        )
1479        .set_key(KeyAes256::new(&key)?);
1480        let request = http_request_builder(inner, builder)
1481            .await?
1482            .build_for_tests()
1483            .await?;
1484
1485        assert_eq!(request.method(), Method::GET);
1486        assert_eq!(
1487            request.url().as_str(),
1488            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1489        );
1490
1491        let want = [
1492            ("x-goog-encryption-algorithm", "AES256".to_string()),
1493            ("x-goog-encryption-key", key_base64),
1494            ("x-goog-encryption-key-sha256", key_sha256_base64),
1495        ];
1496
1497        let headers = request.headers();
1498        for (name, value) in want {
1499            assert_eq!(
1500                headers.get(name).and_then(|h| h.to_str().ok()),
1501                Some(value.as_str())
1502            );
1503        }
1504        Ok(())
1505    }
1506
1507    #[test_case(ReadRange::all(), None; "no headers needed")]
1508    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1509    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1510    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1511    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1512    #[tokio::test]
1513    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1514        let inner = test_inner_client(test_builder()).await;
1515        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1516        let builder = ReadObject::new(
1517            stub,
1518            "projects/_/buckets/bucket",
1519            "object",
1520            inner.options.clone(),
1521        )
1522        .set_read_range(input.clone());
1523        let request = http_request_builder(inner, builder)
1524            .await?
1525            .build_for_tests()
1526            .await?;
1527
1528        assert_eq!(request.method(), Method::GET);
1529        assert_eq!(
1530            request.url().as_str(),
1531            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1532        );
1533
1534        assert_eq!(request.headers().get("range"), want);
1535        Ok(())
1536    }
1537
1538    #[test_case("projects/p", "projects%2Fp")]
1539    #[test_case("kebab-case", "kebab-case")]
1540    #[test_case("dot.name", "dot.name")]
1541    #[test_case("under_score", "under_score")]
1542    #[test_case("tilde~123", "tilde~123")]
1543    #[test_case("exclamation!point!", "exclamation%21point%21")]
1544    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1545    #[test_case("preserve%percent%21", "preserve%percent%21")]
1546    #[test_case(
1547        "testall !#$&'()*+,/:;=?@[]",
1548        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1549    )]
1550    #[tokio::test]
1551    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1552        let inner = test_inner_client(test_builder()).await;
1553        let stub = crate::storage::transport::Storage::new_test(inner.clone());
1554        let builder = ReadObject::new(
1555            stub,
1556            "projects/_/buckets/bucket",
1557            name,
1558            inner.options.clone(),
1559        );
1560        let request = http_request_builder(inner, builder)
1561            .await?
1562            .build_for_tests()
1563            .await?;
1564        let got = request.url().path_segments().unwrap().next_back().unwrap();
1565        assert_eq!(got, want);
1566        Ok(())
1567    }
1568
1569    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1570    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1571    #[test_case("warning", "214 UploadServer gunzipped", true)]
1572    #[test_case("warning", "no match", false)]
1573    #[test_case("unused", "unused", false)]
1574    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1575        let response = http::Response::builder()
1576            .status(200)
1577            .header(name, value)
1578            .body(Vec::new())?;
1579        let response = Response::from(response);
1580        let got = Reader::is_gunzipped(&response);
1581        assert_eq!(got, want, "{response:?}");
1582        Ok(())
1583    }
1584}