#![cfg(e2e_test)]
#[cfg(test)]
mod common;
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use std::convert::TryFrom;
use aws_sdk_s3::types::{ServerSideEncryption, StorageClass};
use common::*;
use s3sync::config::Config;
use s3sync::config::args::parse_from_args;
use s3sync::pipeline::Pipeline;
use s3sync::types::token::create_pipeline_cancellation_token;
use uuid::Uuid;
use super::*;
#[tokio::test]
async fn local_to_s3_without_prefix() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case1_{}", Uuid::new_v4());
TestHelper::copy_dir_all("./test_data/e2e_test/case1/", &test_dir);
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
for object in object_list {
assert!(TestHelper::verify_object_md5_digest(
object.key().unwrap(),
object.e_tag().unwrap()
));
}
}
{
let touch_path = format!("{}/data1", test_dir);
TestHelper::touch_file(&touch_path, TOUCH_FILE_SECS_FROM_NOW);
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_without_prefix_no_parallel_listing() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case1_{}", Uuid::new_v4());
TestHelper::copy_dir_all("./test_data/e2e_test/case1/", &test_dir);
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--max-parallel-listings",
"1",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
for object in object_list {
assert!(TestHelper::verify_object_md5_digest(
object.key().unwrap(),
object.e_tag().unwrap()
));
}
}
{
let touch_path = format!("{}/data1", test_dir);
TestHelper::touch_file(&touch_path, TOUCH_FILE_SECS_FROM_NOW);
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--max-parallel-listings",
"1",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_with_prefix() {
TestHelper::init_dummy_tracing_subscriber();
const TEST_PREFIX: &str = "mydir";
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}/{}/", bucket, TEST_PREFIX);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
for object in object_list {
let key_set = HashSet::from([
format!("{}/dir2/data2", TEST_PREFIX),
format!("{}/dir5/data3", TEST_PREFIX),
format!("{}/data1", TEST_PREFIX),
format!("{}/dir21/data1", TEST_PREFIX),
format!("{}/dir1/data1", TEST_PREFIX),
]);
assert!(key_set.contains(object.key.as_ref().unwrap()))
}
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_delete() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"./test_data/e2e_test/case2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
assert_eq!(object_list.len(), 4);
assert!(!helper.is_object_exist(&bucket, "data1", None).await);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_delete_if_match() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"--if-match",
"./test_data/e2e_test/case2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.sync_skip, 4);
assert_eq!(stats.sync_delete, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert!(!helper.is_object_exist(&bucket, "data1", None).await);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_if_none_match() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--if-none-match",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.sync_skip, 0);
assert_eq!(stats.sync_delete, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 10);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_max_delete() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"--max-delete",
"1",
"./test_data/e2e_test/case2_1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert!(pipeline.has_warning());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert!(stats.sync_warning > 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_delete_excluded() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--filter-exclude-regex",
"dir21/.+",
"--delete",
"--delete-excluded",
"./test_data/e2e_test/case2_1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
assert_eq!(object_list.len(), 3);
assert!(!helper.is_object_exist(&bucket, "dir21/data1", None).await);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_delete_without_excluded() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--filter-exclude-regex",
"dir21/.+",
"--delete",
"./test_data/e2e_test/case2_1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
assert_eq!(object_list.len(), 4);
assert!(helper.is_object_exist(&bucket, "dir21/data1", None).await);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_delete_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let target_bucket_url = format!("s3://{}", bucket);
{
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"--dry-run",
"./test_data/e2e_test/case2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
assert_eq!(object_list.len(), 5);
assert!(helper.is_object_exist(&bucket, "data1", None).await);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--dry-run",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
assert_eq!(object_list.len(), 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_all_skip() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_large_test_data(&target_bucket_url).await;
assert!(
helper
.verify_e_tag(&bucket, "large_file", None, LARGE_FILE_S3_ETAG)
.await
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_storage_class() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
"--storage-class",
"REDUCED_REDUNDANCY",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "data1", None).await;
assert_eq!(
*head_object_output.storage_class().unwrap(),
StorageClass::ReducedRedundancy
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_storage_class_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
LARGE_FILE_DIR,
"--storage-class",
"STANDARD_IA",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "large_file", None).await;
assert_eq!(
*head_object_output.storage_class().unwrap(),
StorageClass::StandardIa
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_not_found_error_warn() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_not_found_test_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--allow-e2e-test-dangerous-simulation",
NOT_FOUND_TEST_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
TestHelper::enable_not_found_dangerous_simulation();
pipeline.run().await;
TestHelper::disable_not_found_dangerous_simulation();
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
1
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_not_found_error() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_not_found_test_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--warn-as-error",
"--allow-e2e-test-dangerous-simulation",
NOT_FOUND_TEST_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
TestHelper::enable_not_found_dangerous_simulation();
pipeline.run().await;
TestHelper::disable_not_found_dangerous_simulation();
assert!(pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_guess_mime_type() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "img.png", None).await;
assert_eq!(head_object_output.content_type().unwrap(), "image/png");
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_without_guess_mime_type() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--no-guess-mime-type",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "img.png", None).await;
assert_eq!(
head_object_output.content_type().unwrap(),
"application/octet-stream"
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sse_kms() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--sse",
"aws:kms",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "img.png", None).await;
assert_eq!(
head_object_output.server_side_encryption().unwrap(),
&ServerSideEncryption::AwsKms
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sse_kms_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
LARGE_FILE_DIR,
"--sse",
"aws:kms",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "large_file", None).await;
assert_eq!(
head_object_output.server_side_encryption().unwrap(),
&ServerSideEncryption::AwsKms
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_dsse_kms() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--sse",
"aws:kms:dsse",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "img.png", None).await;
assert_eq!(
head_object_output.server_side_encryption().unwrap(),
&ServerSideEncryption::AwsKmsDsse
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_dsse_kms_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
LARGE_FILE_DIR,
"--sse",
"aws:kms:dsse",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let head_object_output = helper.head_object(&bucket, "large_file", None).await;
assert_eq!(
head_object_output.server_side_encryption().unwrap(),
&ServerSideEncryption::AwsKmsDsse
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_acl() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--acl",
"bucket-owner-read",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_acl_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--acl",
"bucket-owner-read",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn local_to_s3_with_rate_limit() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case1_{}", Uuid::new_v4());
TestHelper::copy_dir_all("./test_data/e2e_test/case1/", &test_dir);
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let object_list = helper.list_objects(&bucket, "").await;
for object in object_list {
assert!(TestHelper::verify_object_md5_digest(
object.key().unwrap(),
object.e_tag().unwrap()
));
}
}
{
let touch_path = format!("{}/data1", test_dir);
TestHelper::touch_file(&touch_path, TOUCH_FILE_SECS_FROM_NOW);
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
&test_dir,
"--rate-limit-objects",
"300",
"--rate-limit-bandwidth",
"100MiB",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case3/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum_disable_verify() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--disable-additional-checksum-verify",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum_crc32_full_object_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC32",
"--full-object-checksum",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum_crc32c_full_object_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC32C",
"--full-object-checksum",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_additional_checksum_crc64nvme_full_object_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_8mib_composite_checksum_test() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_8mib_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
TEST_8MIB_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_8mib_crc32_full_object_checksum_test() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_8mib_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--additional-checksum-algorithm",
"CRC32",
TEST_8MIB_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_8mib_crc32c_full_object_checksum_test() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_8mib_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--additional-checksum-algorithm",
"CRC32C",
TEST_8MIB_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_8mib_crc64nvme_full_object_checksum_test() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_8mib_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
TEST_8MIB_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_all_metadata_option() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_test_data_with_all_metadata_option(&target_bucket_url)
.await;
helper
.verify_test_object_metadata(&bucket, "dir1/data1", None)
.await;
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_all_metadata_option_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_all_metadata_option(&target_bucket_url)
.await;
helper
.verify_test_object_metadata(&bucket, LARGE_FILE_KEY, None)
.await;
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sse_c() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sse_c_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha1() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32_c() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc64_nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_last_modified_metadata() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--put-last-modified-metadata",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper.get_object(&bucket, "data1", None).await;
assert!(
object
.metadata
.unwrap()
.contains_key("s3sync_origin_last_modified")
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_cancel() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
cancellation_token.cancel();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_multiple_filters() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--filter-smaller-size",
"30",
"--filter-larger-size",
"10",
"./test_data/e2e_test/case5/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_e_tag_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-etag",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-etag",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-etag",
"--head-each-target",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_if_match() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case1_2_{}", Uuid::new_v4());
TestHelper::copy_dir_all("./test_data/e2e_test/case1_2/", &test_dir);
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let touch_path = format!("{}/data1", test_dir);
TestHelper::touch_file(&touch_path, TOUCH_FILE_SECS_FROM_NOW);
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--if-match",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.sync_skip, 4);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.sync_skip, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let touch_path = format!("{}/data1", test_dir);
TestHelper::touch_file(&touch_path, TOUCH_FILE_SECS_FROM_NOW);
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--delete",
"--if-match",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.sync_skip, 4);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_with_e_tag_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--check-etag",
"--auto-chunksize",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"--multipart-threshold",
"5MiB",
"--multipart-chunksize",
"5243236",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--check-etag",
"--auto-chunksize",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--check-etag",
"--auto-chunksize",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_e_tag_check_warn() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--head-each-target",
"--check-etag",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
5
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha256_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha1_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32_full_object_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32c_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32c_full_object_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc64nvme_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha256_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha256_check_if_match() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--if-match",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_sha1_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha1(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA1",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"SHA1",
"--additional-checksum-algorithm",
"SHA1",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32_full_object_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32_full_object_checksum(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"CRC32",
"--additional-checksum-algorithm",
"CRC32",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32c_full_object_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32_full_object_checksum(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--full-object-checksum",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc32c_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32c(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"CRC32C",
"--additional-checksum-algorithm",
"CRC32C",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_crc64nvme_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc64nvme(&target_bucket_url)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC64NVME",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--head-each-target",
"--check-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_checksum_check_kms() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_sha256(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_checksum_check_sse_c() {
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_checksum_check_warn() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
5
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_disable_payload_signing() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--disable-payload-signing",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_disable_payload_signing_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--disable-payload-signing",
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_mtime_etag_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case3_{}", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-chunksize",
"5MiB",
"--check-mtime-and-etag",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::update_large_file_mtime_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--check-mtime-and-etag",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::modify_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--check-mtime-and-etag",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_with_mtime_checksum_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let test_dir = format!("./playground/case3_{}", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-chunksize",
"5MiB",
"--check-mtime-and-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::update_large_file_mtime_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-mtime-and-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::modify_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-mtime-and-additional-checksum",
"CRC64NVME",
"--additional-checksum-algorithm",
"CRC64NVME",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn local_to_s3_with_website_redirect() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--website-redirect",
"/redirect",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper.get_object(&bucket, "data1", None).await;
assert_eq!(
object.website_redirect_location,
Some("/redirect".to_string())
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_single_file() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_single_file_check_etag() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-etag",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_single_file_check_additional_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_single_file_check_size() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-size",
"./test_data/e2e_test/case1/data1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_single_file_no_dir() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"README.md",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_without_slash() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_with_multipart_upload_website_redirect() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_website_redirect(&target_bucket_url, "/redirect")
.await;
let object = helper.head_object(&bucket, "large_file", None).await;
assert_eq!(
object.website_redirect_location,
Some("/redirect".to_string())
);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn local_to_s3_content_type_filtering_no_content_type() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--no-guess-mime-type",
"--filter-include-content-type-regex",
"application/.+",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--no-guess-mime-type",
"--filter-exclude-content-type-regex",
"video/.+",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
}