google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::*;
16use super::*;
17use crate::download_resume_policy::DownloadResumePolicy;
18use crate::error::{RangeError, ReadError};
19use crate::model::ObjectChecksums;
20use crate::storage::checksum::{
21    ChecksumEngine,
22    details::{Crc32c, Md5, validate},
23};
24use base64::Engine;
25#[cfg(feature = "unstable-stream")]
26use futures::Stream;
27use serde_with::DeserializeAs;
28
29/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
30///
31/// # Example: accumulate the contents of an object into a vector
32/// ```
33/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
34/// async fn sample(client: &Storage) -> anyhow::Result<()> {
35///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
36///     let mut reader = builder.send().await?;
37///     let mut contents = Vec::new();
38///     while let Some(chunk) = reader.next().await.transpose()? {
39///         contents.extend_from_slice(&chunk);
40///     }
41///     println!("object contents={:?}", contents);
42///     Ok(())
43/// }
44/// ```
45///
46/// # Example: read part of an object
47/// ```
48/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: i64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .with_read_offset(4 * MIB)
55///         .with_read_limit(2 * MIB)
56///         .send()
57///         .await?;
58///     while let Some(chunk) = reader.next().await.transpose()? {
59///         contents.extend_from_slice(&chunk);
60///     }
61///     println!("range contents={:?}", contents);
62///     Ok(())
63/// }
64/// ```
65#[derive(Clone, Debug)]
66pub struct ReadObject<C = Crc32c> {
67    inner: std::sync::Arc<StorageInner>,
68    request: crate::model::ReadObjectRequest,
69    options: super::request_options::RequestOptions,
70    checksum: C,
71}
72
73impl ReadObject<Crc32c> {
74    pub(crate) fn new<B, O>(inner: std::sync::Arc<StorageInner>, bucket: B, object: O) -> Self
75    where
76        B: Into<String>,
77        O: Into<String>,
78    {
79        let options = inner.options.clone();
80        ReadObject {
81            inner,
82            request: crate::model::ReadObjectRequest::new()
83                .set_bucket(bucket)
84                .set_object(object),
85            options,
86            checksum: Crc32c::default(),
87        }
88    }
89
90    /// Enables computation of MD5 hashes.
91    ///
92    /// Crc32c hashes are checked by default.
93    ///
94    /// Checksum validation is supported iff:
95    /// 1. The full content is requested.
96    /// 2. All of the content is returned (status != PartialContent).
97    /// 3. The server sent a checksum header.
98    /// 4. The http stack did not uncompress the file.
99    /// 5. The server did not uncompress data on download.
100    ///
101    /// # Example
102    /// ```
103    /// # use google_cloud_storage::client::Storage;
104    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
105    /// let builder =  client
106    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
107    ///     .compute_md5();
108    /// let mut reader = builder
109    ///     .send()
110    ///     .await?;
111    /// let mut contents = Vec::new();
112    /// while let Some(chunk) = reader.next().await.transpose()? {
113    ///     contents.extend_from_slice(&chunk);
114    /// }
115    /// println!("object contents={:?}", contents);
116    /// # Ok(()) }
117    /// ```
118    pub fn compute_md5(self) -> ReadObject<Md5<Crc32c>> {
119        self.switch_checksum(Md5::from_inner)
120    }
121}
122
123impl<C> ReadObject<C>
124where
125    C: Clone + ChecksumEngine + Send,
126{
127    fn switch_checksum<F, U>(self, new: F) -> ReadObject<U>
128    where
129        F: FnOnce(C) -> U,
130    {
131        ReadObject {
132            inner: self.inner,
133            request: self.request,
134            options: self.options,
135            checksum: new(self.checksum),
136        }
137    }
138
139    /// If present, selects a specific revision of this object (as
140    /// opposed to the latest version, the default).
141    pub fn with_generation<T: Into<i64>>(mut self, v: T) -> Self {
142        self.request.generation = v.into();
143        self
144    }
145
146    /// Makes the operation conditional on whether the object's current generation
147    /// matches the given value. Setting to 0 makes the operation succeed only if
148    /// there are no live versions of the object.
149    pub fn with_if_generation_match<T>(mut self, v: T) -> Self
150    where
151        T: Into<i64>,
152    {
153        self.request.if_generation_match = Some(v.into());
154        self
155    }
156
157    /// Makes the operation conditional on whether the object's live generation
158    /// does not match the given value. If no live object exists, the precondition
159    /// fails. Setting to 0 makes the operation succeed only if there is a live
160    /// version of the object.
161    pub fn with_if_generation_not_match<T>(mut self, v: T) -> Self
162    where
163        T: Into<i64>,
164    {
165        self.request.if_generation_not_match = Some(v.into());
166        self
167    }
168
169    /// Makes the operation conditional on whether the object's current
170    /// metageneration matches the given value.
171    pub fn with_if_metageneration_match<T>(mut self, v: T) -> Self
172    where
173        T: Into<i64>,
174    {
175        self.request.if_metageneration_match = Some(v.into());
176        self
177    }
178
179    /// Makes the operation conditional on whether the object's current
180    /// metageneration does not match the given value.
181    pub fn with_if_metageneration_not_match<T>(mut self, v: T) -> Self
182    where
183        T: Into<i64>,
184    {
185        self.request.if_metageneration_not_match = Some(v.into());
186        self
187    }
188
189    /// The offset for the first byte to return in the read, relative to
190    /// the start of the object.
191    ///
192    /// A negative `read_offset` value will be interpreted as the number of bytes
193    /// back from the end of the object to be returned.
194    ///
195    /// # Examples
196    ///
197    /// Read starting at 100 bytes to end of file.
198    /// ```
199    /// # use google_cloud_storage::client::Storage;
200    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
201    /// let response = client
202    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
203    ///     .with_read_offset(100)
204    ///     .send()
205    ///     .await?;
206    /// println!("response details={response:?}");
207    /// # Ok(()) }
208    /// ```
209    ///
210    /// Read last 100 bytes of file:
211    /// ```
212    /// # use google_cloud_storage::client::Storage;
213    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
214    /// let response = client
215    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
216    ///     .with_read_offset(-100)
217    ///     .send()
218    ///     .await?;
219    /// println!("response details={response:?}");
220    /// # Ok(()) }
221    /// ```
222    ///
223    /// Read bytes 1000 to 1099.
224    /// ```
225    /// # use google_cloud_storage::client::Storage;
226    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
227    /// let response = client
228    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
229    ///     .with_read_offset(1000)
230    ///     .with_read_limit(100)
231    ///     .send()
232    ///     .await?;
233    /// println!("response details={response:?}");
234    /// # Ok(()) }
235    /// ```
236    pub fn with_read_offset<T>(mut self, v: T) -> Self
237    where
238        T: Into<i64>,
239    {
240        self.request.read_offset = v.into();
241        self
242    }
243
244    /// The maximum number of `data` bytes the server is allowed to
245    /// return.
246    ///
247    /// A `read_limit` of zero indicates that there is no limit,
248    /// and a negative `read_limit` will cause an error.
249    ///
250    /// # Examples:
251    ///
252    /// Read first 100 bytes.
253    /// ```
254    /// # use google_cloud_storage::client::Storage;
255    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
256    /// let response = client
257    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
258    ///     .with_read_limit(100)
259    ///     .send()
260    ///     .await?;
261    /// println!("response details={response:?}");
262    /// # Ok(()) }
263    /// ```
264    ///
265    /// Read bytes 1000 to 1099.
266    /// ```
267    /// # use google_cloud_storage::client::Storage;
268    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
269    /// let response = client
270    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
271    ///     .with_read_offset(1000)
272    ///     .with_read_limit(100)
273    ///     .send()
274    ///     .await?;
275    /// println!("response details={response:?}");
276    /// # Ok(()) }
277    /// ```
278    pub fn with_read_limit<T>(mut self, v: T) -> Self
279    where
280        T: Into<i64>,
281    {
282        self.request.read_limit = v.into();
283        self
284    }
285
286    /// The encryption key used with the Customer-Supplied Encryption Keys
287    /// feature. In raw bytes format (not base64-encoded).
288    ///
289    /// Example:
290    /// ```
291    /// # use google_cloud_storage::{builder::storage::KeyAes256, client::Storage};
292    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
293    /// let key: &[u8] = &[97; 32];
294    /// let response = client
295    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
296    ///     .with_key(KeyAes256::new(key)?)
297    ///     .send()
298    ///     .await?;
299    /// println!("response details={response:?}");
300    /// # Ok(()) }
301    /// ```
302    pub fn with_key(mut self, v: KeyAes256) -> Self {
303        self.request.common_object_request_params = Some(v.into());
304        self
305    }
306
307    /// The retry policy used for this request.
308    ///
309    /// # Example
310    /// ```
311    /// # use google_cloud_storage::client::Storage;
312    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
313    /// use google_cloud_storage::retry_policy::RecommendedPolicy;
314    /// use std::time::Duration;
315    /// use gax::retry_policy::RetryPolicyExt;
316    /// let response = client
317    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
318    ///     .with_retry_policy(RecommendedPolicy
319    ///         .with_attempt_limit(5)
320    ///         .with_time_limit(Duration::from_secs(10)),
321    ///     )
322    ///     .send()
323    ///     .await?;
324    /// println!("response details={response:?}");
325    /// # Ok(()) }
326    /// ```
327    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
328        self.options.retry_policy = v.into().into();
329        self
330    }
331
332    /// The backoff policy used for this request.
333    ///
334    /// # Example
335    /// ```
336    /// # use google_cloud_storage::client::Storage;
337    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
338    /// use std::time::Duration;
339    /// use gax::exponential_backoff::ExponentialBackoff;
340    /// let response = client
341    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
342    ///     .with_backoff_policy(ExponentialBackoff::default())
343    ///     .send()
344    ///     .await?;
345    /// println!("response details={response:?}");
346    /// # Ok(()) }
347    /// ```
348    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
349        mut self,
350        v: V,
351    ) -> Self {
352        self.options.backoff_policy = v.into().into();
353        self
354    }
355
356    /// The retry throttler used for this request.
357    ///
358    /// Most of the time you want to use the same throttler for all the requests
359    /// in a client, and even the same throttler for many clients. Rarely it
360    /// may be necessary to use an custom throttler for some subset of the
361    /// requests.
362    ///
363    /// # Example
364    /// ```
365    /// # use google_cloud_storage::client::Storage;
366    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
367    /// let response = client
368    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
369    ///     .with_retry_throttler(adhoc_throttler())
370    ///     .send()
371    ///     .await?;
372    /// println!("response details={response:?}");
373    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
374    ///     # panic!();
375    /// }
376    /// # Ok(()) }
377    /// ```
378    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
379        mut self,
380        v: V,
381    ) -> Self {
382        self.options.retry_throttler = v.into().into();
383        self
384    }
385
386    /// Configure the resume policy for downloads.
387    ///
388    /// The Cloud Storage client library can automatically resume a download
389    /// that is interrupted by a transient error. Applications may want to
390    /// limit the number of download attempts, or may wish to expand the type
391    /// of errors treated as retryable.
392    ///
393    /// # Example
394    /// ```
395    /// # use google_cloud_storage::client::Storage;
396    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
397    /// use google_cloud_storage::download_resume_policy::{AlwaysResume, DownloadResumePolicyExt};
398    /// let response = client
399    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
400    ///     .with_download_resume_policy(AlwaysResume.with_attempt_limit(3))
401    ///     .send()
402    ///     .await?;
403    /// # Ok(()) }
404    /// ```
405    pub fn with_download_resume_policy<V>(mut self, v: V) -> Self
406    where
407        V: DownloadResumePolicy + 'static,
408    {
409        self.options.download_resume_policy = std::sync::Arc::new(v);
410        self
411    }
412
413    /// Sends the request.
414    pub async fn send(self) -> Result<ReadObjectResponse<C>> {
415        let download = self.clone().download().await?;
416        ReadObjectResponse::new(self, download)
417    }
418
419    async fn download(self) -> Result<reqwest::Response> {
420        let throttler = self.options.retry_throttler.clone();
421        let retry = self.options.retry_policy.clone();
422        let backoff = self.options.backoff_policy.clone();
423
424        gax::retry_loop_internal::retry_loop(
425            async move |_| self.download_attempt().await,
426            async |duration| tokio::time::sleep(duration).await,
427            true,
428            throttler,
429            retry,
430            backoff,
431        )
432        .await
433    }
434
435    async fn download_attempt(&self) -> Result<reqwest::Response> {
436        let builder = self.http_request_builder().await?;
437        let response = builder.send().await.map_err(Error::io)?;
438        if !response.status().is_success() {
439            return gaxi::http::to_http_error(response).await;
440        }
441        Ok(response)
442    }
443
444    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
445        // Collect the required bucket and object parameters.
446        let bucket = &self.request.bucket;
447        let bucket_id = bucket
448            .as_str()
449            .strip_prefix("projects/_/buckets/")
450            .ok_or_else(|| {
451                Error::binding(format!(
452                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
453                ))
454            })?;
455        let object = &self.request.object;
456
457        // Build the request.
458        let builder = self
459            .inner
460            .client
461            .request(
462                reqwest::Method::GET,
463                format!(
464                    "{}/storage/v1/b/{bucket_id}/o/{}",
465                    &self.inner.endpoint,
466                    enc(object)
467                ),
468            )
469            .query(&[("alt", "media")])
470            .header(
471                "x-goog-api-client",
472                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
473            );
474
475        // Add the optional query parameters.
476        let builder = if self.request.generation != 0 {
477            builder.query(&[("generation", self.request.generation)])
478        } else {
479            builder
480        };
481        let builder = self
482            .request
483            .if_generation_match
484            .iter()
485            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
486        let builder = self
487            .request
488            .if_generation_not_match
489            .iter()
490            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
491        let builder = self
492            .request
493            .if_metageneration_match
494            .iter()
495            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
496        let builder = self
497            .request
498            .if_metageneration_not_match
499            .iter()
500            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
501
502        let builder = apply_customer_supplied_encryption_headers(
503            builder,
504            &self.request.common_object_request_params,
505        );
506
507        // Apply "range" header for read limits and offsets.
508        let builder = match (self.request.read_offset, self.request.read_limit) {
509            // read_limit can't be negative.
510            (_, l) if l < 0 => Err(RangeError::NegativeLimit),
511            // negative offset can't also have a read_limit.
512            (o, l) if o < 0 && l > 0 => Err(RangeError::NegativeOffsetWithLimit),
513            // If both are zero, we use default implementation (no range header).
514            (0, 0) => Ok(builder),
515            // read_limit is zero, means no limit. Read from offset to end of file.
516            // This handles cases like (5, 0) -> "bytes=5-"
517            (o, 0) => Ok(builder.header("range", format!("bytes={o}-"))),
518            // General case: non-negative offset and positive limit.
519            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
520            (o, l) => Ok(builder.header("range", format!("bytes={o}-{}", o + l - 1))),
521        }
522        .map_err(Error::ser)?;
523
524        self.inner.apply_auth_headers(builder).await
525    }
526}
527
528fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
529    headers
530        .get("x-goog-hash")
531        .and_then(|hash| hash.to_str().ok())
532        .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
533        .and_then(|hash| {
534            let hash = hash.trim_start_matches("crc32c=");
535            v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
536        })
537}
538
539fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
540    headers
541        .get("x-goog-hash")
542        .and_then(|hash| hash.to_str().ok())
543        .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
544        .and_then(|hash| {
545            let hash = hash.trim_start_matches("md5=");
546            base64::prelude::BASE64_STANDARD.decode(hash).ok()
547        })
548        .unwrap_or_default()
549}
550
551/// A response to a [Storage::read_object] request.
552#[derive(Debug)]
553pub struct ReadObjectResponse<C> {
554    inner: Option<reqwest::Response>,
555    highlights: ObjectHighlights,
556    // Fields for tracking the crc checksum checks.
557    response_checksums: ObjectChecksums,
558    // Fields for resuming download.
559    range: ReadRange,
560    generation: i64,
561    builder: ReadObject<C>,
562    resume_count: u32,
563}
564
565impl<C> ReadObjectResponse<C>
566where
567    C: ChecksumEngine + Clone + Send,
568{
569    fn new(builder: ReadObject<C>, inner: reqwest::Response) -> Result<Self> {
570        let full = builder.request.read_offset == 0 && builder.request.read_limit == 0;
571        let response_checksums = checksums_from_response(full, inner.status(), inner.headers());
572        let range = response_range(&inner).map_err(Error::deser)?;
573        let generation = response_generation(&inner).map_err(Error::deser)?;
574
575        let headers = inner.headers();
576        let get_as_i64 = |header_name: &str| -> i64 {
577            headers
578                .get(header_name)
579                .and_then(|s| s.to_str().ok())
580                .and_then(|s| s.parse::<i64>().ok())
581                .unwrap_or_default()
582        };
583        let get_as_string = |header_name: &str| -> String {
584            headers
585                .get(header_name)
586                .and_then(|sc| sc.to_str().ok())
587                .map(|sc| sc.to_string())
588                .unwrap_or_default()
589        };
590        let highlights = ObjectHighlights {
591            generation,
592            metageneration: get_as_i64("x-goog-metageneration"),
593            size: get_as_i64("x-goog-stored-content-length"),
594            content_encoding: get_as_string("x-goog-stored-content-encoding"),
595            storage_class: get_as_string("x-goog-storage-class"),
596            content_type: get_as_string("content-type"),
597            content_language: get_as_string("content-language"),
598            content_disposition: get_as_string("content-disposition"),
599            etag: get_as_string("etag"),
600            checksums: headers.get("x-goog-hash").map(|_| {
601                crate::model::ObjectChecksums::new()
602                    .set_or_clear_crc32c(headers_to_crc32c(headers))
603                    .set_md5_hash(headers_to_md5_hash(headers))
604            }),
605        };
606
607        Ok(Self {
608            inner: Some(inner),
609            highlights,
610            // Fields for computing checksums.
611            response_checksums,
612            // Fields for resuming download.
613            range,
614            generation,
615            builder,
616            resume_count: 0,
617        })
618    }
619}
620
621impl<C> ReadObjectResponse<C>
622where
623    C: ChecksumEngine + Clone + Send,
624{
625    /// Get the highlights of the object metadata included in the
626    /// response.
627    ///
628    /// To get full metadata about this object, use [crate::client::StorageControl::get_object].
629    ///
630    /// # Example
631    /// ```
632    /// # tokio_test::block_on(async {
633    /// # use google_cloud_storage::client::Storage;
634    /// # let client = Storage::builder().build().await?;
635    /// let object = client
636    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
637    ///     .send()
638    ///     .await?
639    ///     .object();
640    /// println!("object generation={}", object.generation);
641    /// println!("object metageneration={}", object.metageneration);
642    /// println!("object size={}", object.size);
643    /// println!("object content encoding={}", object.content_encoding);
644    /// # Ok::<(), anyhow::Error>(()) });
645    /// ```
646    pub fn object(&self) -> ObjectHighlights {
647        self.highlights.clone()
648    }
649
650    /// Stream the next bytes of the object.
651    ///
652    /// When the response has been exhausted, this will return None.
653    ///
654    /// # Example
655    /// ```
656    /// # tokio_test::block_on(async {
657    /// # use google_cloud_storage::client::Storage;
658    /// # let client = Storage::builder().build().await?;
659    /// let mut resp = client
660    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
661    ///     .send()
662    ///     .await?;
663    ///
664    /// while let Some(next) = resp.next().await {
665    ///     println!("next={:?}", next?);
666    /// }
667    /// # Ok::<(), anyhow::Error>(()) });
668    /// ```
669    pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
670        match self.next_attempt().await {
671            None => None,
672            Some(Ok(b)) => Some(Ok(b)),
673            // Recursive async requires pin:
674            //     https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
675            Some(Err(e)) => Box::pin(self.resume(e)).await,
676        }
677    }
678
679    async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
680        let inner = self.inner.as_mut()?;
681        let res = inner.chunk().await.map_err(Error::io);
682        match res {
683            Ok(Some(chunk)) => {
684                self.builder.checksum.update(self.range.start, &chunk);
685                let len = chunk.len() as u64;
686                if self.range.limit < len {
687                    return Some(Err(Error::deser(ReadError::LongRead {
688                        expected: self.range.limit,
689                        got: len,
690                    })));
691                }
692                self.range.limit -= len;
693                self.range.start += len;
694                Some(Ok(chunk))
695            }
696            Ok(None) => {
697                if self.range.limit != 0 {
698                    return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
699                }
700                let computed = self.builder.checksum.finalize();
701                let res = validate(&self.response_checksums, &Some(computed));
702                match res {
703                    Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
704                    Ok(()) => None,
705                }
706            }
707            Err(e) => Some(Err(e)),
708        }
709    }
710
711    async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
712        use crate::download_resume_policy::{ResumeQuery, ResumeResult};
713
714        // The existing download is no longer valid.
715        self.inner = None;
716        self.resume_count += 1;
717        let query = ResumeQuery::new(self.resume_count);
718        match self
719            .builder
720            .options
721            .download_resume_policy
722            .on_error(&query, error)
723        {
724            ResumeResult::Continue(_) => {}
725            ResumeResult::Permanent(e) => return Some(Err(e)),
726            ResumeResult::Exhausted(e) => return Some(Err(e)),
727        };
728        self.builder.request.read_offset = self.range.start as i64;
729        self.builder.request.read_limit = self.range.limit as i64;
730        self.builder.request.generation = self.generation;
731        self.inner = match self.builder.clone().download().await {
732            Ok(r) => Some(r),
733            Err(e) => return Some(Err(e)),
734        };
735        self.next().await
736    }
737
738    #[cfg(feature = "unstable-stream")]
739    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
740    /// Convert the response to a [Stream].
741    pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
742        use futures::stream::unfold;
743        Box::pin(unfold(Some(self), move |state| async move {
744            if let Some(mut this) = state {
745                if let Some(chunk) = this.next().await {
746                    return Some((chunk, Some(this)));
747                }
748            };
749            None
750        }))
751    }
752}
753
754/// ObjectHighlights contains select metadata from a [crate::model::Object].
755#[derive(Clone, Debug, PartialEq)]
756#[non_exhaustive]
757pub struct ObjectHighlights {
758    /// The content generation of this object. Used for object versioning.
759    pub generation: i64,
760
761    /// The version of the metadata for this generation of this
762    /// object. Used for preconditions and for detecting changes in metadata. A
763    /// metageneration number is only meaningful in the context of a particular
764    /// generation of a particular object.
765    pub metageneration: i64,
766
767    /// Content-Length of the object data in bytes, matching [RFC 7230 §3.3.2].
768    ///
769    /// [rfc 7230 §3.3.2]: https://tools.ietf.org/html/rfc7230#section-3.3.2
770    pub size: i64,
771
772    /// Content-Encoding of the object data, matching [RFC 7231 §3.1.2.2].
773    ///
774    /// [rfc 7231 §3.1.2.2]: https://tools.ietf.org/html/rfc7231#section-3.1.2.2
775    pub content_encoding: String,
776
777    /// Hashes for the data part of this object. The checksums of the complete
778    /// object regardless of data range. If the object is downloaded in full,
779    /// the client should compute one of these checksums over the downloaded
780    /// object and compare it against the value provided here.
781    pub checksums: std::option::Option<crate::model::ObjectChecksums>,
782
783    /// Storage class of the object.
784    pub storage_class: String,
785
786    /// Content-Language of the object data, matching [RFC 7231 §3.1.3.2].
787    ///
788    /// [rfc 7231 §3.1.3.2]: https://tools.ietf.org/html/rfc7231#section-3.1.3.2
789    pub content_language: String,
790
791    /// Content-Type of the object data, matching [RFC 7231 §3.1.1.5]. If an
792    /// object is stored without a Content-Type, it is served as
793    /// `application/octet-stream`.
794    ///
795    /// [rfc 7231 §3.1.1.5]: https://tools.ietf.org/html/rfc7231#section-3.1.1.5
796    pub content_type: String,
797
798    /// Content-Disposition of the object data, matching [RFC 6266].
799    ///
800    /// [rfc 6266]: https://tools.ietf.org/html/rfc6266
801    pub content_disposition: String,
802
803    /// The etag of the object.
804    pub etag: String,
805}
806
807/// checksums_from_response returns the object checksums to validate against,
808/// if it is valid to check them.
809///
810/// Checksum validation is supported iff:
811/// 1. We requested the full content (request.read_limit = 0, request.read_offset = 0).
812/// 2. We got all the content (status != PartialContent).
813/// 3. The server sent a CRC header.
814/// 4. The http stack did not uncompress the file.
815/// 5. We were not served compressed data that was uncompressed on download.
816///
817/// For 4, we turn off automatic decompression in reqwest::Client when we create it,
818fn checksums_from_response(
819    full_content_requested: bool,
820    status: http::StatusCode,
821    headers: &http::HeaderMap,
822) -> ObjectChecksums {
823    let checksums = ObjectChecksums::new();
824    if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
825        return checksums;
826    }
827    let stored_encoding = headers
828        .get("x-goog-stored-content-encoding")
829        .and_then(|e| e.to_str().ok())
830        .map_or("", |e| e);
831    let content_encoding = headers
832        .get("content-encoding")
833        .and_then(|e| e.to_str().ok())
834        .map_or("", |e| e);
835    if stored_encoding == "gzip" && content_encoding != "gzip" {
836        return checksums;
837    }
838    checksums
839        .set_or_clear_crc32c(headers_to_crc32c(headers))
840        .set_md5_hash(headers_to_md5_hash(headers))
841}
842
843fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
844    match response.status() {
845        reqwest::StatusCode::OK => {
846            let header = required_header(response, "content-length")?;
847            let limit = header
848                .parse::<u64>()
849                .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
850            Ok(ReadRange { start: 0, limit })
851        }
852        reqwest::StatusCode::PARTIAL_CONTENT => {
853            let header = required_header(response, "content-range")?;
854            let header = header.strip_prefix("bytes ").ok_or_else(|| {
855                ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
856            })?;
857            let (range, _) = header.split_once('/').ok_or_else(|| {
858                ReadError::BadHeaderFormat("content-range", "missing / separator".into())
859            })?;
860            let (start, end) = range.split_once('-').ok_or_else(|| {
861                ReadError::BadHeaderFormat("content-range", "missing - separator".into())
862            })?;
863            let start = start
864                .parse::<u64>()
865                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
866            let end = end
867                .parse::<u64>()
868                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
869            // HTTP ranges are inclusive, we need to compute the number of bytes
870            // in the range:
871            let end = end + 1;
872            let limit = end
873                .checked_sub(start)
874                .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
875            Ok(ReadRange { start, limit })
876        }
877        s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
878    }
879}
880
881fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
882    let header = required_header(response, "x-goog-generation")?;
883    header
884        .parse::<i64>()
885        .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
886}
887
888fn required_header<'a>(
889    response: &'a reqwest::Response,
890    name: &'static str,
891) -> std::result::Result<&'a str, ReadError> {
892    let header = response
893        .headers()
894        .get(name)
895        .ok_or_else(|| ReadError::MissingHeader(name))?;
896    header
897        .to_str()
898        .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
899}
900
901#[derive(Debug, PartialEq)]
902struct ReadRange {
903    start: u64,
904    limit: u64,
905}
906
907#[cfg(test)]
908mod resume_tests;
909
910#[cfg(test)]
911mod tests {
912    use super::client::tests::{create_key_helper, test_builder, test_inner_client};
913    use super::*;
914    use crate::error::ChecksumMismatch;
915    use futures::TryStreamExt;
916    use httptest::{Expectation, Server, matchers::*, responders::status_code};
917    use std::collections::HashMap;
918    use std::error::Error;
919    use test_case::test_case;
920
921    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
922
923    // Verify `read_object()` meets normal Send, Sync, requirements.
924    #[tokio::test]
925    async fn test_read_is_send_and_static() -> Result {
926        let client = Storage::builder()
927            .with_credentials(auth::credentials::testing::test_credentials())
928            .build()
929            .await?;
930
931        fn need_send<T: Send>(_val: &T) {}
932        fn need_sync<T: Sync>(_val: &T) {}
933        fn need_static<T: 'static>(_val: &T) {}
934
935        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
936        need_send(&read);
937        need_sync(&read);
938        need_static(&read);
939
940        let read = client
941            .read_object("projects/_/buckets/test-bucket", "test-object")
942            .send();
943        need_send(&read);
944        need_static(&read);
945
946        Ok(())
947    }
948    #[tokio::test]
949    async fn read_object_normal() -> Result {
950        let server = Server::run();
951        server.expect(
952            Expectation::matching(all_of![
953                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
954                request::query(url_decoded(contains(("alt", "media")))),
955            ])
956            .respond_with(
957                status_code(200)
958                    .body("hello world")
959                    .append_header("x-goog-generation", 123456),
960            ),
961        );
962
963        let client = Storage::builder()
964            .with_endpoint(format!("http://{}", server.addr()))
965            .with_credentials(auth::credentials::testing::test_credentials())
966            .build()
967            .await?;
968        let mut reader = client
969            .read_object("projects/_/buckets/test-bucket", "test-object")
970            .send()
971            .await?;
972        let mut got = Vec::new();
973        while let Some(b) = reader.next().await.transpose()? {
974            got.extend_from_slice(&b);
975        }
976        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
977
978        Ok(())
979    }
980
981    #[tokio::test]
982    async fn read_object_metadata() -> Result {
983        const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
984        let server = Server::run();
985        server.expect(
986            Expectation::matching(all_of![
987                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
988                request::query(url_decoded(contains(("alt", "media")))),
989            ])
990            .respond_with(
991                status_code(200)
992                    .body(CONTENTS)
993                    .append_header(
994                        "x-goog-hash",
995                        "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
996                    )
997                    .append_header("x-goog-generation", 500)
998                    .append_header("x-goog-metageneration", "1")
999                    .append_header("x-goog-stored-content-length", 30)
1000                    .append_header("x-goog-stored-content-encoding", "identity")
1001                    .append_header("x-goog-storage-class", "STANDARD")
1002                    .append_header("content-language", "en")
1003                    .append_header("content-type", "text/plain")
1004                    .append_header("content-disposition", "inline")
1005                    .append_header("etag", "etagval"),
1006            ),
1007        );
1008
1009        let endpoint = server.url("");
1010        let client = Storage::builder()
1011            .with_endpoint(endpoint.to_string())
1012            .with_credentials(auth::credentials::testing::test_credentials())
1013            .build()
1014            .await?;
1015        let reader = client
1016            .read_object("projects/_/buckets/test-bucket", "test-object")
1017            .send()
1018            .await?;
1019        let object = reader.object();
1020        assert_eq!(object.generation, 500);
1021        assert_eq!(object.metageneration, 1);
1022        assert_eq!(object.size, 30);
1023        assert_eq!(object.content_encoding, "identity");
1024        assert_eq!(
1025            object.checksums.as_ref().unwrap().crc32c.unwrap(),
1026            crc32c::crc32c(CONTENTS.as_bytes())
1027        );
1028        assert_eq!(
1029            object.checksums.as_ref().unwrap().md5_hash,
1030            base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
1031        );
1032
1033        Ok(())
1034    }
1035
1036    #[tokio::test]
1037    async fn read_object_stream() -> Result {
1038        let server = Server::run();
1039        server.expect(
1040            Expectation::matching(all_of![
1041                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1042                request::query(url_decoded(contains(("alt", "media")))),
1043            ])
1044            .respond_with(
1045                status_code(200)
1046                    .append_header("x-goog-generation", 123456)
1047                    .body("hello world"),
1048            ),
1049        );
1050
1051        let client = Storage::builder()
1052            .with_endpoint(format!("http://{}", server.addr()))
1053            .with_credentials(auth::credentials::testing::test_credentials())
1054            .build()
1055            .await?;
1056        let response = client
1057            .read_object("projects/_/buckets/test-bucket", "test-object")
1058            .send()
1059            .await?;
1060        let result: Vec<_> = response.into_stream().try_collect().await?;
1061        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
1062
1063        Ok(())
1064    }
1065
1066    #[tokio::test]
1067    async fn read_object_next_then_consume_response() -> Result {
1068        // Create a large enough file that will require multiple chunks to download.
1069        const BLOCK_SIZE: usize = 500;
1070        let mut contents = Vec::new();
1071        for i in 0..50 {
1072            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
1073        }
1074
1075        // Calculate and serialize the crc32c checksum
1076        let u = crc32c::crc32c(&contents);
1077        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1078
1079        let server = Server::run();
1080        server.expect(
1081            Expectation::matching(all_of![
1082                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1083                request::query(url_decoded(contains(("alt", "media")))),
1084            ])
1085            .times(1)
1086            .respond_with(
1087                status_code(200)
1088                    .body(contents.clone())
1089                    .append_header("x-goog-hash", format!("crc32c={value}"))
1090                    .append_header("x-goog-generation", 123456),
1091            ),
1092        );
1093
1094        let client = Storage::builder()
1095            .with_endpoint(format!("http://{}", server.addr()))
1096            .with_credentials(auth::credentials::testing::test_credentials())
1097            .build()
1098            .await?;
1099
1100        // Read some bytes, then remainder with stream.
1101        let mut response = client
1102            .read_object("projects/_/buckets/test-bucket", "test-object")
1103            .send()
1104            .await?;
1105
1106        let mut all_bytes = bytes::BytesMut::new();
1107        let chunk = response.next().await.transpose()?.unwrap();
1108        assert!(!chunk.is_empty());
1109        all_bytes.extend(chunk);
1110        use futures::StreamExt;
1111        let mut stream = response.into_stream();
1112        while let Some(chunk) = stream.next().await.transpose()? {
1113            all_bytes.extend(chunk);
1114        }
1115        assert_eq!(all_bytes, contents);
1116
1117        Ok(())
1118    }
1119
1120    #[tokio::test]
1121    async fn read_object_not_found() -> Result {
1122        let server = Server::run();
1123        server.expect(
1124            Expectation::matching(all_of![
1125                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1126                request::query(url_decoded(contains(("alt", "media")))),
1127            ])
1128            .respond_with(status_code(404).body("NOT FOUND")),
1129        );
1130
1131        let client = Storage::builder()
1132            .with_endpoint(format!("http://{}", server.addr()))
1133            .with_credentials(auth::credentials::testing::test_credentials())
1134            .build()
1135            .await?;
1136        let err = client
1137            .read_object("projects/_/buckets/test-bucket", "test-object")
1138            .send()
1139            .await
1140            .expect_err("expected a not found error");
1141        assert_eq!(err.http_status_code(), Some(404));
1142
1143        Ok(())
1144    }
1145
1146    #[tokio::test]
1147    async fn read_object_incorrect_crc32c_check() -> Result {
1148        // Calculate and serialize the crc32c checksum
1149        let u = crc32c::crc32c("goodbye world".as_bytes());
1150        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1151
1152        let server = Server::run();
1153        server.expect(
1154            Expectation::matching(all_of![
1155                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1156                request::query(url_decoded(contains(("alt", "media")))),
1157            ])
1158            .times(3)
1159            .respond_with(
1160                status_code(200)
1161                    .body("hello world")
1162                    .append_header("x-goog-hash", format!("crc32c={value}"))
1163                    .append_header("x-goog-generation", 123456),
1164            ),
1165        );
1166
1167        let client = Storage::builder()
1168            .with_endpoint(format!("http://{}", server.addr()))
1169            .with_credentials(auth::credentials::testing::test_credentials())
1170            .build()
1171            .await?;
1172        let mut response = client
1173            .read_object("projects/_/buckets/test-bucket", "test-object")
1174            .send()
1175            .await?;
1176        let mut partial = Vec::new();
1177        let mut err = None;
1178        while let Some(r) = response.next().await {
1179            match r {
1180                Ok(b) => partial.extend_from_slice(&b),
1181                Err(e) => err = Some(e),
1182            };
1183        }
1184        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1185        let err = err.expect("expect error on incorrect crc32c");
1186        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1187        assert!(
1188            matches!(
1189                source,
1190                Some(&ReadError::ChecksumMismatch(
1191                    ChecksumMismatch::Crc32c { .. }
1192                ))
1193            ),
1194            "err={err:?}"
1195        );
1196
1197        let mut response = client
1198            .read_object("projects/_/buckets/test-bucket", "test-object")
1199            .send()
1200            .await?;
1201        let err: crate::Error = async {
1202            {
1203                while (response.next().await.transpose()?).is_some() {}
1204                Ok(())
1205            }
1206        }
1207        .await
1208        .expect_err("expect error on incorrect crc32c");
1209        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1210        assert!(
1211            matches!(
1212                source,
1213                Some(&ReadError::ChecksumMismatch(
1214                    ChecksumMismatch::Crc32c { .. }
1215                ))
1216            ),
1217            "err={err:?}"
1218        );
1219
1220        use futures::TryStreamExt;
1221        let err = client
1222            .read_object("projects/_/buckets/test-bucket", "test-object")
1223            .send()
1224            .await?
1225            .into_stream()
1226            .try_collect::<Vec<bytes::Bytes>>()
1227            .await
1228            .expect_err("expect error on incorrect crc32c");
1229        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1230        assert!(
1231            matches!(
1232                source,
1233                Some(&ReadError::ChecksumMismatch(
1234                    ChecksumMismatch::Crc32c { .. }
1235                ))
1236            ),
1237            "err={err:?}"
1238        );
1239        Ok(())
1240    }
1241
1242    #[tokio::test]
1243    async fn read_object_incorrect_md5_check() -> Result {
1244        // Calculate and serialize the md5 checksum
1245        let digest = md5::compute("goodbye world".as_bytes());
1246        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1247
1248        let server = Server::run();
1249        server.expect(
1250            Expectation::matching(all_of![
1251                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1252                request::query(url_decoded(contains(("alt", "media")))),
1253            ])
1254            .times(1)
1255            .respond_with(
1256                status_code(200)
1257                    .body("hello world")
1258                    .append_header("x-goog-hash", format!("md5={value}"))
1259                    .append_header("x-goog-generation", 123456),
1260            ),
1261        );
1262
1263        let client = Storage::builder()
1264            .with_endpoint(format!("http://{}", server.addr()))
1265            .with_credentials(auth::credentials::testing::test_credentials())
1266            .build()
1267            .await?;
1268        let mut response = client
1269            .read_object("projects/_/buckets/test-bucket", "test-object")
1270            .compute_md5()
1271            .send()
1272            .await?;
1273        let mut partial = Vec::new();
1274        let mut err = None;
1275        while let Some(r) = response.next().await {
1276            match r {
1277                Ok(b) => partial.extend_from_slice(&b),
1278                Err(e) => err = Some(e),
1279            };
1280        }
1281        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1282        let err = err.expect("expect error on incorrect md5");
1283        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1284        assert!(
1285            matches!(
1286                source,
1287                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1288            ),
1289            "err={err:?}"
1290        );
1291
1292        Ok(())
1293    }
1294
1295    #[tokio::test]
1296    async fn read_object() -> Result {
1297        let inner = test_inner_client(test_builder());
1298        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1299            .http_request_builder()
1300            .await?
1301            .build()?;
1302
1303        assert_eq!(request.method(), reqwest::Method::GET);
1304        assert_eq!(
1305            request.url().as_str(),
1306            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1307        );
1308        Ok(())
1309    }
1310
1311    #[tokio::test]
1312    async fn read_object_error_credentials() -> Result {
1313        let inner = test_inner_client(
1314            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1315        );
1316        let _ = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1317            .http_request_builder()
1318            .await
1319            .inspect_err(|e| assert!(e.is_authentication()))
1320            .expect_err("invalid credentials should err");
1321        Ok(())
1322    }
1323
1324    #[tokio::test]
1325    async fn read_object_bad_bucket() -> Result {
1326        let inner = test_inner_client(test_builder());
1327        ReadObject::new(inner, "malformed", "object")
1328            .http_request_builder()
1329            .await
1330            .expect_err("malformed bucket string should error");
1331        Ok(())
1332    }
1333
1334    #[tokio::test]
1335    async fn read_object_query_params() -> Result {
1336        let inner = test_inner_client(test_builder());
1337        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1338            .with_generation(5)
1339            .with_if_generation_match(10)
1340            .with_if_generation_not_match(20)
1341            .with_if_metageneration_match(30)
1342            .with_if_metageneration_not_match(40)
1343            .http_request_builder()
1344            .await?
1345            .build()?;
1346
1347        assert_eq!(request.method(), reqwest::Method::GET);
1348        let want_pairs: HashMap<String, String> = [
1349            ("alt", "media"),
1350            ("generation", "5"),
1351            ("ifGenerationMatch", "10"),
1352            ("ifGenerationNotMatch", "20"),
1353            ("ifMetagenerationMatch", "30"),
1354            ("ifMetagenerationNotMatch", "40"),
1355        ]
1356        .iter()
1357        .map(|(k, v)| (k.to_string(), v.to_string()))
1358        .collect();
1359        let query_pairs: HashMap<String, String> = request
1360            .url()
1361            .query_pairs()
1362            .map(|param| (param.0.to_string(), param.1.to_string()))
1363            .collect();
1364        assert_eq!(query_pairs.len(), want_pairs.len());
1365        assert_eq!(query_pairs, want_pairs);
1366        Ok(())
1367    }
1368
1369    #[tokio::test]
1370    async fn read_object_headers() -> Result {
1371        // Make a 32-byte key.
1372        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1373
1374        // The API takes the unencoded byte array.
1375        let inner = test_inner_client(test_builder());
1376        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1377            .with_key(KeyAes256::new(&key)?)
1378            .http_request_builder()
1379            .await?
1380            .build()?;
1381
1382        assert_eq!(request.method(), reqwest::Method::GET);
1383        assert_eq!(
1384            request.url().as_str(),
1385            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1386        );
1387
1388        let want = vec![
1389            ("x-goog-encryption-algorithm", "AES256".to_string()),
1390            ("x-goog-encryption-key", key_base64),
1391            ("x-goog-encryption-key-sha256", key_sha256_base64),
1392        ];
1393
1394        for (name, value) in want {
1395            assert_eq!(
1396                request.headers().get(name).unwrap().as_bytes(),
1397                bytes::Bytes::from(value)
1398            );
1399        }
1400        Ok(())
1401    }
1402
1403    #[test_case(0, 0, None; "no headers needed")]
1404    #[test_case(10, 0, Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1405    #[test_case(-2000, 0, Some(&http::HeaderValue::from_static("bytes=-2000-")); "negative offset")]
1406    #[test_case(0, 100, Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1407    #[test_case(1000, 100, Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1408    #[tokio::test]
1409    async fn range_header(offset: i64, limit: i64, want: Option<&http::HeaderValue>) -> Result {
1410        let inner = test_inner_client(test_builder());
1411        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1412            .with_read_offset(offset)
1413            .with_read_limit(limit)
1414            .http_request_builder()
1415            .await?
1416            .build()?;
1417
1418        assert_eq!(request.method(), reqwest::Method::GET);
1419        assert_eq!(
1420            request.url().as_str(),
1421            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1422        );
1423
1424        assert_eq!(request.headers().get("range"), want);
1425        Ok(())
1426    }
1427
1428    #[tokio::test]
1429    async fn range_header_negative_limit() -> Result {
1430        let inner = test_inner_client(test_builder());
1431        let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1432            .with_read_limit(-100)
1433            .http_request_builder()
1434            .await
1435            .unwrap_err();
1436
1437        assert!(
1438            matches!(
1439                err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
1440                RangeError::NegativeLimit
1441            ),
1442            "{err:?}"
1443        );
1444        Ok(())
1445    }
1446
1447    #[tokio::test]
1448    async fn range_header_negative_offset_with_limit() -> Result {
1449        let inner = test_inner_client(test_builder());
1450        let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
1451            .with_read_offset(-100)
1452            .with_read_limit(100)
1453            .http_request_builder()
1454            .await
1455            .unwrap_err();
1456
1457        assert!(
1458            matches!(
1459                err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
1460                RangeError::NegativeOffsetWithLimit
1461            ),
1462            "{err:?}"
1463        );
1464        Ok(())
1465    }
1466
1467    #[test_case("projects/p", "projects%2Fp")]
1468    #[test_case("kebab-case", "kebab-case")]
1469    #[test_case("dot.name", "dot.name")]
1470    #[test_case("under_score", "under_score")]
1471    #[test_case("tilde~123", "tilde~123")]
1472    #[test_case("exclamation!point!", "exclamation%21point%21")]
1473    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1474    #[test_case("preserve%percent%21", "preserve%percent%21")]
1475    #[test_case(
1476        "testall !#$&'()*+,/:;=?@[]",
1477        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1478    )]
1479    #[tokio::test]
1480    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1481        let inner = test_inner_client(test_builder());
1482        let request = ReadObject::new(inner, "projects/_/buckets/bucket", name)
1483            .http_request_builder()
1484            .await?
1485            .build()?;
1486        let got = request.url().path_segments().unwrap().next_back().unwrap();
1487        assert_eq!(got, want);
1488        Ok(())
1489    }
1490
1491    #[test]
1492    fn document_crc32c_values() {
1493        let bytes = (1234567890_u32).to_be_bytes();
1494        let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1495        assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1496    }
1497
1498    #[test_case("", None; "no header")]
1499    #[test_case("crc32c=hello", None; "invalid value")]
1500    #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1501    #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1502    #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1503    #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1504    fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1505        let mut headers = http::HeaderMap::new();
1506        if !val.is_empty() {
1507            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1508        }
1509        let got = headers_to_crc32c(&headers);
1510        assert_eq!(got, want);
1511        Ok(())
1512    }
1513
1514    #[test_case("", None; "no header")]
1515    #[test_case("md5=invalid", None; "invalid value")]
1516    #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1517    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1518    #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1519    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1520    fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1521        let mut headers = http::HeaderMap::new();
1522        if !val.is_empty() {
1523            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1524        }
1525        let got = headers_to_md5_hash(&headers);
1526        match want {
1527            Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1528            None => assert!(got.is_empty()),
1529        }
1530        Ok(())
1531    }
1532
1533    #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1534    #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1535    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1536    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1537    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1538    fn test_checksums_validation_enabled(
1539        full_content_requested: bool,
1540        headers: Vec<(&str, &str)>,
1541        status: http::StatusCode,
1542        want_crc32c: Option<u32>,
1543        want_md5: &str,
1544    ) -> Result {
1545        let mut header_map = http::HeaderMap::new();
1546        for (key, value) in headers {
1547            header_map.insert(
1548                http::HeaderName::from_bytes(key.as_bytes())?,
1549                http::HeaderValue::from_bytes(value.as_bytes())?,
1550            );
1551        }
1552
1553        let got = checksums_from_response(full_content_requested, status, &header_map);
1554        assert_eq!(got.crc32c, want_crc32c);
1555        assert_eq!(
1556            got.md5_hash,
1557            base64::prelude::BASE64_STANDARD.decode(want_md5)?
1558        );
1559        Ok(())
1560    }
1561
1562    #[test_case(0)]
1563    #[test_case(1024)]
1564    fn response_range_success(limit: u64) -> Result {
1565        let response = http::Response::builder()
1566            .status(200)
1567            .header("content-length", limit)
1568            .body(Vec::new())?;
1569        let response = reqwest::Response::from(response);
1570        let range = response_range(&response)?;
1571        assert_eq!(range, ReadRange { start: 0, limit });
1572        Ok(())
1573    }
1574
1575    #[test]
1576    fn response_range_missing() -> Result {
1577        let response = http::Response::builder().status(200).body(Vec::new())?;
1578        let response = reqwest::Response::from(response);
1579        let err = response_range(&response).expect_err("missing header should result in an error");
1580        assert!(
1581            matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1582            "{err:?}"
1583        );
1584        Ok(())
1585    }
1586
1587    #[test_case("")]
1588    #[test_case("abc")]
1589    #[test_case("-123")]
1590    fn response_range_format(value: &'static str) -> Result {
1591        let response = http::Response::builder()
1592            .status(200)
1593            .header("content-length", value)
1594            .body(Vec::new())?;
1595        let response = reqwest::Response::from(response);
1596        let err = response_range(&response).expect_err("header value should result in an error");
1597        assert!(
1598            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1599            "{err:?}"
1600        );
1601        assert!(err.source().is_some(), "{err:?}");
1602        Ok(())
1603    }
1604
1605    #[test_case(0, 123)]
1606    #[test_case(123, 456)]
1607    fn response_range_partial_success(start: u64, end: u64) -> Result {
1608        let response = http::Response::builder()
1609            .status(206)
1610            .header(
1611                "content-range",
1612                format!("bytes {}-{}/{}", start, end, end + 1),
1613            )
1614            .body(Vec::new())?;
1615        let response = reqwest::Response::from(response);
1616        let range = response_range(&response)?;
1617        assert_eq!(
1618            range,
1619            ReadRange {
1620                start,
1621                limit: (end + 1 - start)
1622            }
1623        );
1624        Ok(())
1625    }
1626
1627    #[test]
1628    fn response_range_partial_missing() -> Result {
1629        let response = http::Response::builder().status(206).body(Vec::new())?;
1630        let response = reqwest::Response::from(response);
1631        let err = response_range(&response).expect_err("missing header should result in an error");
1632        assert!(
1633            matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1634            "{err:?}"
1635        );
1636        Ok(())
1637    }
1638
1639    #[test_case("")]
1640    #[test_case("123-456/457"; "bad prefix")]
1641    #[test_case("bytes 123-456 457"; "bad separator")]
1642    #[test_case("bytes 123+456/457"; "bad separator [2]")]
1643    #[test_case("bytes abc-456/457"; "start is not numbers")]
1644    #[test_case("bytes 123-cde/457"; "end is not numbers")]
1645    #[test_case("bytes 123-0/457"; "invalid range")]
1646    fn response_range_partial_format(value: &'static str) -> Result {
1647        let response = http::Response::builder()
1648            .status(206)
1649            .header("content-range", value)
1650            .body(Vec::new())?;
1651        let response = reqwest::Response::from(response);
1652        let err = response_range(&response).expect_err("header value should result in an error");
1653        assert!(
1654            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1655            "{err:?}"
1656        );
1657        assert!(err.source().is_some(), "{err:?}");
1658        Ok(())
1659    }
1660
1661    #[test]
1662    fn response_range_bad_response() -> Result {
1663        let code = reqwest::StatusCode::CREATED;
1664        let response = http::Response::builder().status(code).body(Vec::new())?;
1665        let response = reqwest::Response::from(response);
1666        let err = response_range(&response).expect_err("unexpected status creates error");
1667        assert!(
1668            matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1669            "{err:?}"
1670        );
1671        Ok(())
1672    }
1673
1674    #[test_case(0)]
1675    #[test_case(1024)]
1676    fn response_generation_success(value: i64) -> Result {
1677        let response = http::Response::builder()
1678            .status(200)
1679            .header("x-goog-generation", value)
1680            .body(Vec::new())?;
1681        let response = reqwest::Response::from(response);
1682        let got = response_generation(&response)?;
1683        assert_eq!(got, value);
1684        Ok(())
1685    }
1686
1687    #[test]
1688    fn response_generation_missing() -> Result {
1689        let response = http::Response::builder().status(200).body(Vec::new())?;
1690        let response = reqwest::Response::from(response);
1691        let err =
1692            response_generation(&response).expect_err("missing header should result in an error");
1693        assert!(
1694            matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1695            "{err:?}"
1696        );
1697        Ok(())
1698    }
1699
1700    #[test_case("")]
1701    #[test_case("abc")]
1702    fn response_generation_format(value: &'static str) -> Result {
1703        let response = http::Response::builder()
1704            .status(200)
1705            .header("x-goog-generation", value)
1706            .body(Vec::new())?;
1707        let response = reqwest::Response::from(response);
1708        let err =
1709            response_generation(&response).expect_err("header value should result in an error");
1710        assert!(
1711            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1712            "{err:?}"
1713        );
1714        assert!(err.source().is_some(), "{err:?}");
1715        Ok(())
1716    }
1717
1718    #[test]
1719    fn required_header_not_str() -> Result {
1720        let name = "x-goog-test";
1721        let response = http::Response::builder()
1722            .status(200)
1723            .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1724            .body(Vec::new())?;
1725        let response = reqwest::Response::from(response);
1726        let err =
1727            required_header(&response, name).expect_err("header value should result in an error");
1728        assert!(
1729            matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1730            "{err:?}"
1731        );
1732        assert!(err.source().is_some(), "{err:?}");
1733        Ok(())
1734    }
1735}