use super::ObjectContent;
use crate::s3::builders::{ContentStream, Size};
use crate::s3::client::MinioClient;
use crate::s3::error::{Error, IoError, ValidationErr};
use crate::s3::header_constants::*;
use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::{
AbortMultipartUploadResponse, CompleteMultipartUploadResponse, CreateMultipartUploadResponse,
PutObjectContentResponse, PutObjectResponse, UploadPartResponse,
};
use crate::s3::response_traits::{HasChecksumHeaders, HasEtagFromHeaders};
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse;
use crate::s3::types::PartInfo;
use crate::s3::types::Retention;
use crate::s3::types::{BucketName, ObjectKey, Region, S3Api, S3Request, ToS3Request, UploadId};
use crate::s3::utils::{ChecksumAlgorithm, check_sse, compute_checksum_sb, insert};
use crate::s3::utils::{encode_tags, md5sum_hash, to_iso8601utc, url_encode};
use bytes::{Bytes, BytesMut};
use http::Method;
use std::{collections::HashMap, sync::Arc};
use typed_builder::TypedBuilder;
#[derive(Clone, Debug, TypedBuilder)]
pub struct CreateMultipartUpload {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default = false)]
legal_hold: bool,
#[builder(default, setter(into))]
content_type: Option<String>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
pub type CreateMultipartUploadBldr = CreateMultipartUploadBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(),
)>;
impl S3Api for CreateMultipartUpload {
type S3Response = CreateMultipartUploadResponse;
}
impl ToS3Request for CreateMultipartUpload {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
let mut headers: Multimap = into_headers_put_object(
self.extra_headers,
self.user_metadata,
self.sse,
self.tags,
self.retention,
self.legal_hold,
self.content_type,
)?;
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
Ok(S3Request::builder()
.client(self.client)
.method(Method::POST)
.region(self.region)
.bucket(self.bucket)
.object(self.object)
.query_params(insert(self.extra_query_params, "uploads"))
.headers(headers)
.build())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct AbortMultipartUpload {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(setter(into))]
upload_id: UploadId,
}
pub type AbortMultipartUploadBldr = AbortMultipartUploadBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(UploadId,),
)>;
impl S3Api for AbortMultipartUpload {
type S3Response = AbortMultipartUploadResponse;
}
impl ToS3Request for AbortMultipartUpload {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
let headers: Multimap = self.extra_headers.unwrap_or_default();
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add("uploadId", url_encode(self.upload_id.as_str()).to_string());
Ok(S3Request::builder()
.client(self.client)
.method(Method::DELETE)
.region(self.region)
.bucket(self.bucket)
.object(self.object)
.query_params(query_params)
.headers(headers)
.build())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct CompleteMultipartUpload {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(setter(into))]
upload_id: UploadId,
#[builder(!default)] parts: Vec<PartInfo>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
pub type CompleteMultipartUploadBldr = CompleteMultipartUploadBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(UploadId,),
(Vec<PartInfo>,),
(),
)>;
impl S3Api for CompleteMultipartUpload {
type S3Response = CompleteMultipartUploadResponse;
}
impl ToS3Request for CompleteMultipartUpload {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
{
if self.upload_id.is_empty() {
return Err(ValidationErr::InvalidUploadId(
"upload ID cannot be empty".into(),
));
}
if self.parts.is_empty() {
return Err(ValidationErr::EmptyParts("parts cannot be empty".into()));
}
}
let bytes: Bytes = {
let mut data = BytesMut::with_capacity(200 * self.parts.len() + 100);
data.extend_from_slice(b"<CompleteMultipartUpload>");
for part in self.parts.iter() {
data.extend_from_slice(b"<Part><PartNumber>");
data.extend_from_slice(part.number.to_string().as_bytes());
data.extend_from_slice(b"</PartNumber><ETag>");
data.extend_from_slice(part.etag.as_str().as_bytes());
data.extend_from_slice(b"</ETag>");
if let Some((algorithm, ref value)) = part.checksum {
let (open_tag, close_tag) = match algorithm {
ChecksumAlgorithm::CRC32 => {
(&b"<ChecksumCRC32>"[..], &b"</ChecksumCRC32>"[..])
}
ChecksumAlgorithm::CRC32C => {
(&b"<ChecksumCRC32C>"[..], &b"</ChecksumCRC32C>"[..])
}
ChecksumAlgorithm::SHA1 => {
(&b"<ChecksumSHA1>"[..], &b"</ChecksumSHA1>"[..])
}
ChecksumAlgorithm::SHA256 => {
(&b"<ChecksumSHA256>"[..], &b"</ChecksumSHA256>"[..])
}
ChecksumAlgorithm::CRC64NVME => {
(&b"<ChecksumCRC64NVME>"[..], &b"</ChecksumCRC64NVME>"[..])
}
};
data.extend_from_slice(open_tag);
data.extend_from_slice(value.as_bytes());
data.extend_from_slice(close_tag);
}
data.extend_from_slice(b"</Part>");
}
data.extend_from_slice(b"</CompleteMultipartUpload>");
data.freeze()
};
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
{
headers.add(CONTENT_TYPE, "application/xml");
headers.add(CONTENT_MD5, md5sum_hash(bytes.as_ref()));
if let Some(algorithm) = self.checksum_algorithm {
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
}
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add("uploadId", self.upload_id.as_str());
let body = Arc::new(SegmentedBytes::from(bytes));
Ok(S3Request::builder()
.client(self.client)
.method(Method::POST)
.region(self.region)
.bucket(self.bucket)
.object(self.object)
.query_params(query_params)
.headers(headers)
.body(body)
.build())
}
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct UploadPart {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default = false)]
legal_hold: bool,
#[builder(!default)] data: Arc<SegmentedBytes>,
#[builder(default, setter(into))]
content_type: Option<String>,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))] upload_id: Option<String>,
#[builder(default, setter(into))] part_number: Option<u16>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
#[builder(default = false)]
use_trailing_checksum: bool,
#[builder(default = false)]
use_signed_streaming: bool,
}
pub type UploadPartBldr = UploadPartBuilder<(
(MinioClient,),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(Arc<SegmentedBytes>,),
(),
(),
(Option<String>,),
(Option<u16>,),
(),
(),
(),
)>;
impl S3Api for UploadPart {
type S3Response = UploadPartResponse;
}
impl ToS3Request for UploadPart {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
{
if let Some(upload_id) = &self.upload_id
&& upload_id.is_empty()
{
return Err(ValidationErr::InvalidUploadId(
"upload ID cannot be empty".into(),
));
}
if let Some(part_number) = self.part_number
&& !(1..=MAX_MULTIPART_COUNT).contains(&part_number)
{
return Err(ValidationErr::InvalidPartNumber(format!(
"part number must be between 1 and {MAX_MULTIPART_COUNT}"
)));
}
}
let mut headers: Multimap = into_headers_put_object(
self.extra_headers,
self.user_metadata,
self.sse,
self.tags,
self.retention,
self.legal_hold,
self.content_type,
)?;
let trailing_checksum = if self.use_trailing_checksum && self.checksum_algorithm.is_some() {
self.checksum_algorithm
} else {
None
};
if let Some(algorithm) = self.checksum_algorithm
&& !self.use_trailing_checksum
{
let checksum_value = compute_checksum_sb(algorithm, &self.data);
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
match algorithm {
ChecksumAlgorithm::CRC32 => headers.add(X_AMZ_CHECKSUM_CRC32, checksum_value),
ChecksumAlgorithm::CRC32C => headers.add(X_AMZ_CHECKSUM_CRC32C, checksum_value),
ChecksumAlgorithm::SHA1 => headers.add(X_AMZ_CHECKSUM_SHA1, checksum_value),
ChecksumAlgorithm::SHA256 => headers.add(X_AMZ_CHECKSUM_SHA256, checksum_value),
ChecksumAlgorithm::CRC64NVME => {
headers.add(X_AMZ_CHECKSUM_CRC64NVME, checksum_value)
}
}
}
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
if let Some(upload_id) = self.upload_id {
query_params.add("uploadId", upload_id);
}
if let Some(part_number) = self.part_number {
query_params.add("partNumber", part_number.to_string());
}
Ok(S3Request::builder()
.client(self.client)
.method(Method::PUT)
.region(self.region)
.bucket(self.bucket)
.query_params(query_params)
.object(self.object)
.headers(headers)
.body(self.data)
.trailing_checksum(trailing_checksum)
.use_signed_streaming(self.use_signed_streaming)
.build())
}
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct PutObject {
pub(crate) inner: UploadPart,
}
pub type PutObjectBldr = PutObjectBuilder<((UploadPart,),)>;
impl S3Api for PutObject {
type S3Response = PutObjectResponse;
}
impl ToS3Request for PutObject {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
self.inner.to_s3request()
}
}
#[derive(TypedBuilder)]
pub struct PutObjectContent {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
user_metadata: Option<Multimap>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(default, setter(into))]
tags: Option<HashMap<String, String>>,
#[builder(default, setter(into))]
retention: Option<Retention>,
#[builder(default = false)]
legal_hold: bool,
#[builder(default, setter(into))]
part_size: Size,
#[builder(default, setter(into))]
content_type: Option<String>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
#[builder(default = false)]
use_trailing_checksum: bool,
#[builder(default = false)]
use_signed_streaming: bool,
#[builder(!default, setter(into))] input_content: ObjectContent,
#[builder(default, setter(skip))]
content_stream: ContentStream,
#[builder(default, setter(skip))]
part_count: Option<u16>,
}
pub type PutObjectContentBldr = PutObjectContentBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(),
(),
(),
(),
(),
(),
(),
(),
(ObjectContent,),
)>;
impl PutObjectContent {
pub async fn send(mut self) -> Result<PutObjectContentResponse, Error> {
check_sse(&self.sse, &self.client)?;
let input_content = std::mem::take(&mut self.input_content);
self.content_stream = input_content
.to_content_stream()
.await
.map_err(IoError::from)?;
let object_size = self.content_stream.get_size();
let (part_size, expected_parts) = calc_part_info(object_size, self.part_size)?;
self.part_size = Size::Known(part_size);
self.part_count = expected_parts;
let seg_bytes = self
.content_stream
.read_upto(part_size as usize)
.await
.map_err(IoError::from)?;
if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size)
|| expected_parts == Some(1)
{
let size = seg_bytes.len() as u64;
let resp: PutObjectResponse = PutObject::builder()
.inner(UploadPart {
client: self.client.clone(),
extra_headers: self.extra_headers.clone(),
extra_query_params: self.extra_query_params.clone(),
bucket: self.bucket.clone(),
object: self.object.clone(),
region: self.region.clone(),
user_metadata: self.user_metadata.clone(),
sse: self.sse.clone(),
tags: self.tags.clone(),
retention: self.retention.clone(),
legal_hold: self.legal_hold,
part_number: None,
upload_id: None,
data: Arc::new(seg_bytes),
content_type: self.content_type.clone(),
checksum_algorithm: self.checksum_algorithm,
use_trailing_checksum: self.use_trailing_checksum,
use_signed_streaming: self.use_signed_streaming,
})
.build()
.send()
.await?;
Ok(PutObjectContentResponse::new(resp, size))
} else if let Some(expected) = object_size.value()
&& (seg_bytes.len() as u64) < part_size
{
let got: u64 = seg_bytes.len() as u64;
Err(ValidationErr::InsufficientData { expected, got }.into())
} else {
let create_mpu_resp: CreateMultipartUploadResponse = CreateMultipartUpload::builder()
.client(self.client.clone())
.extra_headers(self.extra_headers.clone())
.extra_query_params(self.extra_query_params.clone())
.region(self.region.clone())
.bucket(&self.bucket)
.object(&self.object)
.user_metadata(self.user_metadata.clone())
.sse(self.sse.clone())
.tags(self.tags.clone())
.retention(self.retention.clone())
.legal_hold(self.legal_hold)
.content_type(self.content_type.clone())
.checksum_algorithm(self.checksum_algorithm)
.build()
.send()
.await?;
let client = self.client.clone();
let bucket = self.bucket.clone();
let object = self.object.clone();
let upload_id: UploadId = create_mpu_resp.upload_id().await?;
let mpu_res = self
.send_mpu(part_size, upload_id.clone(), object_size, seg_bytes)
.await;
if mpu_res.is_err() {
let _ = AbortMultipartUpload::builder()
.client(client)
.bucket(bucket)
.object(object)
.upload_id(upload_id)
.build()
.send()
.await;
}
mpu_res
}
}
async fn send_mpu(
mut self,
part_size: u64,
upload_id: UploadId,
object_size: Size,
first_part: SegmentedBytes,
) -> Result<PutObjectContentResponse, Error> {
let mut done = false;
let mut part_number = 0;
let mut parts: Vec<PartInfo> = if let Some(pc) = self.part_count {
Vec::with_capacity(pc as usize)
} else {
Vec::new()
};
let mut first_part = Some(first_part);
let mut total_read = 0;
while !done {
let part_content = {
if let Some(v) = first_part.take() {
v
} else {
self.content_stream
.read_upto(part_size as usize)
.await
.map_err(IoError::from)?
}
};
part_number += 1;
let buffer_size = part_content.len() as u64;
total_read += buffer_size;
assert!(buffer_size <= part_size, "{buffer_size} <= {part_size}",);
if (buffer_size == 0) && (part_number > 1) {
break;
}
if self.part_count.is_none() && (part_number > MAX_MULTIPART_COUNT) {
return Err(ValidationErr::TooManyParts(part_number as u64).into());
}
if let Some(exp) = object_size.value()
&& exp < total_read
{
return Err(ValidationErr::TooMuchData(exp).into());
}
let resp: UploadPartResponse = UploadPart {
client: self.client.clone(),
extra_headers: self.extra_headers.clone(),
extra_query_params: self.extra_query_params.clone(),
bucket: self.bucket.clone(),
object: self.object.clone(),
region: self.region.clone(),
user_metadata: None,
sse: self.sse.clone(),
tags: self.tags.clone(),
retention: self.retention.clone(),
legal_hold: self.legal_hold,
part_number: Some(part_number),
upload_id: Some(upload_id.to_string()),
data: Arc::new(part_content),
content_type: self.content_type.clone(),
checksum_algorithm: self.checksum_algorithm,
use_trailing_checksum: self.use_trailing_checksum,
use_signed_streaming: self.use_signed_streaming,
}
.send()
.await?;
let checksum = self
.checksum_algorithm
.and_then(|alg| resp.get_checksum(alg).map(|v| (alg, v)));
parts.push(PartInfo::new(
part_number,
resp.etag()?,
buffer_size,
checksum,
));
if buffer_size < part_size {
done = true;
}
}
let size = parts.iter().map(|p| p.size).sum();
if let Some(expected) = object_size.value()
&& expected != size
{
return Err(ValidationErr::InsufficientData {
expected,
got: size,
}
.into());
}
let resp: CompleteMultipartUploadResponse = CompleteMultipartUpload {
client: self.client,
extra_headers: self.extra_headers,
extra_query_params: self.extra_query_params,
bucket: self.bucket,
object: self.object,
region: self.region,
parts,
upload_id,
checksum_algorithm: self.checksum_algorithm,
}
.send()
.await?;
Ok(PutObjectContentResponse::new(resp, size))
}
}
fn into_headers_put_object(
extra_headers: Option<Multimap>,
user_metadata: Option<Multimap>,
sse: Option<Arc<dyn Sse>>,
tags: Option<HashMap<String, String>>,
retention: Option<Retention>,
legal_hold: bool,
content_type: Option<String>,
) -> Result<Multimap, ValidationErr> {
let mut map = Multimap::new();
if let Some(v) = extra_headers {
map.add_multimap(v);
}
if let Some(v) = user_metadata {
for (k, _) in v.iter() {
if k.is_empty() {
return Err(ValidationErr::InvalidUserMetadata(
"user metadata key cannot be empty".into(),
));
}
if !k.starts_with("x-amz-meta-") {
return Err(ValidationErr::InvalidUserMetadata(format!(
"user metadata key '{k}' does not start with 'x-amz-meta-'",
)));
}
}
map.add_multimap(v);
}
if let Some(v) = sse {
map.add_multimap(v.headers());
}
if let Some(v) = tags {
let tagging = encode_tags(&v);
if !tagging.is_empty() {
map.insert(X_AMZ_TAGGING.into(), tagging);
}
}
if let Some(v) = retention {
map.insert(X_AMZ_OBJECT_LOCK_MODE.into(), v.mode.to_string());
map.insert(
X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE.into(),
to_iso8601utc(v.retain_until_date),
);
}
if legal_hold {
map.insert(X_AMZ_OBJECT_LOCK_LEGAL_HOLD.into(), "ON".into());
}
if !map.contains_key(CONTENT_TYPE) {
map.insert(
CONTENT_TYPE.into(),
content_type.unwrap_or("application/octet-stream".into()),
);
}
Ok(map)
}
pub const MIN_PART_SIZE: u64 = 5 * 1024 * 1024;
pub const DEFAULT_PART_SIZE: u64 = 64 * 1024 * 1024;
pub const MAX_PART_SIZE: u64 = 1024 * MIN_PART_SIZE;
pub const MAX_OBJECT_SIZE: u64 = 1024 * MAX_PART_SIZE;
pub const MAX_MULTIPART_COUNT: u16 = 10_000;
pub fn calc_part_info(
object_size: Size,
part_size: Size,
) -> Result<(u64, Option<u16>), ValidationErr> {
if let Size::Known(v) = part_size {
if v < MIN_PART_SIZE {
return Err(ValidationErr::InvalidMinPartSize(v));
}
if v > MAX_PART_SIZE {
return Err(ValidationErr::InvalidMaxPartSize(v));
}
}
if let Size::Known(v) = object_size
&& v > MAX_OBJECT_SIZE
{
return Err(ValidationErr::InvalidObjectSize(v));
}
match (object_size, part_size) {
(Size::Unknown, Size::Unknown) => Err(ValidationErr::MissingPartSize),
(Size::Unknown, Size::Known(part_size)) => Ok((part_size, None)),
(Size::Known(object_size), Size::Unknown) => {
let mut psize = if object_size > DEFAULT_PART_SIZE * MAX_MULTIPART_COUNT as u64 {
let raw = (object_size as f64 / MAX_MULTIPART_COUNT as f64).ceil() as u64;
MIN_PART_SIZE * (raw as f64 / MIN_PART_SIZE as f64).ceil() as u64
} else {
DEFAULT_PART_SIZE
};
if psize > object_size {
psize = object_size;
}
let part_count = if psize > 0 {
(object_size as f64 / psize as f64).ceil() as u16
} else {
1
};
Ok((psize, Some(part_count)))
}
(Size::Known(object_size), Size::Known(part_size)) => {
let part_count = (object_size as f64 / part_size as f64).ceil() as u16;
if part_count == 0 || part_count > MAX_MULTIPART_COUNT {
return Err(ValidationErr::InvalidPartCount {
object_size,
part_size,
part_count: MAX_MULTIPART_COUNT,
});
}
Ok((part_size, Some(part_count)))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn calc_part_info_uses_default_part_size_below_threshold() {
let (psize, count) = calc_part_info(Size::Known(100 * 1024 * 1024), Size::Unknown).unwrap();
assert_eq!(psize, DEFAULT_PART_SIZE);
assert_eq!(count, Some(2)); }
#[test]
fn calc_part_info_uses_default_at_threshold() {
let object_size = DEFAULT_PART_SIZE * MAX_MULTIPART_COUNT as u64;
let (psize, count) = calc_part_info(Size::Known(object_size), Size::Unknown).unwrap();
assert_eq!(psize, DEFAULT_PART_SIZE);
assert_eq!(count, Some(MAX_MULTIPART_COUNT));
}
#[test]
fn calc_part_info_scales_up_above_threshold() {
let object_size = DEFAULT_PART_SIZE * MAX_MULTIPART_COUNT as u64 + 1;
let (psize, count) = calc_part_info(Size::Known(object_size), Size::Unknown).unwrap();
assert!(psize > DEFAULT_PART_SIZE);
assert_eq!(psize % MIN_PART_SIZE, 0);
assert!(count.unwrap() <= MAX_MULTIPART_COUNT);
}
#[test]
fn calc_part_info_scales_up_near_max_object_size() {
let (psize, count) = calc_part_info(Size::Known(MAX_OBJECT_SIZE), Size::Unknown).unwrap();
assert_eq!(psize % MIN_PART_SIZE, 0);
assert!((MIN_PART_SIZE..=MAX_PART_SIZE).contains(&psize));
let c = count.unwrap();
assert!(c > 0 && c <= MAX_MULTIPART_COUNT);
assert!(psize.saturating_mul(c as u64) >= MAX_OBJECT_SIZE);
}
#[test]
fn calc_part_info_clamps_small_object_to_single_part() {
let (psize, count) = calc_part_info(Size::Known(1024 * 1024), Size::Unknown).unwrap();
assert_eq!(psize, 1024 * 1024);
assert_eq!(count, Some(1));
}
quickcheck! {
fn test_calc_part_info(object_size: Size, part_size: Size) -> bool {
let res = calc_part_info(object_size, part_size);
if let Size::Known(v) = part_size {
if v < MIN_PART_SIZE {
return match res {
Err(ValidationErr::InvalidMinPartSize(v_err)) => v == v_err,
_ => false,
}
}
if v > MAX_PART_SIZE {
return match res {
Err(ValidationErr::InvalidMaxPartSize(v_err)) => v == v_err,
_ => false,
}
}
}
if let Size::Known(v) = object_size
&& v > MAX_OBJECT_SIZE {
return match res {
Err(ValidationErr::InvalidObjectSize(v_err)) => v == v_err,
_ => false,
}
}
match (object_size, part_size, res) {
(Size::Unknown, Size::Unknown, Err(ValidationErr::MissingPartSize)) => true,
(Size::Unknown, Size::Unknown, _) => false,
(Size::Unknown, Size::Known(part_size), Ok((psize, None))) => {
psize == part_size
}
(Size::Unknown, Size::Known(_), _) => false,
(Size::Known(object_size), Size::Unknown, Ok((psize, Some(part_count)))) => {
if object_size < MIN_PART_SIZE {
return psize == object_size && part_count == 1;
}
if !(MIN_PART_SIZE..=MAX_PART_SIZE).contains(&psize){
return false;
}
if psize > object_size {
return false;
}
(part_count > 0) && (part_count <= MAX_MULTIPART_COUNT)
}
(Size::Known(_), Size::Unknown, _) => false,
(Size::Known(object_size), Size::Known(part_size), res) => {
if (part_size > object_size) || ((part_size * (MAX_MULTIPART_COUNT as u64)) < object_size) {
return match res {
Err(ValidationErr::InvalidPartCount{object_size:v1, part_size:v2, part_count:v3}) => {
(v1 == object_size) && (v2 == part_size) && (v3 == MAX_MULTIPART_COUNT)
}
_ => false,
}
}
match res {
Ok((psize, part_count)) => {
let expected_part_count = (object_size as f64 / part_size as f64).ceil() as u16;
(psize == part_size) && (part_count == Some(expected_part_count))
}
_ => false,
}
}
}
}
}
}