use futures::stream::Stream;
use futures::stream::StreamExt;
use futures::Future;
use rusoto_s3::Object;
use crate::command::FindStat;
const CHUNK: usize = 1000;
pub async fn list_filter_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<Object>>,
limit: Option<usize>,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&Object) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<Object>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
match limit {
Some(limit) => list_filter_limit_execute(iterator, limit, stats, p, f).await,
None => list_filter_unlimited_execute(iterator, stats, p, f).await,
}
}
#[inline]
async fn list_filter_limit_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<Object>>,
limit: usize,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&Object) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<Object>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
iterator
.map(|x| futures::stream::iter(x.into_iter()))
.flatten()
.filter(p)
.take(limit)
.chunks(CHUNK)
.fold(stats, f)
.await
}
#[inline]
async fn list_filter_unlimited_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<Object>>,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&Object) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<Object>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
iterator
.map(|x| futures::stream::iter(x.into_iter()))
.flatten()
.filter(p)
.chunks(CHUNK)
.fold(stats, f)
.await
}