use super::*;
use futures::future::{ok, ready};
use futures::stream::Stream;
use rusoto_s3::ListObjectsV2Output;
use snafu::futures::TryStreamExt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
pub type ListObjectsV2Result = Result<ListObjectsV2Output, RusotoError<ListObjectsV2Error>>;
pub struct ListObjects<C, S> {
s3: C,
config: Config,
bucket: String,
prefix: String,
stream: S,
}
impl<C, S> ListObjects<C, S>
where
C: S3 + Clone + Send + Sync + Unpin + 'static,
S: Stream<Item = ListObjectsV2Result> + Sized + Send + 'static,
{
pub fn boxed(self) -> ListObjects<C, Pin<Box<dyn Stream<Item = ListObjectsV2Result> + Send>>> {
ListObjects {
s3: self.s3,
config: self.config,
bucket: self.bucket,
stream: self.stream.boxed(),
prefix: self.prefix,
}
}
pub fn delete_all(self) -> impl Future<Output = Result<(), Error>> {
let ListObjects {
s3,
config: _,
bucket,
stream,
prefix: _,
} = self;
stream
.filter_map(|response| ready(response.map(|r| r.contents).transpose()))
.map_err(|e| e.into())
.try_for_each_concurrent(None, move |contents| {
let s3 = s3.clone();
let bucket = bucket.clone();
async move {
s3.delete_objects(DeleteObjectsRequest {
bucket,
delete: Delete {
objects: contents
.iter()
.filter_map(|obj| {
obj.key.as_ref().map(|key| ObjectIdentifier {
key: key.clone(),
version_id: None,
})
})
.collect::<Vec<_>>(),
quiet: None,
},
..Default::default()
})
.map_ok(drop)
.map_err(|e| e.into())
.await
}
})
}
pub fn flatten(self) -> impl TryStream<Ok = Object, Error = RusotoError<ListObjectsV2Error>> {
self.stream
.try_filter_map(|response| ok(response.contents))
.map_ok(|x| stream::iter(x).map(Ok))
.try_flatten()
}
fn copy_all_stream<F, R>(
self,
dest_bucket: Option<String>,
mapping: F,
default_request: R,
) -> impl Stream<Item = Result<String, Error>>
where
F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
{
let ListObjects {
s3,
config,
bucket,
stream,
prefix: _,
} = self;
let timeout = Arc::new(Mutex::new(TimeoutState::new(config.request.clone())));
let dest_bucket = dest_bucket.unwrap_or_else(|| bucket.clone());
stream
.try_filter_map(|response| ok(response.contents))
.map_ok(|x| stream::iter(x).map(Ok))
.try_flatten()
.try_filter_map(|obj| {
let Object { key, size, .. } = obj;
ok(key.and_then(|key| size.map(|size| (key, size))))
})
.context(err::ListObjectsV2)
.and_then(move |(key, size)| {
let (s3, timeout) = (s3.clone(), timeout.clone());
let request = CopyObjectRequest {
copy_source: format!("{}/{}", bucket, key),
bucket: dest_bucket.clone(),
key: mapping(&key),
..default_request()
};
s3_request(
move || {
let (s3, request) = (s3.clone(), request.clone());
async move {
let (s3, request) = (s3.clone(), request.clone());
Ok((async move{s3.copy_object(request).context(err::CopyObject).await}, size as u64))
}
},
10,
timeout,
)
.map_ok(|_| key)
})
}
pub fn copy_all<F, R>(
self,
dest_bucket: Option<String>,
mapping: F,
default_request: R,
) -> impl Future<Output = Result<(), Error>>
where
F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
{
self.copy_all_stream(dest_bucket, mapping, default_request)
.try_for_each(|_| async { Ok(()) })
}
pub fn move_all<F, R>(
self,
dest_bucket: Option<String>,
mapping: F,
default_request: R,
) -> impl Future<Output = Result<(), Error>>
where
F: Fn(&str) -> String + Clone + Send + Sync + Unpin + 'static,
R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
{
let src_bucket = self.bucket.clone();
let timeout = Arc::new(Mutex::new(TimeoutState::new(self.config.request.clone())));
let s3 = self.s3.clone();
self.copy_all_stream(dest_bucket, mapping, default_request)
.and_then(move |src_key| {
let delete_request = DeleteObjectRequest {
bucket: src_bucket.clone(),
key: src_key,
..Default::default()
};
let (s3, timeout) = (s3.clone(), timeout.clone());
s3_request(
move || {
let (s3, delete_request) = (s3.clone(), delete_request.clone());
async move {
let (s3, delete_request) = (s3.clone(), delete_request.clone());
Ok((
async move {
s3.delete_object(delete_request)
.context(err::DeleteObject)
.await
},
0,
))
}
},
10,
timeout,
)
.map_ok(drop)
.boxed()
})
.try_for_each(|_| async { Ok(()) })
.boxed()
}
pub fn move_to_prefix<R>(
self,
dest_bucket: Option<String>,
new_prefix: String,
default_request: R,
) -> impl Future<Output = Result<(), Error>>
where
R: Fn() -> CopyObjectRequest + Clone + Unpin + Sync + Send + 'static,
{
let old_prefix = self.prefix.clone();
let substitute_prefix =
move |source: &str| format!("{}{}", new_prefix, source.trim_start_matches(&old_prefix));
self.move_all(dest_bucket, substitute_prefix, default_request)
.boxed()
}
}
impl<C, S> Stream for ListObjects<C, S>
where
S: Stream<Item = Result<ListObjectsV2Output, Error>> + Sized + Send + Unpin,
C: Unpin,
{
type Item = Result<ListObjectsV2Output, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.stream).poll_next(cx)
}
}
impl<S: S3 + Clone + Send + Sync + 'static> S3Algo<S> {
pub fn list_prefix(
&self,
bucket: String,
prefix: String,
) -> ListObjects<S, impl Stream<Item = ListObjectsV2Result> + Sized + Send> {
let bucket1 = bucket.clone();
let prefix2 = prefix.clone();
let mut s = self.list_objects(bucket, move || ListObjectsV2Request {
bucket: bucket1.clone(),
prefix: Some(prefix2.clone()),
..Default::default()
});
s.prefix = prefix;
s
}
pub fn list_objects<F>(
&self,
bucket: String,
request_factory: F,
) -> ListObjects<S, impl Stream<Item = ListObjectsV2Result> + Sized + Send>
where
F: Fn() -> ListObjectsV2Request + Send + Sync + Clone,
{
let s3_1 = self.s3.clone();
let stream = futures::stream::unfold(
(None, true),
move |(cont, first)| {
let (s3, request_factory) = (s3_1.clone(), request_factory.clone());
async move {
if let (&None, false) = (&cont, first) {
None
} else {
let result = s3
.list_objects_v2(ListObjectsV2Request {
continuation_token: cont,
..request_factory()
})
.await;
let next_cont = if let Ok(ref response) = result {
response.next_continuation_token.clone()
} else {
None
};
Some((result, (next_cont, false)))
}
}
},
);
ListObjects {
s3: self.s3.clone(),
config: self.config.clone(),
stream,
bucket,
prefix: String::new(),
}
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::test::rand_string;
#[tokio::test]
async fn test_s3_delete_files() {
let s3 = testing_s3_client();
let algo = S3Algo::new(s3);
let dir = rand_string(14);
const N_FILES: usize = 11_000;
let files = (0..N_FILES).map(move |i| ObjectSource::Data {
data: vec![1, 2, 3],
key: format!("{}/{}.file", dir, i),
});
algo.upload_files(
"test-bucket".into(),
files,
|result| async move {
if result.seq % 100 == 0 {
println!("{} files uploaded", result.seq);
}
},
PutObjectRequest::default,
)
.await
.unwrap();
algo.list_prefix("test-bucket".into(), String::new())
.delete_all()
.await
.unwrap();
let count = algo
.list_prefix("test-bucket".into(), String::new())
.flatten()
.try_fold(0usize, |acc, _| ok(acc + 1))
.await
.unwrap();
assert_eq!(count, 0);
}
}