use futures::{future, stream::Stream, Future, StreamExt, TryFutureExt};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
use aws_sdk_s3::Client;
use log::{debug, error, info};
use log_derive::logfn;
use crate::errors::Result;
use crate::Error;
const DELETE_BATCH_SIZE: usize = 50;
#[logfn(err = "ERROR")]
pub async fn delete(s3: &Client, bucket: impl AsRef<str>, keys: &[impl AsRef<str>]) -> Result<()> {
info!(
"delete: bucket={}, keys.len()={:?}",
bucket.as_ref(),
keys.len()
);
let delete = create_delete(keys, false);
s3.delete_objects()
.bucket(bucket.as_ref().to_string())
.delete(delete)
.send()
.await
.map_err(|e| match e {
SdkError::ServiceError(error) => Error::DeleteObjectsFailed(Box::new(error.into_err())),
_ => Error::SdkError(e.to_string()),
})?;
Ok(())
}
pub fn delete_streaming<'a>(
s3: &'a Client,
bucket: impl AsRef<str> + 'a,
keys: impl Stream<Item = Result<String>> + Unpin + 'a,
) -> impl Stream<Item = impl Future<Output = Result<usize>> + 'a> + 'a {
info!(
"delete_streaming: bucket={}, batch_size={}",
bucket.as_ref(),
DELETE_BATCH_SIZE
);
let chunks = keys.chunks(DELETE_BATCH_SIZE);
chunks.map(move |keys| {
let keys: Result<Vec<String>> = keys.into_iter().collect();
match keys {
Ok(keys) => {
debug!("delete_streaming: keys={:?}", keys);
let len = keys.len();
let delete = create_delete(&keys, false);
let fut = s3
.delete_objects()
.bucket(bucket.as_ref().to_string())
.delete(delete)
.send();
future::Either::Left(fut.map_ok(move |_| len).map_err(|e| match e {
SdkError::ServiceError(error) => {
Error::DeleteObjectsFailed(Box::new(error.into_err()))
}
_ => Error::SdkError(e.to_string()),
}))
}
Err(err) => {
error!("nothing found in delete_streaming keys");
future::Either::Right(future::ready(Err(err)))
}
}
})
}
fn create_delete(keys: &[impl AsRef<str>], quiet: bool) -> Delete {
let objects = keys
.iter()
.map(|key| {
ObjectIdentifier::builder()
.key(key.as_ref().to_string())
.build()
})
.collect::<Vec<_>>();
Delete::builder()
.set_objects(Some(objects))
.quiet(quiet)
.build()
}