use futures::{future, stream::Stream, Future, StreamExt, TryFutureExt};
use log::{debug, info};
use log_derive::logfn;
use crate::{
errors::Result,
rusoto::{Delete, DeleteObjectsRequest, ObjectIdentifier, S3},
};
const DELETE_BATCH_SIZE: usize = 50;
#[logfn(err = "ERROR")]
pub async fn delete<T>(s3: &T, bucket: impl AsRef<str>, keys: &[impl AsRef<str>]) -> Result<()>
where
T: S3 + Sync + Send + Clone,
{
info!(
"delete: bucket={}, keys.len()={:?}",
bucket.as_ref(),
keys.len()
);
let dor = create_delete_request(bucket, keys);
s3.delete_objects(dor).await?;
Ok(())
}
pub fn delete_streaming<'a, T>(
s3: &'a T,
bucket: impl AsRef<str> + 'a,
keys: impl Stream<Item = Result<String>> + Unpin + 'a,
) -> impl Stream<Item = impl Future<Output = Result<usize>> + 'a> + 'a
where
T: S3 + Sync + Send + Clone,
{
info!(
"delete_streaming: bucket={}, batch_size={}",
bucket.as_ref(),
DELETE_BATCH_SIZE
);
let chunks = keys.chunks(DELETE_BATCH_SIZE);
chunks.map(move |keys| {
let keys: Result<Vec<String>> = keys.into_iter().collect();
match keys {
Ok(keys) => {
debug!("delete_streaming: keys={:?}", keys);
let len = keys.len();
let dor = create_delete_request(&bucket, &keys);
let fut = s3.delete_objects(dor);
future::Either::Left(fut.map_ok(move |_| len).map_err(|e| e.into()))
}
Err(err) => {
println!("nothing found in delete_streaming keys");
future::Either::Right(future::ready(Err(err)))
}
}
})
}
fn create_delete_request(
bucket: impl AsRef<str>,
keys: &[impl AsRef<str>],
) -> DeleteObjectsRequest {
let objects = keys
.iter()
.map(|key| ObjectIdentifier {
key: key.as_ref().into(),
..Default::default()
})
.collect::<Vec<_>>();
let del = Delete {
objects,
..Default::default()
};
DeleteObjectsRequest {
bucket: bucket.as_ref().into(),
delete: del,
..Default::default()
}
}