use std::{path::PathBuf, time::Duration};
use crate::{
opts::EsthriPutOptParams,
presign::n_parts,
rusoto::{complete_multipart_upload, create_multipart_upload},
PendingUpload, Result,
};
use esthri_internals::rusoto::{
util::{PreSignedRequest, PreSignedRequestOption},
AwsCredentials, CompletedPart, Region, S3Client, UploadPartRequest,
};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use tokio::{
fs::File,
io::{AsyncReadExt, BufReader},
};
use super::DEAFULT_EXPIRATION;
#[derive(Debug, Serialize, Deserialize)]
pub struct PresignedMultipartUpload {
pub upload_id: String,
#[serde(with = "tuple_vec_map")]
pub parts: Vec<(usize, String)>,
}
#[allow(clippy::too_many_arguments)]
pub async fn setup_presigned_multipart_upload(
client: &S3Client,
credentials: &AwsCredentials,
region: &Region,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
n_parts: usize,
expiration: Option<Duration>,
opts: EsthriPutOptParams,
) -> Result<PresignedMultipartUpload> {
let upload_id = create_multipart_upload(
client,
bucket.as_ref(),
key.as_ref(),
None,
opts.storage_class.unwrap(),
)
.await?
.upload_id
.unwrap();
let parts = (1..n_parts + 1)
.map(|part| {
(
part,
presign_multipart_upload(
credentials,
region,
bucket.as_ref(),
key.as_ref(),
part as i64,
upload_id.clone(),
expiration,
),
)
})
.collect();
Ok(PresignedMultipartUpload { upload_id, parts })
}
pub async fn upload_file_presigned_multipart_upload(
client: &Client,
presigned_multipart_upload: PresignedMultipartUpload,
file: &PathBuf,
part_size: usize,
) -> Result<PresignedMultipartUpload> {
let file = File::open(file).await?;
assert!(
presigned_multipart_upload.parts.len()
== n_parts(file.metadata().await?.len() as usize, part_size)
);
let mut reader = BufReader::with_capacity(part_size, file);
let mut upload = PresignedMultipartUpload {
upload_id: presigned_multipart_upload.upload_id,
parts: Vec::new(),
};
for (n, url) in presigned_multipart_upload.parts.iter() {
let mut buf = vec![0; part_size];
let n_read = reader.read_exact(&mut buf).await?;
buf.shrink_to(n_read);
let e_tag = client
.put(url)
.header("Content-Length", n_read.to_string())
.body(buf)
.send()
.await?
.error_for_status()?
.headers()
.get("ETag")
.unwrap()
.to_str()
.unwrap()
.to_owned();
upload.parts.push((*n, e_tag));
}
Ok(upload)
}
pub async fn complete_presigned_multipart_upload(
s3: &S3Client,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
presigned_multipart_upload: PresignedMultipartUpload,
) -> Result<()> {
let parts: Vec<_> = presigned_multipart_upload
.parts
.into_iter()
.map(|(part, etag)| CompletedPart {
e_tag: Some(etag),
part_number: Some(part as i64),
})
.collect();
complete_multipart_upload(
s3,
bucket.as_ref(),
key.as_ref(),
presigned_multipart_upload.upload_id.as_ref(),
&parts,
)
.await
}
pub async fn abort_presigned_multipart_upload(
s3: &S3Client,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
upload_id: impl AsRef<str>,
) -> Result<()> {
PendingUpload::new(bucket.as_ref(), key.as_ref(), upload_id.as_ref())
.abort(s3)
.await
}
fn presign_multipart_upload(
credentials: &AwsCredentials,
region: &Region,
bucket: impl AsRef<str>,
key: impl AsRef<str>,
part: i64,
upload_id: String,
expiration: Option<Duration>,
) -> String {
let options = PreSignedRequestOption {
expires_in: expiration.unwrap_or(DEAFULT_EXPIRATION),
};
UploadPartRequest {
bucket: bucket.as_ref().to_owned(),
key: key.as_ref().to_owned(),
part_number: part,
upload_id,
..Default::default()
}
.get_presigned_url(region, credentials, &options)
}