#![cfg(e2e_test)]
#[cfg(test)]
mod common;
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use common::*;
use s3sync::config::Config;
use s3sync::config::args::parse_from_args;
use s3sync::pipeline::Pipeline;
use s3sync::types::token::create_pipeline_cancellation_token;
use uuid::Uuid;
use super::*;
#[tokio::test]
async fn s3_to_local_without_prefix() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
{
tokio::time::sleep(std::time::Duration::from_secs(SLEEP_SECS_BEFORE_RESYNC)).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_without_slash() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
download_dir.trim_end_matches('/'),
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_without_prefix_no_parallel_listing() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--max-parallel-listings",
"1",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
{
tokio::time::sleep(std::time::Duration::from_secs(SLEEP_SECS_BEFORE_RESYNC)).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--max-parallel-listings",
"1",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_prefix() {
TestHelper::init_dummy_tracing_subscriber();
const TEST_PREFIX: &str = "dir2";
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}/{}", bucket, TEST_PREFIX);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 2);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_same_prefix() {
TestHelper::init_dummy_tracing_subscriber();
const TEST_PREFIX: &str = "dir1/data1";
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}/{}", bucket, TEST_PREFIX);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_delete() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
helper.delete_object(&bucket, "data1", None).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--delete",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 4);
assert!(!TestHelper::is_file_exist(&format!(
"{}/data1",
&download_dir
)));
}
TestHelper::delete_all_files(&download_dir);
helper.delete_all_objects(&bucket).await;
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_delete_excluded() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
helper.delete_object(&bucket, "data1", None).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--filter-exclude-regex",
"data1",
"--delete",
"--delete-excluded",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 4);
assert!(!TestHelper::is_file_exist(&format!(
"{}/data1",
&download_dir
)));
}
TestHelper::delete_all_files(&download_dir);
helper.delete_all_objects(&bucket).await;
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_delete_no_excluded() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
helper.delete_object(&bucket, "data1", None).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--filter-exclude-regex",
"data1",
"--delete",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
assert!(TestHelper::is_file_exist(&format!(
"{}/data1",
&download_dir
)));
}
TestHelper::delete_all_files(&download_dir);
helper.delete_all_objects(&bucket).await;
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_delete_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
helper.delete_object(&bucket, "data1", None).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--delete",
"--dry-run",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
assert!(TestHelper::is_file_exist(&format!(
"{}/data1",
&download_dir
)));
}
TestHelper::delete_all_files(&download_dir);
helper.delete_all_objects(&bucket).await;
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--dry-run",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_skip_all() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
assert_eq!(dir_entry_list.len(), 5);
}
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_empty_directory() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper.put_empty_object(&bucket, "dir1/dir2/").await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert!(TestHelper::is_file_exist(&format!(
"{}{}",
&download_dir, "dir1/dir2/"
)));
}
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert!(TestHelper::is_file_exist(&format!(
"{}{}",
&download_dir, "dir1/dir2/"
)));
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_empty_directory_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper.put_empty_object(&bucket, "dir1/dir2/").await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--dry-run",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert!(!TestHelper::is_file_exist(&format!(
"{}{}",
&download_dir, "dir1/dir2/"
)));
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn s3_to_local_with_rate_limit() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
{
tokio::time::sleep(std::time::Duration::from_secs(SLEEP_SECS_BEFORE_RESYNC)).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--rate-limit-objects",
"300",
"--rate-limit-bandwidth",
"100MiB",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_sha256(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
{
tokio::time::sleep(std::time::Duration::from_secs(SLEEP_SECS_BEFORE_RESYNC)).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_dry_run() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--dry-run",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_sha1() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha1(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_disable_multipart_verify() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha1(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--disable-multipart-verify",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_crc32() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_crc32_full_object() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32_full_object_checksum(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_crc32c_full_object() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32c_full_object_checksum(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_crc32c() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32c(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_crc64nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc64nvme(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_custom_chunksize(&target_bucket_url, "7340033")
.await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_with_8mib_composite_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_8mib_test_data_with_sha256(&target_bucket_url)
.await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_with_8mib_crc32_full_object_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_8mib_test_data_with_full_object_crc32(&target_bucket_url)
.await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_with_8mib_crc32c_full_object_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_8mib_test_data_with_full_object_crc32c(&target_bucket_url)
.await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_with_8mib_crc64nvme_full_object_checksum() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_8mib_test_data_with_full_object_crc64nvme(&target_bucket_url)
.await;
}
let source_bucket_url = format!("s3://{}", bucket);
{
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
}
#[tokio::test]
async fn s3_to_local_with_sse_c() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--source-sse-c",
"AES256",
"--source-sse-c-key",
TEST_SSE_C_KEY_1,
"--source-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_sse_c_multipart_upload() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
TestHelper::create_large_file();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
LARGE_FILE_DIR,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--source-sse-c",
"AES256",
"--source-sse-c-key",
TEST_SSE_C_KEY_1,
"--source-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_empty_data_checksum_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_empty_data_with_sha256(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_e_tag_warning() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_custom_chunksize(&target_bucket_url, "5MiB")
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
1
);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_max_keys() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--max-keys",
"2",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let dir_entry_list = TestHelper::list_all_files(&download_dir);
for entry in dir_entry_list {
let path = entry.path().to_string_lossy().replace(&download_dir, "");
assert!(TestHelper::verify_file_md5_digest(
&format!("./test_data/e2e_test/case1/{}", &path),
&TestHelper::md5_digest(&entry.path().to_string_lossy()),
));
}
assert_eq!(
helper
.get_object_last_modified(&bucket, "data1", None)
.await,
TestHelper::get_file_last_modified(&format!("{}/data1", &download_dir))
);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_multipart_upload_checksum_max_keys() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--max-keys",
"1",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_directory_traversal_error() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper
.put_object_with_metadata(
&bucket,
"data1/../data2",
"./test_data/e2e_test/case1/data1",
)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_directory_traversal_warn_as_error() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper
.put_object_with_metadata(
&bucket,
"data1/../data2",
"./test_data/e2e_test/case1/data1",
)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--warn-as-error",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_incompatible_object() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper
.put_object_with_metadata(&bucket, "data1/", "./test_data/e2e_test/case1/data1")
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_access_denied() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
}
{
helper.put_bucket_policy_deny_get_object(&bucket).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_warn_as_error() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
helper
.put_object_with_metadata(&bucket, "data1", "./test_data/e2e_test/case1/data1")
.await;
}
{
helper.put_bucket_policy_deny_get_object(&bucket).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
"--warn-as-error",
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(pipeline.has_error());
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_etag_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--head-each-target",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_etag_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_large_test_data(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
helper
.sync_large_test_data_with_custom_chunksize(&target_bucket_url, "6000000")
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
LARGE_FILE_DIR_CASE2,
"--multipart-chunksize",
"7MiB",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_etag_check_warn() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--head-each-target",
"--check-etag",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--head-each-target",
"--check-etag",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
5
);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_sha256_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_sha256(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_sha256_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha256(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
"--multipart-chunksize",
"7MiB",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_sha1_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_sha1(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA1",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_sha1_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_sha1(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA1",
"--multipart-chunksize",
"7MiB",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA1",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc32_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_crc32(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc32_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32",
"--multipart-chunksize",
"7MiB",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc32c_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper.sync_test_data_with_crc32c(&target_bucket_url).await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc64nvme_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_test_data_with_crc64nvme(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC64NVME",
"./test_data/e2e_test/case1_2/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc32c_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc32c(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC32C",
"--multipart-chunksize",
"7MiB",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC32C",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_crc64nvme_check_auto_chunksize() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
helper
.sync_large_test_data_with_crc64nvme(&target_bucket_url)
.await;
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::create_large_file_case2();
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"CRC64NVME",
"--multipart-chunksize",
"7MiB",
LARGE_FILE_DIR_CASE2,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 1);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 1);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_checksum_check_kms() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_checksum_check_dsse_kms() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--sse",
"aws:kms:dsse",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_checksum_check_sse_c() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper
.create_bucket_with_sse_c_encryption(&bucket, REGION)
.await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
"--target-sse-c",
"AES256",
"--target-sse-c-key",
TEST_SSE_C_KEY_1,
"--target-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
"--source-sse-c",
"AES256",
"--source-sse-c-key",
TEST_SSE_C_KEY_1,
"--source-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
"--source-sse-c",
"AES256",
"--source-sse-c-key",
TEST_SSE_C_KEY_1,
"--source-sse-c-key-md5",
TEST_SSE_C_KEY_1_MD5,
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(TestHelper::get_skip_count(pipeline.get_stats_receiver()), 5);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_with_checksum_check_warn() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
{
helper.create_bucket(&bucket, REGION).await;
}
{
let target_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"./test_data/e2e_test/case1/",
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 5);
assert_eq!(stats.e_tag_verified, 5);
assert_eq!(stats.checksum_verified, 5);
assert_eq!(stats.sync_warning, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
assert_eq!(
TestHelper::get_warning_count(pipeline.get_stats_receiver()),
5
);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
}
#[tokio::test]
async fn s3_to_local_mtime_etag_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
let test_dir = format!("./playground/case3_{}", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-chunksize",
"5MiB",
"--check-mtime-and-etag",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-mtime-and-etag",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::update_large_file_mtime_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--check-mtime-and-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::modify_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--check-mtime-and-etag",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--check-mtime-and-etag",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
let _ = std::fs::remove_dir_all(&test_dir);
}
#[tokio::test]
async fn s3_to_local_mtime_checksum_check() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
let test_dir = format!("./playground/case3_{}", Uuid::new_v4());
{
let target_bucket_url = format!("s3://{}", bucket);
helper.create_bucket(&bucket, REGION).await;
TestHelper::create_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-chunksize",
"5MiB",
"--check-mtime-and-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--check-mtime-and-additional-checksum",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::update_large_file_mtime_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--check-mtime-and-additional-checksum",
"SHA256",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 0);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 1);
}
{
let target_bucket_url = format!("s3://{}", bucket);
TestHelper::modify_large_file_in(&test_dir);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--check-mtime-and-additional-checksum",
"SHA256",
"--additional-checksum-algorithm",
"SHA256",
&test_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
{
let source_bucket_url = format!("s3://{}", bucket);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--check-mtime-and-additional-checksum",
"SHA256",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
assert_eq!(stats.sync_skip, 0);
}
helper.delete_bucket_with_cascade(&bucket).await;
let _ = std::fs::remove_dir_all(&download_dir);
let _ = std::fs::remove_dir_all(&test_dir);
}
}