use async_trait::async_trait;
use aws_sdk_s3::operation::get_object::GetObjectOutput;
use std::collections::HashMap;
use s3sync::config::Config;
use s3sync::config::args::parse_from_args;
use s3sync::pipeline::Pipeline;
use s3sync::types::event_callback::{EventCallback, EventData, EventType};
use s3sync::types::filter_callback::FilterCallback;
use s3sync::types::preprocess_callback::{PreprocessCallback, PreprocessError, UploadMetadata};
use s3sync::types::token::create_pipeline_cancellation_token;
#[allow(unused_imports)]
use s3sync::types::{S3syncObject, SyncStatistics};
#[tokio::main]
async fn main() {
let args = vec![
"program_name",
"--aws-max-attempts",
"7",
"./test_data/e2e_test/case1",
"s3://XXXXXXX-TEST-BUCKET/",
];
let mut config = Config::try_from(parse_from_args(args).unwrap()).unwrap();
config.event_manager.register_callback(
EventType::SYNC_START
| EventType::SYNC_COMPLETE
| EventType::SYNC_CANCEL
| EventType::PIPELINE_ERROR,
DebugEventCallback {},
config.dry_run,
);
config
.filter_config
.filter_manager
.register_callback(DebugFilterCallback {});
config
.preprocess_manager
.register_callback(DebugPreprocessCallback {});
let cancellation_token = create_pipeline_cancellation_token();
let mut pipeline = Pipeline::new(config.clone(), cancellation_token.clone()).await;
pipeline.close_stats_sender();
let sync_task = pipeline.run();
sync_task.await;
let sync_stats = pipeline.get_sync_stats().await;
println!(
"stats_transferred_byte: {}",
sync_stats.stats_transferred_byte
);
println!(
"stats_transferred_byte_per_sec: {}",
sync_stats.stats_transferred_byte_per_sec
);
println!(
"stats_transferred_object: {}",
sync_stats.stats_transferred_object
);
println!(
"stats_transferred_object_per_sec: {}",
sync_stats.stats_transferred_object_per_sec
);
println!("stats_etag_verified: {}", sync_stats.stats_etag_verified);
println!("stats_etag_mismatch: {}", sync_stats.stats_etag_mismatch);
println!(
"stats_checksum_verified: {}",
sync_stats.stats_checksum_verified
);
println!(
"stats_checksum_mismatch: {}",
sync_stats.stats_checksum_mismatch
);
println!("stats_deleted: {}", sync_stats.stats_deleted);
println!("stats_skipped: {}", sync_stats.stats_skipped);
println!("stats_error: {}", sync_stats.stats_error);
println!("stats_warning: {}", sync_stats.stats_warning);
println!("stats_duration_sec: {}", sync_stats.stats_duration_sec);
if pipeline.has_error() {
println!("An error has occurred.\n\n");
println!("{:?}", pipeline.get_errors_and_consume().unwrap()[0]);
}
if pipeline.has_warning() {
println!("A warning has occurred.\n\n");
}
let sync_stats_report_to_be_locked = pipeline.get_sync_stats_report();
let sync_stats_report = sync_stats_report_to_be_locked.lock().unwrap();
if sync_stats_report.number_of_objects != sync_stats_report.etag_matches {
println!("Some objects could not be transferred correctly.");
}
}
pub struct DebugEventCallback;
#[async_trait]
impl EventCallback for DebugEventCallback {
async fn on_event(&mut self, event_data: EventData) {
match event_data.event_type {
EventType::SYNC_START => {
println!("Sync started: {event_data:?}");
}
EventType::SYNC_COMPLETE => {
println!("Sync complete: {event_data:?}");
}
EventType::SYNC_CANCEL => {
println!("Sync cancelled: {event_data:?}");
}
EventType::PIPELINE_ERROR => {
println!("Pipeline error: {event_data:?}");
}
_ => {
println!("Other events: {event_data:?}");
}
}
}
}
pub struct DebugFilterCallback;
#[async_trait]
impl FilterCallback for DebugFilterCallback {
async fn filter(&mut self, source_object: &S3syncObject) -> anyhow::Result<bool> {
Ok(!source_object.key().starts_with("should_be_skipped/"))
}
}
pub struct DebugPreprocessCallback;
#[async_trait]
impl PreprocessCallback for DebugPreprocessCallback {
async fn preprocess_before_upload(
&mut self,
key: &str, source_object: &GetObjectOutput, metadata: &mut UploadMetadata, ) -> anyhow::Result<()> {
if key == "callback_cancel_test" || key == "data1" {
return Err(anyhow::Error::from(PreprocessError::Cancelled));
}
let content_length = source_object.content_length.unwrap().to_string();
if let Some(user_defined_metadata) = metadata.metadata.as_mut() {
user_defined_metadata.insert("mycontent-length".to_string(), content_length);
} else {
let mut user_defined_metadata = HashMap::new();
user_defined_metadata.insert("mycontent-length".to_string(), content_length);
metadata.metadata = Some(user_defined_metadata);
}
Ok(())
}
}