use std::fmt::Debug;
use async_channel::{Receiver, Sender, bounded, unbounded};
use bytes::Bytes;
use futures::future::RemoteHandle;
use futures::task::SpawnExt as _;
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_client::error::{ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{
ChecksumAlgorithm, ChecksumMode, ETag, HeadObjectParams, PutObjectResult, PutObjectSingleParams, UploadChecksum,
};
use tracing::{Instrument, debug_span, trace};
use crate::ServerSideEncryption;
use crate::async_util::Runtime;
use crate::mem_limiter::{BufferArea, MemoryLimiter};
use crate::memory::{BufferKind, PagedPool, PoolBufferMut};
use crate::sync::Arc;
use super::hasher::ChecksumHasher;
use super::{ChecksumHasherError, UploadError};
#[derive(Debug)]
pub struct AppendUploadRequest<Client: ObjectClient> {
buffer: Option<UploadBuffer>,
offset: u64,
buffer_size: usize,
upload_queue: AppendUploadQueue<Client>,
}
pub struct AppendUploadQueueParams {
pub bucket: String,
pub key: String,
pub initial_offset: u64,
pub initial_etag: Option<ETag>,
pub server_side_encryption: ServerSideEncryption,
pub default_checksum_algorithm: Option<ChecksumAlgorithm>,
pub capacity: usize,
}
impl<Client> AppendUploadRequest<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
pub(super) fn new(
runtime: &Runtime,
client: Client,
buffer_size: usize,
pool: PagedPool,
mem_limiter: Arc<MemoryLimiter>,
params: AppendUploadQueueParams,
) -> Self {
let offset = params.initial_offset;
let upload_queue = AppendUploadQueue::new(runtime, client, pool, mem_limiter, params);
Self {
buffer: None,
upload_queue,
offset,
buffer_size,
}
}
pub async fn write(&mut self, offset: u64, data: &[u8]) -> Result<usize, UploadError<Client::ClientError>> {
self.upload_queue.verify().await?;
if offset != self.offset {
return Err(UploadError::OutOfOrderWrite {
write_offset: offset,
expected_offset: self.offset,
});
}
let mut slice = data;
while !slice.is_empty() {
let buffer = match self.buffer.as_mut() {
Some(buffer) => buffer,
None => self
.buffer
.insert(self.upload_queue.get_buffer(self.buffer_size).await?),
};
let len = slice.len();
slice = buffer.write(slice)?;
self.offset += (len - slice.len()) as u64;
if buffer.is_full() {
trace!("push full buffer to append queue");
self.upload_queue.push(self.buffer.take().unwrap()).await?;
}
}
Ok((self.offset - offset) as usize)
}
pub async fn complete(mut self) -> Result<Option<PutObjectResult>, UploadError<Client::ClientError>> {
if let Some(buffer) = self.buffer.take() {
trace!("push remaining buffer to append queue");
self.upload_queue.push(buffer).await?;
} else if self.offset == 0 {
trace!("push empty buffer to append queue");
let empty_buffer = self.upload_queue.get_buffer(0).await?;
self.upload_queue.push(empty_buffer).await?;
}
self.upload_queue.join().await
}
pub fn current_offset(&self) -> u64 {
self.offset
}
}
#[derive(Debug)]
enum AppendUploadEvent<E> {
ChecksumAlgorithm(Option<ChecksumAlgorithm>),
PutResponse(PutObjectResult),
Error(UploadError<E>),
}
#[derive(Debug)]
struct AppendUploadQueue<Client: ObjectClient> {
buffer_sender: Sender<UploadBuffer>,
event_receiver: Receiver<AppendUploadEvent<Client::ClientError>>,
pool: PagedPool,
mem_limiter: Arc<MemoryLimiter>,
_task_handle: RemoteHandle<()>,
checksum_algorithm: Option<Option<ChecksumAlgorithm>>,
last_known_result: Option<PutObjectResult>,
requests_in_queue: usize,
}
impl<Client> AppendUploadQueue<Client>
where
Client: ObjectClient + Send + Sync + 'static,
{
pub fn new(
runtime: &Runtime,
client: Client,
pool: PagedPool,
mem_limiter: Arc<MemoryLimiter>,
params: AppendUploadQueueParams,
) -> Self {
assert!(params.capacity > 0, "append queue capacity must be greater than 0");
let span = debug_span!("append", key = params.key, initial_offset = params.initial_offset);
let (buffer_sender, buffer_receiver) = bounded(params.capacity);
let (event_sender, event_receiver) = unbounded();
let task_handle = runtime
.spawn_with_handle(
async move {
match run_append_loop(client, params, &buffer_receiver, &event_sender).await {
Ok(()) => {}
Err(error) => {
trace!("append upload task failed");
buffer_receiver.close();
_ = event_sender.send(AppendUploadEvent::Error(error)).await;
}
}
trace!("append upload task finished");
}
.instrument(span),
)
.unwrap();
Self {
buffer_sender,
event_receiver,
last_known_result: None,
requests_in_queue: 0,
checksum_algorithm: None,
pool,
mem_limiter,
_task_handle: task_handle,
}
}
pub async fn push(&mut self, buffer: UploadBuffer) -> Result<(), UploadError<Client::ClientError>> {
if let Err(_send_error) = self.buffer_sender.send(buffer).await {
trace!("upload queue is already closed");
while self.consume_next_response().await? {}
return Err(UploadError::UploadAlreadyTerminated);
}
self.requests_in_queue += 1;
Ok(())
}
pub async fn verify(&mut self) -> Result<(), UploadError<Client::ClientError>> {
if self.buffer_sender.is_closed() {
trace!("upload queue is already closed");
while self.consume_next_response().await? {}
return Err(UploadError::UploadAlreadyTerminated);
}
Ok(())
}
pub async fn join(mut self) -> Result<Option<PutObjectResult>, UploadError<Client::ClientError>> {
let terminated = !self.buffer_sender.close();
while self.consume_next_response().await? {}
if terminated {
return Err(UploadError::UploadAlreadyTerminated);
}
Ok(self.last_known_result.take())
}
pub async fn get_buffer(&mut self, capacity: usize) -> Result<UploadBuffer, UploadError<Client::ClientError>> {
let checksum_algorithm = if let Some(checksum_algorithm) = self.checksum_algorithm.clone() {
checksum_algorithm
} else {
match self
.event_receiver
.recv()
.await
.map_err(|_| UploadError::UploadAlreadyTerminated)?
{
AppendUploadEvent::ChecksumAlgorithm(checksum_algorithm) => {
self.checksum_algorithm.insert(checksum_algorithm).clone()
}
AppendUploadEvent::PutResponse(_) => {
unreachable!("cannot receive a response event before the checksum algorithm");
}
AppendUploadEvent::Error(upload_error) => return Err(upload_error),
}
};
while self.requests_in_queue > 0 {
match UploadBuffer::try_new(capacity, &checksum_algorithm, &self.pool, self.mem_limiter.clone())? {
Some(buffer) => return Ok(buffer),
None => {
trace!("wait for the next request to be processed");
if !self.consume_next_response().await? {
return Err(UploadError::UploadAlreadyTerminated);
}
}
}
}
Ok(UploadBuffer::new(
capacity,
&checksum_algorithm,
&self.pool,
self.mem_limiter.clone(),
)?)
}
async fn consume_next_response(&mut self) -> Result<bool, UploadError<Client::ClientError>> {
loop {
let Ok(event) = self.event_receiver.recv().await else {
return Ok(false);
};
match event {
AppendUploadEvent::ChecksumAlgorithm(checksum_algorithm) => {
trace!(?checksum_algorithm, "received checksum algorithm");
self.checksum_algorithm = Some(checksum_algorithm);
continue;
}
AppendUploadEvent::PutResponse(result) => {
trace!(?result, "received result");
self.requests_in_queue -= 1;
self.last_known_result = Some(result);
return Ok(true);
}
AppendUploadEvent::Error(upload_error) => return Err(upload_error),
}
}
}
}
async fn get_checksum_algorithm<Client>(
client: &Client,
params: &AppendUploadQueueParams,
) -> Result<Option<ChecksumAlgorithm>, UploadError<Client::ClientError>>
where
Client: ObjectClient + Send + Sync + 'static,
{
if params.initial_offset == 0 {
return Ok(params.default_checksum_algorithm.clone());
}
let head_object = client
.head_object(
¶ms.bucket,
¶ms.key,
&HeadObjectParams::new().checksum_mode(Some(ChecksumMode::Enabled)),
)
.await?;
trace!(?head_object, "received head_object response");
if Some(&head_object.etag) != params.initial_etag.as_ref() {
trace!(?head_object, initial_etag=?params.initial_etag, "mismatching etag");
return Err(UploadError::PutRequestFailed(ObjectClientError::ServiceError(
PutObjectError::PreconditionFailed,
)));
}
Ok(head_object.checksum.algorithms().first().cloned())
}
async fn run_append_loop<Client>(
client: Client,
params: AppendUploadQueueParams,
buffer_receiver: &Receiver<UploadBuffer>,
event_sender: &Sender<AppendUploadEvent<Client::ClientError>>,
) -> Result<(), UploadError<Client::ClientError>>
where
Client: ObjectClient + Send + Sync + 'static,
{
let checksum_algorithm = get_checksum_algorithm(&client, ¶ms).await?;
event_sender
.send(AppendUploadEvent::ChecksumAlgorithm(checksum_algorithm))
.await
.map_err(|_| UploadError::UploadAlreadyTerminated)?;
let bucket = params.bucket;
let key = params.key;
let sse = params.server_side_encryption;
let mut etag = params.initial_etag;
let mut offset = params.initial_offset;
while let Ok(buffer) = buffer_receiver.recv().await {
let buffer_len = buffer.len();
let result = append(&client, &bucket, &key, buffer, offset, etag.take(), sse.clone()).await?;
offset += buffer_len as u64;
etag = Some(result.etag.clone());
event_sender
.send(AppendUploadEvent::PutResponse(result))
.await
.map_err(|_| UploadError::UploadAlreadyTerminated)?;
}
Ok(())
}
async fn append<Client: ObjectClient>(
client: &Client,
bucket: &str,
key: &str,
buffer: UploadBuffer,
offset: u64,
etag: Option<ETag>,
server_side_encryption: ServerSideEncryption,
) -> Result<PutObjectResult, UploadError<Client::ClientError>> {
trace!(key, offset, len = buffer.len(), "preparing PutObject request");
let (data, checksum) = buffer.freeze()?;
let mut request_params = if offset == 0 {
PutObjectSingleParams::new()
} else {
PutObjectSingleParams::new_for_append(offset).if_match(etag)
};
let (sse_type, key_id) = server_side_encryption
.into_inner()
.map_err(UploadError::SseCorruptedError)?;
request_params.checksum = checksum;
request_params.server_side_encryption = sse_type;
request_params.ssekms_key_id = key_id;
client
.put_object_single(bucket, key, &request_params, data)
.await
.map_err(UploadError::PutRequestFailed)
}
#[derive(Debug)]
struct UploadBuffer {
data: ReservedBuffer,
hasher: ChecksumHasher,
}
impl UploadBuffer {
fn new(
capacity: usize,
checksum_algorithm: &Option<ChecksumAlgorithm>,
pool: &PagedPool,
mem_limiter: Arc<MemoryLimiter>,
) -> Result<Self, ChecksumHasherError> {
let hasher = ChecksumHasher::new(checksum_algorithm)?;
mem_limiter.reserve(BufferArea::Upload, capacity as u64);
let data = ReservedBuffer {
buffer: pool.get_buffer_mut(capacity, BufferKind::Append),
mem_limiter,
};
Ok(Self { data, hasher })
}
fn try_new(
capacity: usize,
checksum_algorithm: &Option<ChecksumAlgorithm>,
pool: &PagedPool,
mem_limiter: Arc<MemoryLimiter>,
) -> Result<Option<Self>, ChecksumHasherError> {
if mem_limiter.try_reserve(BufferArea::Upload, capacity as u64) {
let hasher = ChecksumHasher::new(checksum_algorithm)?;
let data = ReservedBuffer {
buffer: pool.get_buffer_mut(capacity, BufferKind::Append),
mem_limiter,
};
Ok(Some(Self { data, hasher }))
} else {
Ok(None)
}
}
fn write<'a>(&mut self, mut slice: &'a [u8]) -> Result<&'a [u8], ChecksumHasherError> {
let remaining = self.data.buffer.append_from_slice(&mut slice);
self.hasher.update(slice)?;
Ok(remaining)
}
fn is_full(&self) -> bool {
self.data.buffer.is_full()
}
fn len(&self) -> usize {
self.data.buffer.len()
}
fn freeze(self) -> Result<(Bytes, Option<UploadChecksum>), ChecksumHasherError> {
let checksum = self.hasher.finalize()?;
let bytes = Bytes::from_owner(self.data);
Ok((bytes, checksum))
}
}
#[derive(Debug)]
struct ReservedBuffer {
buffer: PoolBufferMut,
mem_limiter: Arc<MemoryLimiter>,
}
impl Drop for ReservedBuffer {
fn drop(&mut self) {
self.mem_limiter
.release(BufferArea::Upload, self.buffer.capacity() as u64);
}
}
impl AsRef<[u8]> for ReservedBuffer {
fn as_ref(&self) -> &[u8] {
&self.buffer
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::time::Duration;
use crate::mem_limiter::MINIMUM_MEM_LIMIT;
use crate::memory::PagedPool;
use super::super::{Uploader, UploaderConfig};
use super::*;
use futures::executor::ThreadPool;
use mountpoint_s3_client::error::{ObjectClientError, PutObjectError};
use mountpoint_s3_client::failure_client::{CountdownFailureConfig, countdown_failure_client};
use mountpoint_s3_client::mock_client::{MockClient, MockObject};
use mountpoint_s3_client::types::{ChecksumAlgorithm, ETag, GetObjectParams, GetObjectResponse};
use test_case::{test_case, test_matrix};
use tokio::time::sleep;
fn new_uploader_for_test<Client>(
client: Client,
buffer_size: usize,
server_side_encryption: Option<ServerSideEncryption>,
default_checksum_algorithm: Option<ChecksumAlgorithm>,
) -> Uploader<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
let mem_limiter = MemoryLimiter::new(pool.clone(), MINIMUM_MEM_LIMIT);
Uploader::new(
client,
runtime,
pool,
mem_limiter.into(),
UploaderConfig::new(buffer_size)
.server_side_encryption(server_side_encryption.unwrap_or_default())
.default_checksum_algorithm(default_checksum_algorithm),
)
}
#[test_case(None)]
#[test_case(Some(MockObject::ramp(0xaa, 2 * 1024 * 1024, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::constant(0xab, 20, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([0xbb; 128])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc64nvme])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc32])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Sha1])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Sha256])))]
#[tokio::test]
async fn test_append_align_with_buffer(existing_object: Option<MockObject>) {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut existing_object = existing_object;
if let Some(object) = &mut existing_object {
client.add_object(key, object.clone());
expected_content.extend_from_slice(&object.read(0, object.len()));
}
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let append_data = [0xaa; 128];
expected_content.extend_from_slice(&append_data);
offset += upload_request
.write(offset, &append_data)
.await
.expect("write should succeed") as u64;
let buffer = upload_request.buffer.as_ref().unwrap();
assert_eq!(buffer_size, buffer.data.buffer.capacity());
assert_eq!(&append_data, &buffer.data.as_ref()[..buffer.len()]);
let append_data = [0xab; 128];
expected_content.extend_from_slice(&append_data);
upload_request
.write(offset, &append_data)
.await
.expect("write should succeed");
assert!(upload_request.buffer.is_none());
upload_request
.complete()
.await
.expect("upload should complete successfully");
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[test_case(None)]
#[test_case(Some(MockObject::ramp(0xaa, 2 * 1024 * 1024, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::constant(0xab, 20, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([0xbb; 128])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc64nvme])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc32])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Sha1])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Sha256])))]
#[tokio::test]
async fn test_append_not_align_with_buffer(existing_object: Option<MockObject>) {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut existing_object = existing_object;
if let Some(object) = &mut existing_object {
client.add_object(key, object.clone());
expected_content.extend_from_slice(&object.read(0, object.len()));
}
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let append_data = [0xaa; 384];
expected_content.extend_from_slice(&append_data);
offset += upload_request
.write(offset, &append_data)
.await
.expect("write should succeed") as u64;
let buffer = upload_request.buffer.as_ref().unwrap();
assert!(buffer.len() < buffer_size);
let append_data = [0xab; 256];
expected_content.extend_from_slice(&append_data);
upload_request
.write(offset, &append_data)
.await
.expect("write should succeed");
let buffer = upload_request.buffer.as_ref().unwrap();
assert!(buffer.len() < buffer_size);
upload_request
.complete()
.await
.expect("upload should complete successfully");
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[test_case(None, None)]
#[test_case(None, Some(ChecksumAlgorithm::Crc64nvme))]
#[test_case(None, Some(ChecksumAlgorithm::Crc32c))]
#[test_case(None, Some(ChecksumAlgorithm::Crc32))]
#[test_case(None, Some(ChecksumAlgorithm::Sha1))]
#[test_case(None, Some(ChecksumAlgorithm::Sha256))]
#[test_case(Some(&[]), Some(ChecksumAlgorithm::Crc32c))]
#[test_case(Some(&[ChecksumAlgorithm::Crc64nvme]), None)]
#[test_case(Some(&[ChecksumAlgorithm::Crc32c]), None)]
#[test_case(Some(&[ChecksumAlgorithm::Crc32]), None)]
#[test_case(Some(&[ChecksumAlgorithm::Sha1]), None)]
#[test_case(Some(&[ChecksumAlgorithm::Sha256]), None)]
#[tokio::test]
async fn test_append_respects_checksums(
existing_object_checksum_algorithms: Option<&[ChecksumAlgorithm]>,
default_checksum_algorithm: Option<ChecksumAlgorithm>,
) {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut expected_checksum_algorithm = default_checksum_algorithm.as_slice();
let mut expected_content = Vec::new();
let mut offset = 0;
let mut initial_etag = None;
if let Some(algorithms) = existing_object_checksum_algorithms {
let existing_content = [0xbb; 128];
let object = MockObject::from(&existing_content).with_computed_checksums(algorithms);
expected_content.extend_from_slice(&existing_content);
expected_checksum_algorithm = algorithms;
offset = object.len() as u64;
initial_etag = Some(object.etag());
client.add_object(key, object);
}
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, default_checksum_algorithm.clone());
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, initial_etag);
let append_data = [0xaa; 384];
expected_content.extend_from_slice(&append_data);
upload_request
.write(offset, &append_data)
.await
.expect("write should succeed");
upload_request
.complete()
.await
.expect("upload should complete successfully");
let get_request = client
.get_object(
bucket,
key,
&GetObjectParams::default().checksum_mode(Some(ChecksumMode::Enabled)),
)
.await
.expect("get_object failed");
let checksum = get_request.get_object_checksum().expect("failed to get checksum");
assert_eq!(checksum.algorithms(), expected_checksum_algorithm);
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[test_case(None)]
#[test_case(Some(MockObject::ramp(0xaa, 2 * 1024 * 1024, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::constant(0xab, 20, ETag::for_tests()).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[test_case(Some(MockObject::from([0xbb; 128]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c])))]
#[tokio::test]
async fn test_append_empty(existing_object: Option<MockObject>) {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut existing_object = existing_object;
if let Some(object) = &mut existing_object {
client.add_object(key, object.clone());
expected_content.extend_from_slice(&object.read(0, object.len()));
}
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let initial_offset = existing_object.as_ref().map_or(0, |object| object.len() as u64);
let initial_etag = existing_object.map(|object| object.etag());
let upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, initial_etag);
upload_request
.complete()
.await
.expect("upload should complete successfully");
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[tokio::test]
async fn test_append_failure_at_completion() {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let initial_offset = (existing_object.len() - 1) as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
let append_data = [0xaa; 128];
upload_request
.write(initial_offset, &append_data)
.await
.expect("write should succeed");
assert!(matches!(
upload_request.complete().await,
Err(UploadError::PutRequestFailed(_))
));
}
#[tokio::test]
async fn test_append_partial_failure_at_completion() {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut put_single_failures = HashMap::new();
put_single_failures.insert(2, ObjectClientError::ServiceError(PutObjectError::BadChecksum));
let failure_client = Arc::new(countdown_failure_client(
client.clone(),
CountdownFailureConfig {
put_single_failures,
..Default::default()
},
));
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
expected_content.extend_from_slice(&existing_object.read(0, existing_object.len()));
let buffer_size = 256;
let uploader = new_uploader_for_test(failure_client, buffer_size, None, None);
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
let append_data = [0xab; 384];
expected_content.extend_from_slice(&append_data[..buffer_size]);
upload_request
.write(initial_offset, &append_data)
.await
.expect("write should succeed");
let result = upload_request.complete().await;
assert!(matches!(result, Err(UploadError::PutRequestFailed(_))));
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[tokio::test]
async fn test_append_failure_during_write() {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut offset = (existing_object.len() - 1) as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
let mut write_success_count = 0;
let max_retries = 10000;
let append_data = [0xab; 256];
while write_success_count < max_retries {
match upload_request.write(offset, &append_data).await {
Ok(len) => {
offset += len as u64;
write_success_count += 1;
}
Err(e) => {
assert!(matches!(e, UploadError::PutRequestFailed(_)));
break;
}
}
}
assert!(
write_success_count < max_retries,
"retry count should not have been exhausted"
);
assert!(matches!(
upload_request.write(offset, b"some data").await,
Err(UploadError::UploadAlreadyTerminated)
));
assert!(matches!(
upload_request.complete().await,
Err(UploadError::UploadAlreadyTerminated)
));
}
#[tokio::test]
async fn test_append_partial_failure_during_write() {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let mut put_single_failures = HashMap::new();
put_single_failures.insert(2, ObjectClientError::ServiceError(PutObjectError::BadChecksum));
let failure_client = Arc::new(countdown_failure_client(
client.clone(),
CountdownFailureConfig {
put_single_failures,
..Default::default()
},
));
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
expected_content.extend_from_slice(&existing_object.read(0, existing_object.len()));
let buffer_size = 256;
let uploader = new_uploader_for_test(failure_client, buffer_size, None, None);
let mut offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
let mut write_success_count = 0;
let max_retries = 10000;
let append_data = [0xab; 256];
expected_content.extend_from_slice(&append_data);
while write_success_count < max_retries {
match upload_request.write(offset, &append_data).await {
Ok(len) => {
offset += len as u64;
write_success_count += 1;
}
Err(e) => {
assert!(matches!(e, UploadError::PutRequestFailed(_)));
break;
}
}
}
assert!(
write_success_count < max_retries,
"retry count should not have been exhausted"
);
assert!(matches!(
upload_request.write(offset, b"some data").await,
Err(UploadError::UploadAlreadyTerminated)
));
assert!(matches!(
upload_request.complete().await,
Err(UploadError::UploadAlreadyTerminated)
));
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[tokio::test]
#[test_matrix([true, false], [Duration::ZERO, Duration::from_secs(1)])]
async fn test_append_failure_on_object_replaced(replace_before_start: bool, sleep_before_write: Duration) {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket.to_owned()).part_size(32).build());
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
if replace_before_start {
let replacing_object =
MockObject::from(vec![0xcc; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, replacing_object.clone());
}
let mut offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, Some(initial_etag));
if !replace_before_start {
let replacing_object =
MockObject::from(vec![0xcc; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, replacing_object.clone());
}
if !sleep_before_write.is_zero() {
sleep(sleep_before_write).await;
}
let mut write_success_count = 0;
let max_retries = 10000;
let append_data = [0xab; 256];
while write_success_count < max_retries {
match upload_request.write(offset, &append_data).await {
Ok(len) => {
offset += len as u64;
write_success_count += 1;
}
Err(e) => {
assert!(matches!(
e,
UploadError::PutRequestFailed(ObjectClientError::ServiceError(
PutObjectError::PreconditionFailed
))
));
break;
}
}
}
assert!(
write_success_count < max_retries,
"retry count should not have been exhausted"
);
assert!(matches!(
upload_request.write(offset, b"some data").await,
Err(UploadError::UploadAlreadyTerminated)
));
assert!(matches!(
upload_request.complete().await,
Err(UploadError::UploadAlreadyTerminated)
));
}
#[tokio::test]
async fn test_append_failure_on_out_of_order() {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let buffer_size = 256;
let uploader = new_uploader_for_test(client.clone(), buffer_size, None, None);
let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), 0, None);
let append_data = [0xaa; 128];
upload_request
.write(0, &append_data)
.await
.expect("write should succeed");
let next_offset = append_data.len() as u64;
let wrong_offset = next_offset + 1;
let error = upload_request
.write(wrong_offset, &append_data)
.await
.expect_err("out-of-order write should fail");
assert!(matches!(error, UploadError::OutOfOrderWrite { .. }));
}
#[test_case(Some("aws:kmr"), Some("some_key_alias"))]
#[test_case(Some("aws:kms"), Some("some_key_ali`s"))]
#[test_case(None, Some("some_key_alias"))]
#[test_case(Some("aws:kms"), None)]
#[tokio::test]
async fn test_append_with_corrupted_sse_test(sse_type_corrupted: Option<&str>, key_id_corrupted: Option<&str>) {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
let buffer_size = 256;
let server_side_encryption =
ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key_alias".to_string()));
let mut uploader = new_uploader_for_test(client.clone(), buffer_size, Some(server_side_encryption), None);
uploader
.server_side_encryption
.corrupt_data(sse_type_corrupted.map(String::from), key_id_corrupted.map(String::from));
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
let append_data = [0xaa; 128];
upload_request
.write(initial_offset, &append_data)
.await
.expect("write should succeed");
assert!(matches!(
upload_request.complete().await,
Err(UploadError::SseCorruptedError(_))
));
}
#[tokio::test]
async fn test_append_with_good_sse_test() {
let bucket = "bucket";
let key = "hello";
let mut expected_content = Vec::new();
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let existing_object = MockObject::from([0xbb; 20]).with_computed_checksums(&[ChecksumAlgorithm::Crc32c]);
client.add_object(key, existing_object.clone());
expected_content.extend_from_slice(&existing_object.read(0, existing_object.len()));
let buffer_size = 256;
let server_side_encryption =
ServerSideEncryption::new(Some("aws:kms".to_string()), Some("some_key_alias".to_string()));
let uploader = new_uploader_for_test(client.clone(), buffer_size, Some(server_side_encryption), None);
let initial_offset = existing_object.len() as u64;
let initial_etag = existing_object.etag();
let mut upload_request =
uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), initial_offset, Some(initial_etag));
let append_data = [0xaa; 128];
expected_content.extend_from_slice(&append_data);
upload_request
.write(initial_offset, &append_data)
.await
.expect("write should succeed");
upload_request
.complete()
.await
.expect("upload with sse should complete successfully");
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
#[test_case(1024, 128, 10)]
#[test_case(1024, 4096, 20)]
#[tokio::test]
async fn test_append_on_low_memory(part_size: usize, write_size: usize, part_count: usize) {
let bucket = "bucket";
let key = "hello";
let client = Arc::new(MockClient::config().bucket(bucket).part_size(32).build());
let pool = PagedPool::new_with_candidate_sizes([32]);
let mem_limiter = MemoryLimiter::new(pool.clone(), 0);
let uploader = Uploader::new(
client.clone(),
Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap()),
pool,
mem_limiter.into(),
UploaderConfig::new(part_size),
);
let mut offset = 0;
let mut upload_request = uploader.start_incremental_upload(bucket.to_owned(), key.to_owned(), offset, None);
let mut expected_content = Vec::new();
while expected_content.len() < part_count * part_size {
let append_data = vec![0xaa; write_size];
expected_content.extend_from_slice(&append_data);
offset += upload_request
.write(offset, &append_data)
.await
.expect("write should succeed") as u64;
}
upload_request
.complete()
.await
.expect("upload should complete successfully");
let get_request = client
.get_object(bucket, key, &GetObjectParams::default())
.await
.expect("get_object failed");
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(expected_content, *actual);
}
}