gcs-rsync 0.4.8

rsync support for gcs with higher perf than gsutil rsync
Documentation
use std::ops::Not;

use futures::{StreamExt, TryStreamExt};
use gcs_rsync::{
    oauth2::token::ServiceAccountCredentials,
    storage::{
        Metadata, Object, ObjectClient, ObjectMetadata, ObjectsListRequest, PartialObject,
        StorageResult,
    },
};

mod config;
use config::gcs::GcsTestConfig;

#[tokio::test]
async fn test_test_config() {
    let t = GcsTestConfig::from_env().await;

    assert!(
        t.list_prefix().is_empty().not(),
        "list prefix should not be empty"
    );
    let name = t.object("object_name").name;
    assert!(
        name.ends_with("/object_name"),
        "object name should end with /object_name"
    );

    assert!(t.bucket().is_empty().not(), "bucket should not be empty");
}

async fn assert_delete_err(object_client: &ObjectClient, object: &Object) {
    let delete_result = object_client.delete(object).await;
    assert!(
        delete_result.is_err(),
        "expected an error got {:?} for {}",
        delete_result,
        object
    );
}

async fn assert_delete_ok(object_client: &ObjectClient, object: &Object) {
    let delete_result = object_client.delete(object).await;
    assert!(
        delete_result.is_ok(),
        "unexpected error {:?} for {}",
        delete_result,
        object
    );
}

async fn upload_bytes(
    object_client: &ObjectClient,
    object: &Object,
    content: &str,
) -> StorageResult<()> {
    let data = bytes::Bytes::copy_from_slice(content.as_bytes());
    let stream = futures::stream::once(futures::future::ok::<bytes::Bytes, String>(data));
    object_client.upload(object, stream).await
}

async fn assert_upload_bytes(object_client: &ObjectClient, object: &Object, content: &str) {
    let upload_result = upload_bytes(object_client, object, content).await;
    assert!(
        upload_result.is_ok(),
        "unexpected error got {:?} for {}",
        upload_result,
        object
    );
}

async fn assert_download_bytes(object_client: &ObjectClient, object: &Object, expected: &str) {
    let bytes = futures::stream::once(object_client.download(object))
        .try_flatten()
        .try_fold(Vec::new(), |mut bytes, buffer| {
            for byte in buffer {
                bytes.push(byte);
            }
            futures::future::ok(bytes)
        })
        .await
        .unwrap();
    let actual = String::from_utf8(bytes).unwrap();
    assert_eq!(expected, actual);
}

#[tokio::test]
async fn test_upload_multipart() {
    let test_config = GcsTestConfig::from_env().await;
    let object = test_config.object("with/path/object_mutlipart.txt");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();

    let content = b"test multipart";

    let data = bytes::Bytes::copy_from_slice(content);
    let stream = futures::stream::once(futures::future::ok::<bytes::Bytes, String>(data));
    let now = chrono::offset::Utc::now().timestamp();
    let metadata = ObjectMetadata {
        metadata: Metadata {
            modification_time: Some(now),
        },
    };
    object_client
        .upload_with_metadata(&metadata, &object, stream)
        .await
        .unwrap();
    let actual = object_client
        .get(&object, "size, metadata/goog-reserved-file-mtime")
        .await
        .unwrap();
    assert_delete_ok(&object_client, &object).await;
    assert_eq!(Some(now), actual.metadata.and_then(|x| x.modification_time));
    assert_eq!(Some(content.len() as u64), actual.size);
}

#[tokio::test]
async fn test_delete_upload_download_delete() {
    let test_config = GcsTestConfig::from_env().await;
    let object = test_config.object("object.txt");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();

    let content = "hello";
    assert_delete_err(&object_client, &object).await;
    assert_upload_bytes(&object_client, &object, content).await;
    assert_download_bytes(&object_client, &object, content).await;
    assert_delete_ok(&object_client, &object).await;
}

#[tokio::test]
async fn test_get_object_ok() {
    let test_config = GcsTestConfig::from_env().await;
    let object = test_config.object("object.txt");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();

    let content = "hello";
    assert_delete_err(&object_client, &object).await;
    assert_upload_bytes(&object_client, &object, content).await;

    let partial_object = object_client.get(&object, "name,selfLink").await.unwrap();

    assert!(partial_object.name.unwrap().ends_with("object.txt"));
    let self_link = partial_object.self_link.unwrap();
    assert!(self_link.ends_with("%2Fobject.txt") || self_link.ends_with("%5Cobject.txt"));
    assert_eq!(None, partial_object.crc32c);
    assert_delete_ok(&object_client, &object).await;
}

#[tokio::test]
async fn test_get_object_size() {
    let test_config = GcsTestConfig::from_env().await;
    let object = test_config.object("object.txt");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();

    let content = "hello";
    assert_delete_err(&object_client, &object).await;
    assert_upload_bytes(&object_client, &object, content).await;

    let partial_object = object_client.get(&object, "size").await.unwrap();

    assert_eq!(5, partial_object.size.unwrap());
    assert_delete_ok(&object_client, &object).await;
}

#[tokio::test]
async fn test_get_object_not_found() {
    let test_config = GcsTestConfig::from_env().await;
    let object = test_config.object("object.txt");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();

    let err = object_client
        .get(&object, "name,selfLink")
        .await
        .unwrap_err();

    assert_not_found_response(err);
}

#[tokio::test]
async fn test_upload_with_detailed_error() {
    let test_config = GcsTestConfig::from_env().await;
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();
    let object = Object::new("the_bad_bucket", "name").unwrap();

    let err = upload_bytes(&object_client, &object, "").await.unwrap_err();

    assert_unexpected_response(err, r#""code": 403"#);
}

#[tokio::test]
async fn test_api_list_objects() {
    let test_config = GcsTestConfig::from_env().await;

    let count = 11;
    let prefix = test_config.list_prefix();
    let bucket = test_config.bucket();
    let test_objects = (0..count + 2)
        .map(|i| test_config.object(format!("object_{}", i).as_str()))
        .collect::<Vec<_>>();

    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();
    futures::stream::iter(test_objects.iter())
        .for_each_concurrent(config::default::CONCURRENCY_LEVEL, |object| {
            assert_upload_bytes(&object_client, object, "hello")
        })
        .await;

    let objects_list_request = ObjectsListRequest {
        prefix: Some(prefix),
        fields: Some("items(selfLink,name),nextPageToken".to_owned()),
        max_results: Some(2),
        ..Default::default()
    };

    let result: Vec<PartialObject> = object_client
        .list(bucket.as_str(), &objects_list_request)
        .await
        .take(count)
        .try_collect()
        .await
        .unwrap();

    assert_eq!(count, result.len());

    futures::stream::iter(test_objects.iter())
        .for_each_concurrent(config::default::CONCURRENCY_LEVEL, |object| {
            assert_delete_ok(&object_client, object)
        })
        .await;
}

#[tokio::test]
async fn test_crc32c_object() {
    let test_config = GcsTestConfig::from_env().await;
    let bucket = test_config.bucket();
    let prefix = test_config.list_prefix();
    let test_object = &test_config.object("test_crc32c");
    let object_client = ObjectClient::new(Box::new(test_config.token()))
        .await
        .unwrap();
    assert_upload_bytes(&object_client, test_object, "hello world!").await;

    let objects_list_request = ObjectsListRequest {
        prefix: Some(prefix),
        fields: Some("items(name,crc32c),nextPageToken".to_owned()),
        max_results: Some(2),
        ..Default::default()
    };

    let mut result: Vec<PartialObject> = object_client
        .list(bucket.as_str(), &objects_list_request)
        .await
        .try_collect()
        .await
        .unwrap();

    assert_eq!(1, result.len());
    let crc32c = result.pop().unwrap().crc32c.unwrap_or_default().to_u32();
    assert_eq!(1238062967, crc32c);
    assert_delete_ok(&object_client, test_object).await;
}

fn assert_unexpected_response(err: gcs_rsync::storage::Error, content: &str) {
    match err {
        gcs_rsync::storage::Error::GcsUnexpectedResponse { value: actual, .. } => {
            assert!(
                actual.contains(content),
                "{:?} not found in json {}",
                content,
                actual
            );
        }
        e => panic!("expected UnexpectedApiResponse error got {:?}", e),
    }
}

fn assert_not_found_response(err: gcs_rsync::storage::Error) {
    match err {
        gcs_rsync::storage::Error::GcsResourceNotFound { .. } => (),
        e => panic!("expected GcsResourceNotFound error got {:?}", e),
    }
}

#[tokio::test]
async fn test_api_list_objects_not_found_error() {
    let path = env!("TEST_SERVICE_ACCOUNT");
    let sac = ServiceAccountCredentials::from_file(path)
        .await
        .unwrap()
        .with_scope("https://www.googleapis.com/auth/devstorage.full_control");

    let object_client = ObjectClient::new(Box::new(sac)).await.unwrap();

    let objects_list_request = ObjectsListRequest::default();

    let err = object_client
        .list("my_very_bad_bucket", &objects_list_request)
        .await
        .take(1)
        .try_collect::<Vec<_>>()
        .await
        .unwrap_err();

    assert_not_found_response(err);
}