google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::http::map_send_error;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> Clone for ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    fn clone(&self) -> Self {
79        Self {
80            stub: self.stub.clone(),
81            request: self.request.clone(),
82            options: self.options.clone(),
83        }
84    }
85}
86
87impl<S> ReadObject<S>
88where
89    S: crate::storage::stub::Storage + 'static,
90{
91    pub(crate) fn new<B, O>(
92        stub: std::sync::Arc<S>,
93        bucket: B,
94        object: O,
95        options: RequestOptions,
96    ) -> Self
97    where
98        B: Into<String>,
99        O: Into<String>,
100    {
101        ReadObject {
102            stub,
103            request: crate::model::ReadObjectRequest::new()
104                .set_bucket(bucket)
105                .set_object(object),
106            options,
107        }
108    }
109
110    /// Enables computation of MD5 hashes.
111    ///
112    /// Crc32c hashes are checked by default.
113    ///
114    /// Checksum validation is supported iff:
115    /// 1. The full content is requested.
116    /// 2. All of the content is returned (status != PartialContent).
117    /// 3. The server sent a checksum header.
118    /// 4. The http stack did not uncompress the file.
119    /// 5. The server did not uncompress data on read.
120    ///
121    /// # Example
122    /// ```
123    /// # use google_cloud_storage::client::Storage;
124    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
125    /// let builder =  client
126    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
127    ///     .compute_md5();
128    /// let mut reader = builder
129    ///     .send()
130    ///     .await?;
131    /// let mut contents = Vec::new();
132    /// while let Some(chunk) = reader.next().await.transpose()? {
133    ///     contents.extend_from_slice(&chunk);
134    /// }
135    /// println!("object contents={:?}", contents);
136    /// # Ok(()) }
137    /// ```
138    pub fn compute_md5(self) -> Self {
139        let mut this = self;
140        this.options.checksum.md5_hash = Some(Md5::default());
141        this
142    }
143
144    /// If present, selects a specific revision of this object (as
145    /// opposed to the latest version, the default).
146    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
147        self.request.generation = v.into();
148        self
149    }
150
151    /// Makes the operation conditional on whether the object's current generation
152    /// matches the given value. Setting to 0 makes the operation succeed only if
153    /// there are no live versions of the object.
154    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
155    where
156        T: Into<i64>,
157    {
158        self.request.if_generation_match = Some(v.into());
159        self
160    }
161
162    /// Makes the operation conditional on whether the object's live generation
163    /// does not match the given value. If no live object exists, the precondition
164    /// fails. Setting to 0 makes the operation succeed only if there is a live
165    /// version of the object.
166    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
167    where
168        T: Into<i64>,
169    {
170        self.request.if_generation_not_match = Some(v.into());
171        self
172    }
173
174    /// Makes the operation conditional on whether the object's current
175    /// metageneration matches the given value.
176    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
177    where
178        T: Into<i64>,
179    {
180        self.request.if_metageneration_match = Some(v.into());
181        self
182    }
183
184    /// Makes the operation conditional on whether the object's current
185    /// metageneration does not match the given value.
186    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
187    where
188        T: Into<i64>,
189    {
190        self.request.if_metageneration_not_match = Some(v.into());
191        self
192    }
193
194    /// The range of bytes to return in the read.
195    ///
196    /// This can be all the bytes starting at a given offset
197    /// (`ReadRange::offset()`), all the bytes in an explicit range
198    /// (`Range::segment`), or the last N bytes of the object
199    /// (`ReadRange::tail`).
200    ///
201    /// # Examples
202    ///
203    /// Read starting at 100 bytes to end of file.
204    /// ```
205    /// # use google_cloud_storage::client::Storage;
206    /// # use google_cloud_storage::model_ext::ReadRange;
207    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
208    /// let response = client
209    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
210    ///     .set_read_range(ReadRange::offset(100))
211    ///     .send()
212    ///     .await?;
213    /// println!("response details={response:?}");
214    /// # Ok(()) }
215    /// ```
216    ///
217    /// Read last 100 bytes of file:
218    /// ```
219    /// # use google_cloud_storage::client::Storage;
220    /// # use google_cloud_storage::model_ext::ReadRange;
221    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
222    /// let response = client
223    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
224    ///     .set_read_range(ReadRange::tail(100))
225    ///     .send()
226    ///     .await?;
227    /// println!("response details={response:?}");
228    /// # Ok(()) }
229    /// ```
230    ///
231    /// Read bytes 1000 to 1099.
232    /// ```
233    /// # use google_cloud_storage::client::Storage;
234    /// # use google_cloud_storage::model_ext::ReadRange;
235    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
236    /// let response = client
237    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
238    ///     .set_read_range(ReadRange::segment(1000, 100))
239    ///     .send()
240    ///     .await?;
241    /// println!("response details={response:?}");
242    /// # Ok(()) }
243    /// ```
244    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
245        self.request.with_range(range);
246        self
247    }
248
249    /// The encryption key used with the Customer-Supplied Encryption Keys
250    /// feature. In raw bytes format (not base64-encoded).
251    ///
252    /// Example:
253    /// ```
254    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
255    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
256    /// let key: &[u8] = &[97; 32];
257    /// let response = client
258    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
259    ///     .set_key(KeyAes256::new(key)?)
260    ///     .send()
261    ///     .await?;
262    /// println!("response details={response:?}");
263    /// # Ok(()) }
264    /// ```
265    pub fn set_key(mut self, v: KeyAes256) -> Self {
266        self.request.common_object_request_params = Some(v.into());
267        self
268    }
269
270    /// The retry policy used for this request.
271    ///
272    /// # Example
273    /// ```
274    /// # use google_cloud_storage::client::Storage;
275    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
276    /// use google_cloud_storage::retry_policy::RetryableErrors;
277    /// use std::time::Duration;
278    /// use gax::retry_policy::RetryPolicyExt;
279    /// let response = client
280    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
281    ///     .with_retry_policy(
282    ///         RetryableErrors
283    ///             .with_attempt_limit(5)
284    ///             .with_time_limit(Duration::from_secs(10)),
285    ///     )
286    ///     .send()
287    ///     .await?;
288    /// println!("response details={response:?}");
289    /// # Ok(()) }
290    /// ```
291    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
292        self.options.retry_policy = v.into().into();
293        self
294    }
295
296    /// The backoff policy used for this request.
297    ///
298    /// # Example
299    /// ```
300    /// # use google_cloud_storage::client::Storage;
301    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
302    /// use std::time::Duration;
303    /// use gax::exponential_backoff::ExponentialBackoff;
304    /// let response = client
305    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
306    ///     .with_backoff_policy(ExponentialBackoff::default())
307    ///     .send()
308    ///     .await?;
309    /// println!("response details={response:?}");
310    /// # Ok(()) }
311    /// ```
312    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
313        mut self,
314        v: V,
315    ) -> Self {
316        self.options.backoff_policy = v.into().into();
317        self
318    }
319
320    /// The retry throttler used for this request.
321    ///
322    /// Most of the time you want to use the same throttler for all the requests
323    /// in a client, and even the same throttler for many clients. Rarely it
324    /// may be necessary to use an custom throttler for some subset of the
325    /// requests.
326    ///
327    /// # Example
328    /// ```
329    /// # use google_cloud_storage::client::Storage;
330    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
331    /// let response = client
332    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
333    ///     .with_retry_throttler(adhoc_throttler())
334    ///     .send()
335    ///     .await?;
336    /// println!("response details={response:?}");
337    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
338    ///     # panic!();
339    /// }
340    /// # Ok(()) }
341    /// ```
342    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
343        mut self,
344        v: V,
345    ) -> Self {
346        self.options.retry_throttler = v.into().into();
347        self
348    }
349
350    /// Configure the resume policy for read requests.
351    ///
352    /// The Cloud Storage client library can automatically resume a read that is
353    /// interrupted by a transient error. Applications may want to limit the
354    /// number of read attempts, or may wish to expand the type of errors
355    /// treated as retryable.
356    ///
357    /// # Example
358    /// ```
359    /// # use google_cloud_storage::client::Storage;
360    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
361    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
362    /// let response = client
363    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
364    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
365    ///     .send()
366    ///     .await?;
367    /// # Ok(()) }
368    /// ```
369    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
370    where
371        V: ReadResumePolicy + 'static,
372    {
373        self.options.set_read_resume_policy(std::sync::Arc::new(v));
374        self
375    }
376
377    /// Enables automatic decompression.
378    ///
379    /// The Cloud Storage service [automatically decompresses] objects
380    /// with `content_encoding == "gzip"` during reads. The client library
381    /// disables this behavior by default, as it is not possible to
382    /// perform ranged reads or to resume interrupted downloads if automatic
383    /// decompression is enabled.
384    ///
385    /// Use this option to enable automatic decompression.
386    ///
387    /// # Example
388    /// ```
389    /// # use google_cloud_storage::client::Storage;
390    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
391    /// let response = client
392    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
393    ///     .with_automatic_decompression(true)
394    ///     .send()
395    ///     .await?;
396    /// println!("response details={response:?}");
397    /// # Ok(()) }
398    /// ```
399    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
400        self.options.automatic_decompression = v;
401        self
402    }
403
404    /// Sends the request.
405    pub async fn send(self) -> Result<ReadObjectResponse> {
406        self.stub.read_object(self.request, self.options).await
407    }
408}
409
410// A convenience struct that saves the request conditions and performs the read.
411#[derive(Clone, Debug)]
412pub(crate) struct Reader {
413    pub inner: std::sync::Arc<StorageInner>,
414    pub request: crate::model::ReadObjectRequest,
415    pub options: RequestOptions,
416}
417
418impl Reader {
419    async fn read(self) -> Result<reqwest::Response> {
420        let throttler = self.options.retry_throttler.clone();
421        let retry = self.options.retry_policy.clone();
422        let backoff = self.options.backoff_policy.clone();
423
424        gax::retry_loop_internal::retry_loop(
425            async move |_| self.read_attempt().await,
426            async |duration| tokio::time::sleep(duration).await,
427            true,
428            throttler,
429            retry,
430            backoff,
431        )
432        .await
433    }
434
435    async fn read_attempt(&self) -> Result<reqwest::Response> {
436        let builder = self.http_request_builder().await?;
437        let response = builder.send().await.map_err(map_send_error)?;
438        if !response.status().is_success() {
439            return gaxi::http::to_http_error(response).await;
440        }
441        Ok(response)
442    }
443
444    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
445        // Collect the required bucket and object parameters.
446        let bucket = &self.request.bucket;
447        let bucket_id = bucket
448            .as_str()
449            .strip_prefix("projects/_/buckets/")
450            .ok_or_else(|| {
451                Error::binding(format!(
452                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
453                ))
454            })?;
455        let object = &self.request.object;
456
457        // Build the request.
458        let builder = self
459            .inner
460            .client
461            .request(
462                reqwest::Method::GET,
463                format!(
464                    "{}/storage/v1/b/{bucket_id}/o/{}",
465                    &self.inner.endpoint,
466                    enc(object)
467                ),
468            )
469            .query(&[("alt", "media")])
470            .header(
471                "x-goog-api-client",
472                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
473            );
474
475        let builder = if self.options.automatic_decompression {
476            builder
477        } else {
478            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
479            //
480            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
481            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
482            // with a different `contentEncoding` value.
483            builder.header(
484                "accept-encoding",
485                reqwest::header::HeaderValue::from_static("gzip"),
486            )
487        };
488
489        // Add the optional query parameters.
490        let builder = if self.request.generation != 0 {
491            builder.query(&[("generation", self.request.generation)])
492        } else {
493            builder
494        };
495        let builder = self
496            .request
497            .if_generation_match
498            .iter()
499            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
500        let builder = self
501            .request
502            .if_generation_not_match
503            .iter()
504            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
505        let builder = self
506            .request
507            .if_metageneration_match
508            .iter()
509            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
510        let builder = self
511            .request
512            .if_metageneration_not_match
513            .iter()
514            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
515
516        let builder = apply_customer_supplied_encryption_headers(
517            builder,
518            &self.request.common_object_request_params,
519        );
520
521        // Apply "range" header for read limits and offsets.
522        let builder = match (self.request.read_offset, self.request.read_limit) {
523            // read_limit can't be negative.
524            (_, l) if l < 0 => {
525                unreachable!("ReadObject build never sets a negative read_limit value")
526            }
527            // negative offset can't also have a read_limit.
528            (o, l) if o < 0 && l > 0 => unreachable!(
529                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
530            ),
531            // If both are zero, we use default implementation (no range header).
532            (0, 0) => builder,
533            // negative offset with no limit means the last N bytes.
534            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
535            // read_limit is zero, means no limit. Read from offset to end of file.
536            // This handles cases like (5, 0) -> "bytes=5-"
537            (o, 0) => builder.header("range", format!("bytes={o}-")),
538            // General case: non-negative offset and positive limit.
539            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
540            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
541        };
542
543        self.inner.apply_auth_headers(builder).await
544    }
545
546    fn is_gunzipped(response: &reqwest::Response) -> bool {
547        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
548        // objects. Reading such objects comes with a number of restrictions:
549        // - Ranged reads do not work.
550        // - The size of the decompressed data is not known.
551        // - Checksums do not work because the object checksums correspond to the
552        //   compressed data and the client library receives the decompressed data.
553        //
554        // Because ranged reads do not work, resuming a read does not work. Consequently,
555        // the implementation of `ReadObjectResponse` is substantially different for
556        // objects that are gunzipped.
557        //
558        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
559        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
560        use http::header::WARNING;
561        if response
562            .headers()
563            .get(TRANSFORMATION)
564            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
565        {
566            return true;
567        }
568        response
569            .headers()
570            .get(WARNING)
571            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
572    }
573
574    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
575        let response = self.clone().read().await?;
576        if Self::is_gunzipped(&response) {
577            return Ok(ReadObjectResponse::new(Box::new(
578                non_resumable::NonResumableResponse::new(response)?,
579            )));
580        }
581        Ok(ReadObjectResponse::new(Box::new(
582            resumable::ResumableResponse::new(self, response)?,
583        )))
584    }
585}
586
587#[cfg(test)]
588mod resume_tests;
589
590#[cfg(test)]
591mod tests {
592    use super::client::tests::{test_builder, test_inner_client};
593    use super::*;
594    use crate::error::{ChecksumMismatch, ReadError};
595    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
596    use auth::credentials::anonymous::Builder as Anonymous;
597    use base64::Engine;
598    use futures::TryStreamExt;
599    use httptest::{Expectation, Server, matchers::*, responders::status_code};
600    use std::collections::HashMap;
601    use std::error::Error;
602    use std::sync::Arc;
603    use test_case::test_case;
604
605    type Result = anyhow::Result<()>;
606
607    async fn http_request_builder(
608        inner: Arc<StorageInner>,
609        builder: ReadObject,
610    ) -> crate::Result<reqwest::RequestBuilder> {
611        let reader = Reader {
612            inner,
613            request: builder.request,
614            options: builder.options,
615        };
616        reader.http_request_builder().await
617    }
618
619    #[tokio::test]
620    async fn test_clone() {
621        let inner = test_inner_client(test_builder()).await;
622        let stub = crate::storage::transport::Storage::new(inner.clone());
623        let options = {
624            let mut o = RequestOptions::new();
625            o.set_resumable_upload_threshold(12345_usize);
626            o
627        };
628        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
629
630        let clone = builder.clone();
631        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
632        assert_eq!(clone.request, builder.request);
633        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
634    }
635
636    // Verify `read_object()` meets normal Send, Sync, requirements.
637    #[tokio::test]
638    async fn test_read_is_send_and_static() -> Result {
639        let client = Storage::builder()
640            .with_credentials(Anonymous::new().build())
641            .build()
642            .await?;
643
644        fn need_send<T: Send>(_val: &T) {}
645        fn need_sync<T: Sync>(_val: &T) {}
646        fn need_static<T: 'static>(_val: &T) {}
647
648        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
649        need_send(&read);
650        need_sync(&read);
651        need_static(&read);
652
653        let read = client
654            .read_object("projects/_/buckets/test-bucket", "test-object")
655            .send();
656        need_send(&read);
657        need_static(&read);
658
659        Ok(())
660    }
661
662    #[tokio::test]
663    async fn read_object_normal() -> Result {
664        let server = Server::run();
665        server.expect(
666            Expectation::matching(all_of![
667                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
668                request::headers(contains(("accept-encoding", "gzip"))),
669                request::query(url_decoded(contains(("alt", "media")))),
670            ])
671            .respond_with(
672                status_code(200)
673                    .body("hello world")
674                    .append_header("x-goog-generation", 123456),
675            ),
676        );
677
678        let client = Storage::builder()
679            .with_endpoint(format!("http://{}", server.addr()))
680            .with_credentials(Anonymous::new().build())
681            .build()
682            .await?;
683        let mut reader = client
684            .read_object("projects/_/buckets/test-bucket", "test-object")
685            .send()
686            .await?;
687        let mut got = Vec::new();
688        while let Some(b) = reader.next().await.transpose()? {
689            got.extend_from_slice(&b);
690        }
691        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
692
693        Ok(())
694    }
695
696    #[tokio::test]
697    async fn read_object_stream() -> Result {
698        let server = Server::run();
699        server.expect(
700            Expectation::matching(all_of![
701                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
702                request::query(url_decoded(contains(("alt", "media")))),
703            ])
704            .respond_with(
705                status_code(200)
706                    .append_header("x-goog-generation", 123456)
707                    .body("hello world"),
708            ),
709        );
710
711        let client = Storage::builder()
712            .with_endpoint(format!("http://{}", server.addr()))
713            .with_credentials(Anonymous::new().build())
714            .build()
715            .await?;
716        let response = client
717            .read_object("projects/_/buckets/test-bucket", "test-object")
718            .send()
719            .await?;
720        let result: Vec<_> = response.into_stream().try_collect().await?;
721        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
722
723        Ok(())
724    }
725
726    #[tokio::test]
727    async fn read_object_next_then_consume_response() -> Result {
728        // Create a large enough file that will require multiple chunks to read.
729        const BLOCK_SIZE: usize = 500;
730        let mut contents = Vec::new();
731        for i in 0..50 {
732            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
733        }
734
735        // Calculate and serialize the crc32c checksum
736        let u = crc32c::crc32c(&contents);
737        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
738
739        let server = Server::run();
740        server.expect(
741            Expectation::matching(all_of![
742                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
743                request::query(url_decoded(contains(("alt", "media")))),
744            ])
745            .times(1)
746            .respond_with(
747                status_code(200)
748                    .body(contents.clone())
749                    .append_header("x-goog-hash", format!("crc32c={value}"))
750                    .append_header("x-goog-generation", 123456),
751            ),
752        );
753
754        let client = Storage::builder()
755            .with_endpoint(format!("http://{}", server.addr()))
756            .with_credentials(Anonymous::new().build())
757            .build()
758            .await?;
759
760        // Read some bytes, then remainder with stream.
761        let mut response = client
762            .read_object("projects/_/buckets/test-bucket", "test-object")
763            .send()
764            .await?;
765
766        let mut all_bytes = bytes::BytesMut::new();
767        let chunk = response.next().await.transpose()?.unwrap();
768        assert!(!chunk.is_empty());
769        all_bytes.extend(chunk);
770        use futures::StreamExt;
771        let mut stream = response.into_stream();
772        while let Some(chunk) = stream.next().await.transpose()? {
773            all_bytes.extend(chunk);
774        }
775        assert_eq!(all_bytes, contents);
776
777        Ok(())
778    }
779
780    #[tokio::test]
781    async fn read_object_not_found() -> Result {
782        let server = Server::run();
783        server.expect(
784            Expectation::matching(all_of![
785                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
786                request::query(url_decoded(contains(("alt", "media")))),
787            ])
788            .respond_with(status_code(404).body("NOT FOUND")),
789        );
790
791        let client = Storage::builder()
792            .with_endpoint(format!("http://{}", server.addr()))
793            .with_credentials(Anonymous::new().build())
794            .build()
795            .await?;
796        let err = client
797            .read_object("projects/_/buckets/test-bucket", "test-object")
798            .send()
799            .await
800            .expect_err("expected a not found error");
801        assert_eq!(err.http_status_code(), Some(404));
802
803        Ok(())
804    }
805
806    #[tokio::test]
807    async fn read_object_incorrect_crc32c_check() -> Result {
808        // Calculate and serialize the crc32c checksum
809        let u = crc32c::crc32c("goodbye world".as_bytes());
810        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
811
812        let server = Server::run();
813        server.expect(
814            Expectation::matching(all_of![
815                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
816                request::query(url_decoded(contains(("alt", "media")))),
817            ])
818            .times(3)
819            .respond_with(
820                status_code(200)
821                    .body("hello world")
822                    .append_header("x-goog-hash", format!("crc32c={value}"))
823                    .append_header("x-goog-generation", 123456),
824            ),
825        );
826
827        let client = Storage::builder()
828            .with_endpoint(format!("http://{}", server.addr()))
829            .with_credentials(Anonymous::new().build())
830            .build()
831            .await?;
832        let mut response = client
833            .read_object("projects/_/buckets/test-bucket", "test-object")
834            .send()
835            .await?;
836        let mut partial = Vec::new();
837        let mut err = None;
838        while let Some(r) = response.next().await {
839            match r {
840                Ok(b) => partial.extend_from_slice(&b),
841                Err(e) => err = Some(e),
842            };
843        }
844        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
845        let err = err.expect("expect error on incorrect crc32c");
846        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
847        assert!(
848            matches!(
849                source,
850                Some(&ReadError::ChecksumMismatch(
851                    ChecksumMismatch::Crc32c { .. }
852                ))
853            ),
854            "err={err:?}"
855        );
856
857        let mut response = client
858            .read_object("projects/_/buckets/test-bucket", "test-object")
859            .send()
860            .await?;
861        let err: crate::Error = async {
862            {
863                while (response.next().await.transpose()?).is_some() {}
864                Ok(())
865            }
866        }
867        .await
868        .expect_err("expect error on incorrect crc32c");
869        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
870        assert!(
871            matches!(
872                source,
873                Some(&ReadError::ChecksumMismatch(
874                    ChecksumMismatch::Crc32c { .. }
875                ))
876            ),
877            "err={err:?}"
878        );
879
880        use futures::TryStreamExt;
881        let err = client
882            .read_object("projects/_/buckets/test-bucket", "test-object")
883            .send()
884            .await?
885            .into_stream()
886            .try_collect::<Vec<bytes::Bytes>>()
887            .await
888            .expect_err("expect error on incorrect crc32c");
889        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
890        assert!(
891            matches!(
892                source,
893                Some(&ReadError::ChecksumMismatch(
894                    ChecksumMismatch::Crc32c { .. }
895                ))
896            ),
897            "err={err:?}"
898        );
899        Ok(())
900    }
901
902    #[tokio::test]
903    async fn read_object_incorrect_md5_check() -> Result {
904        // Calculate and serialize the md5 checksum
905        let digest = md5::compute("goodbye world".as_bytes());
906        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
907
908        let server = Server::run();
909        server.expect(
910            Expectation::matching(all_of![
911                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
912                request::query(url_decoded(contains(("alt", "media")))),
913            ])
914            .times(1)
915            .respond_with(
916                status_code(200)
917                    .body("hello world")
918                    .append_header("x-goog-hash", format!("md5={value}"))
919                    .append_header("x-goog-generation", 123456),
920            ),
921        );
922
923        let client = Storage::builder()
924            .with_endpoint(format!("http://{}", server.addr()))
925            .with_credentials(Anonymous::new().build())
926            .build()
927            .await?;
928        let mut response = client
929            .read_object("projects/_/buckets/test-bucket", "test-object")
930            .compute_md5()
931            .send()
932            .await?;
933        let mut partial = Vec::new();
934        let mut err = None;
935        while let Some(r) = response.next().await {
936            match r {
937                Ok(b) => partial.extend_from_slice(&b),
938                Err(e) => err = Some(e),
939            };
940        }
941        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
942        let err = err.expect("expect error on incorrect md5");
943        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
944        assert!(
945            matches!(
946                source,
947                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
948            ),
949            "err={err:?}"
950        );
951
952        Ok(())
953    }
954
955    #[tokio::test]
956    async fn read_object() -> Result {
957        let inner = test_inner_client(test_builder()).await;
958        let stub = crate::storage::transport::Storage::new(inner.clone());
959        let builder = ReadObject::new(
960            stub,
961            "projects/_/buckets/bucket",
962            "object",
963            inner.options.clone(),
964        );
965        let request = http_request_builder(inner, builder).await?.build()?;
966
967        assert_eq!(request.method(), reqwest::Method::GET);
968        assert_eq!(
969            request.url().as_str(),
970            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
971        );
972        Ok(())
973    }
974
975    #[tokio::test]
976    async fn read_object_error_credentials() -> Result {
977        let inner = test_inner_client(
978            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
979        )
980        .await;
981        let stub = crate::storage::transport::Storage::new(inner.clone());
982        let builder = ReadObject::new(
983            stub,
984            "projects/_/buckets/bucket",
985            "object",
986            inner.options.clone(),
987        );
988        let _ = http_request_builder(inner, builder)
989            .await
990            .inspect_err(|e| assert!(e.is_authentication()))
991            .expect_err("invalid credentials should err");
992        Ok(())
993    }
994
995    #[tokio::test]
996    async fn read_object_bad_bucket() -> Result {
997        let inner = test_inner_client(test_builder()).await;
998        let stub = crate::storage::transport::Storage::new(inner.clone());
999        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1000        let _ = http_request_builder(inner, builder)
1001            .await
1002            .expect_err("malformed bucket string should error");
1003        Ok(())
1004    }
1005
1006    #[tokio::test]
1007    async fn read_object_query_params() -> Result {
1008        let inner = test_inner_client(test_builder()).await;
1009        let stub = crate::storage::transport::Storage::new(inner.clone());
1010        let builder = ReadObject::new(
1011            stub,
1012            "projects/_/buckets/bucket",
1013            "object",
1014            inner.options.clone(),
1015        )
1016        .set_generation(5)
1017        .set_if_generation_match(10)
1018        .set_if_generation_not_match(20)
1019        .set_if_metageneration_match(30)
1020        .set_if_metageneration_not_match(40);
1021        let request = http_request_builder(inner, builder).await?.build()?;
1022
1023        assert_eq!(request.method(), reqwest::Method::GET);
1024        let want_pairs: HashMap<String, String> = [
1025            ("alt", "media"),
1026            ("generation", "5"),
1027            ("ifGenerationMatch", "10"),
1028            ("ifGenerationNotMatch", "20"),
1029            ("ifMetagenerationMatch", "30"),
1030            ("ifMetagenerationNotMatch", "40"),
1031        ]
1032        .iter()
1033        .map(|(k, v)| (k.to_string(), v.to_string()))
1034        .collect();
1035        let query_pairs: HashMap<String, String> = request
1036            .url()
1037            .query_pairs()
1038            .map(|param| (param.0.to_string(), param.1.to_string()))
1039            .collect();
1040        assert_eq!(query_pairs.len(), want_pairs.len());
1041        assert_eq!(query_pairs, want_pairs);
1042        Ok(())
1043    }
1044
1045    #[tokio::test]
1046    async fn read_object_default_headers() -> Result {
1047        // The API takes the unencoded byte array.
1048        let inner = test_inner_client(test_builder()).await;
1049        let stub = crate::storage::transport::Storage::new(inner.clone());
1050        let builder = ReadObject::new(
1051            stub,
1052            "projects/_/buckets/bucket",
1053            "object",
1054            inner.options.clone(),
1055        );
1056        let request = http_request_builder(inner, builder).await?.build()?;
1057
1058        assert_eq!(request.method(), reqwest::Method::GET);
1059        assert_eq!(
1060            request.url().as_str(),
1061            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1062        );
1063
1064        let want = [("accept-encoding", "gzip")];
1065        let headers = request.headers();
1066        for (name, value) in want {
1067            assert_eq!(
1068                headers.get(name).and_then(|h| h.to_str().ok()),
1069                Some(value),
1070                "{request:?}"
1071            );
1072        }
1073        Ok(())
1074    }
1075
1076    #[tokio::test]
1077    async fn read_object_automatic_decompression_headers() -> Result {
1078        // The API takes the unencoded byte array.
1079        let inner = test_inner_client(test_builder()).await;
1080        let stub = crate::storage::transport::Storage::new(inner.clone());
1081        let builder = ReadObject::new(
1082            stub,
1083            "projects/_/buckets/bucket",
1084            "object",
1085            inner.options.clone(),
1086        )
1087        .with_automatic_decompression(true);
1088        let request = http_request_builder(inner, builder).await?.build()?;
1089
1090        assert_eq!(request.method(), reqwest::Method::GET);
1091        assert_eq!(
1092            request.url().as_str(),
1093            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1094        );
1095
1096        let headers = request.headers();
1097        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1098        Ok(())
1099    }
1100
1101    #[tokio::test]
1102    async fn read_object_encryption_headers() -> Result {
1103        // Make a 32-byte key.
1104        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1105
1106        // The API takes the unencoded byte array.
1107        let inner = test_inner_client(test_builder()).await;
1108        let stub = crate::storage::transport::Storage::new(inner.clone());
1109        let builder = ReadObject::new(
1110            stub,
1111            "projects/_/buckets/bucket",
1112            "object",
1113            inner.options.clone(),
1114        )
1115        .set_key(KeyAes256::new(&key)?);
1116        let request = http_request_builder(inner, builder).await?.build()?;
1117
1118        assert_eq!(request.method(), reqwest::Method::GET);
1119        assert_eq!(
1120            request.url().as_str(),
1121            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1122        );
1123
1124        let want = [
1125            ("x-goog-encryption-algorithm", "AES256".to_string()),
1126            ("x-goog-encryption-key", key_base64),
1127            ("x-goog-encryption-key-sha256", key_sha256_base64),
1128        ];
1129
1130        let headers = request.headers();
1131        for (name, value) in want {
1132            assert_eq!(
1133                headers.get(name).and_then(|h| h.to_str().ok()),
1134                Some(value.as_str())
1135            );
1136        }
1137        Ok(())
1138    }
1139
1140    #[test_case(ReadRange::all(), None; "no headers needed")]
1141    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1142    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1143    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1144    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1145    #[tokio::test]
1146    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1147        let inner = test_inner_client(test_builder()).await;
1148        let stub = crate::storage::transport::Storage::new(inner.clone());
1149        let builder = ReadObject::new(
1150            stub,
1151            "projects/_/buckets/bucket",
1152            "object",
1153            inner.options.clone(),
1154        )
1155        .set_read_range(input.clone());
1156        let request = http_request_builder(inner, builder).await?.build()?;
1157
1158        assert_eq!(request.method(), reqwest::Method::GET);
1159        assert_eq!(
1160            request.url().as_str(),
1161            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1162        );
1163
1164        assert_eq!(request.headers().get("range"), want);
1165        Ok(())
1166    }
1167
1168    #[test_case("projects/p", "projects%2Fp")]
1169    #[test_case("kebab-case", "kebab-case")]
1170    #[test_case("dot.name", "dot.name")]
1171    #[test_case("under_score", "under_score")]
1172    #[test_case("tilde~123", "tilde~123")]
1173    #[test_case("exclamation!point!", "exclamation%21point%21")]
1174    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1175    #[test_case("preserve%percent%21", "preserve%percent%21")]
1176    #[test_case(
1177        "testall !#$&'()*+,/:;=?@[]",
1178        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1179    )]
1180    #[tokio::test]
1181    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1182        let inner = test_inner_client(test_builder()).await;
1183        let stub = crate::storage::transport::Storage::new(inner.clone());
1184        let builder = ReadObject::new(
1185            stub,
1186            "projects/_/buckets/bucket",
1187            name,
1188            inner.options.clone(),
1189        );
1190        let request = http_request_builder(inner, builder).await?.build()?;
1191        let got = request.url().path_segments().unwrap().next_back().unwrap();
1192        assert_eq!(got, want);
1193        Ok(())
1194    }
1195
1196    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1197    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1198    #[test_case("warning", "214 UploadServer gunzipped", true)]
1199    #[test_case("warning", "no match", false)]
1200    #[test_case("unused", "unused", false)]
1201    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1202        let response = http::Response::builder()
1203            .status(200)
1204            .header(name, value)
1205            .body(Vec::new())?;
1206        let response = reqwest::Response::from(response);
1207        let got = Reader::is_gunzipped(&response);
1208        assert_eq!(got, want, "{response:?}");
1209        Ok(())
1210    }
1211}