use crate::s3::Client;
use crate::s3::builders::{
ContentStream, MAX_MULTIPART_COUNT, ObjectContent, Size, calc_part_info,
};
use crate::s3::error::Error;
use crate::s3::multimap::{Multimap, MultimapExt};
use crate::s3::response::{AppendObjectResponse, StatObjectResponse};
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse;
use crate::s3::types::{S3Api, S3Request, ToS3Request};
use crate::s3::utils::{check_bucket_name, check_object_name};
use http::Method;
use std::sync::Arc;
#[derive(Clone, Debug, Default)]
pub struct AppendObject {
client: Client,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
bucket: String,
object: String,
region: Option<String>,
sse: Option<Arc<dyn Sse>>,
data: SegmentedBytes,
offset_bytes: u64,
}
impl AppendObject {
pub fn new(
client: Client,
bucket: String,
object: String,
data: SegmentedBytes,
offset_bytes: u64,
) -> Self {
Self {
client,
bucket,
object,
offset_bytes,
data,
..Default::default()
}
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
}
impl S3Api for AppendObject {
type S3Response = AppendObjectResponse;
}
impl ToS3Request for AppendObject {
fn to_s3request(self) -> Result<S3Request, Error> {
{
check_bucket_name(&self.bucket, true)?;
check_object_name(&self.object)?;
if let Some(v) = &self.sse {
if v.tls_required() && !self.client.is_secure() {
return Err(Error::SseTlsRequired(None));
}
}
}
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
headers.add("x-amz-write-offset-bytes", self.offset_bytes.to_string());
Ok(S3Request::new(self.client, Method::PUT)
.region(self.region)
.bucket(Some(self.bucket))
.query_params(self.extra_query_params.unwrap_or_default())
.object(Some(self.object))
.headers(headers)
.body(Some(self.data)))
}
}
#[derive(Default)]
pub struct AppendObjectContent {
client: Client,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<String>,
bucket: String,
object: String,
sse: Option<Arc<dyn Sse>>,
part_size: Size,
input_content: ObjectContent,
content_stream: ContentStream,
part_count: Option<u16>,
offset_bytes: u64,
}
impl AppendObjectContent {
pub fn new(
client: Client,
bucket: String,
object: String,
content: impl Into<ObjectContent>,
) -> Self {
Self {
client,
bucket,
object,
input_content: content.into(),
extra_headers: None,
extra_query_params: None,
region: None,
sse: None,
part_size: Size::Unknown,
content_stream: ContentStream::empty(),
part_count: None,
offset_bytes: 0,
}
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<String>) -> Self {
self.region = region;
self
}
pub fn part_size(mut self, part_size: impl Into<Size>) -> Self {
self.part_size = part_size.into();
self
}
pub fn offset_bytes(mut self, offset_bytes: u64) -> Self {
self.offset_bytes = offset_bytes;
self
}
pub async fn send(mut self) -> Result<AppendObjectResponse, Error> {
{
check_bucket_name(&self.bucket, true)?;
check_object_name(&self.object)?;
if let Some(v) = &self.sse {
if v.tls_required() && !self.client.is_secure() {
return Err(Error::SseTlsRequired(None));
}
}
}
{
let mut headers: Multimap = match self.extra_headers {
Some(ref headers) => headers.clone(),
None => Multimap::new(),
};
headers.add("x-amz-write-offset-bytes", self.offset_bytes.to_string());
self.extra_query_params = Some(headers);
}
self.content_stream = std::mem::take(&mut self.input_content)
.to_content_stream()
.await
.map_err(Error::IOError)?;
let object_size = self.content_stream.get_size();
let (part_size, n_expected_parts) = calc_part_info(object_size, self.part_size)?;
self.part_size = Size::Known(part_size);
self.part_count = n_expected_parts;
let seg_bytes = self.content_stream.read_upto(part_size as usize).await?;
let resp: StatObjectResponse = self
.client
.stat_object(&self.bucket, &self.object)
.send()
.await?;
let current_file_size = resp.size;
if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size)
|| n_expected_parts == Some(1)
{
let ao = AppendObject {
client: self.client,
extra_headers: self.extra_headers,
extra_query_params: self.extra_query_params,
bucket: self.bucket,
object: self.object,
region: self.region,
offset_bytes: current_file_size,
sse: self.sse,
data: seg_bytes,
};
ao.send().await
} else if object_size.is_known() && (seg_bytes.len() as u64) < part_size {
let expected = object_size.as_u64().unwrap();
let got = seg_bytes.len() as u64;
Err(Error::InsufficientData(expected, got))
} else {
self.send_mpa(part_size, current_file_size, seg_bytes).await
}
}
async fn send_mpa(
&mut self,
part_size: u64,
object_size: u64,
first_part: SegmentedBytes,
) -> Result<AppendObjectResponse, Error> {
let mut done = false;
let mut part_number = 0;
let mut last_resp: Option<AppendObjectResponse> = None;
let mut next_offset_bytes: u64 = object_size;
let mut first_part = Some(first_part);
while !done {
let part_content: SegmentedBytes = {
if let Some(v) = first_part.take() {
v
} else {
self.content_stream.read_upto(part_size as usize).await?
}
};
part_number += 1;
let buffer_size = part_content.len() as u64;
assert!(buffer_size <= part_size, "{buffer_size} <= {part_size}",);
if buffer_size == 0 && part_number > 1 {
break;
}
if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT {
return Err(Error::TooManyParts);
}
let append_object = AppendObject {
client: self.client.clone(),
extra_headers: self.extra_headers.clone(),
extra_query_params: self.extra_query_params.clone(),
bucket: self.bucket.clone(),
object: self.object.clone(),
region: self.region.clone(),
sse: self.sse.clone(),
data: part_content,
offset_bytes: next_offset_bytes,
};
let resp: AppendObjectResponse = append_object.send().await?;
next_offset_bytes = resp.object_size;
if buffer_size < part_size {
done = true;
last_resp = Some(resp);
}
}
Ok(last_resp.unwrap())
}
}