use std::{path::PathBuf, time::Duration};
use crate::{
aws_sdk::{complete_multipart_upload, create_multipart_upload},
opts::EsthriPutOptParams,
presign::n_parts,
Error, PendingUpload, Result,
};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::{presigning::PresigningConfig, types::CompletedPart};
use reqwest::Client as HttpClient;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncReadExt, BufReader},
};
use super::DEAFULT_EXPIRATION;
#[derive(Debug, Serialize, Deserialize)]
pub struct PresignedMultipartUpload {
pub upload_id: String,
#[serde(with = "tuple_vec_map")]
pub parts: Vec<(usize, String)>,
}
#[allow(clippy::too_many_arguments)]
pub async fn setup_presigned_multipart_upload(
s3: &S3Client,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
n_parts: usize,
expiration: Option<Duration>,
opts: EsthriPutOptParams,
) -> Result<PresignedMultipartUpload> {
let upload_id = create_multipart_upload(
s3,
bucket.as_ref(),
key.as_ref(),
None,
opts.storage_class.unwrap(),
)
.await?
.upload_id
.unwrap();
let fut: Vec<_> = (1..=n_parts)
.map(|part| {
let bucket = bucket.as_ref().to_string();
let key = key.as_ref().to_string();
let upload_id = upload_id.clone();
async move {
let url =
presign_multipart_upload(s3, bucket, key, part as i32, upload_id, expiration)
.await?;
Ok((part, url))
}
})
.collect();
let parts: Result<Vec<_>> = futures::future::try_join_all(fut).await;
Ok(PresignedMultipartUpload {
upload_id,
parts: parts?,
})
}
pub async fn upload_file_presigned_multipart_upload(
client: &HttpClient,
presigned_multipart_upload: PresignedMultipartUpload,
file: &PathBuf,
part_size: usize,
) -> Result<PresignedMultipartUpload> {
let file = File::open(file).await?;
assert!(
presigned_multipart_upload.parts.len()
== n_parts(file.metadata().await?.len() as usize, part_size)
);
let mut reader = BufReader::with_capacity(part_size, file);
let mut upload = PresignedMultipartUpload {
upload_id: presigned_multipart_upload.upload_id,
parts: Vec::new(),
};
for (n, url) in presigned_multipart_upload.parts.iter() {
let mut buf = vec![0; part_size];
let n_read = reader.read_exact(&mut buf).await?;
buf.shrink_to(n_read);
let e_tag = client
.put(url)
.header("Content-Length", n_read.to_string())
.body(buf)
.send()
.await?
.error_for_status()?
.headers()
.get("ETag")
.unwrap()
.to_str()
.unwrap()
.to_owned();
upload.parts.push((*n, e_tag));
}
Ok(upload)
}
pub async fn complete_presigned_multipart_upload(
s3: &S3Client,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
presigned_multipart_upload: PresignedMultipartUpload,
) -> Result<()> {
let parts: Vec<_> = presigned_multipart_upload
.parts
.into_iter()
.map(|(part, etag)| {
CompletedPart::builder()
.e_tag(etag)
.part_number(part as i32)
.build()
})
.collect();
complete_multipart_upload(
s3,
bucket.as_ref(),
key.as_ref(),
presigned_multipart_upload.upload_id.as_ref(),
parts.as_slice(),
)
.await
}
pub async fn abort_presigned_multipart_upload(
s3: &S3Client,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
upload_id: impl AsRef<str>,
) -> Result<()> {
PendingUpload::new(bucket.as_ref(), key.as_ref(), upload_id.as_ref())
.abort(s3)
.await
}
async fn presign_multipart_upload(
s3: &S3Client,
bucket: String,
key: String,
part: i32,
upload_id: String,
expiration: Option<Duration>,
) -> Result<String> {
let presigning_config = PresigningConfig::builder()
.expires_in(expiration.unwrap_or(DEAFULT_EXPIRATION))
.build()
.map_err(Error::PresigningConfigError)?;
let presigned_req = s3
.upload_part()
.bucket(bucket)
.key(key)
.part_number(part)
.upload_id(upload_id)
.presigned(presigning_config)
.await
.map_err(|e| match e {
SdkError::ServiceError(error) => Error::UploadPartFailed(Box::new(error.into_err())),
_ => Error::SdkError(e.to_string()),
})?;
Ok(presigned_req.uri().to_string())
}