mod non_resumable;
mod parse_http_response;
mod resumable;
use super::client::*;
use super::*;
use crate::model_ext::KeyAes256;
use crate::read_object::ReadObjectResponse;
use crate::read_resume_policy::ReadResumePolicy;
use crate::storage::checksum::details::Md5;
use crate::storage::request_options::RequestOptions;
#[derive(Clone, Debug)]
pub struct ReadObject<S = crate::storage::transport::Storage>
where
S: crate::storage::stub::Storage + 'static,
{
stub: std::sync::Arc<S>,
request: crate::model::ReadObjectRequest,
options: RequestOptions,
}
impl<S> ReadObject<S>
where
S: crate::storage::stub::Storage + 'static,
{
pub(crate) fn new<B, O>(
stub: std::sync::Arc<S>,
bucket: B,
object: O,
options: RequestOptions,
) -> Self
where
B: Into<String>,
O: Into<String>,
{
ReadObject {
stub,
request: crate::model::ReadObjectRequest::new()
.set_bucket(bucket)
.set_object(object),
options,
}
}
pub fn compute_md5(self) -> Self {
let mut this = self;
this.options.checksum.md5_hash = Some(Md5::default());
this
}
pub fn set_generation<T: Into<i64>>(mut self, v: T) -> Self {
self.request.generation = v.into();
self
}
pub fn set_if_generation_match<T>(mut self, v: T) -> Self
where
T: Into<i64>,
{
self.request.if_generation_match = Some(v.into());
self
}
pub fn set_if_generation_not_match<T>(mut self, v: T) -> Self
where
T: Into<i64>,
{
self.request.if_generation_not_match = Some(v.into());
self
}
pub fn set_if_metageneration_match<T>(mut self, v: T) -> Self
where
T: Into<i64>,
{
self.request.if_metageneration_match = Some(v.into());
self
}
pub fn set_if_metageneration_not_match<T>(mut self, v: T) -> Self
where
T: Into<i64>,
{
self.request.if_metageneration_not_match = Some(v.into());
self
}
pub fn set_read_range(mut self, range: crate::model_ext::ReadRange) -> Self {
self.request.with_range(range);
self
}
pub fn set_key(mut self, v: KeyAes256) -> Self {
self.request.common_object_request_params = Some(v.into());
self
}
pub fn with_retry_policy<V: Into<gax::retry_policy::RetryPolicyArg>>(mut self, v: V) -> Self {
self.options.retry_policy = v.into().into();
self
}
pub fn with_backoff_policy<V: Into<gax::backoff_policy::BackoffPolicyArg>>(
mut self,
v: V,
) -> Self {
self.options.backoff_policy = v.into().into();
self
}
pub fn with_retry_throttler<V: Into<gax::retry_throttler::RetryThrottlerArg>>(
mut self,
v: V,
) -> Self {
self.options.retry_throttler = v.into().into();
self
}
pub fn with_read_resume_policy<V>(mut self, v: V) -> Self
where
V: ReadResumePolicy + 'static,
{
self.options.read_resume_policy = std::sync::Arc::new(v);
self
}
pub fn with_automatic_decompression(mut self, v: bool) -> Self {
self.options.automatic_decompression = v;
self
}
pub async fn send(self) -> Result<ReadObjectResponse> {
self.stub.read_object(self.request, self.options).await
}
}
#[derive(Clone, Debug)]
pub(crate) struct Reader {
pub inner: std::sync::Arc<StorageInner>,
pub request: crate::model::ReadObjectRequest,
pub options: RequestOptions,
}
impl Reader {
async fn read(self) -> Result<reqwest::Response> {
let throttler = self.options.retry_throttler.clone();
let retry = self.options.retry_policy.clone();
let backoff = self.options.backoff_policy.clone();
gax::retry_loop_internal::retry_loop(
async move |_| self.read_attempt().await,
async |duration| tokio::time::sleep(duration).await,
true,
throttler,
retry,
backoff,
)
.await
}
async fn read_attempt(&self) -> Result<reqwest::Response> {
let builder = self.http_request_builder().await?;
let response = builder.send().await.map_err(Error::io)?;
if !response.status().is_success() {
return gaxi::http::to_http_error(response).await;
}
Ok(response)
}
async fn http_request_builder(&self) -> Result<reqwest::RequestBuilder> {
let bucket = &self.request.bucket;
let bucket_id = bucket
.as_str()
.strip_prefix("projects/_/buckets/")
.ok_or_else(|| {
Error::binding(format!(
"malformed bucket name, it must start with `projects/_/buckets/`: {bucket}"
))
})?;
let object = &self.request.object;
let builder = self
.inner
.client
.request(
reqwest::Method::GET,
format!(
"{}/storage/v1/b/{bucket_id}/o/{}",
&self.inner.endpoint,
enc(object)
),
)
.query(&[("alt", "media")])
.header(
"x-goog-api-client",
reqwest::header::HeaderValue::from_static(&self::info::X_GOOG_API_CLIENT_HEADER),
);
let builder = if self.options.automatic_decompression {
builder
} else {
builder.header(
"accept-encoding",
reqwest::header::HeaderValue::from_static("gzip"),
)
};
let builder = if self.request.generation != 0 {
builder.query(&[("generation", self.request.generation)])
} else {
builder
};
let builder = self
.request
.if_generation_match
.iter()
.fold(builder, |b, v| b.query(&[("ifGenerationMatch", v)]));
let builder = self
.request
.if_generation_not_match
.iter()
.fold(builder, |b, v| b.query(&[("ifGenerationNotMatch", v)]));
let builder = self
.request
.if_metageneration_match
.iter()
.fold(builder, |b, v| b.query(&[("ifMetagenerationMatch", v)]));
let builder = self
.request
.if_metageneration_not_match
.iter()
.fold(builder, |b, v| b.query(&[("ifMetagenerationNotMatch", v)]));
let builder = apply_customer_supplied_encryption_headers(
builder,
&self.request.common_object_request_params,
);
let builder = match (self.request.read_offset, self.request.read_limit) {
(_, l) if l < 0 => {
unreachable!("ReadObject build never sets a negative read_limit value")
}
(o, l) if o < 0 && l > 0 => unreachable!(
"ReadObject builder never sets a positive read_offset value with a negative read_limit value"
),
(0, 0) => builder,
(o, 0) if o < 0 => builder.header("range", format!("bytes={o}")),
(o, 0) => builder.header("range", format!("bytes={o}-")),
(o, l) => builder.header("range", format!("bytes={o}-{}", o + l - 1)),
};
self.inner.apply_auth_headers(builder).await
}
fn is_gunzipped(response: &reqwest::Response) -> bool {
const TRANSFORMATION: &str = "x-guploader-response-body-transformations";
use http::header::WARNING;
if response
.headers()
.get(TRANSFORMATION)
.is_some_and(|h| h.as_bytes() == "gunzipped".as_bytes())
{
return true;
}
response
.headers()
.get(WARNING)
.is_some_and(|h| h.as_bytes() == "214 UploadServer gunzipped".as_bytes())
}
pub(crate) async fn response(self) -> Result<ReadObjectResponse> {
let response = self.clone().read().await?;
if Self::is_gunzipped(&response) {
return Ok(ReadObjectResponse::new(Box::new(
non_resumable::NonResumableResponse::new(response)?,
)));
}
Ok(ReadObjectResponse::new(Box::new(
resumable::ResumableResponse::new(self, response)?,
)))
}
}
#[cfg(test)]
mod resume_tests;
#[cfg(test)]
mod tests {
use super::client::tests::{test_builder, test_inner_client};
use super::*;
use crate::error::{ChecksumMismatch, ReadError};
use crate::model_ext::{KeyAes256, ReadRange, tests::create_key_helper};
use auth::credentials::anonymous::Builder as Anonymous;
use base64::Engine;
use futures::TryStreamExt;
use httptest::{Expectation, Server, matchers::*, responders::status_code};
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use test_case::test_case;
type Result = anyhow::Result<()>;
async fn http_request_builder(
inner: Arc<StorageInner>,
builder: ReadObject,
) -> crate::Result<reqwest::RequestBuilder> {
let reader = Reader {
inner,
request: builder.request,
options: builder.options,
};
reader.http_request_builder().await
}
#[tokio::test]
async fn test_read_is_send_and_static() -> Result {
let client = Storage::builder()
.with_credentials(Anonymous::new().build())
.build()
.await?;
fn need_send<T: Send>(_val: &T) {}
fn need_sync<T: Sync>(_val: &T) {}
fn need_static<T: 'static>(_val: &T) {}
let read = client.read_object("projects/_/buckets/test-bucket", "test-object");
need_send(&read);
need_sync(&read);
need_static(&read);
let read = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send();
need_send(&read);
need_static(&read);
Ok(())
}
#[tokio::test]
async fn read_object_normal() -> Result {
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::headers(contains(("accept-encoding", "gzip"))),
request::query(url_decoded(contains(("alt", "media")))),
])
.respond_with(
status_code(200)
.body("hello world")
.append_header("x-goog-generation", 123456),
),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let mut reader = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?;
let mut got = Vec::new();
while let Some(b) = reader.next().await.transpose()? {
got.extend_from_slice(&b);
}
assert_eq!(bytes::Bytes::from_owner(got), "hello world");
Ok(())
}
#[tokio::test]
async fn read_object_stream() -> Result {
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::query(url_decoded(contains(("alt", "media")))),
])
.respond_with(
status_code(200)
.append_header("x-goog-generation", 123456)
.body("hello world"),
),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let response = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?;
let result: Vec<_> = response.into_stream().try_collect().await?;
assert_eq!(result, vec![bytes::Bytes::from_static(b"hello world")]);
Ok(())
}
#[tokio::test]
async fn read_object_next_then_consume_response() -> Result {
const BLOCK_SIZE: usize = 500;
let mut contents = Vec::new();
for i in 0..50 {
contents.extend_from_slice(&[i as u8; BLOCK_SIZE]);
}
let u = crc32c::crc32c(&contents);
let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::query(url_decoded(contains(("alt", "media")))),
])
.times(1)
.respond_with(
status_code(200)
.body(contents.clone())
.append_header("x-goog-hash", format!("crc32c={value}"))
.append_header("x-goog-generation", 123456),
),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let mut response = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?;
let mut all_bytes = bytes::BytesMut::new();
let chunk = response.next().await.transpose()?.unwrap();
assert!(!chunk.is_empty());
all_bytes.extend(chunk);
use futures::StreamExt;
let mut stream = response.into_stream();
while let Some(chunk) = stream.next().await.transpose()? {
all_bytes.extend(chunk);
}
assert_eq!(all_bytes, contents);
Ok(())
}
#[tokio::test]
async fn read_object_not_found() -> Result {
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::query(url_decoded(contains(("alt", "media")))),
])
.respond_with(status_code(404).body("NOT FOUND")),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let err = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await
.expect_err("expected a not found error");
assert_eq!(err.http_status_code(), Some(404));
Ok(())
}
#[tokio::test]
async fn read_object_incorrect_crc32c_check() -> Result {
let u = crc32c::crc32c("goodbye world".as_bytes());
let value = base64::prelude::BASE64_STANDARD.encode(u.to_be_bytes());
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::query(url_decoded(contains(("alt", "media")))),
])
.times(3)
.respond_with(
status_code(200)
.body("hello world")
.append_header("x-goog-hash", format!("crc32c={value}"))
.append_header("x-goog-generation", 123456),
),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let mut response = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?;
let mut partial = Vec::new();
let mut err = None;
while let Some(r) = response.next().await {
match r {
Ok(b) => partial.extend_from_slice(&b),
Err(e) => err = Some(e),
};
}
assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
let err = err.expect("expect error on incorrect crc32c");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
source,
Some(&ReadError::ChecksumMismatch(
ChecksumMismatch::Crc32c { .. }
))
),
"err={err:?}"
);
let mut response = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?;
let err: crate::Error = async {
{
while (response.next().await.transpose()?).is_some() {}
Ok(())
}
}
.await
.expect_err("expect error on incorrect crc32c");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
source,
Some(&ReadError::ChecksumMismatch(
ChecksumMismatch::Crc32c { .. }
))
),
"err={err:?}"
);
use futures::TryStreamExt;
let err = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.send()
.await?
.into_stream()
.try_collect::<Vec<bytes::Bytes>>()
.await
.expect_err("expect error on incorrect crc32c");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
source,
Some(&ReadError::ChecksumMismatch(
ChecksumMismatch::Crc32c { .. }
))
),
"err={err:?}"
);
Ok(())
}
#[tokio::test]
async fn read_object_incorrect_md5_check() -> Result {
let digest = md5::compute("goodbye world".as_bytes());
let value = base64::prelude::BASE64_STANDARD.encode(digest.as_ref());
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("GET", "/storage/v1/b/test-bucket/o/test-object"),
request::query(url_decoded(contains(("alt", "media")))),
])
.times(1)
.respond_with(
status_code(200)
.body("hello world")
.append_header("x-goog-hash", format!("md5={value}"))
.append_header("x-goog-generation", 123456),
),
);
let client = Storage::builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
let mut response = client
.read_object("projects/_/buckets/test-bucket", "test-object")
.compute_md5()
.send()
.await?;
let mut partial = Vec::new();
let mut err = None;
while let Some(r) = response.next().await {
match r {
Ok(b) => partial.extend_from_slice(&b),
Err(e) => err = Some(e),
};
}
assert_eq!(bytes::Bytes::from_owner(partial), "hello world");
let err = err.expect("expect error on incorrect md5");
let source = err.source().and_then(|e| e.downcast_ref::<ReadError>());
assert!(
matches!(
source,
Some(&ReadError::ChecksumMismatch(ChecksumMismatch::Md5 { .. }))
),
"err={err:?}"
);
Ok(())
}
#[tokio::test]
async fn read_object() -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
);
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
assert_eq!(
request.url().as_str(),
"http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
);
Ok(())
}
#[tokio::test]
async fn read_object_error_credentials() -> Result {
let inner = test_inner_client(
test_builder().with_credentials(auth::credentials::testing::error_credentials(false)),
);
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
);
let _ = http_request_builder(inner, builder)
.await
.inspect_err(|e| assert!(e.is_authentication()))
.expect_err("invalid credentials should err");
Ok(())
}
#[tokio::test]
async fn read_object_bad_bucket() -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(stub, "malformed", "object", inner.options.clone());
let _ = http_request_builder(inner, builder)
.await
.expect_err("malformed bucket string should error");
Ok(())
}
#[tokio::test]
async fn read_object_query_params() -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
)
.set_generation(5)
.set_if_generation_match(10)
.set_if_generation_not_match(20)
.set_if_metageneration_match(30)
.set_if_metageneration_not_match(40);
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
let want_pairs: HashMap<String, String> = [
("alt", "media"),
("generation", "5"),
("ifGenerationMatch", "10"),
("ifGenerationNotMatch", "20"),
("ifMetagenerationMatch", "30"),
("ifMetagenerationNotMatch", "40"),
]
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let query_pairs: HashMap<String, String> = request
.url()
.query_pairs()
.map(|param| (param.0.to_string(), param.1.to_string()))
.collect();
assert_eq!(query_pairs.len(), want_pairs.len());
assert_eq!(query_pairs, want_pairs);
Ok(())
}
#[tokio::test]
async fn read_object_default_headers() -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
);
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
assert_eq!(
request.url().as_str(),
"http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
);
let want = [("accept-encoding", "gzip")];
let headers = request.headers();
for (name, value) in want {
assert_eq!(
headers.get(name).and_then(|h| h.to_str().ok()),
Some(value),
"{request:?}"
);
}
Ok(())
}
#[tokio::test]
async fn read_object_automatic_decompression_headers() -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
)
.with_automatic_decompression(true);
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
assert_eq!(
request.url().as_str(),
"http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
);
let headers = request.headers();
assert!(headers.get("accept-encoding").is_none(), "{request:?}");
Ok(())
}
#[tokio::test]
async fn read_object_encryption_headers() -> Result {
let (key, key_base64, _, key_sha256_base64) = create_key_helper();
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
)
.set_key(KeyAes256::new(&key)?);
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
assert_eq!(
request.url().as_str(),
"http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
);
let want = [
("x-goog-encryption-algorithm", "AES256".to_string()),
("x-goog-encryption-key", key_base64),
("x-goog-encryption-key-sha256", key_sha256_base64),
];
let headers = request.headers();
for (name, value) in want {
assert_eq!(
headers.get(name).and_then(|h| h.to_str().ok()),
Some(value.as_str())
);
}
Ok(())
}
#[test_case(ReadRange::all(), None; "no headers needed")]
#[test_case(ReadRange::offset(10), Some(&http::HeaderValue::from_static("bytes=10-")); "offset only")]
#[test_case(ReadRange::tail(2000), Some(&http::HeaderValue::from_static("bytes=-2000")); "negative offset")]
#[test_case(ReadRange::segment(0, 100), Some(&http::HeaderValue::from_static("bytes=0-99")); "limit only")]
#[test_case(ReadRange::segment(1000, 100), Some(&http::HeaderValue::from_static("bytes=1000-1099")); "offset and limit")]
#[tokio::test]
async fn range_header(input: ReadRange, want: Option<&http::HeaderValue>) -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
"object",
inner.options.clone(),
)
.set_read_range(input.clone());
let request = http_request_builder(inner, builder).await?.build()?;
assert_eq!(request.method(), reqwest::Method::GET);
assert_eq!(
request.url().as_str(),
"http://private.googleapis.com/storage/v1/b/bucket/o/object?alt=media"
);
assert_eq!(request.headers().get("range"), want);
Ok(())
}
#[test_case("projects/p", "projects%2Fp")]
#[test_case("kebab-case", "kebab-case")]
#[test_case("dot.name", "dot.name")]
#[test_case("under_score", "under_score")]
#[test_case("tilde~123", "tilde~123")]
#[test_case("exclamation!point!", "exclamation%21point%21")]
#[test_case("spaces spaces", "spaces%20%20%20spaces")]
#[test_case("preserve%percent%21", "preserve%percent%21")]
#[test_case(
"testall !#$&'()*+,/:;=?@[]",
"testall%20%21%23%24%26%27%28%29%2A%2B%2C%2F%3A%3B%3D%3F%40%5B%5D"
)]
#[tokio::test]
async fn test_percent_encoding_object_name(name: &str, want: &str) -> Result {
let inner = test_inner_client(test_builder());
let stub = crate::storage::transport::Storage::new(inner.clone());
let builder = ReadObject::new(
stub,
"projects/_/buckets/bucket",
name,
inner.options.clone(),
);
let request = http_request_builder(inner, builder).await?.build()?;
let got = request.url().path_segments().unwrap().next_back().unwrap();
assert_eq!(got, want);
Ok(())
}
#[test_case("x-guploader-response-body-transformations", "gunzipped", true)]
#[test_case("x-guploader-response-body-transformations", "no match", false)]
#[test_case("warning", "214 UploadServer gunzipped", true)]
#[test_case("warning", "no match", false)]
#[test_case("unused", "unused", false)]
fn test_is_gunzipped(name: &'static str, value: &'static str, want: bool) -> Result {
let response = http::Response::builder()
.status(200)
.header(name, value)
.body(Vec::new())?;
let response = reqwest::Response::from(response);
let got = Reader::is_gunzipped(&response);
assert_eq!(got, want, "{response:?}");
Ok(())
}
}