use std::io::Read;
use std::sync::Arc;
use std::sync::Mutex;
use rusoto_core::Region;
use rusoto_s3::CompleteMultipartUploadRequest;
use rusoto_s3::CompletedMultipartUpload;
use rusoto_s3::CompletedPart;
use rusoto_s3::CreateMultipartUploadRequest;
use rusoto_s3::S3Client;
use rusoto_s3::UploadPartRequest;
use rusoto_s3::S3;
pub async fn s3_upload_file(
file_path: &str,
bucket: &str,
key: &str,
) -> Result<String, String> {
let mut file = std::fs::File::open(file_path).unwrap();
let chunk_size: usize = 6_000_000;
let mut buffer = Vec::with_capacity(chunk_size);
let s3_bucket = String::from(bucket);
let s3_key = String::from(key);
let s3_bucket_copy = String::from(bucket);
let s3_key_copy = String::from(key);
let server_side_encryption = "AES256";
let storage_class = std::env::var("S3_STORAGE_CLASS")
.unwrap_or_else(|_| "STANDARD".to_string());
let client = S3Client::new(Region::UsEast2);
info!(
"s3_upload_file - start - \
{file_path} \
to s3://{bucket}/{key} with \
sse={server_side_encryption} \
sc={storage_class}"
);
let create_multipart_request = CreateMultipartUploadRequest {
bucket: s3_bucket,
key: s3_key,
server_side_encryption: Some(server_side_encryption.to_string()),
storage_class: Some(storage_class.to_string()),
..Default::default()
};
let create_response_result = client
.create_multipart_upload(create_multipart_request)
.await;
info!(
"s3_upload_file - waiting - \
{file_path} \
to s3://{bucket}/{key} with \
sse={server_side_encryption} \
sc={storage_class}"
);
let create_response = match create_response_result {
Ok(d) => d,
Err(e) => {
let full_err_msg = format!(
"s3_upload_file - failed to create s3 multipart upload \
s3://{s3_bucket_copy}/{s3_key_copy} \
with err='{e}'"
);
if full_err_msg.contains("<Code>AccessDenied</Code>") {
let err_msg = format!(
"s3_upload_file - failed with access denied - \
please confirm the environment variables \
AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID \
are set correctly (or other aws account credentials) \
s3://{s3_bucket_copy}/{s3_key_copy}"
);
return Err(err_msg);
}
else {
return Err(full_err_msg);
}
}
};
let upload_id = create_response.upload_id.unwrap();
let upload_id_clone = upload_id.clone();
let s3_bucket = String::from(bucket);
let s3_key = String::from(key);
let create_upload_part =
move |body: Vec<u8>, part_number: i64| -> UploadPartRequest {
UploadPartRequest {
body: Some(body.into()),
bucket: s3_bucket.clone(),
key: s3_key.clone(),
upload_id: upload_id_clone.to_owned(),
part_number,
..Default::default()
}
};
let create_upload_part_arc = Arc::new(create_upload_part);
let completed_parts = Arc::new(Mutex::new(vec![]));
let mut part_number = 1;
let mut multiple_parts_futures = Vec::new();
loop {
let maximum_bytes_to_read = chunk_size - buffer.len();
file.by_ref()
.take(maximum_bytes_to_read as u64)
.read_to_end(&mut buffer)
.unwrap();
if buffer.is_empty() {
break;
}
let next_buffer = Vec::with_capacity(chunk_size);
let data_to_send = buffer;
let completed_parts_cloned = completed_parts.clone();
let create_upload_part_arc_cloned = create_upload_part_arc.clone();
let send_part_task_future = tokio::task::spawn(async move {
let part = create_upload_part_arc_cloned(
data_to_send.to_vec(),
part_number,
);
{
let part_number = part.part_number;
let internal_loop_client = S3Client::new(Region::UsEast2);
let response = internal_loop_client.upload_part(part).await;
completed_parts_cloned.lock().unwrap().push(CompletedPart {
e_tag: response
.expect("Couldn't complete multipart upload")
.e_tag,
part_number: Some(part_number),
});
}
});
multiple_parts_futures.push(send_part_task_future);
buffer = next_buffer;
part_number += 1;
}
let final_client = S3Client::new(Region::UsEast2);
let _results = futures::future::join_all(multiple_parts_futures).await;
let mut completed_parts_vector = completed_parts.lock().unwrap().to_vec();
completed_parts_vector.sort_by_key(|part| part.part_number);
let completed_upload = CompletedMultipartUpload {
parts: Some(completed_parts_vector),
};
let s3_bucket = String::from(bucket);
let s3_key = String::from(key);
let complete_req = CompleteMultipartUploadRequest {
bucket: s3_bucket,
key: s3_key,
upload_id: upload_id.to_owned(),
multipart_upload: Some(completed_upload),
..Default::default()
};
final_client
.complete_multipart_upload(complete_req)
.await
.expect("Couldn't complete multipart upload");
info!(
"s3_upload_file - done - \
{file_path} to s3://{bucket}/{key} \
with sse={server_side_encryption} \
sc={storage_class}"
);
Ok("Success".to_string())
}