google_cloud_storage/storage/
read_object.rs

1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use serde_with::DeserializeAs;
16
17use super::client::*;
18use super::*;
19#[cfg(feature = "unstable-stream")]
20use futures::Stream;
21
22/// The request builder for [Storage::read_object][crate::client::Storage::read_object] calls.
23///
24/// # Example
25/// ```
26/// # tokio_test::block_on(async {
27/// # use google_cloud_storage::client::Storage;
28/// use google_cloud_storage::builder::storage::ReadObject;
29/// # let client = Storage::builder()
30/// #   .with_endpoint("https://storage.googleapis.com")
31/// #    .build().await?;
32/// let builder: ReadObject = client.read_object("projects/_/buckets/my-bucket", "my-object");
33/// let contents = builder.send().await?.all_bytes().await?;
34/// println!("object contents={contents:?}");
35/// # Ok::<(), anyhow::Error>(()) });
36/// ```
37pub struct ReadObject {
38    inner: std::sync::Arc<StorageInner>,
39    request: crate::model::ReadObjectRequest,
40}
41
42impl ReadObject {
43    pub(crate) fn new<B, O>(inner: std::sync::Arc<StorageInner>, bucket: B, object: O) -> Self
44    where
45        B: Into<String>,
46        O: Into<String>,
47    {
48        ReadObject {
49            inner,
50            request: crate::model::ReadObjectRequest::new()
51                .set_bucket(bucket)
52                .set_object(object),
53        }
54    }
55
56    /// If present, selects a specific revision of this object (as
57    /// opposed to the latest version, the default).
58    pub fn with_generation<T: Into<i64>>(mut self, v: T) -> Self {
59        self.request.generation = v.into();
60        self
61    }
62
63    /// Makes the operation conditional on whether the object's current generation
64    /// matches the given value. Setting to 0 makes the operation succeed only if
65    /// there are no live versions of the object.
66    pub fn with_if_generation_match<T>(mut self, v: T) -> Self
67    where
68        T: Into<i64>,
69    {
70        self.request.if_generation_match = Some(v.into());
71        self
72    }
73
74    /// Makes the operation conditional on whether the object's live generation
75    /// does not match the given value. If no live object exists, the precondition
76    /// fails. Setting to 0 makes the operation succeed only if there is a live
77    /// version of the object.
78    pub fn with_if_generation_not_match<T>(mut self, v: T) -> Self
79    where
80        T: Into<i64>,
81    {
82        self.request.if_generation_not_match = Some(v.into());
83        self
84    }
85
86    /// Makes the operation conditional on whether the object's current
87    /// metageneration matches the given value.
88    pub fn with_if_metageneration_match<T>(mut self, v: T) -> Self
89    where
90        T: Into<i64>,
91    {
92        self.request.if_metageneration_match = Some(v.into());
93        self
94    }
95
96    /// Makes the operation conditional on whether the object's current
97    /// metageneration does not match the given value.
98    pub fn with_if_metageneration_not_match<T>(mut self, v: T) -> Self
99    where
100        T: Into<i64>,
101    {
102        self.request.if_metageneration_not_match = Some(v.into());
103        self
104    }
105
106    /// The offset for the first byte to return in the read, relative to
107    /// the start of the object.
108    ///
109    /// A negative `read_offset` value will be interpreted as the number of bytes
110    /// back from the end of the object to be returned.
111    ///
112    /// # Examples
113    ///
114    /// Read starting at 100 bytes to end of file.
115    /// ```
116    /// # tokio_test::block_on(async {
117    /// # use google_cloud_storage::client::Storage;
118    /// # let client = Storage::builder().build().await?;
119    /// let response = client
120    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
121    ///     .with_read_offset(100)
122    ///     .send()
123    ///     .await?;
124    /// println!("response details={response:?}");
125    /// # Ok::<(), anyhow::Error>(()) });
126    /// ```
127    ///
128    /// Read last 100 bytes of file:
129    /// ```
130    /// # tokio_test::block_on(async {
131    /// # use google_cloud_storage::client::Storage;
132    /// # let client = Storage::builder().build().await?;
133    /// let response = client
134    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
135    ///     .with_read_offset(-100)
136    ///     .send()
137    ///     .await?;
138    /// println!("response details={response:?}");
139    /// # Ok::<(), anyhow::Error>(()) });
140    /// ```
141    ///
142    /// Read bytes 1000 to 1099.
143    /// ```
144    /// # tokio_test::block_on(async {
145    /// # use google_cloud_storage::client::Storage;
146    /// # let client = Storage::builder().build().await?;
147    /// let response = client
148    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
149    ///     .with_read_offset(1000)
150    ///     .with_read_limit(100)
151    ///     .send()
152    ///     .await?;
153    /// println!("response details={response:?}");
154    /// # Ok::<(), anyhow::Error>(()) });
155    /// ```
156    pub fn with_read_offset<T>(mut self, v: T) -> Self
157    where
158        T: Into<i64>,
159    {
160        self.request.read_offset = v.into();
161        self
162    }
163
164    /// The maximum number of `data` bytes the server is allowed to
165    /// return.
166    ///
167    /// A `read_limit` of zero indicates that there is no limit,
168    /// and a negative `read_limit` will cause an error.
169    ///
170    /// # Examples:
171    ///
172    /// Read first 100 bytes.
173    /// ```
174    /// # tokio_test::block_on(async {
175    /// # use google_cloud_storage::client::Storage;
176    /// # let client = Storage::builder().build().await?;
177    /// let response = client
178    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
179    ///     .with_read_limit(100)
180    ///     .send()
181    ///     .await?;
182    /// println!("response details={response:?}");
183    /// # Ok::<(), anyhow::Error>(()) });
184    /// ```
185    ///
186    /// Read bytes 1000 to 1099.
187    /// ```
188    /// # tokio_test::block_on(async {
189    /// # use google_cloud_storage::client::Storage;
190    /// # let client = Storage::builder().build().await?;
191    /// let response = client
192    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
193    ///     .with_read_offset(1000)
194    ///     .with_read_limit(100)
195    ///     .send()
196    ///     .await?;
197    /// println!("response details={response:?}");
198    /// # Ok::<(), anyhow::Error>(()) });
199    /// ```
200    pub fn with_read_limit<T>(mut self, v: T) -> Self
201    where
202        T: Into<i64>,
203    {
204        self.request.read_limit = v.into();
205        self
206    }
207
208    /// The encryption key used with the Customer-Supplied Encryption Keys
209    /// feature. In raw bytes format (not base64-encoded).
210    ///
211    /// Example:
212    /// ```
213    /// # tokio_test::block_on(async {
214    /// # use google_cloud_storage::client::Storage;
215    /// # use google_cloud_storage::client::KeyAes256;
216    /// # let client = Storage::builder().build().await?;
217    /// let key: &[u8] = &[97; 32];
218    /// let response = client
219    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
220    ///     .with_key(KeyAes256::new(key)?)
221    ///     .send()
222    ///     .await?;
223    /// println!("response details={response:?}");
224    /// # Ok::<(), anyhow::Error>(()) });
225    /// ```
226    pub fn with_key(mut self, v: KeyAes256) -> Self {
227        self.request.common_object_request_params = Some(v.into());
228        self
229    }
230
231    /// Sends the request.
232    pub async fn send(self) -> Result<ReadObjectResponse> {
233        let full_content_requested = self.request.read_offset == 0 && self.request.read_limit == 0;
234
235        let builder = self.http_request_builder().await?;
236
237        tracing::info!("builder={builder:?}");
238
239        let response = builder.send().await.map_err(Error::io)?;
240        if !response.status().is_success() {
241            return gaxi::http::to_http_error(response).await;
242        }
243        let response_crc32c = crc32c_from_response(
244            full_content_requested,
245            response.status(),
246            response.headers(),
247        );
248        Ok(ReadObjectResponse {
249            inner: response,
250            response_crc32c,
251            crc32c: 0, // no bytes read yet.
252        })
253    }
254
255    async fn http_request_builder(self) -> Result<reqwest::RequestBuilder> {
256        // Collect the required bucket and object parameters.
257        let bucket: String = self.request.bucket;
258        let bucket_id = bucket
259            .as_str()
260            .strip_prefix("projects/_/buckets/")
261            .ok_or_else(|| {
262                Error::binding(format!(
263                    "malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
264                ))
265            })?;
266        let object: String = self.request.object;
267
268        // Build the request.
269        let builder = self
270            .inner
271            .client
272            .request(
273                reqwest::Method::GET,
274                format!(
275                    "{}/storage/v1/b/{bucket_id}/o/{}",
276                    &self.inner.endpoint,
277                    enc(&object)
278                ),
279            )
280            .query(&[("alt", "media")])
281            .header(
282                "x-goog-api-client",
283                reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
284            );
285
286        // Add the optional query parameters.
287        let builder = if self.request.generation != 0 {
288            builder.query(&[("generation", self.request.generation)])
289        } else {
290            builder
291        };
292        let builder = self
293            .request
294            .if_generation_match
295            .iter()
296            .fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
297        let builder = self
298            .request
299            .if_generation_not_match
300            .iter()
301            .fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
302        let builder = self
303            .request
304            .if_metageneration_match
305            .iter()
306            .fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
307        let builder = self
308            .request
309            .if_metageneration_not_match
310            .iter()
311            .fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
312
313        let builder = apply_customer_supplied_encryption_headers(
314            builder,
315            &self.request.common_object_request_params,
316        );
317
318        // Apply "range" header for read limits and offsets.
319        let builder = match (self.request.read_offset, self.request.read_limit) {
320            // read_limit can't be negative.
321            (_, l) if l < 0 => Err(RangeError::NegativeLimit),
322            // negative offset can't also have a read_limit.
323            (o, l) if o < 0 && l > 0 => Err(RangeError::NegativeOffsetWithLimit),
324            // If both are zero, we use default implementation (no range header).
325            (0, 0) => Ok(builder),
326            // read_limit is zero, means no limit. Read from offset to end of file.
327            // This handles cases like (5, 0) -> "bytes=5-"
328            (o, 0) => Ok(builder.header("range", format!("bytes={o}-"))),
329            // General case: non-negative offset and positive limit.
330            // This covers cases like (0, 100) -> "bytes=0-99", (5, 100) -> "bytes=5-104"
331            (o, l) => Ok(builder.header("range", format!("bytes={o}-{}", o + l - 1))),
332        }
333        .map_err(Error::ser)?;
334
335        self.inner.apply_auth_headers(builder).await
336    }
337}
338
339fn headers_to_crc32c(headers: &http::HeaderMap) -> Option<u32> {
340    headers
341        .get("x-goog-hash")
342        .and_then(|hash| hash.to_str().ok())
343        .and_then(|hash| hash.split(",").find(|v| v.starts_with("crc32c")))
344        .and_then(|hash| {
345            let hash = hash.trim_start_matches("crc32c=");
346            v1::Crc32c::deserialize_as(serde_json::json!(hash)).ok()
347        })
348}
349
350/// A response to a [Storage::read_object] request.
351#[derive(Debug)]
352pub struct ReadObjectResponse {
353    inner: reqwest::Response,
354    // Fields for tracking the crc checksum checks.
355    response_crc32c: Option<u32>,
356    crc32c: u32,
357}
358
359impl ReadObjectResponse {
360    // Get the full object as bytes.
361    //
362    /// # Example
363    /// ```
364    /// # tokio_test::block_on(async {
365    /// # use google_cloud_storage::client::Storage;
366    /// # let client = Storage::builder().build().await?;
367    /// let contents = client
368    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
369    ///     .send()
370    ///     .await?
371    ///     .all_bytes()
372    ///     .await?;
373    /// println!("object contents={contents:?}");
374    /// # Ok::<(), anyhow::Error>(()) });
375    /// ```
376    pub async fn all_bytes(self) -> Result<bytes::Bytes> {
377        let bytes = self.inner.bytes().await.map_err(Error::io)?;
378        if self.response_crc32c.is_some() {
379            let crc32c = crc32c::crc32c_append(self.crc32c, &bytes); // bytes may have already been read by `next`.
380            check_crc32c_match(crc32c, self.response_crc32c)?;
381        }
382        Ok(bytes)
383    }
384
385    /// Stream the next bytes of the object.
386    ///
387    /// When the response has been exhausted, this will return None.
388    ///
389    /// # Example
390    /// ```
391    /// # tokio_test::block_on(async {
392    /// # use google_cloud_storage::client::Storage;
393    /// # let client = Storage::builder().build().await?;
394    /// let mut resp = client
395    ///     .read_object("projects/_/buckets/my-bucket", "my-object")
396    ///     .send()
397    ///     .await?;
398    ///
399    /// while let Some(next) = resp.next().await {
400    ///     println!("next={:?}", next?);
401    /// }
402    /// # Ok::<(), anyhow::Error>(()) });
403    /// ```
404    pub async fn next(&mut self) -> Option<Result<bytes::Bytes>> {
405        let res = self.inner.chunk().await.map_err(Error::io);
406        match res {
407            Ok(Some(chunk)) => {
408                if self.response_crc32c.is_some() {
409                    self.crc32c = crc32c::crc32c_append(self.crc32c, &chunk);
410                }
411                Some(Ok(chunk))
412            }
413            Ok(None) => {
414                let res = check_crc32c_match(self.crc32c, self.response_crc32c);
415                match res {
416                    Err(e) => Some(Err(e)),
417                    Ok(()) => None,
418                }
419            }
420            Err(e) => Some(Err(e)),
421        }
422    }
423
424    #[cfg(feature = "unstable-stream")]
425    #[cfg_attr(docsrs, doc(cfg(feature = "unstable-stream")))]
426    /// Convert the response to a [Stream].
427    pub fn into_stream(self) -> impl Stream<Item = Result<bytes::Bytes>> + Unpin {
428        use futures::stream::unfold;
429        Box::pin(unfold(Some(self), move |state| async move {
430            if let Some(mut this) = state {
431                if let Some(chunk) = this.next().await {
432                    return Some((chunk, Some(this)));
433                }
434            };
435            None
436        }))
437    }
438}
439
440/// Represents an error that can occur when reading response data.
441#[derive(thiserror::Error, Debug, PartialEq)]
442#[non_exhaustive]
443enum ReadError {
444    /// The calculated crc32c did not match server provided crc32c.
445    #[error("bad CRC on read: got {got}, want {want}")]
446    BadCrc { got: u32, want: u32 },
447}
448
449fn crc32c_from_response(
450    full_content_requested: bool,
451    status: http::StatusCode,
452    headers: &http::HeaderMap,
453) -> Option<u32> {
454    // Check the CRC iff all of the following hold:
455    // 1. We requested the full content (request.read_limit = 0, request.read_offset = 0).
456    // 2. We got all the content (status != PartialContent).
457    // 3. The server sent a CRC header.
458    // 4. The http stack did not uncompress the file.
459    // 5. We were not served compressed data that was uncompressed on download.
460    //
461    // For 4, we turn off automatic decompression in reqwest::Client when we create it,
462    // so it will not be turned on.
463    if !full_content_requested || status == http::StatusCode::PARTIAL_CONTENT {
464        return None;
465    }
466    let stored_encoding = headers
467        .get("x-goog-stored-content-encoding")
468        .and_then(|e| e.to_str().ok())
469        .map_or("", |e| e);
470    let content_encoding = headers
471        .get("content-encoding")
472        .and_then(|e| e.to_str().ok())
473        .map_or("", |e| e);
474    if stored_encoding == "gzip" && content_encoding != "gzip" {
475        return None;
476    }
477    headers_to_crc32c(headers)
478}
479
480fn check_crc32c_match(crc32c: u32, response: Option<u32>) -> Result<()> {
481    if let Some(response) = response {
482        if crc32c != response {
483            return Err(Error::deser(ReadError::BadCrc {
484                got: crc32c,
485                want: response,
486            }));
487        }
488    }
489    Ok(())
490}
491
492#[cfg(test)]
493mod tests {
494    use super::client::tests::{create_key_helper, test_builder, test_inner_client};
495    use super::*;
496    use base64::Engine as _;
497    use futures::TryStreamExt;
498    use httptest::{Expectation, Server, matchers::*, responders::status_code};
499    use std::collections::HashMap;
500    use std::error::Error;
501    use test_case::test_case;
502
503    type Result = std::result::Result<(), Box<dyn std::error::Error>>;
504
505    #[tokio::test]
506    async fn read_object_normal() -> Result {
507        let server = Server::run();
508        server.expect(
509            Expectation::matching(all_of![
510                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
511                request::query(url_decoded(contains(("alt", "media")))),
512            ])
513            .respond_with(status_code(200).body("hello world")),
514        );
515
516        let endpoint = server.url("");
517        let client = Storage::builder()
518            .with_endpoint(endpoint.to_string())
519            .with_credentials(auth::credentials::testing::test_credentials())
520            .build()
521            .await?;
522        let reader = client
523            .read_object("projects/_/buckets/test-bucket", "test-object")
524            .send()
525            .await?;
526        let got = reader.all_bytes().await?;
527        assert_eq!(got, "hello world");
528
529        Ok(())
530    }
531
532    #[tokio::test]
533    async fn read_object_stream() -> Result {
534        let server = Server::run();
535        server.expect(
536            Expectation::matching(all_of![
537                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
538                request::query(url_decoded(contains(("alt", "media")))),
539            ])
540            .respond_with(status_code(200).body("hello world")),
541        );
542
543        let endpoint = server.url("");
544        let client = Storage::builder()
545            .with_endpoint(endpoint.to_string())
546            .with_credentials(auth::credentials::testing::test_credentials())
547            .build()
548            .await?;
549        let response = client
550            .read_object("projects/_/buckets/test-bucket", "test-object")
551            .send()
552            .await?;
553        let result: Vec<_> = response.into_stream().try_collect().await?;
554        assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
555
556        Ok(())
557    }
558
559    #[tokio::test]
560    async fn read_object_next_then_consume_response() -> Result {
561        // Create a large enough file that will require multiple chunks to download.
562        const BLOCK_SIZE: usize = 500;
563        let mut contents = Vec::new();
564        for i in 0..50 {
565            contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
566        }
567
568        // Calculate and serialize the crc32c checksum
569        let u = crc32c::crc32c(&contents);
570        let bytes = [
571            (u >> 24 & 0xFF) as u8,
572            (u >> 16 & 0xFF) as u8,
573            (u >> 8 & 0xFF) as u8,
574            (u & 0xFF) as u8,
575        ];
576        let value = base64::prelude::BASE64_STANDARD.encode(bytes);
577
578        let server = Server::run();
579        server.expect(
580            Expectation::matching(all_of![
581                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
582                request::query(url_decoded(contains(("alt", "media")))),
583            ])
584            .times(2)
585            .respond_with(
586                status_code(200)
587                    .body(contents.clone())
588                    .append_header("x-goog-hash", format!("crc32c={value}")),
589            ),
590        );
591
592        let endpoint = server.url("");
593        let client = Storage::builder()
594            .with_endpoint(endpoint.to_string())
595            .with_credentials(auth::credentials::testing::test_credentials())
596            .build()
597            .await?;
598
599        // Read some bytes, then get all.
600        let mut response = client
601            .read_object("projects/_/buckets/test-bucket", "test-object")
602            .send()
603            .await?;
604
605        let mut all_bytes = bytes::BytesMut::new();
606        let chunk = response.next().await.transpose()?.unwrap();
607        assert!(!chunk.is_empty());
608        all_bytes.extend(chunk);
609        let remainder = response.all_bytes().await?;
610        assert!(!remainder.is_empty());
611        all_bytes.extend(remainder);
612        assert_eq!(all_bytes, contents);
613
614        // Read some bytes, then remainder with stream.
615        let mut response = client
616            .read_object("projects/_/buckets/test-bucket", "test-object")
617            .send()
618            .await?;
619
620        let mut all_bytes = bytes::BytesMut::new();
621        let chunk = response.next().await.transpose()?.unwrap();
622        assert!(!chunk.is_empty());
623        all_bytes.extend(chunk);
624        use futures::StreamExt;
625        let mut stream = response.into_stream();
626        while let Some(chunk) = stream.next().await.transpose()? {
627            all_bytes.extend(chunk);
628        }
629        assert_eq!(all_bytes, contents);
630
631        Ok(())
632    }
633
634    #[tokio::test]
635    async fn read_object_not_found() -> Result {
636        let server = Server::run();
637        server.expect(
638            Expectation::matching(all_of![
639                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
640                request::query(url_decoded(contains(("alt", "media")))),
641            ])
642            .respond_with(status_code(404).body("NOT FOUND")),
643        );
644
645        let endpoint = server.url("");
646        let client = Storage::builder()
647            .with_endpoint(endpoint.to_string())
648            .with_credentials(auth::credentials::testing::test_credentials())
649            .build()
650            .await?;
651        let err = client
652            .read_object("projects/_/buckets/test-bucket", "test-object")
653            .send()
654            .await
655            .expect_err("expected a not found error");
656        assert_eq!(err.http_status_code(), Some(404));
657
658        Ok(())
659    }
660
661    #[tokio::test]
662    async fn read_object_incorrect_crc32c_check() -> Result {
663        // Calculate and serialize the crc32c checksum
664        let u = crc32c::crc32c("goodbye world".as_bytes());
665        let bytes = [
666            (u >> 24 & 0xFF) as u8,
667            (u >> 16 & 0xFF) as u8,
668            (u >> 8 & 0xFF) as u8,
669            (u & 0xFF) as u8,
670        ];
671        let value = base64::prelude::BASE64_STANDARD.encode(bytes);
672
673        let server = Server::run();
674        server.expect(
675            Expectation::matching(all_of![
676                request::method_path("GET", "//storage/v1/b/test-bucket/o/test-object"),
677                request::query(url_decoded(contains(("alt", "media")))),
678            ])
679            .times(3)
680            .respond_with(
681                status_code(200)
682                    .body("hello world")
683                    .append_header("x-goog-hash", format!("crc32c={value}")),
684            ),
685        );
686
687        let want_err = ReadError::BadCrc {
688            got: crc32c::crc32c("hello world".as_bytes()), // calculated from data.
689            want: crc32c::crc32c("goodbye world".as_bytes()), // returned from server.
690        };
691
692        let endpoint = server.url("");
693        let client = Storage::builder()
694            .with_endpoint(endpoint.to_string())
695            .with_credentials(auth::credentials::testing::test_credentials())
696            .build()
697            .await?;
698        let response = client
699            .read_object("projects/_/buckets/test-bucket", "test-object")
700            .send()
701            .await?;
702        let err = response
703            .all_bytes()
704            .await
705            .expect_err("expect error on incorrect crc32c");
706        assert_eq!(
707            err.source().unwrap().downcast_ref::<ReadError>().unwrap(),
708            &want_err
709        );
710
711        let mut response = client
712            .read_object("projects/_/buckets/test-bucket", "test-object")
713            .send()
714            .await?;
715        let err: crate::Error = async {
716            {
717                while (response.next().await.transpose()?).is_some() {}
718                Ok(())
719            }
720        }
721        .await
722        .expect_err("expect error on incorrect crc32c");
723        assert_eq!(
724            err.source().unwrap().downcast_ref::<ReadError>().unwrap(),
725            &want_err
726        );
727
728        use futures::TryStreamExt;
729        let err = client
730            .read_object("projects/_/buckets/test-bucket", "test-object")
731            .send()
732            .await?
733            .into_stream()
734            .try_collect::<Vec<bytes::Bytes>>()
735            .await
736            .expect_err("expect error on incorrect crc32c");
737        assert_eq!(
738            err.source().unwrap().downcast_ref::<ReadError>().unwrap(),
739            &want_err
740        );
741        Ok(())
742    }
743
744    #[tokio::test]
745    async fn read_object() -> Result {
746        let inner = test_inner_client(test_builder());
747        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
748            .http_request_builder()
749            .await?
750            .build()?;
751
752        assert_eq!(request.method(), reqwest::Method::GET);
753        assert_eq!(
754            request.url().as_str(),
755            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
756        );
757        Ok(())
758    }
759
760    #[tokio::test]
761    async fn read_object_error_credentials() -> Result {
762        let inner = test_inner_client(
763            test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
764        );
765        let _ = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
766            .http_request_builder()
767            .await
768            .inspect_err(|e| assert!(e.is_authentication()))
769            .expect_err("invalid credentials should err");
770        Ok(())
771    }
772
773    #[tokio::test]
774    async fn read_object_bad_bucket() -> Result {
775        let inner = test_inner_client(test_builder());
776        ReadObject::new(inner, "malformed", "object")
777            .http_request_builder()
778            .await
779            .expect_err("malformed bucket string should error");
780        Ok(())
781    }
782
783    #[tokio::test]
784    async fn read_object_query_params() -> Result {
785        let inner = test_inner_client(test_builder());
786        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
787            .with_generation(5)
788            .with_if_generation_match(10)
789            .with_if_generation_not_match(20)
790            .with_if_metageneration_match(30)
791            .with_if_metageneration_not_match(40)
792            .http_request_builder()
793            .await?
794            .build()?;
795
796        assert_eq!(request.method(), reqwest::Method::GET);
797        let want_pairs: HashMap<String, String> = [
798            ("alt", "media"),
799            ("generation", "5"),
800            ("ifGenerationMatch", "10"),
801            ("ifGenerationNotMatch", "20"),
802            ("ifMetagenerationMatch", "30"),
803            ("ifMetagenerationNotMatch", "40"),
804        ]
805        .iter()
806        .map(|(k, v)| (k.to_string(), v.to_string()))
807        .collect();
808        let query_pairs: HashMap<String, String> = request
809            .url()
810            .query_pairs()
811            .map(|param| (param.0.to_string(), param.1.to_string()))
812            .collect();
813        assert_eq!(query_pairs.len(), want_pairs.len());
814        assert_eq!(query_pairs, want_pairs);
815        Ok(())
816    }
817
818    #[tokio::test]
819    async fn read_object_headers() -> Result {
820        // Make a 32-byte key.
821        let (key, key_base64, _, key_sha256_base64) = create_key_helper();
822
823        // The API takes the unencoded byte array.
824        let inner = test_inner_client(test_builder());
825        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
826            .with_key(KeyAes256::new(&key)?)
827            .http_request_builder()
828            .await?
829            .build()?;
830
831        assert_eq!(request.method(), reqwest::Method::GET);
832        assert_eq!(
833            request.url().as_str(),
834            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
835        );
836
837        let want = vec![
838            ("x-goog-encryption-algorithm", "AES256".to_string()),
839            ("x-goog-encryption-key", key_base64),
840            ("x-goog-encryption-key-sha256", key_sha256_base64),
841        ];
842
843        for (name, value) in want {
844            assert_eq!(
845                request.headers().get(name).unwrap().as_bytes(),
846                bytes::Bytes::from(value)
847            );
848        }
849        Ok(())
850    }
851
852    #[test_case(0, 0, None; "no headers needed")]
853    #[test_case(10, 0, Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
854    #[test_case(-2000, 0, Some(&http::HeaderValue::from_static("bytes=-2000-")); "negative offset")]
855    #[test_case(0, 100, Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
856    #[test_case(1000, 100, Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
857    #[tokio::test]
858    async fn range_header(offset: i64, limit: i64, want: Option<&http::HeaderValue>) -> Result {
859        let inner = test_inner_client(test_builder());
860        let request = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
861            .with_read_offset(offset)
862            .with_read_limit(limit)
863            .http_request_builder()
864            .await?
865            .build()?;
866
867        assert_eq!(request.method(), reqwest::Method::GET);
868        assert_eq!(
869            request.url().as_str(),
870            "http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
871        );
872
873        assert_eq!(request.headers().get("range"), want);
874        Ok(())
875    }
876
877    #[test_case(0, -100, RangeError::NegativeLimit; "negative limit")]
878    #[test_case(-100, 100, RangeError::NegativeOffsetWithLimit; "negative offset with positive limit")]
879    #[tokio::test]
880    async fn test_range_header_error(offset: i64, limit: i64, want_err: RangeError) -> Result {
881        let inner = test_inner_client(test_builder());
882        let err = ReadObject::new(inner, "projects/_/buckets/bucket", "object")
883            .with_read_offset(offset)
884            .with_read_limit(limit)
885            .http_request_builder()
886            .await
887            .unwrap_err();
888
889        assert_eq!(
890            err.source().unwrap().downcast_ref::<RangeError>().unwrap(),
891            &want_err
892        );
893        Ok(())
894    }
895
896    #[test_case("projects/p", "projects%2Fp")]
897    #[test_case("kebab-case", "kebab-case")]
898    #[test_case("dot.name", "dot.name")]
899    #[test_case("under_score", "under_score")]
900    #[test_case("tilde~123", "tilde~123")]
901    #[test_case("exclamation!point!", "exclamation%21point%21")]
902    #[test_case("spaces   spaces", "spaces%20%20%20spaces")]
903    #[test_case("preserve%percent%21", "preserve%percent%21")]
904    #[test_case(
905        "testall !#$&'()*+,/:;=?@[]",
906        "testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
907    )]
908    #[tokio::test]
909    async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
910        let inner = test_inner_client(test_builder());
911        let request = ReadObject::new(inner, "projects/_/buckets/bucket", name)
912            .http_request_builder()
913            .await?
914            .build()?;
915        let got = request.url().path_segments().unwrap().next_back().unwrap();
916        assert_eq!(got, want);
917        Ok(())
918    }
919
920    #[test_case(10, Some(10), false; "Match values")]
921    #[test_case(10, None, false; "None response")]
922    #[test_case(10, Some(20), true; "Different values")]
923    fn test_check_crc(crc: u32, resp_crc: Option<u32>, want_err: bool) {
924        let res = check_crc32c_match(crc, resp_crc);
925        if want_err {
926            assert_eq!(
927                res.unwrap_err()
928                    .source()
929                    .unwrap()
930                    .downcast_ref::<ReadError>()
931                    .unwrap(),
932                &ReadError::BadCrc {
933                    got: crc,
934                    want: resp_crc.unwrap(),
935                }
936            );
937        } else {
938            res.unwrap();
939        }
940    }
941
942    #[test_case("", None; "no header")]
943    #[test_case("crc32c=hello", None; "invalid value")]
944    #[test_case("crc32c=AAAAAA==", Some(0); "zero value")]
945    #[test_case("crc32c=SZYC0g==", Some(1234567890_u32); "value")]
946    #[test_case("crc32c=SZYC0g==,md5=something", Some(1234567890_u32); "md5 after crc32c")]
947    #[test_case("md5=something,crc32c=SZYC0g==", Some(1234567890_u32); "md5 before crc32c")]
948    fn test_headers_to_crc(val: &str, want: Option<u32>) -> Result {
949        let mut headers = http::HeaderMap::new();
950        if !val.is_empty() {
951            headers.insert("x-goog-hash", http::HeaderValue::from_str(val)?);
952        }
953        let got = headers_to_crc32c(&headers);
954        assert_eq!(got, want);
955        Ok(())
956    }
957
958    #[test_case(false, vec![("x-goog-hash", "crc32c=SZYC0g==")], http::StatusCode::OK, None; "full content not requested")]
959    #[test_case(true, vec![], http::StatusCode::PARTIAL_CONTENT, None; "No x-goog-hash")]
960    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "json")], http::StatusCode::OK, None; "server uncompressed")]
961    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g=="), ("x-goog-stored-content-encoding", "gzip"), ("content-encoding", "gzip")], http::StatusCode::OK, Some(1234567890_u32); "both gzip")]
962    #[test_case(true, vec![("x-goog-hash", "crc32c=SZYC0g==")], http::StatusCode::OK, Some(1234567890_u32); "all ok")]
963    fn test_check_crc_enabled(
964        full_content_requested: bool,
965        headers: Vec<(&str, &str)>,
966        status: http::StatusCode,
967        want: Option<u32>,
968    ) -> Result {
969        let mut header_map = http::HeaderMap::new();
970        for (key, value) in headers {
971            header_map.insert(
972                http::HeaderName::from_bytes(key.as_bytes())?,
973                http::HeaderValue::from_bytes(value.as_bytes())?,
974            );
975        }
976
977        let got = crc32c_from_response(full_content_requested, status, &header_map);
978        assert_eq!(got, want);
979        Ok(())
980    }
981}