use aws_sdk_s3::{
operation::create_multipart_upload::CreateMultipartUploadOutput,
primitives::SdkBody,
types::{CompletedMultipartUpload, CompletedPart},
Client,
};
use aws_smithy_runtime_api::http::Request;
use aws_smithy_types::byte_stream::{ByteStream, Length};
use std::{
convert::Infallible,
fmt::Write,
fs::File,
io::prelude::*,
path::{Path, PathBuf},
pin::Pin,
task::{Context, Poll},
};
use bytes::Bytes;
use http_body::{Body, SizeHint};
use indicatif::{ProgressBar, ProgressState, ProgressStyle};
use tracing::{debug, info};
use crate::constants;
use crate::file_management;
const PART_SIZE: u64 = 5 * 1024 * 1024;
struct ProgressTracker {
bytes_written: u64,
content_length: u64,
bar: ProgressBar,
}
impl ProgressTracker {
fn track(&mut self, len: u64) {
self.bytes_written += len;
let progress = self.bytes_written as f64 / self.content_length as f64;
info!("Read {} bytes, progress: {:.2}&", len, progress * 100.0);
if self.content_length != self.bytes_written {
self.bar.set_position(self.bytes_written);
} else {
self.bar.finish_with_message("GOODBYE UPLOAD");
}
}
}
#[pin_project::pin_project]
pub struct ProgressBody<InnerBody> {
#[pin]
inner: InnerBody,
progress_tracker: ProgressTracker,
}
impl ProgressBody<SdkBody> {
pub fn replace(value: Request<SdkBody>) -> Result<Request<SdkBody>, Infallible> {
let value = value.map(|body| {
let len = body.content_length().expect("upload body sized"); let body = ProgressBody::new(body, len);
SdkBody::from_body_0_4(body)
});
Ok(value)
}
}
impl<InnerBody> ProgressBody<InnerBody>
where
InnerBody: Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
{
pub fn new(body: InnerBody, content_length: u64) -> Self {
let bar = ProgressBar::new(content_length);
bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w,"{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
Self {
inner: body,
progress_tracker: ProgressTracker {
bytes_written: 0,
content_length,
bar,
},
}
}
}
impl<InnerBody> Body for ProgressBody<InnerBody>
where
InnerBody: Body<Data = Bytes, Error = aws_smithy_types::body::Error>,
{
type Data = Bytes;
type Error = aws_smithy_types::body::Error;
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut this = self.project();
let result = match this.inner.as_mut().poll_data(cx) {
Poll::Ready(Some(Ok(data))) => {
this.progress_tracker.track(data.len() as u64);
Poll::Ready(Some(Ok(data)))
}
Poll::Ready(None) => {
debug!("done");
Poll::Ready(None)
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Pending => Poll::Pending,
};
result
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
self.project().inner.poll_trailers(cx)
}
fn size_hint(&self) -> http_body::SizeHint {
SizeHint::with_exact(self.progress_tracker.content_length)
}
}
pub async fn upload_object(
client: &Client,
bucket_name: &str,
file_name: &PathBuf,
prefix: Option<String>,
key: &str,
presigned_time: u64,
) -> Result<(), anyhow::Error> {
let mut file = File::open(file_name).expect("Failed to open file");
let metadata = file.metadata().expect("Failed to get file metadata");
let file_size = metadata.len();
let pref_key = format!("{}{}", prefix.clone().unwrap_or("".into()), key);
if file_size > 4294967296 {
println!("========================================");
println!("💾 | File size is bigger than 4GB");
println!("💾 | Using multi-part upload");
println!(
"🚀 | Uploading file: {}, to S3 Bucket: {} | 🚀",
key, bucket_name
);
println!("========================================");
let bar = ProgressBar::new(file_size);
bar.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
.unwrap()
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w,"{:.1}s", state.eta().as_secs_f64()).unwrap())
.progress_chars("#>-"));
let multipart_upload_res: CreateMultipartUploadOutput = client
.create_multipart_upload()
.bucket(bucket_name)
.key(&pref_key)
.tagging(constants::DEFAULT_OBJECT_TAG)
.send()
.await
.unwrap();
let upload_id = multipart_upload_res.upload_id().unwrap();
let mut completed_parts = Vec::new();
let mut part_number = 1;
let mut file_position: u64 = 0;
while file_position < file_size {
let bytes_remaining = file_size - file_position;
let part_size = std::cmp::min(bytes_remaining, PART_SIZE);
let mut part_data = vec![0; part_size as usize];
file.read_exact(&mut part_data)
.expect("Failed to read file");
let stream = ByteStream::read_from()
.path(file_name)
.offset(file_position)
.length(Length::Exact(part_size))
.build()
.await
.unwrap();
bar.set_position(file_position);
let upload_part_res = client
.upload_part()
.bucket(bucket_name)
.key(&pref_key)
.upload_id(upload_id)
.part_number(part_number)
.body(stream)
.send()
.await?;
let completed_part = CompletedPart::builder()
.part_number(part_number)
.e_tag(upload_part_res.e_tag.expect("Was unable to upload part"))
.build();
completed_parts.push(completed_part);
file_position += part_size;
part_number += 1;
}
let completed_multipart_upload = CompletedMultipartUpload::builder()
.set_parts(Some(completed_parts))
.build();
let _complete_multipart_upload_res = client
.complete_multipart_upload()
.bucket(bucket_name)
.key(&pref_key)
.multipart_upload(completed_multipart_upload)
.upload_id(upload_id)
.send()
.await
.unwrap();
} else {
println!("========================================");
println!(
"🚀 | Uploading file: {}, to S3 Bucket: {} | 🚀",
key, bucket_name
);
println!("========================================");
let body = ByteStream::read_from()
.path(Path::new(file_name))
.buffer_size(2048)
.build()
.await?;
let request = client
.put_object()
.bucket(bucket_name)
.key(&pref_key)
.tagging(constants::DEFAULT_OBJECT_TAG)
.body(body);
let customized = request
.customize()
.map_request(ProgressBody::<SdkBody>::replace);
let out = customized.send().await?;
debug!("PutObjectOutput: {:?}", out);
}
let presigned_url =
file_management::presign_file(client, bucket_name, key, prefix, presigned_time).await?;
println!("========================================");
println!("📋 | Good job, here is your file: ");
println!("📋 | {}", presigned_url);
Ok(())
}