use crate::s3::builders::{
ContentStream, MAX_MULTIPART_COUNT, ObjectContent, Size, calc_part_info,
};
use crate::s3::client::MinioClient;
use crate::s3::error::ValidationErr;
use crate::s3::error::{Error, IoError};
use crate::s3::header_constants::*;
use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::{AppendObjectResponse, StatObjectResponse};
use crate::s3::response_traits::HasObjectSize;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse;
use crate::s3::types::{BucketName, ObjectKey, Region, S3Api, S3Request, ToS3Request};
use crate::s3::utils::{ChecksumAlgorithm, check_sse, compute_checksum_sb};
use http::Method;
use std::sync::Arc;
use typed_builder::TypedBuilder;
#[derive(Clone, Debug, TypedBuilder)]
pub struct AppendObject {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(default, setter(into))]
sse: Option<Arc<dyn Sse>>,
#[builder(!default)] data: Arc<SegmentedBytes>,
#[builder(!default)] offset_bytes: u64,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
impl S3Api for AppendObject {
type S3Response = AppendObjectResponse;
}
pub type AppendObjectBldr = AppendObjectBuilder<(
(MinioClient,),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(Arc<SegmentedBytes>,),
(u64,),
(),
)>;
impl ToS3Request for AppendObject {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
check_sse(&self.sse, &self.client)?;
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string());
if let Some(algorithm) = self.checksum_algorithm {
let checksum_value = compute_checksum_sb(algorithm, &self.data);
headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());
match algorithm {
ChecksumAlgorithm::CRC32 => headers.add(X_AMZ_CHECKSUM_CRC32, checksum_value),
ChecksumAlgorithm::CRC32C => headers.add(X_AMZ_CHECKSUM_CRC32C, checksum_value),
ChecksumAlgorithm::SHA1 => headers.add(X_AMZ_CHECKSUM_SHA1, checksum_value),
ChecksumAlgorithm::SHA256 => headers.add(X_AMZ_CHECKSUM_SHA256, checksum_value),
ChecksumAlgorithm::CRC64NVME => {
headers.add(X_AMZ_CHECKSUM_CRC64NVME, checksum_value)
}
}
}
Ok(S3Request::builder()
.client(self.client)
.method(Method::PUT)
.region(self.region)
.bucket(self.bucket)
.query_params(self.extra_query_params.unwrap_or_default())
.object(self.object)
.headers(headers)
.body(self.data)
.build())
}
}
#[derive(TypedBuilder)]
pub struct AppendObjectContent {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into), !default)]
bucket: BucketName,
#[builder(setter(into), !default)]
object: ObjectKey,
#[builder(default)]
sse: Option<Arc<dyn Sse>>,
#[builder(default = Size::Unknown)]
part_size: Size,
#[builder(!default, setter(into))]
input_content: ObjectContent,
#[builder(default = ContentStream::empty())]
content_stream: ContentStream,
#[builder(default)]
part_count: Option<u16>,
#[builder(default, setter(into))]
checksum_algorithm: Option<ChecksumAlgorithm>,
}
pub type AppendObjectContentBldr = AppendObjectContentBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectKey,),
(),
(),
(ObjectContent,),
(),
(),
(),
)>;
impl AppendObjectContent {
pub async fn send(mut self) -> Result<AppendObjectResponse, Error> {
check_sse(&self.sse, &self.client)?;
self.content_stream = std::mem::take(&mut self.input_content)
.to_content_stream()
.await
.map_err(IoError::from)?;
let object_size = self.content_stream.get_size();
let (part_size, n_expected_parts) = calc_part_info(object_size, self.part_size)?;
self.part_size = Size::Known(part_size);
self.part_count = n_expected_parts;
let seg_bytes = self
.content_stream
.read_upto(part_size as usize)
.await
.map_err(IoError::from)?;
let resp: StatObjectResponse = self
.client
.stat_object(&self.bucket, &self.object)?
.build()
.send()
.await?;
let current_file_size = resp.size()?;
if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size)
|| n_expected_parts == Some(1)
{
let ao = AppendObject {
client: self.client,
extra_headers: self.extra_headers,
extra_query_params: self.extra_query_params,
bucket: self.bucket,
object: self.object,
region: self.region,
offset_bytes: current_file_size,
sse: self.sse,
data: Arc::new(seg_bytes),
checksum_algorithm: self.checksum_algorithm,
};
ao.send().await
} else if let Some(expected) = object_size.value()
&& (seg_bytes.len() as u64) < part_size
{
let got = seg_bytes.len() as u64;
Err(ValidationErr::InsufficientData { expected, got })?
} else {
self.send_mpa(part_size, current_file_size, seg_bytes).await
}
}
async fn send_mpa(
&mut self,
part_size: u64,
object_size: u64,
first_part: SegmentedBytes,
) -> Result<AppendObjectResponse, Error> {
let mut done = false;
let mut part_number = 0;
let mut last_resp: Option<AppendObjectResponse> = None;
let mut next_offset_bytes: u64 = object_size;
let mut first_part = Some(first_part);
while !done {
let part_content: SegmentedBytes = {
if let Some(v) = first_part.take() {
v
} else {
self.content_stream
.read_upto(part_size as usize)
.await
.map_err(IoError::from)?
}
};
part_number += 1;
let buffer_size = part_content.len() as u64;
assert!(buffer_size <= part_size, "{buffer_size} <= {part_size}",);
if buffer_size == 0 && part_number > 1 {
break;
}
if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT {
return Err(ValidationErr::TooManyParts(part_number as u64).into());
}
let append_object = AppendObject::builder()
.client(self.client.clone())
.extra_headers(self.extra_headers.clone())
.extra_query_params(self.extra_query_params.clone())
.bucket(&self.bucket)
.object(&self.object)
.region(self.region.clone())
.sse(self.sse.clone())
.data(Arc::new(part_content))
.offset_bytes(next_offset_bytes)
.checksum_algorithm(self.checksum_algorithm)
.build();
let resp: AppendObjectResponse = append_object.send().await?;
next_offset_bytes = resp.object_size();
last_resp = Some(resp);
if buffer_size < part_size {
done = true;
}
}
Ok(last_resp.expect("send_mpa always uploads at least one part"))
}
}