Skip to main content

google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod non_resumable;
16mod parse_http_response;
17mod resumable;
18
19use super::client::*;
20use super::*;
21use crate::model_ext::KeyAes256;
22use crate::read_object::ReadObjectResponse;
23use crate::read_resume_policy::ReadResumePolicy;
24use crate::storage::checksum::details::Md5;
25use crate::storage::request_options::RequestOptions;
26use gaxi::http::{
27    map_send_error,
28    reqwest::{HeaderValue, Method, RequestBuilder, Response},
29};
30
31/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
32///
33/// # Example: accumulate the contents of an object into a vector
34/// ```
35/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
36/// async fn sample(client: &Storage) -> anyhow::Result<()> {
37///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
38///     let mut reader = builder.send().await?;
39///     let mut contents = Vec::new();
40///     while let Some(chunk) = reader.next().await.transpose()? {
41///         contents.extend_from_slice(&chunk);
42///     }
43///     println!("object contents={:?}", contents);
44///     Ok(())
45/// }
46/// ```
47///
48/// # Example: read part of an object
49/// ```
50/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
51/// use google_cloud_storage::model_ext::ReadRange;
52/// async fn sample(client: &Storage) -> anyhow::Result<()> {
53///     const MIB: u64 = 1024 * 1024;
54///     let mut contents = Vec::new();
55///     let mut reader = client
56///         .read_object("projects/_/buckets/my-bucket", "my-object")
57///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
58///         .send()
59///         .await?;
60///     while let Some(chunk) = reader.next().await.transpose()? {
61///         contents.extend_from_slice(&chunk);
62///     }
63///     println!("range contents={:?}", contents);
64///     Ok(())
65/// }
66/// ```
67#[derive(Debug)]
68pub struct ReadObject<S = crate::storage::transport::Storage>
69where
70    S: crate::storage::stub::Storage + 'static,
71{
72    stub: std::sync::Arc<S>,
73    request: crate::model::ReadObjectRequest,
74    options: RequestOptions,
75}
76
77impl<S> Clone for ReadObject<S>
78where
79    S: crate::storage::stub::Storage + 'static,
80{
81    fn clone(&self) -> Self {
82        Self {
83            stub: self.stub.clone(),
84            request: self.request.clone(),
85            options: self.options.clone(),
86        }
87    }
88}
89
90impl<S> ReadObject<S>
91where
92    S: crate::storage::stub::Storage + 'static,
93{
94    pub(crate) fn new<B, O>(
95        stub: std::sync::Arc<S>,
96        bucket: B,
97        object: O,
98        options: RequestOptions,
99    ) -> Self
100    where
101        B: Into<String>,
102        O: Into<String>,
103    {
104        ReadObject {
105            stub,
106            request: crate::model::ReadObjectRequest::new()
107                .set_bucket(bucket)
108                .set_object(object),
109            options,
110        }
111    }
112
113    /// Enables computation of MD5 hashes.
114    ///
115    /// Crc32c hashes are checked by default.
116    ///
117    /// Checksum validation is supported iff:
118    /// 1. The full content is requested.
119    /// 2. All of the content is returned (status != PartialContent).
120    /// 3. The server sent a checksum header.
121    /// 4. The http stack did not uncompress the file.
122    /// 5. The server did not uncompress data on read.
123    ///
124    /// # Example
125    /// ```
126    /// # use google_cloud_storage::client::Storage;
127    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
128    /// let builder =  client
129    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
130    ///     .compute_md5();
131    /// let mut reader = builder
132    ///     .send()
133    ///     .await?;
134    /// let mut contents = Vec::new();
135    /// while let Some(chunk) = reader.next().await.transpose()? {
136    ///     contents.extend_from_slice(&chunk);
137    /// }
138    /// println!("object contents={:?}", contents);
139    /// # Ok(()) }
140    /// ```
141    pub fn compute_md5(self) -> Self {
142        let mut this = self;
143        this.options.checksum.md5_hash = Some(Md5::default());
144        this
145    }
146
147    /// If present, selects a specific revision of this object (as
148    /// opposed to the latest version, the default).
149    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
150        self.request.generation = v.into();
151        self
152    }
153
154    /// Makes the operation conditional on whether the object's current generation
155    /// matches the given value. Setting to 0 makes the operation succeed only if
156    /// there are no live versions of the object.
157    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
158    where
159        T: Into<i64>,
160    {
161        self.request.if_generation_match = Some(v.into());
162        self
163    }
164
165    /// Makes the operation conditional on whether the object's live generation
166    /// does not match the given value. If no live object exists, the precondition
167    /// fails. Setting to 0 makes the operation succeed only if there is a live
168    /// version of the object.
169    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
170    where
171        T: Into<i64>,
172    {
173        self.request.if_generation_not_match = Some(v.into());
174        self
175    }
176
177    /// Makes the operation conditional on whether the object's current
178    /// metageneration matches the given value.
179    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
180    where
181        T: Into<i64>,
182    {
183        self.request.if_metageneration_match = Some(v.into());
184        self
185    }
186
187    /// Makes the operation conditional on whether the object's current
188    /// metageneration does not match the given value.
189    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
190    where
191        T: Into<i64>,
192    {
193        self.request.if_metageneration_not_match = Some(v.into());
194        self
195    }
196
197    /// The range of bytes to return in the read.
198    ///
199    /// This can be all the bytes starting at a given offset
200    /// (`ReadRange::offset()`), all the bytes in an explicit range
201    /// (`Range::segment`), or the last N bytes of the object
202    /// (`ReadRange::tail`).
203    ///
204    /// # Examples
205    ///
206    /// Read starting at 100 bytes to end of file.
207    /// ```
208    /// # use google_cloud_storage::client::Storage;
209    /// # use google_cloud_storage::model_ext::ReadRange;
210    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
211    /// let response = client
212    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
213    ///     .set_read_range(ReadRange::offset(100))
214    ///     .send()
215    ///     .await?;
216    /// println!("response details={response:?}");
217    /// # Ok(()) }
218    /// ```
219    ///
220    /// Read last 100 bytes of file:
221    /// ```
222    /// # use google_cloud_storage::client::Storage;
223    /// # use google_cloud_storage::model_ext::ReadRange;
224    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
225    /// let response = client
226    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
227    ///     .set_read_range(ReadRange::tail(100))
228    ///     .send()
229    ///     .await?;
230    /// println!("response details={response:?}");
231    /// # Ok(()) }
232    /// ```
233    ///
234    /// Read bytes 1000 to 1099.
235    /// ```
236    /// # use google_cloud_storage::client::Storage;
237    /// # use google_cloud_storage::model_ext::ReadRange;
238    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
239    /// let response = client
240    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
241    ///     .set_read_range(ReadRange::segment(1000, 100))
242    ///     .send()
243    ///     .await?;
244    /// println!("response details={response:?}");
245    /// # Ok(()) }
246    /// ```
247    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
248        self.request.with_range(range);
249        self
250    }
251
252    /// The encryption key used with the Customer-Supplied Encryption Keys
253    /// feature. In raw bytes format (not base64-encoded).
254    ///
255    /// Example:
256    /// ```
257    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
258    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
259    /// let key: &[u8] = &[97; 32];
260    /// let response = client
261    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
262    ///     .set_key(KeyAes256::new(key)?)
263    ///     .send()
264    ///     .await?;
265    /// println!("response details={response:?}");
266    /// # Ok(()) }
267    /// ```
268    pub fn set_key(mut self, v: KeyAes256) -> Self {
269        self.request.common_object_request_params = Some(v.into());
270        self
271    }
272
273    /// The retry policy used for this request.
274    ///
275    /// # Example
276    /// ```
277    /// # use google_cloud_storage::client::Storage;
278    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
279    /// use google_cloud_storage::retry_policy::RetryableErrors;
280    /// use std::time::Duration;
281    /// use google_cloud_gax::retry_policy::RetryPolicyExt;
282    /// let response = client
283    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
284    ///     .with_retry_policy(
285    ///         RetryableErrors
286    ///             .with_attempt_limit(5)
287    ///             .with_time_limit(Duration::from_secs(10)),
288    ///     )
289    ///     .send()
290    ///     .await?;
291    /// println!("response details={response:?}");
292    /// # Ok(()) }
293    /// ```
294    pub fn with_retry_policy<V: Into<google_cloud_gax::retry_policy::RetryPolicyArg>>(
295        mut self,
296        v: V,
297    ) -> Self {
298        self.options.retry_policy = v.into().into();
299        self
300    }
301
302    /// The backoff policy used for this request.
303    ///
304    /// # Example
305    /// ```
306    /// # use google_cloud_storage::client::Storage;
307    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
308    /// use std::time::Duration;
309    /// use google_cloud_gax::exponential_backoff::ExponentialBackoff;
310    /// let response = client
311    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
312    ///     .with_backoff_policy(ExponentialBackoff::default())
313    ///     .send()
314    ///     .await?;
315    /// println!("response details={response:?}");
316    /// # Ok(()) }
317    /// ```
318    pub fn with_backoff_policy<V: Into<google_cloud_gax::backoff_policy::BackoffPolicyArg>>(
319        mut self,
320        v: V,
321    ) -> Self {
322        self.options.backoff_policy = v.into().into();
323        self
324    }
325
326    /// The retry throttler used for this request.
327    ///
328    /// Most of the time you want to use the same throttler for all the requests
329    /// in a client, and even the same throttler for many clients. Rarely it
330    /// may be necessary to use an custom throttler for some subset of the
331    /// requests.
332    ///
333    /// # Example
334    /// ```
335    /// # use google_cloud_storage::client::Storage;
336    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
337    /// let response = client
338    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
339    ///     .with_retry_throttler(adhoc_throttler())
340    ///     .send()
341    ///     .await?;
342    /// println!("response details={response:?}");
343    /// fn adhoc_throttler() -> google_cloud_gax::retry_throttler::SharedRetryThrottler {
344    ///     # panic!();
345    /// }
346    /// # Ok(()) }
347    /// ```
348    pub fn with_retry_throttler<V: Into<google_cloud_gax::retry_throttler::RetryThrottlerArg>>(
349        mut self,
350        v: V,
351    ) -> Self {
352        self.options.retry_throttler = v.into().into();
353        self
354    }
355
356    /// Configure the resume policy for read requests.
357    ///
358    /// The Cloud Storage client library can automatically resume a read that is
359    /// interrupted by a transient error. Applications may want to limit the
360    /// number of read attempts, or may wish to expand the type of errors
361    /// treated as retryable.
362    ///
363    /// # Example
364    /// ```
365    /// # use google_cloud_storage::client::Storage;
366    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
367    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
368    /// let response = client
369    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
370    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
371    ///     .send()
372    ///     .await?;
373    /// # Ok(()) }
374    /// ```
375    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
376    where
377        V: ReadResumePolicy + 'static,
378    {
379        self.options.set_read_resume_policy(std::sync::Arc::new(v));
380        self
381    }
382
383    /// Enables automatic decompression.
384    ///
385    /// The Cloud Storage service [automatically decompresses] objects
386    /// with `content_encoding == "gzip"` during reads. The client library
387    /// disables this behavior by default, as it is not possible to
388    /// perform ranged reads or to resume interrupted downloads if automatic
389    /// decompression is enabled.
390    ///
391    /// Use this option to enable automatic decompression.
392    ///
393    /// # Example
394    /// ```
395    /// # use google_cloud_storage::client::Storage;
396    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
397    /// let response = client
398    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
399    ///     .with_automatic_decompression(true)
400    ///     .send()
401    ///     .await?;
402    /// println!("response details={response:?}");
403    /// # Ok(()) }
404    /// ```
405    pub fn with_automatic_decompression(mut self, v: bool) -> Self {
406        self.options.automatic_decompression = v;
407        self
408    }
409
410    /// Sends the request.
411    pub async fn send(self) -> Result<ReadObjectResponse> {
412        self.stub.read_object(self.request, self.options).await
413    }
414}
415
416// A convenience struct that saves the request conditions and performs the read.
417#[derive(Clone, Debug)]
418pub(crate) struct Reader {
419    pub inner: std::sync::Arc<StorageInner>,
420    pub request: crate::model::ReadObjectRequest,
421    pub options: RequestOptions,
422}
423
424impl Reader {
425    async fn read(self) -> Result<Response> {
426        let throttler = self.options.retry_throttler.clone();
427        let retry = self.options.retry_policy.clone();
428        let backoff = self.options.backoff_policy.clone();
429
430        google_cloud_gax::retry_loop_internal::retry_loop(
431            async move |_| self.read_attempt().await,
432            async |duration| tokio::time::sleep(duration).await,
433            true,
434            throttler,
435            retry,
436            backoff,
437        )
438        .await
439    }
440
441    async fn read_attempt(&self) -> Result<Response> {
442        let builder = self.http_request_builder().await?;
443        let response = builder.send().await.map_err(map_send_error)?;
444        if !response.status().is_success() {
445            return gaxi::http::to_http_error(response).await;
446        }
447        Ok(response)
448    }
449
450    async fn http_request_builder(&self) -> Result<RequestBuilder> {
451        // Collect the required bucket and object parameters.
452        let bucket = &self.request.bucket;
453        let bucket_id = bucket
454            .as_str()
455            .strip_prefix("projects/_/buckets/")
456            .ok_or_else(|| {
457                Error::binding(format!(
458                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
459                ))
460            })?;
461        let object = &self.request.object;
462
463        // Build the request.
464        let builder = self
465            .inner
466            .client
467            .builder(
468                Method::GET,
469                format!("/storage/v1/b/{bucket_id}/o/{}", enc(object)),
470            )
471            .query(&[("alt", "media")])
472            .header(
473                "x-goog-api-client",
474                HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
475            );
476
477        let builder = if self.options.automatic_decompression {
478            builder
479        } else {
480            // Disable decompressive transcoding: https://cloud.google.com/storage/docs/transcoding
481            //
482            // The default is to decompress objects that have `contentEncoding == "gzip"`. This header
483            // tells Cloud Storage to disable automatic decompression. It has no effect on objects
484            // with a different `contentEncoding` value.
485            builder.header("accept-encoding", HeaderValue::from_static("gzip"))
486        };
487
488        // Add the optional query parameters.
489        let builder = if self.request.generation != 0 {
490            builder.query(&[("generation", self.request.generation)])
491        } else {
492            builder
493        };
494        let builder = self
495            .request
496            .if_generation_match
497            .iter()
498            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
499        let builder = self
500            .request
501            .if_generation_not_match
502            .iter()
503            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
504        let builder = self
505            .request
506            .if_metageneration_match
507            .iter()
508            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
509        let builder = self
510            .request
511            .if_metageneration_not_match
512            .iter()
513            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
514
515        let builder = apply_customer_supplied_encryption_headers(
516            builder,
517            &self.request.common_object_request_params,
518        );
519
520        // Apply "range" header for read limits and offsets.
521        let builder = match (self.request.read_offset, self.request.read_limit) {
522            // read_limit can't be negative.
523            (_, l) if l < 0 => {
524                unreachable!("ReadObject build never sets a negative read_limit value")
525            }
526            // negative offset can't also have a read_limit.
527            (o, l) if o < 0 && l > 0 => unreachable!(
528                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
529            ),
530            // If both are zero, we use default implementation (no range header).
531            (0, 0) => builder,
532            // negative offset with no limit means the last N bytes.
533            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
534            // read_limit is zero, means no limit. Read from offset to end of file.
535            // This handles cases like (5, 0) -> "bytes=5-"
536            (o, 0) => builder.header("range", format!("bytes={o}-")),
537            // General case: non-negative offset and positive limit.
538            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
539            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
540        };
541
542        self.inner.apply_auth_headers(builder).await
543    }
544
545    fn is_gunzipped(response: &Response) -> bool {
546        // Cloud Storage automatically [decompresses gzip-compressed][transcoding]
547        // objects. Reading such objects comes with a number of restrictions:
548        // - Ranged reads do not work.
549        // - The size of the decompressed data is not known.
550        // - Checksums do not work because the object checksums correspond to the
551        //   compressed data and the client library receives the decompressed data.
552        //
553        // Because ranged reads do not work, resuming a read does not work. Consequently,
554        // the implementation of `ReadObjectResponse` is substantially different for
555        // objects that are gunzipped.
556        //
557        // [transcoding]: https://cloud.google.com/storage/docs/transcoding
558        const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
559        use http::header::WARNING;
560        if response
561            .headers()
562            .get(TRANSFORMATION)
563            .is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
564        {
565            return true;
566        }
567        response
568            .headers()
569            .get(WARNING)
570            .is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
571    }
572
573    pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
574        let response = self.clone().read().await?;
575        if Self::is_gunzipped(&response) {
576            return Ok(ReadObjectResponse::new(Box::new(
577                non_resumable::NonResumableResponse::new(response)?,
578            )));
579        }
580        Ok(ReadObjectResponse::new(Box::new(
581            resumable::ResumableResponse::new(self, response)?,
582        )))
583    }
584}
585
586#[cfg(test)]
587mod resume_tests;
588
589#[cfg(test)]
590mod tests {
591    use super::client::tests::{test_builder, test_inner_client};
592    use super::*;
593    use crate::error::{ChecksumMismatch, ReadError};
594    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
595    use base64::Engine;
596    use futures::TryStreamExt;
597    use google_cloud_auth::credentials::{
598        anonymous::Builder as Anonymous, testing::error_credentials,
599    };
600    use httptest::{Expectation, Server, matchers::*, responders::status_code};
601    use std::collections::HashMap;
602    use std::error::Error;
603    use std::sync::Arc;
604    use test_case::test_case;
605
606    type Result = anyhow::Result<()>;
607
608    async fn http_request_builder(
609        inner: Arc<StorageInner>,
610        builder: ReadObject,
611    ) -> crate::Result<RequestBuilder> {
612        let reader = Reader {
613            inner,
614            request: builder.request,
615            options: builder.options,
616        };
617        reader.http_request_builder().await
618    }
619
620    #[tokio::test]
621    async fn test_clone() {
622        let inner = test_inner_client(test_builder()).await;
623        let stub = crate::storage::transport::Storage::new(inner.clone());
624        let options = {
625            let mut o = RequestOptions::new();
626            o.set_resumable_upload_threshold(12345_usize);
627            o
628        };
629        let builder = ReadObject::new(stub, "projects/_/buckets/bucket", "object", options);
630
631        let clone = builder.clone();
632        assert!(Arc::ptr_eq(&clone.stub, &builder.stub));
633        assert_eq!(clone.request, builder.request);
634        assert_eq!(clone.options.resumable_upload_threshold(), 12345_usize);
635    }
636
637    // Verify `read_object()` meets normal Send, Sync, requirements.
638    #[tokio::test]
639    async fn test_read_is_send_and_static() -> Result {
640        let client = Storage::builder()
641            .with_credentials(Anonymous::new().build())
642            .build()
643            .await?;
644
645        fn need_send<T: Send>(_val: &T) {}
646        fn need_sync<T: Sync>(_val: &T) {}
647        fn need_static<T: 'static>(_val: &T) {}
648
649        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
650        need_send(&read);
651        need_sync(&read);
652        need_static(&read);
653
654        let read = client
655            .read_object("projects/_/buckets/test-bucket", "test-object")
656            .send();
657        need_send(&read);
658        need_static(&read);
659
660        Ok(())
661    }
662
663    #[tokio::test]
664    async fn read_object_normal() -> Result {
665        let server = Server::run();
666        server.expect(
667            Expectation::matching(all_of![
668                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
669                request::headers(contains(("accept-encoding", "gzip"))),
670                request::query(url_decoded(contains(("alt", "media")))),
671            ])
672            .respond_with(
673                status_code(200)
674                    .body("hello world")
675                    .append_header("x-goog-generation", 123456),
676            ),
677        );
678
679        let client = Storage::builder()
680            .with_endpoint(format!("http://{}", server.addr()))
681            .with_credentials(Anonymous::new().build())
682            .build()
683            .await?;
684        let mut reader = client
685            .read_object("projects/_/buckets/test-bucket", "test-object")
686            .send()
687            .await?;
688        let mut got = Vec::new();
689        while let Some(b) = reader.next().await.transpose()? {
690            got.extend_from_slice(&b);
691        }
692        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
693
694        Ok(())
695    }
696
697    #[tokio::test]
698    async fn read_object_stream() -> Result {
699        let server = Server::run();
700        server.expect(
701            Expectation::matching(all_of![
702                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
703                request::query(url_decoded(contains(("alt", "media")))),
704            ])
705            .respond_with(
706                status_code(200)
707                    .append_header("x-goog-generation", 123456)
708                    .body("hello world"),
709            ),
710        );
711
712        let client = Storage::builder()
713            .with_endpoint(format!("http://{}", server.addr()))
714            .with_credentials(Anonymous::new().build())
715            .build()
716            .await?;
717        let response = client
718            .read_object("projects/_/buckets/test-bucket", "test-object")
719            .send()
720            .await?;
721        let result: Vec<_> = response.into_stream().try_collect().await?;
722        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
723
724        Ok(())
725    }
726
727    #[tokio::test]
728    async fn read_object_next_then_consume_response() -> Result {
729        // Create a large enough file that will require multiple chunks to read.
730        const BLOCK_SIZE: usize = 500;
731        let mut contents = Vec::new();
732        for i in 0..50 {
733            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
734        }
735
736        // Calculate and serialize the crc32c checksum
737        let u = crc32c::crc32c(&contents);
738        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
739
740        let server = Server::run();
741        server.expect(
742            Expectation::matching(all_of![
743                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
744                request::query(url_decoded(contains(("alt", "media")))),
745            ])
746            .times(1)
747            .respond_with(
748                status_code(200)
749                    .body(contents.clone())
750                    .append_header("x-goog-hash", format!("crc32c={value}"))
751                    .append_header("x-goog-generation", 123456),
752            ),
753        );
754
755        let client = Storage::builder()
756            .with_endpoint(format!("http://{}", server.addr()))
757            .with_credentials(Anonymous::new().build())
758            .build()
759            .await?;
760
761        // Read some bytes, then remainder with stream.
762        let mut response = client
763            .read_object("projects/_/buckets/test-bucket", "test-object")
764            .send()
765            .await?;
766
767        let mut all_bytes = bytes::BytesMut::new();
768        let chunk = response.next().await.transpose()?.unwrap();
769        assert!(!chunk.is_empty());
770        all_bytes.extend(chunk);
771        use futures::StreamExt;
772        let mut stream = response.into_stream();
773        while let Some(chunk) = stream.next().await.transpose()? {
774            all_bytes.extend(chunk);
775        }
776        assert_eq!(all_bytes, contents);
777
778        Ok(())
779    }
780
781    #[tokio::test]
782    async fn read_object_not_found() -> Result {
783        let server = Server::run();
784        server.expect(
785            Expectation::matching(all_of![
786                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
787                request::query(url_decoded(contains(("alt", "media")))),
788            ])
789            .respond_with(status_code(404).body("NOT FOUND")),
790        );
791
792        let client = Storage::builder()
793            .with_endpoint(format!("http://{}", server.addr()))
794            .with_credentials(Anonymous::new().build())
795            .build()
796            .await?;
797        let err = client
798            .read_object("projects/_/buckets/test-bucket", "test-object")
799            .send()
800            .await
801            .expect_err("expected a not found error");
802        assert_eq!(err.http_status_code(), Some(404));
803
804        Ok(())
805    }
806
807    #[tokio::test]
808    async fn read_object_incorrect_crc32c_check() -> Result {
809        // Calculate and serialize the crc32c checksum
810        let u = crc32c::crc32c("goodbye world".as_bytes());
811        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
812
813        let server = Server::run();
814        server.expect(
815            Expectation::matching(all_of![
816                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
817                request::query(url_decoded(contains(("alt", "media")))),
818            ])
819            .times(3)
820            .respond_with(
821                status_code(200)
822                    .body("hello world")
823                    .append_header("x-goog-hash", format!("crc32c={value}"))
824                    .append_header("x-goog-generation", 123456),
825            ),
826        );
827
828        let client = Storage::builder()
829            .with_endpoint(format!("http://{}", server.addr()))
830            .with_credentials(Anonymous::new().build())
831            .build()
832            .await?;
833        let mut response = client
834            .read_object("projects/_/buckets/test-bucket", "test-object")
835            .send()
836            .await?;
837        let mut partial = Vec::new();
838        let mut err = None;
839        while let Some(r) = response.next().await {
840            match r {
841                Ok(b) => partial.extend_from_slice(&b),
842                Err(e) => err = Some(e),
843            };
844        }
845        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
846        let err = err.expect("expect error on incorrect crc32c");
847        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
848        assert!(
849            matches!(
850                source,
851                Some(&ReadError::ChecksumMismatch(
852                    ChecksumMismatch::Crc32c { .. }
853                ))
854            ),
855            "err={err:?}"
856        );
857
858        let mut response = client
859            .read_object("projects/_/buckets/test-bucket", "test-object")
860            .send()
861            .await?;
862        let err: crate::Error = async {
863            {
864                while (response.next().await.transpose()?).is_some() {}
865                Ok(())
866            }
867        }
868        .await
869        .expect_err("expect error on incorrect crc32c");
870        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
871        assert!(
872            matches!(
873                source,
874                Some(&ReadError::ChecksumMismatch(
875                    ChecksumMismatch::Crc32c { .. }
876                ))
877            ),
878            "err={err:?}"
879        );
880
881        use futures::TryStreamExt;
882        let err = client
883            .read_object("projects/_/buckets/test-bucket", "test-object")
884            .send()
885            .await?
886            .into_stream()
887            .try_collect::<Vec<bytes::Bytes>>()
888            .await
889            .expect_err("expect error on incorrect crc32c");
890        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
891        assert!(
892            matches!(
893                source,
894                Some(&ReadError::ChecksumMismatch(
895                    ChecksumMismatch::Crc32c { .. }
896                ))
897            ),
898            "err={err:?}"
899        );
900        Ok(())
901    }
902
903    #[tokio::test]
904    async fn read_object_incorrect_md5_check() -> Result {
905        // Calculate and serialize the md5 checksum
906        let digest = md5::compute("goodbye world".as_bytes());
907        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
908
909        let server = Server::run();
910        server.expect(
911            Expectation::matching(all_of![
912                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
913                request::query(url_decoded(contains(("alt", "media")))),
914            ])
915            .times(1)
916            .respond_with(
917                status_code(200)
918                    .body("hello world")
919                    .append_header("x-goog-hash", format!("md5={value}"))
920                    .append_header("x-goog-generation", 123456),
921            ),
922        );
923
924        let client = Storage::builder()
925            .with_endpoint(format!("http://{}", server.addr()))
926            .with_credentials(Anonymous::new().build())
927            .build()
928            .await?;
929        let mut response = client
930            .read_object("projects/_/buckets/test-bucket", "test-object")
931            .compute_md5()
932            .send()
933            .await?;
934        let mut partial = Vec::new();
935        let mut err = None;
936        while let Some(r) = response.next().await {
937            match r {
938                Ok(b) => partial.extend_from_slice(&b),
939                Err(e) => err = Some(e),
940            };
941        }
942        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
943        let err = err.expect("expect error on incorrect md5");
944        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
945        assert!(
946            matches!(
947                source,
948                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
949            ),
950            "err={err:?}"
951        );
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn read_object() -> Result {
958        let inner = test_inner_client(test_builder()).await;
959        let stub = crate::storage::transport::Storage::new(inner.clone());
960        let builder = ReadObject::new(
961            stub,
962            "projects/_/buckets/bucket",
963            "object",
964            inner.options.clone(),
965        );
966        let request = http_request_builder(inner, builder).await?.build()?;
967
968        assert_eq!(request.method(), Method::GET);
969        assert_eq!(
970            request.url().as_str(),
971            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
972        );
973        Ok(())
974    }
975
976    #[tokio::test]
977    async fn read_object_error_credentials() -> Result {
978        let inner =
979            test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
980        let stub = crate::storage::transport::Storage::new(inner.clone());
981        let builder = ReadObject::new(
982            stub,
983            "projects/_/buckets/bucket",
984            "object",
985            inner.options.clone(),
986        );
987        let _ = http_request_builder(inner, builder)
988            .await
989            .inspect_err(|e| assert!(e.is_authentication()))
990            .expect_err("invalid credentials should err");
991        Ok(())
992    }
993
994    #[tokio::test]
995    async fn read_object_bad_bucket() -> Result {
996        let inner = test_inner_client(test_builder()).await;
997        let stub = crate::storage::transport::Storage::new(inner.clone());
998        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
999        let _ = http_request_builder(inner, builder)
1000            .await
1001            .expect_err("malformed bucket string should error");
1002        Ok(())
1003    }
1004
1005    #[tokio::test]
1006    async fn read_object_query_params() -> Result {
1007        let inner = test_inner_client(test_builder()).await;
1008        let stub = crate::storage::transport::Storage::new(inner.clone());
1009        let builder = ReadObject::new(
1010            stub,
1011            "projects/_/buckets/bucket",
1012            "object",
1013            inner.options.clone(),
1014        )
1015        .set_generation(5)
1016        .set_if_generation_match(10)
1017        .set_if_generation_not_match(20)
1018        .set_if_metageneration_match(30)
1019        .set_if_metageneration_not_match(40);
1020        let request = http_request_builder(inner, builder).await?.build()?;
1021
1022        assert_eq!(request.method(), Method::GET);
1023        let want_pairs: HashMap<String, String> = [
1024            ("alt", "media"),
1025            ("generation", "5"),
1026            ("ifGenerationMatch", "10"),
1027            ("ifGenerationNotMatch", "20"),
1028            ("ifMetagenerationMatch", "30"),
1029            ("ifMetagenerationNotMatch", "40"),
1030        ]
1031        .iter()
1032        .map(|(k, v)| (k.to_string(), v.to_string()))
1033        .collect();
1034        let query_pairs: HashMap<String, String> = request
1035            .url()
1036            .query_pairs()
1037            .map(|param| (param.0.to_string(), param.1.to_string()))
1038            .collect();
1039        assert_eq!(query_pairs.len(), want_pairs.len());
1040        assert_eq!(query_pairs, want_pairs);
1041        Ok(())
1042    }
1043
1044    #[tokio::test]
1045    async fn read_object_default_headers() -> Result {
1046        // The API takes the unencoded byte array.
1047        let inner = test_inner_client(test_builder()).await;
1048        let stub = crate::storage::transport::Storage::new(inner.clone());
1049        let builder = ReadObject::new(
1050            stub,
1051            "projects/_/buckets/bucket",
1052            "object",
1053            inner.options.clone(),
1054        );
1055        let request = http_request_builder(inner, builder).await?.build()?;
1056
1057        assert_eq!(request.method(), Method::GET);
1058        assert_eq!(
1059            request.url().as_str(),
1060            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1061        );
1062
1063        let want = [("accept-encoding", "gzip")];
1064        let headers = request.headers();
1065        for (name, value) in want {
1066            assert_eq!(
1067                headers.get(name).and_then(|h| h.to_str().ok()),
1068                Some(value),
1069                "{request:?}"
1070            );
1071        }
1072        Ok(())
1073    }
1074
1075    #[tokio::test]
1076    async fn read_object_automatic_decompression_headers() -> Result {
1077        // The API takes the unencoded byte array.
1078        let inner = test_inner_client(test_builder()).await;
1079        let stub = crate::storage::transport::Storage::new(inner.clone());
1080        let builder = ReadObject::new(
1081            stub,
1082            "projects/_/buckets/bucket",
1083            "object",
1084            inner.options.clone(),
1085        )
1086        .with_automatic_decompression(true);
1087        let request = http_request_builder(inner, builder).await?.build()?;
1088
1089        assert_eq!(request.method(), Method::GET);
1090        assert_eq!(
1091            request.url().as_str(),
1092            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1093        );
1094
1095        let headers = request.headers();
1096        assert!(headers.get("accept-encoding").is_none(), "{request:?}");
1097        Ok(())
1098    }
1099
1100    #[tokio::test]
1101    async fn read_object_encryption_headers() -> Result {
1102        // Make a 32-byte key.
1103        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1104
1105        // The API takes the unencoded byte array.
1106        let inner = test_inner_client(test_builder()).await;
1107        let stub = crate::storage::transport::Storage::new(inner.clone());
1108        let builder = ReadObject::new(
1109            stub,
1110            "projects/_/buckets/bucket",
1111            "object",
1112            inner.options.clone(),
1113        )
1114        .set_key(KeyAes256::new(&key)?);
1115        let request = http_request_builder(inner, builder).await?.build()?;
1116
1117        assert_eq!(request.method(), Method::GET);
1118        assert_eq!(
1119            request.url().as_str(),
1120            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1121        );
1122
1123        let want = [
1124            ("x-goog-encryption-algorithm", "AES256".to_string()),
1125            ("x-goog-encryption-key", key_base64),
1126            ("x-goog-encryption-key-sha256", key_sha256_base64),
1127        ];
1128
1129        let headers = request.headers();
1130        for (name, value) in want {
1131            assert_eq!(
1132                headers.get(name).and_then(|h| h.to_str().ok()),
1133                Some(value.as_str())
1134            );
1135        }
1136        Ok(())
1137    }
1138
1139    #[test_case(ReadRange::all(), None; "no headers needed")]
1140    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1141    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1142    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1143    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1144    #[tokio::test]
1145    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1146        let inner = test_inner_client(test_builder()).await;
1147        let stub = crate::storage::transport::Storage::new(inner.clone());
1148        let builder = ReadObject::new(
1149            stub,
1150            "projects/_/buckets/bucket",
1151            "object",
1152            inner.options.clone(),
1153        )
1154        .set_read_range(input.clone());
1155        let request = http_request_builder(inner, builder).await?.build()?;
1156
1157        assert_eq!(request.method(), Method::GET);
1158        assert_eq!(
1159            request.url().as_str(),
1160            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1161        );
1162
1163        assert_eq!(request.headers().get("range"), want);
1164        Ok(())
1165    }
1166
1167    #[test_case("projects/p", "projects%2Fp")]
1168    #[test_case("kebab-case", "kebab-case")]
1169    #[test_case("dot.name", "dot.name")]
1170    #[test_case("under_score", "under_score")]
1171    #[test_case("tilde~123", "tilde~123")]
1172    #[test_case("exclamation!point!", "exclamation%21point%21")]
1173    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1174    #[test_case("preserve%percent%21", "preserve%percent%21")]
1175    #[test_case(
1176        "testall !#$&'()*+,/:;=?@[]",
1177        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1178    )]
1179    #[tokio::test]
1180    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1181        let inner = test_inner_client(test_builder()).await;
1182        let stub = crate::storage::transport::Storage::new(inner.clone());
1183        let builder = ReadObject::new(
1184            stub,
1185            "projects/_/buckets/bucket",
1186            name,
1187            inner.options.clone(),
1188        );
1189        let request = http_request_builder(inner, builder).await?.build()?;
1190        let got = request.url().path_segments().unwrap().next_back().unwrap();
1191        assert_eq!(got, want);
1192        Ok(())
1193    }
1194
1195    #[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
1196    #[test_case("x-guploader-response-body-transformations", "no match", false)]
1197    #[test_case("warning", "214 UploadServer gunzipped", true)]
1198    #[test_case("warning", "no match", false)]
1199    #[test_case("unused", "unused", false)]
1200    fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
1201        let response = http::Response::builder()
1202            .status(200)
1203            .header(name, value)
1204            .body(Vec::new())?;
1205        let response = Response::from(response);
1206        let got = Reader::is_gunzipped(&response);
1207        assert_eq!(got, want, "{response:?}");
1208        Ok(())
1209    }
1210}