use super::RESUMABLE_UPLOAD_QUANTUM;
use crate::model_ext::{KeyAes256, tests::create_key_helper};
use crate::storage::client::tests::{
MockBackoffPolicy, MockRetryPolicy, MockRetryThrottler, test_builder,
};
use crate::streaming_source::{BytesSource, SizeHint, tests::UnknownSize};
use google_cloud_auth::credentials::anonymous::Builder as Anonymous;
use google_cloud_gax::retry_policy::RetryPolicyExt;
use google_cloud_gax::retry_result::RetryResult;
use httptest::{Expectation, Server, matchers::*, responders::*};
use serde_json::{Value, json};
use std::time::Duration;
type Result = anyhow::Result<()>;
fn response_body() -> Value {
json!({
"name": "test-object",
"bucket": "test-bucket",
"metadata": {
"is-test-object": "true",
}
})
}
#[tokio::test]
async fn empty_success() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200).append_header("location", session.to_string()),
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */0")))
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200)
.append_header("content-type", "application/json")
.body(response_body().to_string())
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(1)
.respond_with(status_code(308)),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.set_if_generation_match(0_i64)
.send_buffered()
.await?;
assert_eq!(response.name, "test-object");
assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
assert_eq!(
response.metadata.get("is-test-object").map(String::as_str),
Some("true")
);
Ok(())
}
#[tokio::test]
async fn resumable_empty_unknown() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200).append_header("location", session.to_string()),
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */0")))
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200)
.append_header("content-type", "application/json")
.body(response_body().to_string())
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(1)
.respond_with(status_code(308)),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object(
"projects/_/buckets/test-bucket",
"test-object",
UnknownSize::new(BytesSource::new(bytes::Bytes::from_static(b""))),
)
.set_if_generation_match(0_i64)
.send_buffered()
.await?;
assert_eq!(response.name, "test-object");
assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
assert_eq!(
response.metadata.get("is-test-object").map(String::as_str),
Some("true")
);
Ok(())
}
#[tokio::test]
async fn empty_csek() -> Result {
let (key, key_base64, _, key_sha256_base64) = create_key_helper();
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
request::headers(contains(("x-goog-encryption-algorithm", "AES256"))),
request::headers(contains(("x-goog-encryption-key", key_base64.clone()))),
request::headers(contains((
"x-goog-encryption-key-sha256",
key_sha256_base64.clone()
))),
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200).append_header("location", session.to_string()),
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */0"))),
request::headers(contains(("x-goog-encryption-algorithm", "AES256"))),
request::headers(contains(("x-goog-encryption-key", key_base64.clone()))),
request::headers(contains((
"x-goog-encryption-key-sha256",
key_sha256_base64.clone()
))),
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(200)
.append_header("content-type", "application/json")
.body(response_body().to_string())
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(1)
.respond_with(status_code(308)),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.set_if_generation_match(0_i64)
.set_key(KeyAes256::new(&key)?)
.send_buffered()
.await?;
assert_eq!(response.name, "test-object");
assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
assert_eq!(
response.metadata.get("is-test-object").map(String::as_str),
Some("true")
);
Ok(())
}
#[tokio::test]
async fn source_next_error() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.times(1)
.respond_with(cycle![
status_code(200).append_header("location", session.to_string()),
]),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_credentials(Anonymous::new().build())
.build()
.await?;
use crate::streaming_source::tests::MockSimpleSource;
use std::io::{Error as IoError, ErrorKind};
let mut source = MockSimpleSource::new();
source
.expect_next()
.once()
.returning(|| Some(Err(IoError::new(ErrorKind::ConnectionAborted, "test-only"))));
source
.expect_size_hint()
.once()
.returning(|| Ok(SizeHint::with_exact(1024)));
let err = client
.write_object("projects/_/buckets/test-bucket", "test-object", source)
.set_if_generation_match(0)
.with_resumable_upload_threshold(0_usize)
.send_buffered()
.await
.expect_err("expected a serialization error");
assert!(err.is_serialization(), "{err:?}");
Ok(())
}
#[tokio::test]
async fn start_permanent_error() -> Result {
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(403).body("uh-oh"),
]),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.set_if_generation_match(0_i64)
.send_buffered()
.await
.expect_err("request should fail");
assert_eq!(response.http_status_code(), Some(403), "{response:?}");
Ok(())
}
#[tokio::test]
async fn start_too_many_transients() -> Result {
let server = Server::run();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.times(3)
.respond_with(status_code(429).body("try-again")),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.with_retry_policy(crate::retry_policy::RetryableErrors.with_attempt_limit(3))
.set_if_generation_match(0_i64)
.send_buffered()
.await
.expect_err("request should fail");
assert_eq!(response.http_status_code(), Some(429), "{response:?}");
Ok(())
}
#[tokio::test]
async fn put_permanent_error() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
let path = session.path().to_string();
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */0")))
])
.times(2)
.respond_with(cycle![
status_code(429).body("try-again"),
status_code(412).body("precondition"),
]),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", path.clone()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(1)
.respond_with(status_code(308)),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.set_if_generation_match(0_i64)
.send_buffered()
.await
.expect_err("request should fail");
assert_eq!(response.http_status_code(), Some(412), "{response:?}");
Ok(())
}
#[tokio::test]
async fn put_too_many_transients() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains(("content-range", "bytes */0")))
])
.times(3)
.respond_with(status_code(429).body("try-again")),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(2)
.respond_with(status_code(308)),
);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", "")
.with_retry_policy(crate::retry_policy::RetryableErrors.with_attempt_limit(3))
.set_if_generation_match(0_i64)
.send_buffered()
.await
.expect_err("request should fail");
assert_eq!(response.http_status_code(), Some(429), "{response:?}");
Ok(())
}
#[tokio::test]
async fn put_partial_and_recover() -> Result {
const QUANTUM: usize = RESUMABLE_UPLOAD_QUANTUM;
const TARGET: usize = 2 * QUANTUM;
const FULL: usize = 3 * QUANTUM + QUANTUM / 2;
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains((
"content-range",
format!("bytes 0-{}/*", TARGET - 1)
)))
])
.respond_with(status_code(429).body("try-again")),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains((
"content-range",
format!("bytes {QUANTUM}-{}/*", QUANTUM + TARGET - 1)
)))
])
.respond_with(
status_code(308).append_header("range", format!("bytes=0-{}", 3 * QUANTUM - 1)),
),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains((
"content-range",
format!("bytes {}-{}/{FULL}", QUANTUM + TARGET, FULL - 1)
)))
])
.respond_with(status_code(200).body(response_body().to_string())),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.respond_with(status_code(308).append_header("range", format!("bytes=0-{}", QUANTUM - 1))),
);
let payload = bytes::Bytes::from_owner(vec![0_u8; FULL]);
let payload = UnknownSize::new(BytesSource::new(payload));
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.with_resumable_upload_buffer_size(TARGET)
.build()
.await?;
let upload = client
.write_object("projects/_/buckets/test-bucket", "test-object", payload)
.with_retry_policy(crate::retry_policy::RetryableErrors.with_attempt_limit(3))
.set_if_generation_match(0_i64)
.with_resumable_upload_buffer_size(TARGET);
let response = upload.send_buffered().await?;
assert_eq!(response.name, "test-object");
assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
assert_eq!(
response.metadata.get("is-test-object").map(String::as_str),
Some("true")
);
Ok(())
}
#[tokio::test]
async fn put_error_and_finalized() -> Result {
let server = Server::run();
let session = server.url("/upload/session/test-only-001");
server.expect(
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/test-bucket/o"),
request::query(url_decoded(contains(("name", "test-object")))),
request::query(url_decoded(contains(("ifGenerationMatch", "0")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
.respond_with(status_code(200).append_header("location", session.to_string())),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains(("content-range", "bytes 0-999/1000")))
])
.respond_with(status_code(429).body("try-again")),
);
server.expect(
Expectation::matching(all_of![
request::method_path("PUT", session.path().to_string()),
request::headers(contains(("content-range", "bytes */*"))),
request::headers(contains(("content-length", "0"))),
])
.times(1)
.respond_with(cycle![status_code(200).body(response_body().to_string()),]),
);
let payload = bytes::Bytes::from_owner(vec![0_u8; 1_000]);
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let response = client
.write_object("projects/_/buckets/test-bucket", "test-object", payload)
.with_retry_policy(crate::retry_policy::RetryableErrors.with_attempt_limit(3))
.set_if_generation_match(0_i64)
.send_buffered()
.await?;
assert_eq!(response.name, "test-object");
assert_eq!(response.bucket, "projects/_/buckets/test-bucket");
assert_eq!(
response.metadata.get("is-test-object").map(String::as_str),
Some("true")
);
Ok(())
}
#[tokio::test]
async fn start_resumable_upload_request_retry_options() -> Result {
let server = Server::run();
let matching = || {
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
request::query(url_decoded(contains(("name", "object")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
};
server.expect(
matching()
.times(3)
.respond_with(status_code(503).body("try-again")),
);
let mut retry = MockRetryPolicy::new();
retry
.expect_on_error()
.times(1..)
.returning(|_, e| RetryResult::Continue(e));
let mut backoff = MockBackoffPolicy::new();
backoff
.expect_on_failure()
.times(1..)
.return_const(Duration::from_micros(1));
let mut throttler = MockRetryThrottler::new();
throttler
.expect_throttle_retry_attempt()
.times(1..)
.return_const(false);
throttler
.expect_on_retry_failure()
.times(1..)
.return_const(());
throttler.expect_on_success().never().return_const(());
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let err = client
.write_object("projects/_/buckets/bucket", "object", "hello")
.with_retry_policy(retry.with_attempt_limit(3))
.with_backoff_policy(backoff)
.with_retry_throttler(throttler)
.send_buffered()
.await
.expect_err("request should fail after 3 retry attempts");
assert_eq!(err.http_status_code(), Some(503), "{err:?}");
Ok(())
}
#[tokio::test]
async fn start_resumable_upload_client_retry_options() -> Result {
use google_cloud_gax::retry_policy::RetryPolicyExt;
let server = Server::run();
let matching = || {
Expectation::matching(all_of![
request::method_path("POST", "/upload/storage/v1/b/bucket/o"),
request::query(url_decoded(contains(("name", "object")))),
request::query(url_decoded(contains(("uploadType", "resumable")))),
])
};
server.expect(
matching()
.times(3)
.respond_with(status_code(503).body("try-again")),
);
let mut retry = MockRetryPolicy::new();
retry
.expect_on_error()
.times(1..)
.returning(|_, e| RetryResult::Continue(e));
let mut backoff = MockBackoffPolicy::new();
backoff
.expect_on_failure()
.times(1..)
.return_const(Duration::from_micros(1));
let mut throttler = MockRetryThrottler::new();
throttler
.expect_throttle_retry_attempt()
.times(1..)
.return_const(false);
throttler
.expect_on_retry_failure()
.times(1..)
.return_const(());
throttler.expect_on_success().never().return_const(());
let client = test_builder()
.with_endpoint(format!("http://{}", server.addr()))
.with_retry_policy(retry.with_attempt_limit(3))
.with_backoff_policy(backoff)
.with_retry_throttler(throttler)
.with_resumable_upload_threshold(0_usize)
.build()
.await?;
let err = client
.write_object("projects/_/buckets/bucket", "object", "hello")
.send_buffered()
.await
.expect_err("request should fail after 3 retry attempts");
assert_eq!(err.http_status_code(), Some(503), "{err:?}");
Ok(())
}