use crate::s3::builders::{MAX_MULTIPART_COUNT, MAX_PART_SIZE};
use crate::s3::client::MinioClient;
use crate::s3::error::{Error, ValidationErr};
use crate::s3::header_constants::*;
use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::{
AbortMultipartUploadResponse, CompleteMultipartUploadResponse, ComposeObjectResponse,
CopyObjectInternalResponse, CopyObjectResponse, CreateMultipartUploadResponse,
StatObjectResponse, UploadPartCopyResponse,
};
use crate::s3::response_traits::HasChecksumHeaders;
use crate::s3::response_traits::HasEtagFromBody;
use crate::s3::sse::{Sse, SseCustomerKey};
use crate::s3::types::Directive;
use crate::s3::types::PartInfo;
use crate::s3::types::Retention;
use crate::s3::types::{
BucketName, ObjectKey, Region, S3Api, S3Request, ToS3Request, UploadId, VersionId,
};
use crate::s3::utils::{
ChecksumAlgorithm, UtcTime, check_sse, check_ssec, encode_tags, to_http_header_value,
to_iso8601utc, url_encode,
};
use async_recursion::async_recursion;
use http::Method;
use std::collections::HashMap;
use std::sync::Arc;
use typed_builder::TypedBuilder;
#[derive(Clone, Debug, TypedBuilder)]
pub struct UploadPartCopy {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(setter(into))]
upload_id: UploadId,
#[builder(default = 0)]
part_number: u16,
#[builder(default)]
headers: Multimap,
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
}
impl S3Api for UploadPartCopy {
type S3Response = UploadPartCopyResponse;
}
pub type UploadPartCopyBldr = UploadPartCopyBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(UploadId,),
(),
(),
(),
)>;
impl ToS3Request for UploadPartCopy {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
{
if self.upload_id.is_empty() {
return Err(ValidationErr::InvalidUploadId(
"upload ID cannot be empty".into(),
));
}
if !(1..=MAX_MULTIPART_COUNT).contains(&self.part_number) {
return Err(ValidationErr::InvalidPartNumber(format!(
"part number must be between 1 and {MAX_MULTIPART_COUNT}"
)));
}
}
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add_multimap(self.headers);
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
{
query_params.add("partNumber", self.part_number.to_string());
query_params.add("uploadId", self.upload_id.to_string());
}
Ok(S3Request::builder()
.client(self.client)
.method(Method::PUT)
.region(self.region)
.bucket(self.bucket)
.object(self.object)
.query_params(query_params)
.headers(headers)
.build())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct CopyObjectInternal {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
pub(crate) region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default)]
headers: Multimap,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default)]
legal_hold: bool,
#[builder(!default)] source: CopySource,
#[builder(default, setter(into))]
metadata_directive: Option<Directive>,
#[builder(default, setter(into))]
tagging_directive: Option<Directive>,
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
}
impl S3Api for CopyObjectInternal {
type S3Response = CopyObjectInternalResponse;
}
pub type CopyObjectInternalBldr = CopyObjectInternalBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(),
(),
(),
(),
)>;
impl ToS3Request for CopyObjectInternal {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
check_sse(&self.sse, &self.client)?;
check_ssec(&self.source.ssec, &self.client)?;
let mut headers = self.headers;
{
if let Some(v) = self.extra_headers {
headers.add_multimap(v);
}
if let Some(v) = self.user_metadata {
headers.add_multimap(v);
}
if let Some(v) = self.sse {
headers.add_multimap(v.headers());
}
if let Some(v) = self.tags {
let tagging = encode_tags(&v);
if !tagging.is_empty() {
headers.add(X_AMZ_TAGGING, tagging);
}
}
if let Some(v) = self.retention {
headers.add(X_AMZ_OBJECT_LOCK_MODE, v.mode.to_string());
headers.add(
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
to_iso8601utc(v.retain_until_date),
);
}
if self.legal_hold {
headers.add(X_AMZ_OBJECT_LOCK_LEGAL_HOLD, "ON");
}
if let Some(v) = self.metadata_directive {
headers.add(X_AMZ_METADATA_DIRECTIVE, v.to_string());
}
if let Some(v) = self.tagging_directive {
headers.add(X_AMZ_TAGGING_DIRECTIVE, v.to_string());
}
let mut copy_source = String::from("/");
copy_source.push_str(self.source.bucket.as_str());
copy_source.push('/');
copy_source.push_str(self.source.object.as_str());
if let Some(v) = &self.source.version_id {
copy_source.push_str("?versionId=");
copy_source.push_str(&url_encode(v.as_str()));
}
headers.add(X_AMZ_COPY_SOURCE, copy_source);
let range = self.source.get_range_value();
if !range.is_empty() {
headers.add(X_AMZ_COPY_SOURCE_RANGE, range);
}
if let Some(v) = self.source.match_etag {
headers.add(X_AMZ_COPY_SOURCE_IF_MATCH, v);
}
if let Some(v) = self.source.not_match_etag {
headers.add(X_AMZ_COPY_SOURCE_IF_NONE_MATCH, v);
}
if let Some(v) = self.source.modified_since {
headers.add(X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE, to_http_header_value(v));
}
if let Some(v) = self.source.unmodified_since {
headers.add(
X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE,
to_http_header_value(v),
);
}
if let Some(v) = self.source.ssec {
headers.add_multimap(v.copy_headers());
}
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
};
Ok(S3Request::builder()
.client(self.client)
.method(Method::PUT)
.region(self.region)
.bucket(self.bucket)
.object(self.object)
.query_params(self.extra_query_params.unwrap_or_default())
.headers(headers)
.build())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct CopyObject {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
pub(crate) region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
headers: Option<Multimap>,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default = false)]
legal_hold: bool,
#[builder(!default)] source: CopySource,
#[builder(default, setter(into))]
metadata_directive: Option<Directive>,
#[builder(default, setter(into))]
tagging_directive: Option<Directive>,
#[builder(default, setter(into))]
checksum_algorithm: Option<crate::s3::utils::ChecksumAlgorithm>,
}
pub type CopyObjectBldr = CopyObjectBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(),
(),
(),
(),
)>;
impl CopyObject {
pub async fn send(self) -> Result<CopyObjectResponse, Error> {
check_sse(&self.sse, &self.client)?;
check_ssec(&self.source.ssec, &self.client)?;
let source = self.source.clone();
let stat_resp: StatObjectResponse = self
.client
.stat_object(&source.bucket, &source.object)?
.extra_headers(source.extra_headers)
.extra_query_params(source.extra_query_params)
.region(source.region)
.version_id(source.version_id)
.match_etag(source.match_etag)
.not_match_etag(source.not_match_etag)
.modified_since(source.modified_since)
.unmodified_since(source.unmodified_since)
.build()
.send()
.await?;
if self.source.offset.is_some()
|| self.source.length.is_some()
|| stat_resp.size()? > MAX_PART_SIZE
{
if let Some(v) = &self.metadata_directive {
match v {
Directive::Copy => {
return Err(ValidationErr::InvalidCopyDirective(
"COPY metadata directive is not applicable to source object size greater than 5 GiB".into()
).into());
}
_ => todo!(), }
}
if let Some(v) = &self.tagging_directive {
match v {
Directive::Copy => {
return Err(ValidationErr::InvalidCopyDirective(
"COPY tagging directive is not applicable to source object size greater than 5 GiB".into()
).into());
}
_ => todo!(), }
}
let src: ComposeSource = {
let mut src = ComposeSource::new(&self.source.bucket, &self.source.object)?;
src.extra_headers = self.source.extra_headers;
src.extra_query_params = self.source.extra_query_params;
src.region = self.source.region;
src.ssec = self.source.ssec;
src.offset = self.source.offset;
src.length = self.source.length;
src.match_etag = self.source.match_etag;
src.not_match_etag = self.source.not_match_etag;
src.modified_since = self.source.modified_since;
src.unmodified_since = self.source.unmodified_since;
src
};
let sources: Vec<ComposeSource> = vec![src];
let resp: ComposeObjectResponse = self
.client
.compose_object(&self.source.bucket, &self.source.object, sources)?
.extra_headers(self.extra_headers)
.extra_query_params(self.extra_query_params)
.region(self.region)
.headers(self.headers)
.user_metadata(self.user_metadata)
.sse(self.sse)
.tags(self.tags)
.retention(self.retention)
.legal_hold(self.legal_hold)
.build()
.send()
.await?;
let resp: CopyObjectResponse = resp; Ok(resp)
} else {
let resp: CopyObjectInternalResponse = self
.client
.copy_object_internal(self.bucket.clone(), self.object.clone())
.extra_headers(self.extra_headers)
.extra_query_params(self.extra_query_params)
.region(self.region)
.headers(self.headers.unwrap_or_default())
.user_metadata(self.user_metadata)
.sse(self.sse)
.tags(self.tags)
.retention(self.retention)
.legal_hold(self.legal_hold)
.source(self.source)
.metadata_directive(self.metadata_directive)
.tagging_directive(self.tagging_directive)
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await?;
let resp: CopyObjectResponse = resp; Ok(resp)
}
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct ComposeObjectInternal {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
pub(crate) region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
headers: Option<Multimap>,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default)]
legal_hold: bool,
#[builder(default)]
sources: Vec<ComposeSource>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
pub type ComposeObjectInternalBldr = ComposeObjectInternalBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(),
(),
)>;
impl ComposeObjectInternal {
#[async_recursion]
pub async fn send(self) -> (Result<ComposeObjectResponse, Error>, Option<UploadId>) {
let mut sources = self.sources;
let part_count: u16 = match self.client.calculate_part_count(&mut sources).await {
Ok(v) => v,
Err(e) => return (Err(e), None),
};
let sources = sources;
if (part_count == 1) && sources[0].offset.is_none() && sources[0].length.is_none() {
let copy_bldr = match self.client.copy_object(&self.bucket, &self.object) {
Ok(v) => v,
Err(e) => return (Err(e.into()), None),
};
let resp: CopyObjectResponse = match copy_bldr
.extra_headers(self.extra_headers)
.extra_query_params(self.extra_query_params)
.region(self.region)
.headers(self.headers)
.user_metadata(self.user_metadata)
.sse(self.sse)
.tags(self.tags)
.retention(self.retention)
.legal_hold(self.legal_hold)
.source(
CopySource::builder()
.bucket(&sources[0].bucket)
.object(&sources[0].object)
.build(),
)
.build()
.send()
.await
{
Ok(v) => v,
Err(e) => return (Err(e), None),
};
let resp: ComposeObjectResponse = resp;
(Ok(resp), None)
} else {
let headers: Multimap = into_headers_copy_object(
self.extra_headers,
self.headers,
self.user_metadata,
self.sse.clone(),
self.tags,
self.retention,
self.legal_hold,
);
let cmu_bldr = match self
.client
.create_multipart_upload(&self.bucket, &self.object)
{
Ok(v) => v,
Err(e) => return (Err(e.into()), None),
};
let cmu: CreateMultipartUploadResponse = match cmu_bldr
.extra_query_params(self.extra_query_params.clone())
.region(self.region.clone())
.extra_headers(Some(headers))
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await
{
Ok(v) => v,
Err(e) => return (Err(e), None),
};
let upload_id: UploadId = match cmu.upload_id().await {
Ok(v) => v,
Err(e) => return (Err(e.into()), None),
};
let mut part_number = 0_u16;
let ssec_headers: Multimap = match self.sse {
Some(v) => match v.as_any().downcast_ref::<SseCustomerKey>() {
Some(_) => v.headers(),
_ => Multimap::new(),
},
_ => Multimap::new(),
};
let mut parts: Vec<PartInfo> = Vec::new();
for source in sources.iter() {
let mut size = source.get_object_size();
if let Some(l) = source.length {
size = l;
} else if let Some(o) = source.offset {
size -= o;
}
let offset = source.offset.unwrap_or_default();
let mut headers = source.get_headers();
headers.add_multimap(ssec_headers.clone());
if size <= MAX_PART_SIZE {
part_number += 1;
if let Some(l) = source.length {
headers.add(
X_AMZ_COPY_SOURCE_RANGE,
format!("bytes={}-{}", offset, offset + l - 1),
);
} else if source.offset.is_some() {
headers.add(
X_AMZ_COPY_SOURCE_RANGE,
format!("bytes={}-{}", offset, offset + size - 1),
);
}
let upload_part_copy_bldr = match self.client.upload_part_copy(
&self.bucket,
&self.object,
upload_id.clone(),
) {
Ok(v) => v,
Err(e) => return (Err(e.into()), Some(upload_id)),
};
let resp: UploadPartCopyResponse = match upload_part_copy_bldr
.region(self.region.clone())
.part_number(part_number)
.headers(headers)
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await
{
Ok(v) => v,
Err(e) => return (Err(e), Some(upload_id)),
};
let etag = match resp.etag() {
Ok(v) => v,
Err(e) => return (Err(e.into()), Some(upload_id)),
};
let checksum = self
.checksum_algorithm
.and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
parts.push(PartInfo::new(part_number, etag, size, checksum));
} else {
let part_ranges = calculate_part_ranges(offset, size, MAX_PART_SIZE);
for (part_offset, length) in part_ranges {
part_number += 1;
let end_bytes = part_offset + length - 1;
let mut headers_copy = headers.clone();
headers_copy.add(
X_AMZ_COPY_SOURCE_RANGE,
format!("bytes={part_offset}-{end_bytes}"),
);
let upload_part_copy_bldr = match self.client.upload_part_copy(
&self.bucket,
&self.object,
upload_id.clone(),
) {
Ok(v) => v,
Err(e) => return (Err(e.into()), Some(upload_id)),
};
let resp: UploadPartCopyResponse = match upload_part_copy_bldr
.region(self.region.clone())
.part_number(part_number)
.headers(headers_copy)
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await
{
Ok(v) => v,
Err(e) => return (Err(e), Some(upload_id)),
};
let etag = match resp.etag() {
Ok(v) => v,
Err(e) => return (Err(e.into()), Some(upload_id)),
};
let checksum = self
.checksum_algorithm
.and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
parts.push(PartInfo::new(part_number, etag, length, checksum));
}
}
}
let complete_bldr = match self.client.complete_multipart_upload(
&self.bucket,
&self.object,
&upload_id,
parts,
) {
Ok(v) => v,
Err(e) => return (Err(e.into()), Some(upload_id)),
};
let resp: Result<CompleteMultipartUploadResponse, Error> =
complete_bldr.region(self.region).build().send().await;
match resp {
Ok(v) => {
let resp = ComposeObjectResponse {
request: v.request,
headers: v.headers,
body: v.body,
};
(Ok(resp), Some(upload_id))
}
Err(e) => (Err(e), Some(upload_id)),
}
}
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct ComposeObject {
#[builder(!default)] client: MinioClient,
#[builder(default)]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
headers: Option<Multimap>,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default)]
legal_hold: bool,
#[builder(default)]
sources: Vec<ComposeSource>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
pub type ComposeObjectBldr = ComposeObjectBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(Vec<ComposeSource>,),
(),
)>;
impl ComposeObject {
pub async fn send(self) -> Result<ComposeObjectResponse, Error> {
check_sse(&self.sse, &self.client)?;
let (res, upload_id): (Result<ComposeObjectResponse, Error>, Option<UploadId>) = self
.client
.compose_object_internal(self.bucket.clone(), self.object.clone())
.extra_headers(self.extra_headers)
.extra_query_params(self.extra_query_params)
.region(self.region)
.headers(self.headers)
.user_metadata(self.user_metadata)
.sse(self.sse)
.tags(self.tags)
.retention(self.retention)
.legal_hold(self.legal_hold)
.sources(self.sources)
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await;
match res {
Ok(v) => Ok(v),
Err(e) => {
if let Some(upload_id) = upload_id {
let _resp: AbortMultipartUploadResponse = self
.client
.abort_multipart_upload(self.bucket, self.object, upload_id)?
.build()
.send()
.await?;
}
Err(e)
}
}
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct ComposeSource {
#[builder(default, setter(into))]
pub extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
pub extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
pub region: Option<Region>,
#[builder(!default, setter(into))]
pub bucket: BucketName,
#[builder(!default, setter(into))]
pub object: ObjectKey,
#[builder(default, setter(into))]
pub version_id: Option<VersionId>,
#[builder(default, setter(into))]
pub ssec: Option<SseCustomerKey>,
#[builder(default, setter(into))]
pub offset: Option<u64>,
#[builder(default, setter(into))]
pub length: Option<u64>,
#[builder(default, setter(into))]
pub match_etag: Option<String>,
#[builder(default, setter(into))]
pub not_match_etag: Option<String>,
#[builder(default, setter(into))]
pub modified_since: Option<UtcTime>,
#[builder(default, setter(into))]
pub unmodified_since: Option<UtcTime>,
#[builder(default, setter(skip))]
object_size: Option<u64>,
#[builder(default, setter(skip))]
headers: Option<Multimap>,
}
impl ComposeSource {
pub fn new<B, O>(bucket: B, object: O) -> Result<Self, ValidationErr>
where
B: TryInto<BucketName>,
B::Error: Into<ValidationErr>,
O: TryInto<ObjectKey>,
O::Error: Into<ValidationErr>,
{
let bucket = bucket.try_into().map_err(Into::into)?;
let object = object.try_into().map_err(Into::into)?;
Ok(Self {
extra_headers: None,
extra_query_params: None,
region: None,
bucket,
object,
version_id: None,
ssec: None,
offset: None,
length: None,
match_etag: None,
not_match_etag: None,
modified_since: None,
unmodified_since: None,
object_size: None,
headers: None,
})
}
pub fn get_object_size(&self) -> u64 {
self.object_size.expect("A: ABORT: ComposeSource::build_headers() must be called prior to this method invocation. This should not happen.")
}
pub fn get_headers(&self) -> Multimap {
self.headers.as_ref().expect("B: ABORT: ComposeSource::build_headers() must be called prior to this method invocation. This should not happen.").clone()
}
pub fn build_headers(&mut self, object_size: u64, etag: String) -> Result<(), ValidationErr> {
if let Some(v) = self.offset
&& v >= object_size
{
return Err(ValidationErr::InvalidComposeSourceOffset {
bucket: self.bucket.to_string(),
object: self.object.to_string(),
version: self.version_id.as_ref().map(|v| v.to_string()),
offset: v,
object_size,
});
}
if let Some(v) = self.length {
if v > object_size {
return Err(ValidationErr::InvalidComposeSourceLength {
bucket: self.bucket.to_string(),
object: self.object.to_string(),
version: self.version_id.as_ref().map(|v| v.to_string()),
length: v,
object_size,
});
}
if (self.offset.unwrap_or_default() + v) > object_size {
return Err(ValidationErr::InvalidComposeSourceSize {
bucket: self.bucket.to_string(),
object: self.object.to_string(),
version: self.version_id.as_ref().map(|v| v.to_string()),
compose_size: self.offset.unwrap_or_default() + v,
object_size,
});
}
}
self.object_size = Some(object_size);
let mut headers = Multimap::new();
let mut copy_source = String::from("/");
copy_source.push_str(self.bucket.as_ref());
copy_source.push('/');
copy_source.push_str(self.object.as_ref());
if let Some(v) = &self.version_id {
copy_source.push_str("?versionId=");
copy_source.push_str(&url_encode(v.as_ref()));
}
headers.add(X_AMZ_COPY_SOURCE, copy_source);
if let Some(v) = &self.match_etag {
headers.add(X_AMZ_COPY_SOURCE_IF_MATCH, v);
}
if let Some(v) = &self.not_match_etag {
headers.add(X_AMZ_COPY_SOURCE_IF_NONE_MATCH, v);
}
if let Some(v) = self.modified_since {
headers.add(X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE, to_http_header_value(v));
}
if let Some(v) = self.unmodified_since {
headers.add(
X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE,
to_http_header_value(v),
);
}
if let Some(v) = &self.ssec {
headers.add_multimap(v.copy_headers());
}
if !headers.contains_key(X_AMZ_COPY_SOURCE_IF_MATCH) {
headers.add(X_AMZ_COPY_SOURCE_IF_MATCH, etag);
}
self.headers = Some(headers);
Ok(())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct CopySource {
#[builder(default, setter(into))]
pub extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
pub extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
pub region: Option<Region>,
#[builder(setter(into))] pub bucket: BucketName,
#[builder(setter(into))] pub object: ObjectKey,
#[builder(default, setter(into))]
pub version_id: Option<VersionId>,
#[builder(default, setter(into))]
pub ssec: Option<SseCustomerKey>,
#[builder(default, setter(into))]
pub offset: Option<u64>,
#[builder(default, setter(into))]
pub length: Option<u64>,
#[builder(default, setter(into))]
pub match_etag: Option<String>,
#[builder(default, setter(into))]
pub not_match_etag: Option<String>,
#[builder(default, setter(into))]
pub modified_since: Option<UtcTime>,
#[builder(default, setter(into))]
pub unmodified_since: Option<UtcTime>,
}
impl CopySource {
fn get_range_value(&self) -> String {
let (offset, length) = match self.length {
Some(_) => (Some(self.offset.unwrap_or(0_u64)), self.length),
None => (self.offset, None),
};
let mut range = String::new();
if let Some(o) = offset {
range.push_str("bytes=");
range.push_str(&o.to_string());
range.push('-');
if let Some(l) = length {
range.push_str(&(o + l - 1).to_string());
}
}
range
}
}
fn into_headers_copy_object(
extra_headers: Option<Multimap>,
headers: Option<Multimap>,
user_metadata: Option<Multimap>,
sse: Option<Arc<dyn Sse>>,
tags: Option<HashMap<String, String>>,
retention: Option<Retention>,
legal_hold: bool,
) -> Multimap {
let mut map = Multimap::new();
if let Some(v) = extra_headers {
map.add_multimap(v);
}
if let Some(v) = headers {
map.add_multimap(v);
}
if let Some(v) = user_metadata {
map.add_multimap(v);
}
if let Some(v) = sse {
map.add_multimap(v.headers());
}
if let Some(v) = tags {
let tagging = encode_tags(&v);
if !tagging.is_empty() {
map.add(X_AMZ_TAGGING, tagging);
}
}
if let Some(v) = retention {
map.add(X_AMZ_OBJECT_LOCK_MODE, v.mode.to_string());
map.add(
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE,
to_iso8601utc(v.retain_until_date),
);
}
if legal_hold {
map.add(X_AMZ_OBJECT_LOCK_LEGAL_HOLD, "ON");
}
map
}
fn calculate_part_ranges(
start_offset: u64,
total_size: u64,
max_part_size: u64,
) -> Vec<(u64, u64)> {
let mut ranges = Vec::new();
let mut offset = start_offset;
let mut remaining = total_size;
while remaining > 0 {
let length = remaining.min(max_part_size);
ranges.push((offset, length));
offset += length;
remaining -= length;
}
ranges
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_calculate_part_ranges_single_part() {
let ranges = calculate_part_ranges(0, 1000, 5000);
assert_eq!(ranges, vec![(0, 1000)]);
}
#[test]
fn test_calculate_part_ranges_exact_multiple() {
let ranges = calculate_part_ranges(0, 10000, 5000);
assert_eq!(ranges, vec![(0, 5000), (5000, 5000)]);
}
#[test]
fn test_calculate_part_ranges_with_remainder() {
let ranges = calculate_part_ranges(0, 12000, 5000);
assert_eq!(ranges, vec![(0, 5000), (5000, 5000), (10000, 2000)]);
}
#[test]
fn test_calculate_part_ranges_with_start_offset() {
let ranges = calculate_part_ranges(1000, 12000, 5000);
assert_eq!(ranges, vec![(1000, 5000), (6000, 5000), (11000, 2000)]);
}
#[test]
fn test_calculate_part_ranges_zero_size() {
let ranges = calculate_part_ranges(0, 0, 5000);
assert!(ranges.is_empty());
}
#[test]
fn test_calculate_part_ranges_realistic() {
let total_size: u64 = 12 * 1024 * 1024 * 1024; let max_part_size: u64 = 5 * 1024 * 1024 * 1024;
let ranges = calculate_part_ranges(0, total_size, max_part_size);
assert_eq!(ranges.len(), 3);
assert_eq!(ranges[0], (0, max_part_size)); assert_eq!(ranges[1], (max_part_size, max_part_size)); assert_eq!(ranges[2], (2 * max_part_size, 2 * 1024 * 1024 * 1024));
let total: u64 = ranges.iter().map(|(_, len)| len).sum();
assert_eq!(total, total_size);
let mut expected_offset = 0;
for (offset, length) in &ranges {
assert_eq!(*offset, expected_offset);
expected_offset += length;
}
}
#[test]
fn test_calculate_part_ranges_each_part_correct_length() {
let ranges = calculate_part_ranges(0, 17000, 5000);
assert_eq!(
ranges,
vec![(0, 5000), (5000, 5000), (10000, 5000), (15000, 2000)]
);
for (i, (_, length)) in ranges.iter().enumerate() {
if i < ranges.len() - 1 {
assert_eq!(*length, 5000, "Part {} should be max_part_size", i);
}
}
}
}