google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use super::client::*;
16use super::*;
17use crate::error::ReadError;
18use crate::model::ObjectChecksums;
19use crate::model_ext::KeyAes256;
20use crate::model_ext::ObjectHighlights;
21use crate::read_object::ReadObjectResponse;
22use crate::read_resume_policy::ReadResumePolicy;
23use crate::storage::checksum::details::{Md5, validate};
24use crate::storage::request_options::RequestOptions;
25use base64::Engine;
26use serde_with::DeserializeAs;
27
28/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
29///
30/// # Example: accumulate the contents of an object into a vector
31/// ```
32/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
33/// async fn sample(client: &Storage) -> anyhow::Result<()> {
34///     let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
35///     let mut reader = builder.send().await?;
36///     let mut contents = Vec::new();
37///     while let Some(chunk) = reader.next().await.transpose()? {
38///         contents.extend_from_slice(&chunk);
39///     }
40///     println!("object contents={:?}", contents);
41///     Ok(())
42/// }
43/// ```
44///
45/// # Example: read part of an object
46/// ```
47/// use google_cloud_storage::{client::Storage, builder::storage::ReadObject};
48/// use google_cloud_storage::model_ext::ReadRange;
49/// async fn sample(client: &Storage) -> anyhow::Result<()> {
50///     const MIB: u64 = 1024 * 1024;
51///     let mut contents = Vec::new();
52///     let mut reader = client
53///         .read_object("projects/_/buckets/my-bucket", "my-object")
54///         .set_read_range(ReadRange::segment(4 * MIB, 2 * MIB))
55///         .send()
56///         .await?;
57///     while let Some(chunk) = reader.next().await.transpose()? {
58///         contents.extend_from_slice(&chunk);
59///     }
60///     println!("range contents={:?}", contents);
61///     Ok(())
62/// }
63/// ```
64#[derive(Clone, Debug)]
65pub struct ReadObject<S = crate::storage::transport::Storage>
66where
67    S: crate::storage::stub::Storage + 'static,
68{
69    stub: std::sync::Arc<S>,
70    request: crate::model::ReadObjectRequest,
71    options: RequestOptions,
72}
73
74impl<S> ReadObject<S>
75where
76    S: crate::storage::stub::Storage + 'static,
77{
78    pub(crate) fn new<B, O>(
79        stub: std::sync::Arc<S>,
80        bucket: B,
81        object: O,
82        options: RequestOptions,
83    ) -> Self
84    where
85        B: Into<String>,
86        O: Into<String>,
87    {
88        ReadObject {
89            stub,
90            request: crate::model::ReadObjectRequest::new()
91                .set_bucket(bucket)
92                .set_object(object),
93            options,
94        }
95    }
96
97    /// Enables computation of MD5 hashes.
98    ///
99    /// Crc32c hashes are checked by default.
100    ///
101    /// Checksum validation is supported iff:
102    /// 1. The full content is requested.
103    /// 2. All of the content is returned (status != PartialContent).
104    /// 3. The server sent a checksum header.
105    /// 4. The http stack did not uncompress the file.
106    /// 5. The server did not uncompress data on read.
107    ///
108    /// # Example
109    /// ```
110    /// # use google_cloud_storage::client::Storage;
111    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
112    /// let builder =  client
113    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
114    ///     .compute_md5();
115    /// let mut reader = builder
116    ///     .send()
117    ///     .await?;
118    /// let mut contents = Vec::new();
119    /// while let Some(chunk) = reader.next().await.transpose()? {
120    ///     contents.extend_from_slice(&chunk);
121    /// }
122    /// println!("object contents={:?}", contents);
123    /// # Ok(()) }
124    /// ```
125    pub fn compute_md5(self) -> Self {
126        let mut this = self;
127        this.options.checksum.md5_hash = Some(Md5::default());
128        this
129    }
130
131    /// If present, selects a specific revision of this object (as
132    /// opposed to the latest version, the default).
133    pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
134        self.request.generation = v.into();
135        self
136    }
137
138    /// Makes the operation conditional on whether the object's current generation
139    /// matches the given value. Setting to 0 makes the operation succeed only if
140    /// there are no live versions of the object.
141    pub fn set_if_generation_match<T>(mut self, v: T) -> Self
142    where
143        T: Into<i64>,
144    {
145        self.request.if_generation_match = Some(v.into());
146        self
147    }
148
149    /// Makes the operation conditional on whether the object's live generation
150    /// does not match the given value. If no live object exists, the precondition
151    /// fails. Setting to 0 makes the operation succeed only if there is a live
152    /// version of the object.
153    pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
154    where
155        T: Into<i64>,
156    {
157        self.request.if_generation_not_match = Some(v.into());
158        self
159    }
160
161    /// Makes the operation conditional on whether the object's current
162    /// metageneration matches the given value.
163    pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
164    where
165        T: Into<i64>,
166    {
167        self.request.if_metageneration_match = Some(v.into());
168        self
169    }
170
171    /// Makes the operation conditional on whether the object's current
172    /// metageneration does not match the given value.
173    pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
174    where
175        T: Into<i64>,
176    {
177        self.request.if_metageneration_not_match = Some(v.into());
178        self
179    }
180
181    /// The range of bytes to return in the read.
182    ///
183    /// This can be all the bytes starting at a given offset
184    /// (`ReadRange::offset()`), all the bytes in an explicit range
185    /// (`Range::segment`), or the last N bytes of the object
186    /// (`ReadRange::tail`).
187    ///
188    /// # Examples
189    ///
190    /// Read starting at 100 bytes to end of file.
191    /// ```
192    /// # use google_cloud_storage::client::Storage;
193    /// # use google_cloud_storage::model_ext::ReadRange;
194    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
195    /// let response = client
196    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
197    ///     .set_read_range(ReadRange::offset(100))
198    ///     .send()
199    ///     .await?;
200    /// println!("response details={response:?}");
201    /// # Ok(()) }
202    /// ```
203    ///
204    /// Read last 100 bytes of file:
205    /// ```
206    /// # use google_cloud_storage::client::Storage;
207    /// # use google_cloud_storage::model_ext::ReadRange;
208    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
209    /// let response = client
210    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
211    ///     .set_read_range(ReadRange::tail(100))
212    ///     .send()
213    ///     .await?;
214    /// println!("response details={response:?}");
215    /// # Ok(()) }
216    /// ```
217    ///
218    /// Read bytes 1000 to 1099.
219    /// ```
220    /// # use google_cloud_storage::client::Storage;
221    /// # use google_cloud_storage::model_ext::ReadRange;
222    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
223    /// let response = client
224    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
225    ///     .set_read_range(ReadRange::segment(1000, 100))
226    ///     .send()
227    ///     .await?;
228    /// println!("response details={response:?}");
229    /// # Ok(()) }
230    /// ```
231    pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
232        self.request.with_range(range);
233        self
234    }
235
236    /// The encryption key used with the Customer-Supplied Encryption Keys
237    /// feature. In raw bytes format (not base64-encoded).
238    ///
239    /// Example:
240    /// ```
241    /// # use google_cloud_storage::{model_ext::KeyAes256, client::Storage};
242    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
243    /// let key: &[u8] = &[97; 32];
244    /// let response = client
245    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
246    ///     .set_key(KeyAes256::new(key)?)
247    ///     .send()
248    ///     .await?;
249    /// println!("response details={response:?}");
250    /// # Ok(()) }
251    /// ```
252    pub fn set_key(mut self, v: KeyAes256) -> Self {
253        self.request.common_object_request_params = Some(v.into());
254        self
255    }
256
257    /// The retry policy used for this request.
258    ///
259    /// # Example
260    /// ```
261    /// # use google_cloud_storage::client::Storage;
262    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
263    /// use google_cloud_storage::retry_policy::RetryableErrors;
264    /// use std::time::Duration;
265    /// use gax::retry_policy::RetryPolicyExt;
266    /// let response = client
267    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
268    ///     .with_retry_policy(
269    ///         RetryableErrors
270    ///             .with_attempt_limit(5)
271    ///             .with_time_limit(Duration::from_secs(10)),
272    ///     )
273    ///     .send()
274    ///     .await?;
275    /// println!("response details={response:?}");
276    /// # Ok(()) }
277    /// ```
278    pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
279        self.options.retry_policy = v.into().into();
280        self
281    }
282
283    /// The backoff policy used for this request.
284    ///
285    /// # Example
286    /// ```
287    /// # use google_cloud_storage::client::Storage;
288    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
289    /// use std::time::Duration;
290    /// use gax::exponential_backoff::ExponentialBackoff;
291    /// let response = client
292    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
293    ///     .with_backoff_policy(ExponentialBackoff::default())
294    ///     .send()
295    ///     .await?;
296    /// println!("response details={response:?}");
297    /// # Ok(()) }
298    /// ```
299    pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
300        mut self,
301        v: V,
302    ) -> Self {
303        self.options.backoff_policy = v.into().into();
304        self
305    }
306
307    /// The retry throttler used for this request.
308    ///
309    /// Most of the time you want to use the same throttler for all the requests
310    /// in a client, and even the same throttler for many clients. Rarely it
311    /// may be necessary to use an custom throttler for some subset of the
312    /// requests.
313    ///
314    /// # Example
315    /// ```
316    /// # use google_cloud_storage::client::Storage;
317    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
318    /// let response = client
319    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
320    ///     .with_retry_throttler(adhoc_throttler())
321    ///     .send()
322    ///     .await?;
323    /// println!("response details={response:?}");
324    /// fn adhoc_throttler() -> gax::retry_throttler::SharedRetryThrottler {
325    ///     # panic!();
326    /// }
327    /// # Ok(()) }
328    /// ```
329    pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
330        mut self,
331        v: V,
332    ) -> Self {
333        self.options.retry_throttler = v.into().into();
334        self
335    }
336
337    /// Configure the resume policy for read requests.
338    ///
339    /// The Cloud Storage client library can automatically resume a read that is
340    /// interrupted by a transient error. Applications may want to limit the
341    /// number of read attempts, or may wish to expand the type of errors
342    /// treated as retryable.
343    ///
344    /// # Example
345    /// ```
346    /// # use google_cloud_storage::client::Storage;
347    /// # async fn sample(client: &Storage) -> anyhow::Result<()> {
348    /// use google_cloud_storage::read_resume_policy::{AlwaysResume, ReadResumePolicyExt};
349    /// let response = client
350    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
351    ///     .with_read_resume_policy(AlwaysResume.with_attempt_limit(3))
352    ///     .send()
353    ///     .await?;
354    /// # Ok(()) }
355    /// ```
356    pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
357    where
358        V: ReadResumePolicy + 'static,
359    {
360        self.options.read_resume_policy = std::sync::Arc::new(v);
361        self
362    }
363
364    /// Sends the request.
365    pub async fn send(self) -> Result<ReadObjectResponse> {
366        self.stub.read_object(self.request, self.options).await
367    }
368}
369
370// A convenience struct that saves the request conditions and performs the read.
371#[derive(Clone, Debug)]
372pub(crate) struct Reader {
373    pub inner: std::sync::Arc<StorageInner>,
374    pub request: crate::model::ReadObjectRequest,
375    pub options: RequestOptions,
376}
377
378impl Reader {
379    async fn read(self) -> Result<reqwest::Response> {
380        let throttler = self.options.retry_throttler.clone();
381        let retry = self.options.retry_policy.clone();
382        let backoff = self.options.backoff_policy.clone();
383
384        gax::retry_loop_internal::retry_loop(
385            async move |_| self.read_attempt().await,
386            async |duration| tokio::time::sleep(duration).await,
387            true,
388            throttler,
389            retry,
390            backoff,
391        )
392        .await
393    }
394
395    async fn read_attempt(&self) -> Result<reqwest::Response> {
396        let builder = self.http_request_builder().await?;
397        let response = builder.send().await.map_err(Error::io)?;
398        if !response.status().is_success() {
399            return gaxi::http::to_http_error(response).await;
400        }
401        Ok(response)
402    }
403
404    async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
405        // Collect the required bucket and object parameters.
406        let bucket = &self.request.bucket;
407        let bucket_id = bucket
408            .as_str()
409            .strip_prefix("projects/_/buckets/")
410            .ok_or_else(|| {
411                Error::binding(format!(
412                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
413                ))
414            })?;
415        let object = &self.request.object;
416
417        // Build the request.
418        let builder = self
419            .inner
420            .client
421            .request(
422                reqwest::Method::GET,
423                format!(
424                    "{}/storage/v1/b/{bucket_id}/o/{}",
425                    &self.inner.endpoint,
426                    enc(object)
427                ),
428            )
429            .query(&[("alt", "media")])
430            .header(
431                "x-goog-api-client",
432                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
433            );
434
435        // Add the optional query parameters.
436        let builder = if self.request.generation != 0 {
437            builder.query(&[("generation", self.request.generation)])
438        } else {
439            builder
440        };
441        let builder = self
442            .request
443            .if_generation_match
444            .iter()
445            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
446        let builder = self
447            .request
448            .if_generation_not_match
449            .iter()
450            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
451        let builder = self
452            .request
453            .if_metageneration_match
454            .iter()
455            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
456        let builder = self
457            .request
458            .if_metageneration_not_match
459            .iter()
460            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
461
462        let builder = apply_customer_supplied_encryption_headers(
463            builder,
464            &self.request.common_object_request_params,
465        );
466
467        // Apply "range" header for read limits and offsets.
468        let builder = match (self.request.read_offset, self.request.read_limit) {
469            // read_limit can't be negative.
470            (_, l) if l < 0 => {
471                unreachable!("ReadObject build never sets a negative read_limit value")
472            }
473            // negative offset can't also have a read_limit.
474            (o, l) if o < 0 && l > 0 => unreachable!(
475                "ReadObject builder never sets a positive read_offset value with a negative read_limit value"
476            ),
477            // If both are zero, we use default implementation (no range header).
478            (0, 0) => builder,
479            // negative offset with no limit means the last N bytes.
480            (o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
481            // read_limit is zero, means no limit. Read from offset to end of file.
482            // This handles cases like (5, 0) -> "bytes=5-"
483            (o, 0) => builder.header("range", format!("bytes={o}-")),
484            // General case: non-negative offset and positive limit.
485            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
486            (o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
487        };
488
489        self.inner.apply_auth_headers(builder).await
490    }
491}
492
493fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
494    headers
495        .get("x-goog-hash")
496        .and_then(|hash| hash.to_str().ok())
497        .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
498        .and_then(|hash| {
499            let hash = hash.trim_start_matches("crc32c=");
500            v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
501        })
502}
503
504fn headers_to_md5_hash(headers: &http::HeaderMap) -> Vec<u8> {
505    headers
506        .get("x-goog-hash")
507        .and_then(|hash| hash.to_str().ok())
508        .and_then(|hash| hash.split(",").find(|v| v.starts_with("md5")))
509        .and_then(|hash| {
510            let hash = hash.trim_start_matches("md5=");
511            base64::prelude::BASE64_STANDARD.decode(hash).ok()
512        })
513        .unwrap_or_default()
514}
515
516/// A response to a [Storage::read_object] request.
517#[derive(Debug)]
518pub(crate) struct ReadObjectResponseImpl {
519    reader: Reader,
520    response: Option<reqwest::Response>,
521    highlights: ObjectHighlights,
522    // Fields for tracking the crc checksum checks.
523    response_checksums: ObjectChecksums,
524    // Fields for resuming a read request.
525    range: ReadRange,
526    generation: i64,
527    resume_count: u32,
528}
529
530impl ReadObjectResponseImpl {
531    pub(crate) async fn new(reader: Reader) -> Result<Self> {
532        let response = reader.clone().read().await?;
533
534        let full = reader.request.read_offset == 0 && reader.request.read_limit == 0;
535        let response_checksums =
536            checksums_from_response(full, response.status(), response.headers());
537        let range = response_range(&response).map_err(Error::deser)?;
538        let generation = response_generation(&response).map_err(Error::deser)?;
539
540        let headers = response.headers();
541        let get_as_i64 = |header_name: &str| -> i64 {
542            headers
543                .get(header_name)
544                .and_then(|s| s.to_str().ok())
545                .and_then(|s| s.parse::<i64>().ok())
546                .unwrap_or_default()
547        };
548        let get_as_string = |header_name: &str| -> String {
549            headers
550                .get(header_name)
551                .and_then(|sc| sc.to_str().ok())
552                .map(|sc| sc.to_string())
553                .unwrap_or_default()
554        };
555        let highlights = ObjectHighlights {
556            generation,
557            metageneration: get_as_i64("x-goog-metageneration"),
558            size: get_as_i64("x-goog-stored-content-length"),
559            content_encoding: get_as_string("x-goog-stored-content-encoding"),
560            storage_class: get_as_string("x-goog-storage-class"),
561            content_type: get_as_string("content-type"),
562            content_language: get_as_string("content-language"),
563            content_disposition: get_as_string("content-disposition"),
564            etag: get_as_string("etag"),
565            checksums: headers.get("x-goog-hash").map(|_| {
566                crate::model::ObjectChecksums::new()
567                    .set_or_clear_crc32c(headers_to_crc32c(headers))
568                    .set_md5_hash(headers_to_md5_hash(headers))
569            }),
570        };
571
572        Ok(Self {
573            reader,
574            response: Some(response),
575            highlights,
576            // Fields for computing checksums.
577            response_checksums,
578            // Fields for resuming a read request.
579            range,
580            generation,
581            resume_count: 0,
582        })
583    }
584}
585
586#[async_trait::async_trait]
587impl crate::read_object::dynamic::ReadObjectResponse for ReadObjectResponseImpl {
588    fn object(&self) -> ObjectHighlights {
589        self.highlights.clone()
590    }
591
592    async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
593        match self.next_attempt().await {
594            None => None,
595            Some(Ok(b)) => Some(Ok(b)),
596            // Recursive async requires pin:
597            //     https://rust-lang.github.io/async-book/07_workarounds/04_recursion.html
598            Some(Err(e)) => Box::pin(self.resume(e)).await,
599        }
600    }
601}
602
603impl ReadObjectResponseImpl {
604    async fn next_attempt(&mut self) -> Option<Result<bytes::Bytes>> {
605        let response = self.response.as_mut()?;
606        let res = response.chunk().await.map_err(Error::io);
607        match res {
608            Ok(Some(chunk)) => {
609                self.reader
610                    .options
611                    .checksum
612                    .update(self.range.start, &chunk);
613                let len = chunk.len() as u64;
614                if self.range.limit < len {
615                    return Some(Err(Error::deser(ReadError::LongRead {
616                        expected: self.range.limit,
617                        got: len,
618                    })));
619                }
620                self.range.limit -= len;
621                self.range.start += len;
622                Some(Ok(chunk))
623            }
624            Ok(None) => {
625                if self.range.limit != 0 {
626                    return Some(Err(Error::io(ReadError::ShortRead(self.range.limit))));
627                }
628                let computed = self.reader.options.checksum.finalize();
629                let res = validate(&self.response_checksums, &Some(computed));
630                match res {
631                    Err(e) => Some(Err(Error::deser(ReadError::ChecksumMismatch(e)))),
632                    Ok(()) => None,
633                }
634            }
635            Err(e) => Some(Err(e)),
636        }
637    }
638
639    async fn resume(&mut self, error: Error) -> Option<Result<bytes::Bytes>> {
640        use crate::read_object::dynamic::ReadObjectResponse;
641        use crate::read_resume_policy::{ResumeQuery, ResumeResult};
642
643        // The existing read is no longer valid.
644        self.response = None;
645        self.resume_count += 1;
646        let query = ResumeQuery::new(self.resume_count);
647        match self
648            .reader
649            .options
650            .read_resume_policy
651            .on_error(&query, error)
652        {
653            ResumeResult::Continue(_) => {}
654            ResumeResult::Permanent(e) => return Some(Err(e)),
655            ResumeResult::Exhausted(e) => return Some(Err(e)),
656        };
657        self.reader.request.read_offset = self.range.start as i64;
658        self.reader.request.read_limit = self.range.limit as i64;
659        self.reader.request.generation = self.generation;
660        self.response = match self.reader.clone().read().await {
661            Ok(r) => Some(r),
662            Err(e) => return Some(Err(e)),
663        };
664        self.next().await
665    }
666}
667
668/// Returns the object checksums to validate against.
669///
670/// For some responses, the checksums are not expected to match the data.
671/// The function returns an empty `ObjectChecksums` in such a case.
672///
673/// Checksum validation is supported iff:
674/// 1. We requested the full content.
675/// 2. We got all the content (status != PartialContent).
676/// 3. The server sent a CRC header.
677/// 4. The http stack did not uncompress the file.
678/// 5. We were not served compressed data that was uncompressed on read.
679///
680/// For 4, we turn off automatic decompression in reqwest::Client when we
681/// create it,
682fn checksums_from_response(
683    full_content_requested: bool,
684    status: http::StatusCode,
685    headers: &http::HeaderMap,
686) -> ObjectChecksums {
687    let checksums = ObjectChecksums::new();
688    if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
689        return checksums;
690    }
691    let stored_encoding = headers
692        .get("x-goog-stored-content-encoding")
693        .and_then(|e| e.to_str().ok())
694        .map_or("", |e| e);
695    let content_encoding = headers
696        .get("content-encoding")
697        .and_then(|e| e.to_str().ok())
698        .map_or("", |e| e);
699    if stored_encoding == "gzip" && content_encoding != "gzip" {
700        return checksums;
701    }
702    checksums
703        .set_or_clear_crc32c(headers_to_crc32c(headers))
704        .set_md5_hash(headers_to_md5_hash(headers))
705}
706
707fn response_range(response: &reqwest::Response) -> std::result::Result<ReadRange, ReadError> {
708    match response.status() {
709        reqwest::StatusCode::OK => {
710            let header = required_header(response, "content-length")?;
711            let limit = header
712                .parse::<u64>()
713                .map_err(|e| ReadError::BadHeaderFormat("content-length", e.into()))?;
714            Ok(ReadRange { start: 0, limit })
715        }
716        reqwest::StatusCode::PARTIAL_CONTENT => {
717            let header = required_header(response, "content-range")?;
718            let header = header.strip_prefix("bytes ").ok_or_else(|| {
719                ReadError::BadHeaderFormat("content-range", "missing bytes prefix".into())
720            })?;
721            let (range, _) = header.split_once('/').ok_or_else(|| {
722                ReadError::BadHeaderFormat("content-range", "missing / separator".into())
723            })?;
724            let (start, end) = range.split_once('-').ok_or_else(|| {
725                ReadError::BadHeaderFormat("content-range", "missing - separator".into())
726            })?;
727            let start = start
728                .parse::<u64>()
729                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
730            let end = end
731                .parse::<u64>()
732                .map_err(|e| ReadError::BadHeaderFormat("content-range", e.into()))?;
733            // HTTP ranges are inclusive, we need to compute the number of bytes
734            // in the range:
735            let end = end + 1;
736            let limit = end
737                .checked_sub(start)
738                .ok_or_else(|| ReadError::BadHeaderFormat("content-range", format!("range start ({start}) should be less than or equal to the range end ({end})").into()))?;
739            Ok(ReadRange { start, limit })
740        }
741        s => Err(ReadError::UnexpectedSuccessCode(s.as_u16())),
742    }
743}
744
745fn response_generation(response: &reqwest::Response) -> std::result::Result<i64, ReadError> {
746    let header = required_header(response, "x-goog-generation")?;
747    header
748        .parse::<i64>()
749        .map_err(|e| ReadError::BadHeaderFormat("x-goog-generation", e.into()))
750}
751
752fn required_header<'a>(
753    response: &'a reqwest::Response,
754    name: &'static str,
755) -> std::result::Result<&'a str, ReadError> {
756    let header = response
757        .headers()
758        .get(name)
759        .ok_or_else(|| ReadError::MissingHeader(name))?;
760    header
761        .to_str()
762        .map_err(|e| ReadError::BadHeaderFormat(name, e.into()))
763}
764
765#[derive(Debug, PartialEq)]
766struct ReadRange {
767    start: u64,
768    limit: u64,
769}
770
771#[cfg(test)]
772mod resume_tests;
773
774#[cfg(test)]
775mod tests {
776    use super::client::tests::{test_builder, test_inner_client};
777    use super::*;
778    use crate::error::ChecksumMismatch;
779    use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
780    use futures::TryStreamExt;
781    use httptest::{Expectation, Server, matchers::*, responders::status_code};
782    use std::collections::HashMap;
783    use std::error::Error;
784    use std::sync::Arc;
785    use test_case::test_case;
786
787    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
788
789    async fn http_request_builder(
790        inner: Arc<StorageInner>,
791        builder: ReadObject,
792    ) -> crate::Result<reqwest::RequestBuilder> {
793        let reader = Reader {
794            inner,
795            request: builder.request,
796            options: builder.options,
797        };
798        reader.http_request_builder().await
799    }
800
801    // Verify `read_object()` meets normal Send, Sync, requirements.
802    #[tokio::test]
803    async fn test_read_is_send_and_static() -> Result {
804        let client = Storage::builder()
805            .with_credentials(auth::credentials::testing::test_credentials())
806            .build()
807            .await?;
808
809        fn need_send<T: Send>(_val: &T) {}
810        fn need_sync<T: Sync>(_val: &T) {}
811        fn need_static<T: 'static>(_val: &T) {}
812
813        let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
814        need_send(&read);
815        need_sync(&read);
816        need_static(&read);
817
818        let read = client
819            .read_object("projects/_/buckets/test-bucket", "test-object")
820            .send();
821        need_send(&read);
822        need_static(&read);
823
824        Ok(())
825    }
826    #[tokio::test]
827    async fn read_object_normal() -> Result {
828        let server = Server::run();
829        server.expect(
830            Expectation::matching(all_of![
831                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
832                request::query(url_decoded(contains(("alt", "media")))),
833            ])
834            .respond_with(
835                status_code(200)
836                    .body("hello world")
837                    .append_header("x-goog-generation", 123456),
838            ),
839        );
840
841        let client = Storage::builder()
842            .with_endpoint(format!("http://{}", server.addr()))
843            .with_credentials(auth::credentials::testing::test_credentials())
844            .build()
845            .await?;
846        let mut reader = client
847            .read_object("projects/_/buckets/test-bucket", "test-object")
848            .send()
849            .await?;
850        let mut got = Vec::new();
851        while let Some(b) = reader.next().await.transpose()? {
852            got.extend_from_slice(&b);
853        }
854        assert_eq!(bytes::Bytes::from_owner(got), "hello world");
855
856        Ok(())
857    }
858
859    #[tokio::test]
860    async fn read_object_metadata() -> Result {
861        const CONTENTS: &str = "the quick brown fox jumps over the lazy dog";
862        let server = Server::run();
863        server.expect(
864            Expectation::matching(all_of![
865                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
866                request::query(url_decoded(contains(("alt", "media")))),
867            ])
868            .respond_with(
869                status_code(200)
870                    .body(CONTENTS)
871                    .append_header(
872                        "x-goog-hash",
873                        "crc32c=PBj01g==,md5=d63R1fQSI9VYL8pzalyzNQ==",
874                    )
875                    .append_header("x-goog-generation", 500)
876                    .append_header("x-goog-metageneration", "1")
877                    .append_header("x-goog-stored-content-length", 30)
878                    .append_header("x-goog-stored-content-encoding", "identity")
879                    .append_header("x-goog-storage-class", "STANDARD")
880                    .append_header("content-language", "en")
881                    .append_header("content-type", "text/plain")
882                    .append_header("content-disposition", "inline")
883                    .append_header("etag", "etagval"),
884            ),
885        );
886
887        let endpoint = server.url("");
888        let client = Storage::builder()
889            .with_endpoint(endpoint.to_string())
890            .with_credentials(auth::credentials::testing::test_credentials())
891            .build()
892            .await?;
893        let reader = client
894            .read_object("projects/_/buckets/test-bucket", "test-object")
895            .send()
896            .await?;
897        let object = reader.object();
898        assert_eq!(object.generation, 500);
899        assert_eq!(object.metageneration, 1);
900        assert_eq!(object.size, 30);
901        assert_eq!(object.content_encoding, "identity");
902        assert_eq!(
903            object.checksums.as_ref().unwrap().crc32c.unwrap(),
904            crc32c::crc32c(CONTENTS.as_bytes())
905        );
906        assert_eq!(
907            object.checksums.as_ref().unwrap().md5_hash,
908            base64::prelude::BASE64_STANDARD.decode("d63R1fQSI9VYL8pzalyzNQ==")?
909        );
910
911        Ok(())
912    }
913
914    #[tokio::test]
915    async fn read_object_stream() -> Result {
916        let server = Server::run();
917        server.expect(
918            Expectation::matching(all_of![
919                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
920                request::query(url_decoded(contains(("alt", "media")))),
921            ])
922            .respond_with(
923                status_code(200)
924                    .append_header("x-goog-generation", 123456)
925                    .body("hello world"),
926            ),
927        );
928
929        let client = Storage::builder()
930            .with_endpoint(format!("http://{}", server.addr()))
931            .with_credentials(auth::credentials::testing::test_credentials())
932            .build()
933            .await?;
934        let response = client
935            .read_object("projects/_/buckets/test-bucket", "test-object")
936            .send()
937            .await?;
938        let result: Vec<_> = response.into_stream().try_collect().await?;
939        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
940
941        Ok(())
942    }
943
944    #[tokio::test]
945    async fn read_object_next_then_consume_response() -> Result {
946        // Create a large enough file that will require multiple chunks to read.
947        const BLOCK_SIZE: usize = 500;
948        let mut contents = Vec::new();
949        for i in 0..50 {
950            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
951        }
952
953        // Calculate and serialize the crc32c checksum
954        let u = crc32c::crc32c(&contents);
955        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
956
957        let server = Server::run();
958        server.expect(
959            Expectation::matching(all_of![
960                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
961                request::query(url_decoded(contains(("alt", "media")))),
962            ])
963            .times(1)
964            .respond_with(
965                status_code(200)
966                    .body(contents.clone())
967                    .append_header("x-goog-hash", format!("crc32c={value}"))
968                    .append_header("x-goog-generation", 123456),
969            ),
970        );
971
972        let client = Storage::builder()
973            .with_endpoint(format!("http://{}", server.addr()))
974            .with_credentials(auth::credentials::testing::test_credentials())
975            .build()
976            .await?;
977
978        // Read some bytes, then remainder with stream.
979        let mut response = client
980            .read_object("projects/_/buckets/test-bucket", "test-object")
981            .send()
982            .await?;
983
984        let mut all_bytes = bytes::BytesMut::new();
985        let chunk = response.next().await.transpose()?.unwrap();
986        assert!(!chunk.is_empty());
987        all_bytes.extend(chunk);
988        use futures::StreamExt;
989        let mut stream = response.into_stream();
990        while let Some(chunk) = stream.next().await.transpose()? {
991            all_bytes.extend(chunk);
992        }
993        assert_eq!(all_bytes, contents);
994
995        Ok(())
996    }
997
998    #[tokio::test]
999    async fn read_object_not_found() -> Result {
1000        let server = Server::run();
1001        server.expect(
1002            Expectation::matching(all_of![
1003                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1004                request::query(url_decoded(contains(("alt", "media")))),
1005            ])
1006            .respond_with(status_code(404).body("NOT FOUND")),
1007        );
1008
1009        let client = Storage::builder()
1010            .with_endpoint(format!("http://{}", server.addr()))
1011            .with_credentials(auth::credentials::testing::test_credentials())
1012            .build()
1013            .await?;
1014        let err = client
1015            .read_object("projects/_/buckets/test-bucket", "test-object")
1016            .send()
1017            .await
1018            .expect_err("expected a not found error");
1019        assert_eq!(err.http_status_code(), Some(404));
1020
1021        Ok(())
1022    }
1023
1024    #[tokio::test]
1025    async fn read_object_incorrect_crc32c_check() -> Result {
1026        // Calculate and serialize the crc32c checksum
1027        let u = crc32c::crc32c("goodbye world".as_bytes());
1028        let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
1029
1030        let server = Server::run();
1031        server.expect(
1032            Expectation::matching(all_of![
1033                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1034                request::query(url_decoded(contains(("alt", "media")))),
1035            ])
1036            .times(3)
1037            .respond_with(
1038                status_code(200)
1039                    .body("hello world")
1040                    .append_header("x-goog-hash", format!("crc32c={value}"))
1041                    .append_header("x-goog-generation", 123456),
1042            ),
1043        );
1044
1045        let client = Storage::builder()
1046            .with_endpoint(format!("http://{}", server.addr()))
1047            .with_credentials(auth::credentials::testing::test_credentials())
1048            .build()
1049            .await?;
1050        let mut response = client
1051            .read_object("projects/_/buckets/test-bucket", "test-object")
1052            .send()
1053            .await?;
1054        let mut partial = Vec::new();
1055        let mut err = None;
1056        while let Some(r) = response.next().await {
1057            match r {
1058                Ok(b) => partial.extend_from_slice(&b),
1059                Err(e) => err = Some(e),
1060            };
1061        }
1062        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1063        let err = err.expect("expect error on incorrect crc32c");
1064        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1065        assert!(
1066            matches!(
1067                source,
1068                Some(&ReadError::ChecksumMismatch(
1069                    ChecksumMismatch::Crc32c { .. }
1070                ))
1071            ),
1072            "err={err:?}"
1073        );
1074
1075        let mut response = client
1076            .read_object("projects/_/buckets/test-bucket", "test-object")
1077            .send()
1078            .await?;
1079        let err: crate::Error = async {
1080            {
1081                while (response.next().await.transpose()?).is_some() {}
1082                Ok(())
1083            }
1084        }
1085        .await
1086        .expect_err("expect error on incorrect crc32c");
1087        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1088        assert!(
1089            matches!(
1090                source,
1091                Some(&ReadError::ChecksumMismatch(
1092                    ChecksumMismatch::Crc32c { .. }
1093                ))
1094            ),
1095            "err={err:?}"
1096        );
1097
1098        use futures::TryStreamExt;
1099        let err = client
1100            .read_object("projects/_/buckets/test-bucket", "test-object")
1101            .send()
1102            .await?
1103            .into_stream()
1104            .try_collect::<Vec<bytes::Bytes>>()
1105            .await
1106            .expect_err("expect error on incorrect crc32c");
1107        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1108        assert!(
1109            matches!(
1110                source,
1111                Some(&ReadError::ChecksumMismatch(
1112                    ChecksumMismatch::Crc32c { .. }
1113                ))
1114            ),
1115            "err={err:?}"
1116        );
1117        Ok(())
1118    }
1119
1120    #[tokio::test]
1121    async fn read_object_incorrect_md5_check() -> Result {
1122        // Calculate and serialize the md5 checksum
1123        let digest = md5::compute("goodbye world".as_bytes());
1124        let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
1125
1126        let server = Server::run();
1127        server.expect(
1128            Expectation::matching(all_of![
1129                request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
1130                request::query(url_decoded(contains(("alt", "media")))),
1131            ])
1132            .times(1)
1133            .respond_with(
1134                status_code(200)
1135                    .body("hello world")
1136                    .append_header("x-goog-hash", format!("md5={value}"))
1137                    .append_header("x-goog-generation", 123456),
1138            ),
1139        );
1140
1141        let client = Storage::builder()
1142            .with_endpoint(format!("http://{}", server.addr()))
1143            .with_credentials(auth::credentials::testing::test_credentials())
1144            .build()
1145            .await?;
1146        let mut response = client
1147            .read_object("projects/_/buckets/test-bucket", "test-object")
1148            .compute_md5()
1149            .send()
1150            .await?;
1151        let mut partial = Vec::new();
1152        let mut err = None;
1153        while let Some(r) = response.next().await {
1154            match r {
1155                Ok(b) => partial.extend_from_slice(&b),
1156                Err(e) => err = Some(e),
1157            };
1158        }
1159        assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
1160        let err = err.expect("expect error on incorrect md5");
1161        let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
1162        assert!(
1163            matches!(
1164                source,
1165                Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
1166            ),
1167            "err={err:?}"
1168        );
1169
1170        Ok(())
1171    }
1172
1173    #[tokio::test]
1174    async fn read_object() -> Result {
1175        let inner = test_inner_client(test_builder());
1176        let stub = crate::storage::transport::Storage::new(inner.clone());
1177        let builder = ReadObject::new(
1178            stub,
1179            "projects/_/buckets/bucket",
1180            "object",
1181            inner.options.clone(),
1182        );
1183        let request = http_request_builder(inner, builder).await?.build()?;
1184
1185        assert_eq!(request.method(), reqwest::Method::GET);
1186        assert_eq!(
1187            request.url().as_str(),
1188            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1189        );
1190        Ok(())
1191    }
1192
1193    #[tokio::test]
1194    async fn read_object_error_credentials() -> Result {
1195        let inner = test_inner_client(
1196            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
1197        );
1198        let stub = crate::storage::transport::Storage::new(inner.clone());
1199        let builder = ReadObject::new(
1200            stub,
1201            "projects/_/buckets/bucket",
1202            "object",
1203            inner.options.clone(),
1204        );
1205        let _ = http_request_builder(inner, builder)
1206            .await
1207            .inspect_err(|e| assert!(e.is_authentication()))
1208            .expect_err("invalid credentials should err");
1209        Ok(())
1210    }
1211
1212    #[tokio::test]
1213    async fn read_object_bad_bucket() -> Result {
1214        let inner = test_inner_client(test_builder());
1215        let stub = crate::storage::transport::Storage::new(inner.clone());
1216        let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
1217        let _ = http_request_builder(inner, builder)
1218            .await
1219            .expect_err("malformed bucket string should error");
1220        Ok(())
1221    }
1222
1223    #[tokio::test]
1224    async fn read_object_query_params() -> Result {
1225        let inner = test_inner_client(test_builder());
1226        let stub = crate::storage::transport::Storage::new(inner.clone());
1227        let builder = ReadObject::new(
1228            stub,
1229            "projects/_/buckets/bucket",
1230            "object",
1231            inner.options.clone(),
1232        )
1233        .set_generation(5)
1234        .set_if_generation_match(10)
1235        .set_if_generation_not_match(20)
1236        .set_if_metageneration_match(30)
1237        .set_if_metageneration_not_match(40);
1238        let request = http_request_builder(inner, builder).await?.build()?;
1239
1240        assert_eq!(request.method(), reqwest::Method::GET);
1241        let want_pairs: HashMap<String, String> = [
1242            ("alt", "media"),
1243            ("generation", "5"),
1244            ("ifGenerationMatch", "10"),
1245            ("ifGenerationNotMatch", "20"),
1246            ("ifMetagenerationMatch", "30"),
1247            ("ifMetagenerationNotMatch", "40"),
1248        ]
1249        .iter()
1250        .map(|(k, v)| (k.to_string(), v.to_string()))
1251        .collect();
1252        let query_pairs: HashMap<String, String> = request
1253            .url()
1254            .query_pairs()
1255            .map(|param| (param.0.to_string(), param.1.to_string()))
1256            .collect();
1257        assert_eq!(query_pairs.len(), want_pairs.len());
1258        assert_eq!(query_pairs, want_pairs);
1259        Ok(())
1260    }
1261
1262    #[tokio::test]
1263    async fn read_object_headers() -> Result {
1264        // Make a 32-byte key.
1265        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
1266
1267        // The API takes the unencoded byte array.
1268        let inner = test_inner_client(test_builder());
1269        let stub = crate::storage::transport::Storage::new(inner.clone());
1270        let builder = ReadObject::new(
1271            stub,
1272            "projects/_/buckets/bucket",
1273            "object",
1274            inner.options.clone(),
1275        )
1276        .set_key(KeyAes256::new(&key)?);
1277        let request = http_request_builder(inner, builder).await?.build()?;
1278
1279        assert_eq!(request.method(), reqwest::Method::GET);
1280        assert_eq!(
1281            request.url().as_str(),
1282            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1283        );
1284
1285        let want = vec![
1286            ("x-goog-encryption-algorithm", "AES256".to_string()),
1287            ("x-goog-encryption-key", key_base64),
1288            ("x-goog-encryption-key-sha256", key_sha256_base64),
1289        ];
1290
1291        for (name, value) in want {
1292            assert_eq!(
1293                request.headers().get(name).unwrap().as_bytes(),
1294                bytes::Bytes::from(value)
1295            );
1296        }
1297        Ok(())
1298    }
1299
1300    #[test_case(ReadRange::all(), None; "no headers needed")]
1301    #[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
1302    #[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
1303    #[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
1304    #[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
1305    #[tokio::test]
1306    async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
1307        let inner = test_inner_client(test_builder());
1308        let stub = crate::storage::transport::Storage::new(inner.clone());
1309        let builder = ReadObject::new(
1310            stub,
1311            "projects/_/buckets/bucket",
1312            "object",
1313            inner.options.clone(),
1314        )
1315        .set_read_range(input.clone());
1316        let request = http_request_builder(inner, builder).await?.build()?;
1317
1318        assert_eq!(request.method(), reqwest::Method::GET);
1319        assert_eq!(
1320            request.url().as_str(),
1321            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
1322        );
1323
1324        assert_eq!(request.headers().get("range"), want);
1325        Ok(())
1326    }
1327
1328    #[test_case("projects/p", "projects%2Fp")]
1329    #[test_case("kebab-case", "kebab-case")]
1330    #[test_case("dot.name", "dot.name")]
1331    #[test_case("under_score", "under_score")]
1332    #[test_case("tilde~123", "tilde~123")]
1333    #[test_case("exclamation!point!", "exclamation%21point%21")]
1334    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
1335    #[test_case("preserve%percent%21", "preserve%percent%21")]
1336    #[test_case(
1337        "testall !#$&'()*+,/:;=?@[]",
1338        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
1339    )]
1340    #[tokio::test]
1341    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
1342        let inner = test_inner_client(test_builder());
1343        let stub = crate::storage::transport::Storage::new(inner.clone());
1344        let builder = ReadObject::new(
1345            stub,
1346            "projects/_/buckets/bucket",
1347            name,
1348            inner.options.clone(),
1349        );
1350        let request = http_request_builder(inner, builder).await?.build()?;
1351        let got = request.url().path_segments().unwrap().next_back().unwrap();
1352        assert_eq!(got, want);
1353        Ok(())
1354    }
1355
1356    #[test]
1357    fn document_crc32c_values() {
1358        let bytes = (1234567890_u32).to_be_bytes();
1359        let base64 = base64::prelude::BASE64_STANDARD.encode(bytes);
1360        assert_eq!(base64, "SZYC0g==", "{bytes:?}");
1361    }
1362
1363    #[test_case("", None; "no header")]
1364    #[test_case("crc32c=hello", None; "invalid value")]
1365    #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
1366    #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
1367    #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
1368    #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
1369    fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
1370        let mut headers = http::HeaderMap::new();
1371        if !val.is_empty() {
1372            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1373        }
1374        let got = headers_to_crc32c(&headers);
1375        assert_eq!(got, want);
1376        Ok(())
1377    }
1378
1379    #[test_case("", None; "no header")]
1380    #[test_case("md5=invalid", None; "invalid value")]
1381    #[test_case("md5=AAAAAAAAAAAAAAAAAA==",Some("AAAAAAAAAAAAAAAAAA=="); "zero value")]
1382    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "value")]
1383    #[test_case("crc32c=something,md5=d63R1fQSI9VYL8pzalyzNQ==", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 after crc32c")]
1384    #[test_case("md5=d63R1fQSI9VYL8pzalyzNQ==,crc32c=something", Some("d63R1fQSI9VYL8pzalyzNQ=="); "md5 before crc32c")]
1385    fn test_headers_to_md5(val: &str, want: Option<&str>) -> Result {
1386        let mut headers = http::HeaderMap::new();
1387        if !val.is_empty() {
1388            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
1389        }
1390        let got = headers_to_md5_hash(&headers);
1391        match want {
1392            Some(w) => assert_eq!(got, base64::prelude::BASE64_STANDARD.decode(w)?),
1393            None => assert!(got.is_empty()),
1394        }
1395        Ok(())
1396    }
1397
1398    #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, None, ""; "full content not requested")]
1399    #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None, ""; "No x-goog-hash")]
1400    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None, ""; "server uncompressed")]
1401    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "both gzip")]
1402    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==,md5=d63R1fQSI9VYL8pzalyzNQ==")], http::StatusCode::OK, Some(1234567890_u32), "d63R1fQSI9VYL8pzalyzNQ=="; "all ok")]
1403    fn test_checksums_validation_enabled(
1404        full_content_requested: bool,
1405        headers: Vec<(&str, &str)>,
1406        status: http::StatusCode,
1407        want_crc32c: Option<u32>,
1408        want_md5: &str,
1409    ) -> Result {
1410        let mut header_map = http::HeaderMap::new();
1411        for (key, value) in headers {
1412            header_map.insert(
1413                http::HeaderName::from_bytes(key.as_bytes())?,
1414                http::HeaderValue::from_bytes(value.as_bytes())?,
1415            );
1416        }
1417
1418        let got = checksums_from_response(full_content_requested, status, &header_map);
1419        assert_eq!(got.crc32c, want_crc32c);
1420        assert_eq!(
1421            got.md5_hash,
1422            base64::prelude::BASE64_STANDARD.decode(want_md5)?
1423        );
1424        Ok(())
1425    }
1426
1427    #[test_case(0)]
1428    #[test_case(1024)]
1429    fn response_range_success(limit: u64) -> Result {
1430        let response = http::Response::builder()
1431            .status(200)
1432            .header("content-length", limit)
1433            .body(Vec::new())?;
1434        let response = reqwest::Response::from(response);
1435        let range = response_range(&response)?;
1436        assert_eq!(range, super::ReadRange { start: 0, limit });
1437        Ok(())
1438    }
1439
1440    #[test]
1441    fn response_range_missing() -> Result {
1442        let response = http::Response::builder().status(200).body(Vec::new())?;
1443        let response = reqwest::Response::from(response);
1444        let err = response_range(&response).expect_err("missing header should result in an error");
1445        assert!(
1446            matches!(err, ReadError::MissingHeader(h) if h == "content-length"),
1447            "{err:?}"
1448        );
1449        Ok(())
1450    }
1451
1452    #[test_case("")]
1453    #[test_case("abc")]
1454    #[test_case("-123")]
1455    fn response_range_format(value: &'static str) -> Result {
1456        let response = http::Response::builder()
1457            .status(200)
1458            .header("content-length", value)
1459            .body(Vec::new())?;
1460        let response = reqwest::Response::from(response);
1461        let err = response_range(&response).expect_err("header value should result in an error");
1462        assert!(
1463            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-length"),
1464            "{err:?}"
1465        );
1466        assert!(err.source().is_some(), "{err:?}");
1467        Ok(())
1468    }
1469
1470    #[test_case(0, 123)]
1471    #[test_case(123, 456)]
1472    fn response_range_partial_success(start: u64, end: u64) -> Result {
1473        let response = http::Response::builder()
1474            .status(206)
1475            .header(
1476                "content-range",
1477                format!("bytes {}-{}/{}", start, end, end + 1),
1478            )
1479            .body(Vec::new())?;
1480        let response = reqwest::Response::from(response);
1481        let range = response_range(&response)?;
1482        assert_eq!(
1483            range,
1484            super::ReadRange {
1485                start,
1486                limit: (end + 1 - start)
1487            }
1488        );
1489        Ok(())
1490    }
1491
1492    #[test]
1493    fn response_range_partial_missing() -> Result {
1494        let response = http::Response::builder().status(206).body(Vec::new())?;
1495        let response = reqwest::Response::from(response);
1496        let err = response_range(&response).expect_err("missing header should result in an error");
1497        assert!(
1498            matches!(err, ReadError::MissingHeader(h) if h == "content-range"),
1499            "{err:?}"
1500        );
1501        Ok(())
1502    }
1503
1504    #[test_case("")]
1505    #[test_case("123-456/457"; "bad prefix")]
1506    #[test_case("bytes 123-456 457"; "bad separator")]
1507    #[test_case("bytes 123+456/457"; "bad separator [2]")]
1508    #[test_case("bytes abc-456/457"; "start is not numbers")]
1509    #[test_case("bytes 123-cde/457"; "end is not numbers")]
1510    #[test_case("bytes 123-0/457"; "invalid range")]
1511    fn response_range_partial_format(value: &'static str) -> Result {
1512        let response = http::Response::builder()
1513            .status(206)
1514            .header("content-range", value)
1515            .body(Vec::new())?;
1516        let response = reqwest::Response::from(response);
1517        let err = response_range(&response).expect_err("header value should result in an error");
1518        assert!(
1519            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "content-range"),
1520            "{err:?}"
1521        );
1522        assert!(err.source().is_some(), "{err:?}");
1523        Ok(())
1524    }
1525
1526    #[test]
1527    fn response_range_bad_response() -> Result {
1528        let code = reqwest::StatusCode::CREATED;
1529        let response = http::Response::builder().status(code).body(Vec::new())?;
1530        let response = reqwest::Response::from(response);
1531        let err = response_range(&response).expect_err("unexpected status creates error");
1532        assert!(
1533            matches!(err, ReadError::UnexpectedSuccessCode(c) if c == code),
1534            "{err:?}"
1535        );
1536        Ok(())
1537    }
1538
1539    #[test_case(0)]
1540    #[test_case(1024)]
1541    fn response_generation_success(value: i64) -> Result {
1542        let response = http::Response::builder()
1543            .status(200)
1544            .header("x-goog-generation", value)
1545            .body(Vec::new())?;
1546        let response = reqwest::Response::from(response);
1547        let got = response_generation(&response)?;
1548        assert_eq!(got, value);
1549        Ok(())
1550    }
1551
1552    #[test]
1553    fn response_generation_missing() -> Result {
1554        let response = http::Response::builder().status(200).body(Vec::new())?;
1555        let response = reqwest::Response::from(response);
1556        let err =
1557            response_generation(&response).expect_err("missing header should result in an error");
1558        assert!(
1559            matches!(err, ReadError::MissingHeader(h) if h == "x-goog-generation"),
1560            "{err:?}"
1561        );
1562        Ok(())
1563    }
1564
1565    #[test_case("")]
1566    #[test_case("abc")]
1567    fn response_generation_format(value: &'static str) -> Result {
1568        let response = http::Response::builder()
1569            .status(200)
1570            .header("x-goog-generation", value)
1571            .body(Vec::new())?;
1572        let response = reqwest::Response::from(response);
1573        let err =
1574            response_generation(&response).expect_err("header value should result in an error");
1575        assert!(
1576            matches!(err, ReadError::BadHeaderFormat(h, _) if h == "x-goog-generation"),
1577            "{err:?}"
1578        );
1579        assert!(err.source().is_some(), "{err:?}");
1580        Ok(())
1581    }
1582
1583    #[test]
1584    fn required_header_not_str() -> Result {
1585        let name = "x-goog-test";
1586        let response = http::Response::builder()
1587            .status(200)
1588            .header(name, http::HeaderValue::from_bytes(b"invalid\xfa")?)
1589            .body(Vec::new())?;
1590        let response = reqwest::Response::from(response);
1591        let err =
1592            required_header(&response, name).expect_err("header value should result in an error");
1593        assert!(
1594            matches!(err, ReadError::BadHeaderFormat(h, _) if h == name),
1595            "{err:?}"
1596        );
1597        assert!(err.source().is_some(), "{err:?}");
1598        Ok(())
1599    }
1600}