google-cloud-storage 1.11.0

Google Cloud Client Libraries for Rust - Storage
Documentation
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;
use crate::builder::storage::WriteObject;
use crate::model_ext::{KeyAes256, tests::create_key_helper};
use crate::storage::client::tests::{test_builder, test_inner_client};
use crate::streaming_source::Payload;
use google_cloud_auth::credentials::testing::error_credentials;
use serde_json::{Value, json};
use std::collections::BTreeMap;
use std::sync::Arc;
use test_case::test_case;

type Result = anyhow::Result<()>;

mod checksums;
mod preconditions;

fn response_body() -> Value {
    json!({
        "name": "test-object",
        "bucket": "test-bucket",
        "metadata": {
            "is-test-object": "true",
        }
    })
}

pub(crate) fn perform_upload<T>(
    inner: Arc<StorageInner>,
    builder: WriteObject<T>,
) -> PerformUpload<Payload<T>> {
    PerformUpload::new(
        builder.payload,
        inner,
        builder.request.spec,
        builder.request.params,
        builder.options,
    )
}

#[tokio::test]
async fn start_resumable_upload() -> Result {
    let inner = test_inner_client(test_builder()).await;
    let options = inner.options.clone();
    let stub = crate::storage::transport::Storage::new_test(inner.clone());
    let builder = WriteObject::new(
        stub,
        "projects/_/buckets/bucket",
        "object",
        "hello",
        options,
    );
    let mut request = perform_upload(inner, builder)
        .start_resumable_upload_request()
        .await?
        .build_for_tests()
        .await?;

    assert_eq!(request.method(), Method::POST);
    assert_eq!(
        request.url().as_str(),
        "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
    );
    let body = request.body_mut().take().unwrap();
    let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
    let json = serde_json::from_slice::<Value>(&contents)?;
    assert_eq!(json, json!({}));
    Ok(())
}

#[tokio::test]
async fn start_resumable_upload_headers() -> Result {
    // Make a 32-byte key.
    let (key, key_base64, _, key_sha256_base64) = create_key_helper();

    let inner = test_inner_client(test_builder()).await;
    let options = inner.options.clone();
    let stub = crate::storage::transport::Storage::new_test(inner.clone());
    let builder = WriteObject::new(
        stub,
        "projects/_/buckets/bucket",
        "object",
        "hello",
        options,
    )
    .set_key(KeyAes256::new(&key)?);
    let request = perform_upload(inner, builder)
        .start_resumable_upload_request()
        .await?
        .build_for_tests()
        .await?;

    assert_eq!(request.method(), Method::POST);
    assert_eq!(
        request.url().as_str(),
        "http://private.googleapis.com/upload/storage/v1/b/bucket/o?uploadType=resumable&name=object"
    );

    let want = vec![
        ("x-goog-encryption-algorithm", "AES256".to_string()),
        ("x-goog-encryption-key", key_base64),
        ("x-goog-encryption-key-sha256", key_sha256_base64),
    ];

    for (name, value) in want {
        assert_eq!(
            request.headers().get(name).unwrap().as_bytes(),
            bytes::Bytes::from(value)
        );
    }
    Ok(())
}

#[tokio::test]
async fn start_resumable_upload_bad_bucket() -> Result {
    let inner = test_inner_client(test_builder()).await;
    let options = inner.options.clone();
    let stub = crate::storage::transport::Storage::new_test(inner.clone());
    let builder = WriteObject::new(stub, "malformed", "object", "hello", options);
    let _ = perform_upload(inner, builder)
        .start_resumable_upload_request()
        .await
        .expect_err("malformed bucket string should error");
    Ok(())
}

#[tokio::test]
async fn start_resumable_upload_metadata_in_request() -> Result {
    use crate::model::ObjectAccessControl;
    let inner = test_inner_client(test_builder()).await;
    let options = inner.options.clone();
    let stub = crate::storage::transport::Storage::new_test(inner.clone());
    let builder = WriteObject::new(stub, "projects/_/buckets/bucket", "object", "", options)
        .set_if_generation_match(10)
        .set_if_generation_not_match(20)
        .set_if_metageneration_match(30)
        .set_if_metageneration_not_match(40)
        .set_predefined_acl("private")
        .set_acl([ObjectAccessControl::new()
            .set_entity("allAuthenticatedUsers")
            .set_role("READER")])
        .set_cache_control("public; max-age=7200")
        .set_content_disposition("inline")
        .set_content_encoding("gzip")
        .set_content_language("en")
        .set_content_type("text/plain")
        .set_custom_time(wkt::Timestamp::try_from("2025-07-07T18:11:00Z")?)
        .set_event_based_hold(true)
        .set_metadata([("k0", "v0"), ("k1", "v1")])
        .set_retention(
            crate::model::object::Retention::new()
                .set_mode(crate::model::object::retention::Mode::Locked)
                .set_retain_until_time(wkt::Timestamp::try_from("2035-07-07T18:14:00Z")?),
        )
        .set_storage_class("ARCHIVE")
        .set_temporary_hold(true)
        .set_kms_key("test-key")
        .with_known_crc32c(crc32c::crc32c(b""))
        .with_known_md5_hash(md5::compute(b"").0);
    let mut request = perform_upload(inner, builder)
        .start_resumable_upload_request()
        .await?
        .build_for_tests()
        .await?;

    assert_eq!(request.method(), Method::POST);
    let want_pairs: BTreeMap<String, String> = [
        ("uploadType", "resumable"),
        ("name", "object"),
        ("ifGenerationMatch", "10"),
        ("ifGenerationNotMatch", "20"),
        ("ifMetagenerationMatch", "30"),
        ("ifMetagenerationNotMatch", "40"),
        ("kmsKeyName", "test-key"),
        ("predefinedAcl", "private"),
    ]
    .iter()
    .map(|(k, v)| (k.to_string(), v.to_string()))
    .collect();
    let query_pairs: BTreeMap<String, String> = request
        .url()
        .query_pairs()
        .map(|param| (param.0.to_string(), param.1.to_string()))
        .collect();
    assert_eq!(query_pairs, want_pairs);

    let body = request.body_mut().take().unwrap();
    let contents = http_body_util::BodyExt::collect(body).await?.to_bytes();
    let json = serde_json::from_slice::<Value>(&contents)?;
    assert_eq!(
        json,
        json!({
            "acl": [{"entity": "allAuthenticatedUsers", "role": "READER"}],
            "cacheControl": "public; max-age=7200",
            "contentDisposition": "inline",
            "contentEncoding": "gzip",
            "contentLanguage": "en",
            "contentType": "text/plain",
            "crc32c": "AAAAAA==",
            "customTime": "2025-07-07T18:11:00Z",
            "eventBasedHold": true,
            "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
            "metadata": {"k0": "v0", "k1": "v1"},
            "retention": {"mode": "LOCKED", "retainUntilTime": "2035-07-07T18:14:00Z"},
            "storageClass": "ARCHIVE",
            "temporaryHold": true,
        })
    );
    Ok(())
}

#[tokio::test]
async fn start_resumable_upload_credentials() -> Result {
    let inner = test_inner_client(test_builder().with_credentials(error_credentials(false))).await;
    let options = inner.options.clone();
    let stub = crate::storage::transport::Storage::new_test(inner.clone());
    let builder = WriteObject::new(
        stub,
        "projects/_/buckets/bucket",
        "object",
        "hello",
        options,
    );
    let _ = perform_upload(inner, builder)
        .start_resumable_upload_request()
        .await?
        .build_for_tests()
        .await
        .inspect_err(|e| assert!(e.is_authentication()))
        .expect_err("invalid credentials should err");
    Ok(())
}

#[tokio::test]
async fn handle_start_resumable_upload_response() -> Result {
    let response = http::Response::builder()
        .header(
            "Location",
            "http://private.googleapis.com/test-only/session-123",
        )
        .body(Vec::new())?;
    let response = Response::from(response);
    let url = super::handle_start_resumable_upload_response(response).await?;
    assert_eq!(url, "http://private.googleapis.com/test-only/session-123");
    Ok(())
}

#[test_case(None, Some(0))]
#[test_case(Some("bytes=0-12345"), Some(12345))]
#[test_case(Some("bytes=0-1"), Some(1))]
#[test_case(Some("bytes=0-0"), Some(0))]
#[test_case(Some("bytes=1-12345"), None)]
#[test_case(Some(""), None)]
fn range_end(input: Option<&str>, want: Option<u64>) {
    use gaxi::http::reqwest::{HeaderMap, HeaderName, HeaderValue};
    let headers = HeaderMap::from_iter(input.into_iter().map(|s| {
        (
            HeaderName::from_static("range"),
            HeaderValue::from_str(s).unwrap(),
        )
    }));
    assert_eq!(super::parse_range_end(&headers), want, "{headers:?}");
}

#[test]
fn validate_status_code() {
    assert_eq!(RESUME_INCOMPLETE, 308);
}

#[tokio::test]
async fn query_resumable_upload_partial() -> Result {
    let response = http::Response::builder()
        .header("range", "bytes=0-99")
        .status(RESUME_INCOMPLETE)
        .body(Vec::new())?;
    let response = Response::from(response);
    let status = super::query_resumable_upload_handle_response(response).await?;
    assert_eq!(status, ResumableUploadStatus::Partial(100_u64));
    Ok(())
}

#[tokio::test]
async fn query_resumable_upload_finalized() -> Result {
    let response = http::Response::builder()
        .status(200)
        .body(response_body().to_string())?;
    let response = Response::from(response);
    let status = super::query_resumable_upload_handle_response(response).await?;
    assert!(
        matches!(status, ResumableUploadStatus::Finalized(_)),
        "{status:?}"
    );
    Ok(())
}

#[tokio::test]
async fn query_resumable_upload_http_error() -> Result {
    let response = http::Response::builder().status(429).body(Vec::new())?;
    let response = Response::from(response);
    let err = super::query_resumable_upload_handle_response(response)
        .await
        .expect_err("HTTP error should return error");
    assert_eq!(err.http_status_code(), Some(429), "{err:?}");
    Ok(())
}

#[tokio::test]
async fn query_resumable_upload_finalized_deser() -> Result {
    let response = http::Response::builder()
        .status(200)
        .body("a string is not a valid object".to_string())?;
    let response = Response::from(response);
    let err = super::query_resumable_upload_handle_response(response)
        .await
        .expect_err("bad response should return an error");
    assert!(err.is_deserialization(), "{err:?}");
    Ok(())
}

#[tokio::test]
async fn parse_range() -> Result {
    let response = http::Response::builder()
        .header("range", "bytes=0-99")
        .status(RESUME_INCOMPLETE)
        .body(Vec::new())?;
    let response = Response::from(response);
    let range = super::parse_range(response).await?;
    assert_eq!(range, ResumableUploadStatus::Partial(100_u64));
    Ok(())
}

#[tokio::test]
async fn parse_range_missing() -> Result {
    let response = http::Response::builder()
        .status(RESUME_INCOMPLETE)
        .body(Vec::new())?;
    let response = Response::from(response);
    let range = super::parse_range(response).await?;
    assert_eq!(range, ResumableUploadStatus::Partial(0));
    Ok(())
}

#[tokio::test]
async fn parse_range_invalid_range() -> Result {
    let response = http::Response::builder()
        .header("range", "bytes=100-999")
        .status(RESUME_INCOMPLETE)
        .body(Vec::new())?;
    let response = Response::from(response);
    let err = super::parse_range(response)
        .await
        .expect_err("invalid range should create an error");
    assert_eq!(err.http_status_code(), Some(308), "{err:?}");
    Ok(())
}