use anyhow::{Context, Result, anyhow};
use async_channel::Sender;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::{debug, warn};
use crate::Config;
use crate::storage::Storage;
use crate::storage::additional_checksum_verify::is_multipart_upload_checksum;
use crate::storage::checksum::AdditionalChecksum;
use crate::storage::e_tag_verify::{generate_e_tag_hash, normalize_e_tag, verify_e_tag};
use crate::transfer::TransferOutcome;
use crate::types::error::S3syncError;
use crate::types::token::PipelineCancellationToken;
use crate::types::{SyncStatistics, detect_additional_checksum, is_full_object_checksum};
pub async fn transfer(
config: &Config,
source: Storage,
source_key: &str,
mut writer: impl tokio::io::AsyncWrite + Unpin + Send,
cancellation_token: PipelineCancellationToken,
stats_sender: Sender<SyncStatistics>,
) -> Result<TransferOutcome> {
if cancellation_token.is_cancelled() {
return Ok(TransferOutcome::default());
}
let get_object_output = source
.get_object(
source_key,
config.version_id.clone(),
config.additional_checksum_mode.clone(),
None,
config.source_sse_c.clone(),
config.source_sse_c_key.clone(),
config.source_sse_c_key_md5.clone(),
)
.await
.context(format!("failed to download source object: {source_key}"))?;
if cancellation_token.is_cancelled() {
return Ok(TransferOutcome::default());
}
let source_size = get_object_output.content_length().unwrap_or(0) as u64;
let source_e_tag = get_object_output.e_tag().map(|e| e.to_string());
let source_sse = get_object_output.server_side_encryption().cloned();
let (detected_algorithm, source_final_checksum) = if config.additional_checksum_mode.is_some() {
detect_additional_checksum(&get_object_output)
.map(|(a, c)| (Some(a), Some(c)))
.unwrap_or((None, None))
} else {
(None, None)
};
let multipart_chunksize = config.transfer_config.multipart_chunksize as usize;
let multipart_threshold = config.transfer_config.multipart_threshold as usize;
let verify_additional_checksum = config.additional_checksum_mode.is_some()
&& !config.disable_additional_checksum_verify
&& detected_algorithm.is_some()
&& source_final_checksum.is_some();
let mut additional_checksum = if verify_additional_checksum {
Some(AdditionalChecksum::new(
detected_algorithm.as_ref().unwrap().clone(),
config.full_object_checksum,
))
} else {
None
};
let checksum_is_multipart = verify_additional_checksum
&& is_multipart_upload_checksum(&source_final_checksum)
&& !is_full_object_checksum(&source_final_checksum);
let mut body = get_object_output.body.into_async_read();
let rate_limit_bandwidth = source.get_rate_limit_bandwidth();
let mut concatnated_md5_hash: Vec<u8> = Vec::new();
let mut parts_count: i64 = 0;
let mut chunk_buffer: Vec<u8> = Vec::new();
let mut total_bytes = 0u64;
let mut checksum_chunk_buffer: Vec<u8> = Vec::new();
let mut buf = vec![0u8; 64 * 1024]; loop {
if cancellation_token.is_cancelled() {
break;
}
let n = body
.read(&mut buf)
.await
.context("s3_to_stdio: failed to read body")?;
if n == 0 {
break;
}
if let Some(limiter) = &rate_limit_bandwidth {
limiter.acquire(n).await;
}
writer
.write_all(&buf[..n])
.await
.context("s3_to_stdio: failed to write to stdout")?;
total_bytes += n as u64;
let _ = stats_sender.send(SyncStatistics::SyncBytes(n as u64)).await;
if !config.disable_etag_verify {
chunk_buffer.extend_from_slice(&buf[..n]);
while chunk_buffer.len() >= multipart_chunksize && total_bytes < source_size {
let md5_digest = md5::compute(&chunk_buffer[..multipart_chunksize]);
concatnated_md5_hash.extend_from_slice(md5_digest.as_slice());
parts_count += 1;
chunk_buffer = chunk_buffer[multipart_chunksize..].to_vec();
}
}
if let Some(ref mut checksum) = additional_checksum {
if checksum_is_multipart {
checksum_chunk_buffer.extend_from_slice(&buf[..n]);
while checksum_chunk_buffer.len() >= multipart_chunksize
&& total_bytes < source_size
{
checksum.update(&checksum_chunk_buffer[..multipart_chunksize]);
checksum.finalize(); checksum_chunk_buffer = checksum_chunk_buffer[multipart_chunksize..].to_vec();
}
} else {
checksum_chunk_buffer.extend_from_slice(&buf[..n]);
}
}
}
if !config.disable_etag_verify && !chunk_buffer.is_empty() {
let md5_digest = md5::compute(&chunk_buffer);
concatnated_md5_hash.extend_from_slice(md5_digest.as_slice());
parts_count += 1;
}
if let Some(ref mut checksum) = additional_checksum {
if !checksum_chunk_buffer.is_empty() {
checksum.update(&checksum_chunk_buffer);
if checksum_is_multipart {
checksum.finalize(); }
}
}
writer
.flush()
.await
.context("s3_to_stdio: failed to flush stdout")?;
if cancellation_token.is_cancelled() {
return Err(anyhow!(S3syncError::Cancelled));
}
if !config.disable_etag_verify && !source.is_express_onezone_storage() {
let target_e_tag = if total_bytes < multipart_threshold as u64 {
Some(generate_e_tag_hash(&concatnated_md5_hash, 0))
} else {
Some(generate_e_tag_hash(&concatnated_md5_hash, parts_count))
};
let verify_result = verify_e_tag(
!config.disable_multipart_verify,
&config.source_sse_c,
&None,
&source_sse,
&source_e_tag,
&None,
&target_e_tag,
);
match verify_result {
Some(true) => {
debug!(
key = source_key,
source_e_tag = normalize_e_tag(&source_e_tag),
target_e_tag = normalize_e_tag(&target_e_tag),
"e_tag verified.",
);
let _ = stats_sender
.send(SyncStatistics::ETagVerified {
key: source_key.to_string(),
})
.await;
}
Some(false) => {
warn!(
key = source_key,
source_e_tag = normalize_e_tag(&source_e_tag),
target_e_tag = normalize_e_tag(&target_e_tag),
"e_tag mismatch.",
);
let _ = stats_sender
.send(SyncStatistics::ETagMismatch {
key: source_key.to_string(),
})
.await;
source.set_warning();
}
None => {
debug!(
key = source_key,
"e_tag verification skipped (SSE-C or unsupported SSE).",
);
}
}
}
if let Some(ref mut checksum) = additional_checksum {
let source_checksum = source_final_checksum.as_ref().unwrap();
let additional_checksum_algorithm = detected_algorithm.as_ref().unwrap().as_str();
let target_final_checksum = if checksum_is_multipart {
checksum.finalize_all()
} else {
checksum.finalize()
};
if *source_checksum == target_final_checksum {
debug!(
key = source_key,
additional_checksum_algorithm = additional_checksum_algorithm,
source_final_checksum = source_checksum,
target_final_checksum = target_final_checksum,
"additional checksum verified."
);
let _ = stats_sender
.send(SyncStatistics::ChecksumVerified {
key: source_key.to_string(),
})
.await;
} else {
if is_full_object_checksum(&Some(source_checksum.clone())) {
return Err(anyhow::anyhow!(
"additional checksum mismatch. output data may be corrupted. \
key={}, algorithm={}, source_final_checksum={}, target_final_checksum={}",
source_key,
additional_checksum_algorithm,
source_checksum,
target_final_checksum
));
}
warn!(
key = source_key,
additional_checksum_algorithm = additional_checksum_algorithm,
source_final_checksum = source_checksum,
target_final_checksum = target_final_checksum,
"additional checksum mismatch. output data may be corrupted."
);
let _ = stats_sender
.send(SyncStatistics::ChecksumMismatch {
key: source_key.to_string(),
})
.await;
source.set_warning();
}
}
let _ = stats_sender
.send(SyncStatistics::SyncComplete {
key: source_key.to_string(),
})
.await;
Ok(TransferOutcome::default())
}