#![cfg(e2e_test)]
#[cfg(test)]
mod common;
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use common::*;
use s3sync::config::Config;
use s3sync::config::args::parse_from_args;
use s3sync::pipeline::Pipeline;
use s3sync::types::token::create_pipeline_cancellation_token;
const SHA256_30M_FILE_8M_CHUNK: &str = "5NrHBc0Z1wNCbADRDy8mJaIvc53oxncCrw/Fa48VhxY=-4";
const SHA256_30M_FILE_NO_CHUNK: &str = "BcHHcdSIbkzv3wpMC5B5E/4vgp3XZ0GMlOoniwuLw/k=";
const CRC64NVME_30M_FILE_8M_CHUNK: &str = "rrk4q4lsMS4=";
const CRC64NVME_30M_FILE_NO_CHUNK: &str = "rrk4q4lsMS4=";
const ETAG_30M_FILE_8M_CHUNK: &str = "\"a81230a7666d413e511f9c2c2523947a-4\"";
const ETAG_30M_FILE_NO_CHUNK: &str = "\"94189ebb786dbc25aaf22d3d96e88aeb\"";
const SHA256_30M_FILE_WHOLE_HEX: &str =
"05c1c771d4886e4cefdf0a4c0b907913fe2f829dd767418c94ea278b0b8bc3f9";
const SHA256_1M_FILE: &str = "nHjjstwawGidD0Sn7WX1sNMcsjAoK7gP8bUyhDOlnRU=";
const CRC64NVME_1M_FILE: &str = "IWamHyhf59M=";
const ETAG_1M_FILE: &str = "\"1ebdcfc23acf32f84f462e721e9db32c\"";
const SHA256_1M_WHOLE: &str =
"9c78e3b2dc1ac0689d0f44a7ed65f5b0d31cb230282bb80ff1b5328433a59d15";
const SHA256_8M_FILE_8M_CHUNK: &str = "U+ZIEj2OXjCTTOp7PlJy43aKT7mL2X5NzgfjpYzeozw=-1";
const CRC64NVME_8M_FILE_8M_CHUNK: &str = "io2hnVvxKgU=";
const ETAG_8M_FILE_8M_CHUNK: &str = "\"13698b45ee34dbf0611fe527f76abfc7-1\"";
const ETAG_8M_FILE_5M_CHUNK: &str = "\"ebff86fc334a63cefaad7a0b621a0109-2\"";
const SHA256_8M_FILE_NO_CHUNK: &str = "zV9Xxv/j9oUQSrpuxyaLqrh5BgMDS97IMCKLVy2ExaQ=";
const CRC64NVME_8M_FILE_NO_CHUNK: &str = "io2hnVvxKgU=";
const ETAG_8M_FILE_NO_CHUNK: &str = "\"e9d3e2caa0ac28fd50b183dac706ee29\"";
const SHA256_8M_FILE_5M_CHUNK: &str = "EZAvWUpvGrpch+0S5qFJhcwxd6bw9HtocRRVc/FAwQA=-2";
const SHA256_8M_FILE_WHOLE: &str =
"cd5f57c6ffe3f685104aba6ec7268baab8790603034bdec830228b572d84c5a4";
use uuid::Uuid;
use super::*;
#[tokio::test]
async fn test_multipart_upload_30mb() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 30, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 1);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 1);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--remove-modified-filter",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 1);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_30M_FILE_WHOLE_HEX);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_30M_FILE_WHOLE_HEX);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_30mb_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 30, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_30M_FILE_NO_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 2);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_30M_FILE_8M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 2);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_30M_FILE_8M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_30M_FILE_NO_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--remove-modified-filter",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_30M_FILE_NO_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 1);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_30M_FILE_WHOLE_HEX);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_30M_FILE_WHOLE_HEX);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_30mb_crc64nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 30, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"CRC64NVME",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_30M_FILE_NO_CHUNK
);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 1);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_30M_FILE_8M_CHUNK
);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 1);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_8M_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_30M_FILE_8M_CHUNK
);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--remove-modified-filter",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_30M_FILE_8M_CHUNK
);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--remove-modified-filter",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_30M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_30M_FILE_8M_CHUNK
);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 0);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 1);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_30M_FILE_WHOLE_HEX);
}
helper.delete_all_objects(&bucket1).await;
helper.delete_all_objects(&bucket2).await;
}
#[tokio::test]
async fn test_upload_1m() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 1, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_1M_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_upload_1m_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 1, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_1M_FILE);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_1M_FILE);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_1M_FILE);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_1M_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_upload_1m_crc64nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 1, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"CRC64NVME",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_crc64_nvme.unwrap(), CRC64NVME_1M_FILE);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_crc64_nvme.unwrap(), CRC64NVME_1M_FILE);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_1M_FILE);
assert_eq!(object.checksum_crc64_nvme.unwrap(), CRC64NVME_1M_FILE);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_1M_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_crc64nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_8M_CHUNK
);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_8M_CHUNK
);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_8M_CHUNK
);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_no_chunk() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 0);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_no_chunk_sha256() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_NO_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_NO_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_NO_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_no_chunk_crc64nvme() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"50MiB",
"--multipart-chunksize",
"50MiB",
"--additional-checksum-algorithm",
"CRC64NVME",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_NO_CHUNK
);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_NO_CHUNK
);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--auto-chunksize",
"--additional-checksum-algorithm",
"CRC64NVME",
"--enable-additional-checksum",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(
object.checksum_crc64_nvme.unwrap(),
CRC64NVME_8M_FILE_NO_CHUNK
);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--auto-chunksize",
"--enable-additional-checksum",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_7mb_threshold_5mb_chunk() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"7MiB",
"--multipart-chunksize",
"5MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_5M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_5M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_5M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_5M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_5M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_5M_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_7mb_threshold_9mb_chunk() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"7MiB",
"--multipart-chunksize",
"9MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
helper.delete_all_objects(&bucket2).await;
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--server-side-copy",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_8M_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_8M_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--auto-chunksize",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
#[tokio::test]
async fn test_multipart_upload_8mb_9mb_threshold_5mb_chunk() {
TestHelper::init_dummy_tracing_subscriber();
let helper = TestHelper::new().await;
let bucket1 = TestHelper::generate_bucket_name();
let bucket2 = TestHelper::generate_bucket_name();
let download_dir = format!("./playground/download_{}/", Uuid::new_v4());
helper.create_bucket(&bucket1, REGION).await;
helper.create_bucket(&bucket2, REGION).await;
let random_data_dir = format!("./playground/random_data_{}/", Uuid::new_v4());
TestHelper::create_random_test_data_file_in(&random_data_dir, 8, 0).unwrap();
{
let target_bucket_url = format!("s3://{}", bucket1);
let args = vec![
"s3sync",
"--target-profile",
"s3sync-e2e-test",
"--multipart-threshold",
"9MiB",
"--multipart-chunksize",
"5MiB",
"--additional-checksum-algorithm",
"SHA256",
&random_data_dir,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket1, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_NO_CHUNK);
}
{
let source_bucket_url = format!("s3://{}", bucket1);
let target_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--target-profile",
"s3sync-e2e-test",
"--additional-checksum-algorithm",
"SHA256",
"--enable-additional-checksum",
"--multipart-threshold",
"9MiB",
"--multipart-chunksize",
"5MiB",
&source_bucket_url,
&target_bucket_url,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let object = helper
.head_object(&bucket2, TEST_RANDOM_DATA_FILE_KEY, None)
.await;
assert_eq!(object.e_tag.unwrap(), ETAG_8M_FILE_NO_CHUNK);
assert_eq!(object.checksum_sha256.unwrap(), SHA256_8M_FILE_NO_CHUNK);
}
{
TestHelper::delete_all_files(&download_dir);
let source_bucket_url = format!("s3://{}", bucket2);
let args = vec![
"s3sync",
"--source-profile",
"s3sync-e2e-test",
"--enable-additional-checksum",
"--multipart-threshold",
"9MiB",
"--multipart-chunksize",
"5MiB",
&source_bucket_url,
&download_dir,
];
let config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token).await;
pipeline.run().await;
assert!(!pipeline.has_error());
let stats = TestHelper::get_stats_count(pipeline.get_stats_receiver());
assert_eq!(stats.sync_complete, 1);
assert_eq!(stats.e_tag_verified, 1);
assert_eq!(stats.checksum_verified, 1);
assert_eq!(stats.sync_warning, 0);
let sha256_hash = TestHelper::get_sha256_from_file(&format!(
"{}/{}",
&download_dir, TEST_RANDOM_DATA_FILE_KEY
));
assert_eq!(sha256_hash, SHA256_8M_FILE_WHOLE);
}
helper.delete_bucket_with_cascade(&bucket1).await;
helper.delete_bucket_with_cascade(&bucket2).await;
}
}