google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::*;
16use super::*;
17use crate::error::ReadError;
18use crate::model::ObjectChecksums;
19use crate::model_ext::KeyAes256;
20use crate::model_ext::ObjectHighlights;
21use crate::read_object::ReadObjectResponse;
22use crate::read_resume_policy::ReadResumePolicy;
23use crate::storage::checksum::details::{Md5, validate};
24use crate::storage::request_options::RequestOptions;
25use base64::Engine;
26use serde_with::DeserializeAs;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Clone, Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    pub(crate) fn new<B, O>(
79        stub: std::sync::Arc<S>,
80        bucket: B,
81        object: O,
82        options: RequestOptions,
83    ) -> Self
84    where
85        B: Into<String>,
86        O: Into<String>,
87    {
88        ReadObject {
89            stub,
90            request: crate::model::ReadObjectRequest::new()
91                .set_bucket(bucket)
92                .set_object(object),
93            options,
94        }
95    }
96
97    /// Enables computation of MD5 hashes.
98    ///
99    /// Crc32c hashes are checked by default.
100    ///
101    /// Checksum validation is supported iff:
102    /// 1. The full content is requested.
103    /// 2. All of the content is returned (status != PartialContent).
104    /// 3. The server sent a checksum header.
105    /// 4. The http stack did not uncompress the file.
106    /// 5. The server did not uncompress data on read.
107    ///
108    /// # Example
109    /// ```
110    /// # use google_cloud_storage::client::Storage;
111    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
112    /// let builder =  client
113    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
114    ///     .compute_md5();
115    /// let mut reader = builder
116    ///     .send()
117    ///     .await?;
118    /// let mut contents = Vec::new();
119    /// while let Some(chunk) = reader.next().await.transpose()? {
120    ///     contents.extend_from_slice(&chunk);
121    /// }
122    /// println!("object contents={:?}", contents);
123    /// # Ok(()) }
124    /// ```
125    pub fn compute_md5(self) -> Self {
126        let mut this = self;
127        this.options.checksum.md5_hash = Some(Md5::default());
128        this
129    }
130
131    /// If present, selects a specific revision of this object (as
132    /// opposed to the latest version, the default).
133    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
134        self.request.generation = v.into();
135        self
136    }
137
138    /// Makes the operation conditional on whether the object's current generation
139    /// matches the given value. Setting to 0 makes the operation succeed only if
140    /// there are no live versions of the object.
141    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
142    where
143        T: Into<i64>,
144    {
145        self.request.if_generation_match = Some(v.into());
146        self
147    }
148
149    /// Makes the operation conditional on whether the object's live generation
150    /// does not match the given value. If no live object exists, the precondition
151    /// fails. Setting to 0 makes the operation succeed only if there is a live
152    /// version of the object.
153    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
154    where
155        T: Into<i64>,
156    {
157        self.request.if_generation_not_match = Some(v.into());
158        self
159    }
160
161    /// Makes the operation conditional on whether the object's current
162    /// metageneration matches the given value.
163    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
164    where
165        T: Into<i64>,
166    {
167        self.request.if_metageneration_match = Some(v.into());
168        self
169    }
170
171    /// Makes the operation conditional on whether the object's current
172    /// metageneration does not match the given value.
173    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
174    where
175        T: Into<i64>,
176    {
177        self.request.if_metageneration_not_match = Some(v.into());
178        self
179    }
180
181    /// The range of bytes to return in the read.
182    ///
183    /// This can be all the bytes starting at a given offset
184    /// (`ReadRange::offset()`), all the bytes in an explicit range
185    /// (`Range::segment`), or the last N bytes of the object
186    /// (`ReadRange::tail`).
187    ///
188    /// # Examples
189    ///
190    /// Read starting at 100 bytes to end of file.
191    /// ```
192    /// # use google_cloud_storage::client::Storage;
193    /// # use google_cloud_storage::model_ext::ReadRange;
194    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
195    /// let response = client
196    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
197    ///     .set_read_range(ReadRange::offset(100))
198    ///     .send()
199    ///     .await?;
200    /// println!("response details={response:?}");
201    /// # Ok(()) }
202    /// ```
203    ///
204    /// Read last 100 bytes of file:
205    /// ```
206    /// # use google_cloud_storage::client::Storage;
207    /// # use google_cloud_storage::model_ext::ReadRange;
208    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
209    /// let response = client
210    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
211    ///     .set_read_range(ReadRange::tail(100))
212    ///     .send()
213    ///     .await?;
214    /// println!("response details={response:?}");
215    /// # Ok(()) }
216    /// ```
217    ///
218    /// Read bytes 1000 to 1099.
219    /// ```
220    /// # use google_cloud_storage::client::Storage;
221    /// # use google_cloud_storage::model_ext::ReadRange;
222    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
223    /// let response = client
224    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
225    ///     .set_read_range(ReadRange::segment(1000, 100))
226    ///     .send()
227    ///     .await?;
228    /// println!("response details={response:?}");
229    /// # Ok(()) }
230    /// ```
231    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
232        self.request.with_range(range);
233        self
234    }
235
236    /// The encryption key used with the Customer-Supplied Encryption Keys
237    /// feature. In raw bytes format (not base64-encoded).
238    ///
239    /// Example:
240    /// ```
241    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243    /// let key: &[u8] = &[97; 32];
244    /// let response = client
245    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
246    ///     .set_key(KeyAes256::new(key)?)
247    ///     .send()
248    ///     .await?;
249    /// println!("response details={response:?}");
250    /// # Ok(()) }
251    /// ```
252    pub fn set_key(mut self, v: KeyAes256) -> Self {
253        self.request.common_object_request_params = Some(v.into());
254        self
255    }
256
257    /// The retry policy used for this request.
258    ///
259    /// # Example
260    /// ```
261    /// # use google_cloud_storage::client::Storage;
262    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
263    /// use google_cloud_storage::retry_policy::RetryableErrors;
264    /// use std::time::Duration;
265    /// use gax::retry_policy::RetryPolicyExt;
266    /// let response = client
267    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
268    ///     .with_retry_policy(
269    ///         RetryableErrors
270    ///             .with_attempt_limit(5)
271    ///             .with_time_limit(Duration::from_secs(10)),
272    ///     )
273    ///     .send()
274    ///     .await?;
275    /// println!("response details={response:?}");
276    /// # Ok(()) }
277    /// ```
278    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
279        self.options.retry_policy = v.into().into();
280        self
281    }
282
283    /// The backoff policy used for this request.
284    ///
285    /// # Example
286    /// ```
287    /// # use google_cloud_storage::client::Storage;
288    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
289    /// use std::time::Duration;
290    /// use gax::exponential_backoff::ExponentialBackoff;
291    /// let response = client
292    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
293    ///     .with_backoff_policy(ExponentialBackoff::default())
294    ///     .send()
295    ///     .await?;
296    /// println!("response details={response:?}");
297    /// # Ok(()) }
298    /// ```
299    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
300        mut self,
301        v: V,
302    ) -> Self {
303        self.options.backoff_policy = v.into().into();
304        self
305    }
306
307    /// The retry throttler used for this request.
308    ///
309    /// Most of the time you want to use the same throttler for all the requests
310    /// in a client, and even the same throttler for many clients. Rarely it
311    /// may be necessary to use an custom throttler for some subset of the
312    /// requests.
313    ///
314    /// # Example
315    /// ```
316    /// # use google_cloud_storage::client::Storage;
317    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
318    /// let response = client
319    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
320    ///     .with_retry_throttler(adhoc_throttler())
321    ///     .send()
322    ///     .await?;
323    /// println!("response details={response:?}");
324    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
325    ///     # panic!();
326    /// }
327    /// # Ok(()) }
328    /// ```
329    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
330        mut self,
331        v: V,
332    ) -> Self {
333        self.options.retry_throttler = v.into().into();
334        self
335    }
336
337    /// Configure the resume policy for read requests.
338    ///
339    /// The Cloud Storage client library can automatically resume a read that is
340    /// interrupted by a transient error. Applications may want to limit the
341    /// number of read attempts, or may wish to expand the type of errors
342    /// treated as retryable.
343    ///
344    /// # Example
345    /// ```
346    /// # use google_cloud_storage::client::Storage;
347    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
348    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
349    /// let response = client
350    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
351    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
352    ///     .send()
353    ///     .await?;
354    /// # Ok(()) }
355    /// ```
356    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
357    where
358        V: ReadResumePolicy + 'static,
359    {
360        self.options.read_resume_policy = std::sync::Arc::new(v);
361        self
362    }
363
364    /// Sends the request.
365    pub async fn send(self) -> Result<ReadObjectResponse> {
366        self.stub.read_object(self.request, self.options).await
367    }
368}
369
370// A convenience struct that saves the request conditions and performs the read.
371#[derive(Clone, Debug)]
372pub(crate) struct Reader {
373    pub inner: std::sync::Arc<StorageInner>,
374    pub request: crate::model::ReadObjectRequest,
375    pub options: RequestOptions,
376}
377
378impl Reader {
379    async fn read(self) -> Result<reqwest::Response> {
380        let throttler = self.options.retry_throttler.clone();
381        let retry = self.options.retry_policy.clone();
382        let backoff = self.options.backoff_policy.clone();
383
384        gax::retry_loop_internal::retry_loop(
385            async move |_| self.read_attempt().await,
386            async |duration| tokio::time::sleep(duration).await,
387            true,
388            throttler,
389            retry,
390            backoff,
391        )
392        .await
393    }
394
395    async fn read_attempt(&self) -> Result<reqwest::Response> {
396        let builder = self.http_request_builder().await?;
397        let response = builder.send().await.map_err(Error::io)?;
398        if !response.status().is_success() {
399            return gaxi::http::to_http_error(response).await;
400        }
401        Ok(response)
402    }
403
404    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
405        // Collect the required bucket and object parameters.
406        let bucket = &self.request.bucket;
407        let bucket_id = bucket
408            .as_str()
409            .strip_prefix("projects/_/buckets/")
410            .ok_or_else(|| {
411                Error::binding(format!(
412                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
413                ))
414            })?;
415        let object = &self.request.object;
416
417        // Build the request.
418        let builder = self
419            .inner
420            .client
421            .request(
422                reqwest::Method::GET,
423                format!(
424                    "{}/storage/v1/b/{bucket_id}/o/{}",
425                    &self.inner.endpoint,
426                    enc(object)
427                ),
428            )
429            .query(&[("alt", "media")])
430            .header(
431                "x-goog-api-client",
432                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
433            );
434
435        // Add the optional query parameters.
436        let builder = if self.request.generation != 0 {
437            builder.query(&[("generation", self.request.generation)])
438        } else {
439            builder
440        };
441        let builder = self
442            .request
443            .if_generation_match
444            .iter()
445            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
446        let builder = self
447            .request
448            .if_generation_not_match
449            .iter()
450            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
451        let builder = self
452            .request
453            .if_metageneration_match
454            .iter()
455            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
456        let builder = self
457            .request
458            .if_metageneration_not_match
459            .iter()
460            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
461
462        let builder = apply_customer_supplied_encryption_headers(
463            builder,
464            &self.request.common_object_request_params,
465        );
466
467        // Apply "range" header for read limits and offsets.
468        let builder = match (self.request.read_offset, self.request.read_limit) {
469            // read_limit can't be negative.
470            (_, l) if l < 0 => {
471                unreachable!("ReadObject build never sets a negative read_limit value")
472            }
473            // negative offset can't also have a read_limit.
474            (o, l) if o < 0 && l > 0 => unreachable!(
475                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
476            ),
477            // If both are zero, we use default implementation (no range header).
478            (0, 0) => builder,
479            // negative offset with no limit means the last N bytes.
480            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
481            // read_limit is zero, means no limit. Read from offset to end of file.
482            // This handles cases like (5, 0) -> "bytes=5-"
483            (o, 0) => builder.header("range", format!("bytes={o}-")),
484            // General case: non-negative offset and positive limit.
485            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
486            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
487        };
488
489        self.inner.apply_auth_headers(builder).await
490    }
491}
492
493fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
494    headers
495        .get("x-goog-hash")
496        .and_then(|hash| hash.to_str().ok())
497        .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
498        .and_then(|hash| {
499            let hash = hash.trim_start_matches("crc32c=");
500            v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
501        })
502}
503
504fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
505    headers
506        .get("x-goog-hash")
507        .and_then(|hash| hash.to_str().ok())
508        .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
509        .and_then(|hash| {
510            let hash = hash.trim_start_matches("md5=");
511            base64::prelude::BASE64_STANDARD.decode(hash).ok()
512        })
513        .unwrap_or_default()
514}
515
516/// A response to a [Storage::read_object] request.
517#[derive(Debug)]
518pub(crate) struct ReadObjectResponseImpl {
519    reader: Reader,
520    response: Option<reqwest::Response>,
521    highlights: ObjectHighlights,
522    // Fields for tracking the crc checksum checks.
523    response_checksums: ObjectChecksums,
524    // Fields for resuming a read request.
525    range: ReadRange,
526    generation: i64,
527    resume_count: u32,
528}
529
530impl ReadObjectResponseImpl {
531    pub(crate) async fn new(reader: Reader) -> Result<Self> {
532        let response = reader.clone().read().await?;
533
534        let full = reader.request.read_offset == 0 && reader.request.read_limit == 0;
535        let response_checksums =
536            checksums_from_response(full, response.status(), response.headers());
537        let range = response_range(&response).map_err(Error::deser)?;
538        let generation = response_generation(&response).map_err(Error::deser)?;
539
540        let headers = response.headers();
541        let get_as_i64 = |header_name: &str| -> i64 {
542            headers
543                .get(header_name)
544                .and_then(|s| s.to_str().ok())
545                .and_then(|s| s.parse::<i64>().ok())
546                .unwrap_or_default()
547        };
548        let get_as_string = |header_name: &str| -> String {
549            headers
550                .get(header_name)
551                .and_then(|sc| sc.to_str().ok())
552                .map(|sc| sc.to_string())
553                .unwrap_or_default()
554        };
555        let highlights = ObjectHighlights {
556            generation,
557            metageneration: get_as_i64("x-goog-metageneration"),
558            size: get_as_i64("x-goog-stored-content-length"),
559            content_encoding: get_as_string("x-goog-stored-content-encoding"),
560            storage_class: get_as_string("x-goog-storage-class"),
561            content_type: get_as_string("content-type"),
562            content_language: get_as_string("content-language"),
563            content_disposition: get_as_string("content-disposition"),
564            etag: get_as_string("etag"),
565            checksums: headers.get("x-goog-hash").map(|_| {
566                crate::model::ObjectChecksums::new()
567                    .set_or_clear_crc32c(headers_to_crc32c(headers))
568                    .set_md5_hash(headers_to_md5_hash(headers))
569            }),
570        };
571
572        Ok(Self {
573            reader,
574            response: Some(response),
575            highlights,
576            // Fields for computing checksums.
577            response_checksums,
578            // Fields for resuming a read request.
579            range,
580            generation,
581            resume_count: 0,
582        })
583    }
584}
585
586#[async_trait::async_trait]
587impl crate::read_object::dynamic::ReadObjectResponse for ReadObjectResponseImpl {
588    fn object(&self) -> ObjectHighlights {
589        self.highlights.clone()
590    }
591
592    async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
593        match self.next_attempt().await {
594            None => None,
595            Some(Ok(b)) => Some(Ok(b)),
596            // Recursive async requires pin:
597            //     https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
598            Some(Err(e)) => Box::pin(self.resume(e)).await,
599        }
600    }
601}
602
603impl ReadObjectResponseImpl {
604    async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
605        let response = self.response.as_mut()?;
606        let res = response.chunk().await.map_err(Error::io);
607        match res {
608            Ok(Some(chunk)) => {
609                self.reader
610                    .options
611                    .checksum
612                    .update(self.range.start, &chunk);
613                let len = chunk.len() as u64;
614                if self.range.limit < len {
615                    return Some(Err(Error::deser(ReadError::LongRead {
616                        expected: self.range.limit,
617                        got: len,
618                    })));
619                }
620                self.range.limit -= len;
621                self.range.start += len;
622                Some(Ok(chunk))
623            }
624            Ok(None) => {
625                if self.range.limit != 0 {
626                    return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
627                }
628                let computed = self.reader.options.checksum.finalize();
629                let res = validate(&self.response_checksums, &Some(computed));
630                match res {
631                    Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
632                    Ok(()) => None,
633                }
634            }
635            Err(e) => Some(Err(e)),
636        }
637    }
638
639    async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
640        use crate::read_object::dynamic::ReadObjectResponse;
641        use crate::read_resume_policy::{ResumeQuery, ResumeResult};
642
643        // The existing read is no longer valid.
644        self.response = None;
645        self.resume_count += 1;
646        let query = ResumeQuery::new(self.resume_count);
647        match self
648            .reader
649            .options
650            .read_resume_policy
651            .on_error(&query, error)
652        {
653            ResumeResult::Continue(_) => {}
654            ResumeResult::Permanent(e) => return Some(Err(e)),
655            ResumeResult::Exhausted(e) => return Some(Err(e)),
656        };
657        self.reader.request.read_offset = self.range.start as i64;
658        self.reader.request.read_limit = self.range.limit as i64;
659        self.reader.request.generation = self.generation;
660        self.response = match self.reader.clone().read().await {
661            Ok(r) => Some(r),
662            Err(e) => return Some(Err(e)),
663        };
664        self.next().await
665    }
666}
667
668/// Returns the object checksums to validate against.
669///
670/// For some responses, the checksums are not expected to match the data.
671/// The function returns an empty `ObjectChecksums` in such a case.
672///
673/// Checksum validation is supported iff:
674/// 1. We requested the full content.
675/// 2. We got all the content (status != PartialContent).
676/// 3. The server sent a CRC header.
677/// 4. The http stack did not uncompress the file.
678/// 5. We were not served compressed data that was uncompressed on read.
679///
680/// For 4, we turn off automatic decompression in reqwest::Client when we
681/// create it,
682fn checksums_from_response(
683    full_content_requested: bool,
684    status: http::StatusCode,
685    headers: &http::HeaderMap,
686) -> ObjectChecksums {
687    let checksums = ObjectChecksums::new();
688    if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
689        return checksums;
690    }
691    let stored_encoding = headers
692        .get("x-goog-stored-content-encoding")
693        .and_then(|e| e.to_str().ok())
694        .map_or("", |e| e);
695    let content_encoding = headers
696        .get("content-encoding")
697        .and_then(|e| e.to_str().ok())
698        .map_or("", |e| e);
699    if stored_encoding == "gzip" && content_encoding != "gzip" {
700        return checksums;
701    }
702    checksums
703        .set_or_clear_crc32c(headers_to_crc32c(headers))
704        .set_md5_hash(headers_to_md5_hash(headers))
705}
706
707fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
708    match response.status() {
709        reqwest::StatusCode::OK => {
710            let header = required_header(response, "content-length")?;
711            let limit = header
712                .parse::<u64>()
713                .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
714            Ok(ReadRange { start: 0, limit })
715        }
716        reqwest::StatusCode::PARTIAL_CONTENT => {
717            let header = required_header(response, "content-range")?;
718            let header = header.strip_prefix("bytes ").ok_or_else(|| {
719                ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
720            })?;
721            let (range, _) = header.split_once('/').ok_or_else(|| {
722                ReadError::BadHeaderFormat("content-range", "missing / separator".into())
723            })?;
724            let (start, end) = range.split_once('-').ok_or_else(|| {
725                ReadError::BadHeaderFormat("content-range", "missing - separator".into())
726            })?;
727            let start = start
728                .parse::<u64>()
729                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
730            let end = end
731                .parse::<u64>()
732                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
733            // HTTP ranges are inclusive, we need to compute the number of bytes
734            // in the range:
735            let end = end + 1;
736            let limit = end
737                .checked_sub(start)
738                .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
739            Ok(ReadRange { start, limit })
740        }
741        s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
742    }
743}
744
745fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
746    let header = required_header(response, "x-goog-generation")?;
747    header
748        .parse::<i64>()
749        .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
750}
751
752fn required_header<'a>(
753    response: &'a reqwest::Response,
754    name: &'static str,
755) -> std::result::Result<&'a str, ReadError> {
756    let header = response
757        .headers()
758        .get(name)
759        .ok_or_else(|| ReadError::MissingHeader(name))?;
760    header
761        .to_str()
762        .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
763}
764
765#[derive(Debug, PartialEq)]
766struct ReadRange {
767    start: u64,
768    limit: u64,
769}
770
771#[cfg(test)]
772mod resume_tests;
773
774#[cfg(test)]
775mod tests {
776    use super::client::tests::{test_builder, test_inner_client};
777    use super::*;
778    use crate::error::ChecksumMismatch;
779    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
780    use auth::credentials::anonymous::Builder as Anonymous;
781    use futures::TryStreamExt;
782    use httptest::{Expectation, Server, matchers::*, responders::status_code};
783    use std::collections::HashMap;
784    use std::error::Error;
785    use std::sync::Arc;
786    use test_case::test_case;
787
788    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
789
790    async fn http_request_builder(
791        inner: Arc<StorageInner>,
792        builder: ReadObject,
793    ) -> crate::Result<reqwest::RequestBuilder> {
794        let reader = Reader {
795            inner,
796            request: builder.request,
797            options: builder.options,
798        };
799        reader.http_request_builder().await
800    }
801
802    // Verify `read_object()` meets normal Send, Sync, requirements.
803    #[tokio::test]
804    async fn test_read_is_send_and_static() -> Result {
805        let client = Storage::builder()
806            .with_credentials(Anonymous::new().build())
807            .build()
808            .await?;
809
810        fn need_send<T: Send>(_val: &T) {}
811        fn need_sync<T: Sync>(_val: &T) {}
812        fn need_static<T: 'static>(_val: &T) {}
813
814        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
815        need_send(&read);
816        need_sync(&read);
817        need_static(&read);
818
819        let read = client
820            .read_object("projects/_/buckets/test-bucket", "test-object")
821            .send();
822        need_send(&read);
823        need_static(&read);
824
825        Ok(())
826    }
827    #[tokio::test]
828    async fn read_object_normal() -> Result {
829        let server = Server::run();
830        server.expect(
831            Expectation::matching(all_of![
832                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
833                request::query(url_decoded(contains(("alt", "media")))),
834            ])
835            .respond_with(
836                status_code(200)
837                    .body("hello world")
838                    .append_header("x-goog-generation", 123456),
839            ),
840        );
841
842        let client = Storage::builder()
843            .with_endpoint(format!("http://{}", server.addr()))
844            .with_credentials(Anonymous::new().build())
845            .build()
846            .await?;
847        let mut reader = client
848            .read_object("projects/_/buckets/test-bucket", "test-object")
849            .send()
850            .await?;
851        let mut got = Vec::new();
852        while let Some(b) = reader.next().await.transpose()? {
853            got.extend_from_slice(&b);
854        }
855        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
856
857        Ok(())
858    }
859
860    #[tokio::test]
861    async fn read_object_metadata() -> Result {
862        const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
863        let server = Server::run();
864        server.expect(
865            Expectation::matching(all_of![
866                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
867                request::query(url_decoded(contains(("alt", "media")))),
868            ])
869            .respond_with(
870                status_code(200)
871                    .body(CONTENTS)
872                    .append_header(
873                        "x-goog-hash",
874                        "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
875                    )
876                    .append_header("x-goog-generation", 500)
877                    .append_header("x-goog-metageneration", "1")
878                    .append_header("x-goog-stored-content-length", 30)
879                    .append_header("x-goog-stored-content-encoding", "identity")
880                    .append_header("x-goog-storage-class", "STANDARD")
881                    .append_header("content-language", "en")
882                    .append_header("content-type", "text/plain")
883                    .append_header("content-disposition", "inline")
884                    .append_header("etag", "etagval"),
885            ),
886        );
887
888        let endpoint = server.url("");
889        let client = Storage::builder()
890            .with_endpoint(endpoint.to_string())
891            .with_credentials(Anonymous::new().build())
892            .build()
893            .await?;
894        let reader = client
895            .read_object("projects/_/buckets/test-bucket", "test-object")
896            .send()
897            .await?;
898        let object = reader.object();
899        assert_eq!(object.generation, 500);
900        assert_eq!(object.metageneration, 1);
901        assert_eq!(object.size, 30);
902        assert_eq!(object.content_encoding, "identity");
903        assert_eq!(
904            object.checksums.as_ref().unwrap().crc32c.unwrap(),
905            crc32c::crc32c(CONTENTS.as_bytes())
906        );
907        assert_eq!(
908            object.checksums.as_ref().unwrap().md5_hash,
909            base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
910        );
911
912        Ok(())
913    }
914
915    #[tokio::test]
916    async fn read_object_stream() -> Result {
917        let server = Server::run();
918        server.expect(
919            Expectation::matching(all_of![
920                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
921                request::query(url_decoded(contains(("alt", "media")))),
922            ])
923            .respond_with(
924                status_code(200)
925                    .append_header("x-goog-generation", 123456)
926                    .body("hello world"),
927            ),
928        );
929
930        let client = Storage::builder()
931            .with_endpoint(format!("http://{}", server.addr()))
932            .with_credentials(Anonymous::new().build())
933            .build()
934            .await?;
935        let response = client
936            .read_object("projects/_/buckets/test-bucket", "test-object")
937            .send()
938            .await?;
939        let result: Vec<_> = response.into_stream().try_collect().await?;
940        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
941
942        Ok(())
943    }
944
945    #[tokio::test]
946    async fn read_object_next_then_consume_response() -> Result {
947        // Create a large enough file that will require multiple chunks to read.
948        const BLOCK_SIZE: usize = 500;
949        let mut contents = Vec::new();
950        for i in 0..50 {
951            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
952        }
953
954        // Calculate and serialize the crc32c checksum
955        let u = crc32c::crc32c(&contents);
956        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
957
958        let server = Server::run();
959        server.expect(
960            Expectation::matching(all_of![
961                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
962                request::query(url_decoded(contains(("alt", "media")))),
963            ])
964            .times(1)
965            .respond_with(
966                status_code(200)
967                    .body(contents.clone())
968                    .append_header("x-goog-hash", format!("crc32c={value}"))
969                    .append_header("x-goog-generation", 123456),
970            ),
971        );
972
973        let client = Storage::builder()
974            .with_endpoint(format!("http://{}", server.addr()))
975            .with_credentials(Anonymous::new().build())
976            .build()
977            .await?;
978
979        // Read some bytes, then remainder with stream.
980        let mut response = client
981            .read_object("projects/_/buckets/test-bucket", "test-object")
982            .send()
983            .await?;
984
985        let mut all_bytes = bytes::BytesMut::new();
986        let chunk = response.next().await.transpose()?.unwrap();
987        assert!(!chunk.is_empty());
988        all_bytes.extend(chunk);
989        use futures::StreamExt;
990        let mut stream = response.into_stream();
991        while let Some(chunk) = stream.next().await.transpose()? {
992            all_bytes.extend(chunk);
993        }
994        assert_eq!(all_bytes, contents);
995
996        Ok(())
997    }
998
999    #[tokio::test]
1000    async fn read_object_not_found() -> Result {
1001        let server = Server::run();
1002        server.expect(
1003            Expectation::matching(all_of![
1004                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1005                request::query(url_decoded(contains(("alt", "media")))),
1006            ])
1007            .respond_with(status_code(404).body("NOT FOUND")),
1008        );
1009
1010        let client = Storage::builder()
1011            .with_endpoint(format!("http://{}", server.addr()))
1012            .with_credentials(Anonymous::new().build())
1013            .build()
1014            .await?;
1015        let err = client
1016            .read_object("projects/_/buckets/test-bucket", "test-object")
1017            .send()
1018            .await
1019            .expect_err("expected a not found error");
1020        assert_eq!(err.http_status_code(), Some(404));
1021
1022        Ok(())
1023    }
1024
1025    #[tokio::test]
1026    async fn read_object_incorrect_crc32c_check() -> Result {
1027        // Calculate and serialize the crc32c checksum
1028        let u = crc32c::crc32c("goodbye world".as_bytes());
1029        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1030
1031        let server = Server::run();
1032        server.expect(
1033            Expectation::matching(all_of![
1034                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1035                request::query(url_decoded(contains(("alt", "media")))),
1036            ])
1037            .times(3)
1038            .respond_with(
1039                status_code(200)
1040                    .body("hello world")
1041                    .append_header("x-goog-hash", format!("crc32c={value}"))
1042                    .append_header("x-goog-generation", 123456),
1043            ),
1044        );
1045
1046        let client = Storage::builder()
1047            .with_endpoint(format!("http://{}", server.addr()))
1048            .with_credentials(Anonymous::new().build())
1049            .build()
1050            .await?;
1051        let mut response = client
1052            .read_object("projects/_/buckets/test-bucket", "test-object")
1053            .send()
1054            .await?;
1055        let mut partial = Vec::new();
1056        let mut err = None;
1057        while let Some(r) = response.next().await {
1058            match r {
1059                Ok(b) => partial.extend_from_slice(&b),
1060                Err(e) => err = Some(e),
1061            };
1062        }
1063        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1064        let err = err.expect("expect error on incorrect crc32c");
1065        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1066        assert!(
1067            matches!(
1068                source,
1069                Some(&ReadError::ChecksumMismatch(
1070                    ChecksumMismatch::Crc32c { .. }
1071                ))
1072            ),
1073            "err={err:?}"
1074        );
1075
1076        let mut response = client
1077            .read_object("projects/_/buckets/test-bucket", "test-object")
1078            .send()
1079            .await?;
1080        let err: crate::Error = async {
1081            {
1082                while (response.next().await.transpose()?).is_some() {}
1083                Ok(())
1084            }
1085        }
1086        .await
1087        .expect_err("expect error on incorrect crc32c");
1088        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1089        assert!(
1090            matches!(
1091                source,
1092                Some(&ReadError::ChecksumMismatch(
1093                    ChecksumMismatch::Crc32c { .. }
1094                ))
1095            ),
1096            "err={err:?}"
1097        );
1098
1099        use futures::TryStreamExt;
1100        let err = client
1101            .read_object("projects/_/buckets/test-bucket", "test-object")
1102            .send()
1103            .await?
1104            .into_stream()
1105            .try_collect::<Vec<bytes::Bytes>>()
1106            .await
1107            .expect_err("expect error on incorrect crc32c");
1108        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1109        assert!(
1110            matches!(
1111                source,
1112                Some(&ReadError::ChecksumMismatch(
1113                    ChecksumMismatch::Crc32c { .. }
1114                ))
1115            ),
1116            "err={err:?}"
1117        );
1118        Ok(())
1119    }
1120
1121    #[tokio::test]
1122    async fn read_object_incorrect_md5_check() -> Result {
1123        // Calculate and serialize the md5 checksum
1124        let digest = md5::compute("goodbye world".as_bytes());
1125        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1126
1127        let server = Server::run();
1128        server.expect(
1129            Expectation::matching(all_of![
1130                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1131                request::query(url_decoded(contains(("alt", "media")))),
1132            ])
1133            .times(1)
1134            .respond_with(
1135                status_code(200)
1136                    .body("hello world")
1137                    .append_header("x-goog-hash", format!("md5={value}"))
1138                    .append_header("x-goog-generation", 123456),
1139            ),
1140        );
1141
1142        let client = Storage::builder()
1143            .with_endpoint(format!("http://{}", server.addr()))
1144            .with_credentials(Anonymous::new().build())
1145            .build()
1146            .await?;
1147        let mut response = client
1148            .read_object("projects/_/buckets/test-bucket", "test-object")
1149            .compute_md5()
1150            .send()
1151            .await?;
1152        let mut partial = Vec::new();
1153        let mut err = None;
1154        while let Some(r) = response.next().await {
1155            match r {
1156                Ok(b) => partial.extend_from_slice(&b),
1157                Err(e) => err = Some(e),
1158            };
1159        }
1160        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1161        let err = err.expect("expect error on incorrect md5");
1162        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1163        assert!(
1164            matches!(
1165                source,
1166                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1167            ),
1168            "err={err:?}"
1169        );
1170
1171        Ok(())
1172    }
1173
1174    #[tokio::test]
1175    async fn read_object() -> Result {
1176        let inner = test_inner_client(test_builder());
1177        let stub = crate::storage::transport::Storage::new(inner.clone());
1178        let builder = ReadObject::new(
1179            stub,
1180            "projects/_/buckets/bucket",
1181            "object",
1182            inner.options.clone(),
1183        );
1184        let request = http_request_builder(inner, builder).await?.build()?;
1185
1186        assert_eq!(request.method(), reqwest::Method::GET);
1187        assert_eq!(
1188            request.url().as_str(),
1189            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1190        );
1191        Ok(())
1192    }
1193
1194    #[tokio::test]
1195    async fn read_object_error_credentials() -> Result {
1196        let inner = test_inner_client(
1197            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1198        );
1199        let stub = crate::storage::transport::Storage::new(inner.clone());
1200        let builder = ReadObject::new(
1201            stub,
1202            "projects/_/buckets/bucket",
1203            "object",
1204            inner.options.clone(),
1205        );
1206        let _ = http_request_builder(inner, builder)
1207            .await
1208            .inspect_err(|e| assert!(e.is_authentication()))
1209            .expect_err("invalid credentials should err");
1210        Ok(())
1211    }
1212
1213    #[tokio::test]
1214    async fn read_object_bad_bucket() -> Result {
1215        let inner = test_inner_client(test_builder());
1216        let stub = crate::storage::transport::Storage::new(inner.clone());
1217        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1218        let _ = http_request_builder(inner, builder)
1219            .await
1220            .expect_err("malformed bucket string should error");
1221        Ok(())
1222    }
1223
1224    #[tokio::test]
1225    async fn read_object_query_params() -> Result {
1226        let inner = test_inner_client(test_builder());
1227        let stub = crate::storage::transport::Storage::new(inner.clone());
1228        let builder = ReadObject::new(
1229            stub,
1230            "projects/_/buckets/bucket",
1231            "object",
1232            inner.options.clone(),
1233        )
1234        .set_generation(5)
1235        .set_if_generation_match(10)
1236        .set_if_generation_not_match(20)
1237        .set_if_metageneration_match(30)
1238        .set_if_metageneration_not_match(40);
1239        let request = http_request_builder(inner, builder).await?.build()?;
1240
1241        assert_eq!(request.method(), reqwest::Method::GET);
1242        let want_pairs: HashMap<String, String> = [
1243            ("alt", "media"),
1244            ("generation", "5"),
1245            ("ifGenerationMatch", "10"),
1246            ("ifGenerationNotMatch", "20"),
1247            ("ifMetagenerationMatch", "30"),
1248            ("ifMetagenerationNotMatch", "40"),
1249        ]
1250        .iter()
1251        .map(|(k, v)| (k.to_string(), v.to_string()))
1252        .collect();
1253        let query_pairs: HashMap<String, String> = request
1254            .url()
1255            .query_pairs()
1256            .map(|param| (param.0.to_string(), param.1.to_string()))
1257            .collect();
1258        assert_eq!(query_pairs.len(), want_pairs.len());
1259        assert_eq!(query_pairs, want_pairs);
1260        Ok(())
1261    }
1262
1263    #[tokio::test]
1264    async fn read_object_headers() -> Result {
1265        // Make a 32-byte key.
1266        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1267
1268        // The API takes the unencoded byte array.
1269        let inner = test_inner_client(test_builder());
1270        let stub = crate::storage::transport::Storage::new(inner.clone());
1271        let builder = ReadObject::new(
1272            stub,
1273            "projects/_/buckets/bucket",
1274            "object",
1275            inner.options.clone(),
1276        )
1277        .set_key(KeyAes256::new(&key)?);
1278        let request = http_request_builder(inner, builder).await?.build()?;
1279
1280        assert_eq!(request.method(), reqwest::Method::GET);
1281        assert_eq!(
1282            request.url().as_str(),
1283            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1284        );
1285
1286        let want = vec![
1287            ("x-goog-encryption-algorithm", "AES256".to_string()),
1288            ("x-goog-encryption-key", key_base64),
1289            ("x-goog-encryption-key-sha256", key_sha256_base64),
1290        ];
1291
1292        for (name, value) in want {
1293            assert_eq!(
1294                request.headers().get(name).unwrap().as_bytes(),
1295                bytes::Bytes::from(value)
1296            );
1297        }
1298        Ok(())
1299    }
1300
1301    #[test_case(ReadRange::all(), None; "no headers needed")]
1302    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1303    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1304    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1305    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1306    #[tokio::test]
1307    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1308        let inner = test_inner_client(test_builder());
1309        let stub = crate::storage::transport::Storage::new(inner.clone());
1310        let builder = ReadObject::new(
1311            stub,
1312            "projects/_/buckets/bucket",
1313            "object",
1314            inner.options.clone(),
1315        )
1316        .set_read_range(input.clone());
1317        let request = http_request_builder(inner, builder).await?.build()?;
1318
1319        assert_eq!(request.method(), reqwest::Method::GET);
1320        assert_eq!(
1321            request.url().as_str(),
1322            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1323        );
1324
1325        assert_eq!(request.headers().get("range"), want);
1326        Ok(())
1327    }
1328
1329    #[test_case("projects/p", "projects%2Fp")]
1330    #[test_case("kebab-case", "kebab-case")]
1331    #[test_case("dot.name", "dot.name")]
1332    #[test_case("under_score", "under_score")]
1333    #[test_case("tilde~123", "tilde~123")]
1334    #[test_case("exclamation!point!", "exclamation%21point%21")]
1335    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1336    #[test_case("preserve%percent%21", "preserve%percent%21")]
1337    #[test_case(
1338        "testall !#$&'()*+,/:;=?@[]",
1339        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1340    )]
1341    #[tokio::test]
1342    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1343        let inner = test_inner_client(test_builder());
1344        let stub = crate::storage::transport::Storage::new(inner.clone());
1345        let builder = ReadObject::new(
1346            stub,
1347            "projects/_/buckets/bucket",
1348            name,
1349            inner.options.clone(),
1350        );
1351        let request = http_request_builder(inner, builder).await?.build()?;
1352        let got = request.url().path_segments().unwrap().next_back().unwrap();
1353        assert_eq!(got, want);
1354        Ok(())
1355    }
1356
1357    #[test]
1358    fn document_crc32c_values() {
1359        let bytes = (1234567890_u32).to_be_bytes();
1360        let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1361        assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1362    }
1363
1364    #[test_case("", None; "no header")]
1365    #[test_case("crc32c=hello", None; "invalid value")]
1366    #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1367    #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1368    #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1369    #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1370    fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1371        let mut headers = http::HeaderMap::new();
1372        if !val.is_empty() {
1373            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1374        }
1375        let got = headers_to_crc32c(&headers);
1376        assert_eq!(got, want);
1377        Ok(())
1378    }
1379
1380    #[test_case("", None; "no header")]
1381    #[test_case("md5=invalid", None; "invalid value")]
1382    #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1383    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1384    #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1385    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1386    fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1387        let mut headers = http::HeaderMap::new();
1388        if !val.is_empty() {
1389            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1390        }
1391        let got = headers_to_md5_hash(&headers);
1392        match want {
1393            Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1394            None => assert!(got.is_empty()),
1395        }
1396        Ok(())
1397    }
1398
1399    #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1400    #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1401    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1402    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1403    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1404    fn test_checksums_validation_enabled(
1405        full_content_requested: bool,
1406        headers: Vec<(&str, &str)>,
1407        status: http::StatusCode,
1408        want_crc32c: Option<u32>,
1409        want_md5: &str,
1410    ) -> Result {
1411        let mut header_map = http::HeaderMap::new();
1412        for (key, value) in headers {
1413            header_map.insert(
1414                http::HeaderName::from_bytes(key.as_bytes())?,
1415                http::HeaderValue::from_bytes(value.as_bytes())?,
1416            );
1417        }
1418
1419        let got = checksums_from_response(full_content_requested, status, &header_map);
1420        assert_eq!(got.crc32c, want_crc32c);
1421        assert_eq!(
1422            got.md5_hash,
1423            base64::prelude::BASE64_STANDARD.decode(want_md5)?
1424        );
1425        Ok(())
1426    }
1427
1428    #[test_case(0)]
1429    #[test_case(1024)]
1430    fn response_range_success(limit: u64) -> Result {
1431        let response = http::Response::builder()
1432            .status(200)
1433            .header("content-length", limit)
1434            .body(Vec::new())?;
1435        let response = reqwest::Response::from(response);
1436        let range = response_range(&response)?;
1437        assert_eq!(range, super::ReadRange { start: 0, limit });
1438        Ok(())
1439    }
1440
1441    #[test]
1442    fn response_range_missing() -> Result {
1443        let response = http::Response::builder().status(200).body(Vec::new())?;
1444        let response = reqwest::Response::from(response);
1445        let err = response_range(&response).expect_err("missing header should result in an error");
1446        assert!(
1447            matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1448            "{err:?}"
1449        );
1450        Ok(())
1451    }
1452
1453    #[test_case("")]
1454    #[test_case("abc")]
1455    #[test_case("-123")]
1456    fn response_range_format(value: &'static str) -> Result {
1457        let response = http::Response::builder()
1458            .status(200)
1459            .header("content-length", value)
1460            .body(Vec::new())?;
1461        let response = reqwest::Response::from(response);
1462        let err = response_range(&response).expect_err("header value should result in an error");
1463        assert!(
1464            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1465            "{err:?}"
1466        );
1467        assert!(err.source().is_some(), "{err:?}");
1468        Ok(())
1469    }
1470
1471    #[test_case(0, 123)]
1472    #[test_case(123, 456)]
1473    fn response_range_partial_success(start: u64, end: u64) -> Result {
1474        let response = http::Response::builder()
1475            .status(206)
1476            .header(
1477                "content-range",
1478                format!("bytes {}-{}/{}", start, end, end + 1),
1479            )
1480            .body(Vec::new())?;
1481        let response = reqwest::Response::from(response);
1482        let range = response_range(&response)?;
1483        assert_eq!(
1484            range,
1485            super::ReadRange {
1486                start,
1487                limit: (end + 1 - start)
1488            }
1489        );
1490        Ok(())
1491    }
1492
1493    #[test]
1494    fn response_range_partial_missing() -> Result {
1495        let response = http::Response::builder().status(206).body(Vec::new())?;
1496        let response = reqwest::Response::from(response);
1497        let err = response_range(&response).expect_err("missing header should result in an error");
1498        assert!(
1499            matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1500            "{err:?}"
1501        );
1502        Ok(())
1503    }
1504
1505    #[test_case("")]
1506    #[test_case("123-456/457"; "bad prefix")]
1507    #[test_case("bytes 123-456 457"; "bad separator")]
1508    #[test_case("bytes 123+456/457"; "bad separator [2]")]
1509    #[test_case("bytes abc-456/457"; "start is not numbers")]
1510    #[test_case("bytes 123-cde/457"; "end is not numbers")]
1511    #[test_case("bytes 123-0/457"; "invalid range")]
1512    fn response_range_partial_format(value: &'static str) -> Result {
1513        let response = http::Response::builder()
1514            .status(206)
1515            .header("content-range", value)
1516            .body(Vec::new())?;
1517        let response = reqwest::Response::from(response);
1518        let err = response_range(&response).expect_err("header value should result in an error");
1519        assert!(
1520            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1521            "{err:?}"
1522        );
1523        assert!(err.source().is_some(), "{err:?}");
1524        Ok(())
1525    }
1526
1527    #[test]
1528    fn response_range_bad_response() -> Result {
1529        let code = reqwest::StatusCode::CREATED;
1530        let response = http::Response::builder().status(code).body(Vec::new())?;
1531        let response = reqwest::Response::from(response);
1532        let err = response_range(&response).expect_err("unexpected status creates error");
1533        assert!(
1534            matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1535            "{err:?}"
1536        );
1537        Ok(())
1538    }
1539
1540    #[test_case(0)]
1541    #[test_case(1024)]
1542    fn response_generation_success(value: i64) -> Result {
1543        let response = http::Response::builder()
1544            .status(200)
1545            .header("x-goog-generation", value)
1546            .body(Vec::new())?;
1547        let response = reqwest::Response::from(response);
1548        let got = response_generation(&response)?;
1549        assert_eq!(got, value);
1550        Ok(())
1551    }
1552
1553    #[test]
1554    fn response_generation_missing() -> Result {
1555        let response = http::Response::builder().status(200).body(Vec::new())?;
1556        let response = reqwest::Response::from(response);
1557        let err =
1558            response_generation(&response).expect_err("missing header should result in an error");
1559        assert!(
1560            matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1561            "{err:?}"
1562        );
1563        Ok(())
1564    }
1565
1566    #[test_case("")]
1567    #[test_case("abc")]
1568    fn response_generation_format(value: &'static str) -> Result {
1569        let response = http::Response::builder()
1570            .status(200)
1571            .header("x-goog-generation", value)
1572            .body(Vec::new())?;
1573        let response = reqwest::Response::from(response);
1574        let err =
1575            response_generation(&response).expect_err("header value should result in an error");
1576        assert!(
1577            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1578            "{err:?}"
1579        );
1580        assert!(err.source().is_some(), "{err:?}");
1581        Ok(())
1582    }
1583
1584    #[test]
1585    fn required_header_not_str() -> Result {
1586        let name = "x-goog-test";
1587        let response = http::Response::builder()
1588            .status(200)
1589            .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1590            .body(Vec::new())?;
1591        let response = reqwest::Response::from(response);
1592        let err =
1593            required_header(&response, name).expect_err("header value should result in an error");
1594        assert!(
1595            matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1596            "{err:?}"
1597        );
1598        assert!(err.source().is_some(), "{err:?}");
1599        Ok(())
1600    }
1601}