use aws_sdk_s3::Client;
use futures::Future;
use futures::stream::Stream;
use futures::stream::StreamExt;
use std::sync::Arc;
use crate::command::{FindStat, StreamObject};
use crate::filter::TagFilterList;
use crate::tag_fetcher::{TagFetchConfig, TagFetchStats, fetch_tags_for_objects};
const CHUNK: usize = 1000;
const TAG_FETCH_BATCH_SIZE: usize = 100;
pub async fn list_filter_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<StreamObject>>,
limit: Option<usize>,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&StreamObject) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<StreamObject>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
match limit {
Some(limit) => list_filter_limit_execute(iterator, limit, stats, p, f).await,
None => list_filter_unlimited_execute(iterator, stats, p, f).await,
}
}
pub struct TagFilterContext {
pub client: Client,
pub bucket: String,
pub filters: TagFilterList,
pub config: TagFetchConfig,
pub stats: Arc<TagFetchStats>,
}
pub async fn list_filter_execute_with_tags<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<StreamObject>>,
limit: Option<usize>,
stats: Option<FindStat>,
tag_ctx: TagFilterContext,
mut cheap_filter: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&StreamObject) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<StreamObject>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
let mut remaining_limit = limit;
let mut current_stats = stats;
let mut stream = Box::pin(
iterator
.map(|x| futures::stream::iter(x.into_iter()))
.flatten(),
);
let mut batch: Vec<StreamObject> = Vec::with_capacity(TAG_FETCH_BATCH_SIZE);
while let Some(obj) = stream.next().await {
if remaining_limit == Some(0) {
break;
}
if !cheap_filter(&obj).await {
continue;
}
batch.push(obj);
if batch.len() >= TAG_FETCH_BATCH_SIZE {
let (processed, new_stats) = process_tag_batch(
std::mem::take(&mut batch),
&tag_ctx,
remaining_limit,
current_stats,
f,
)
.await;
current_stats = new_stats;
if let Some(ref mut remaining) = remaining_limit {
*remaining = remaining.saturating_sub(processed);
if *remaining == 0 {
break;
}
}
batch = Vec::with_capacity(TAG_FETCH_BATCH_SIZE);
}
}
if !batch.is_empty() {
let (_processed, new_stats) =
process_tag_batch(batch, &tag_ctx, remaining_limit, current_stats, f).await;
current_stats = new_stats;
}
current_stats
}
async fn process_tag_batch<F, Fut2>(
objects: Vec<StreamObject>,
tag_ctx: &TagFilterContext,
limit: Option<usize>,
stats: Option<FindStat>,
f: &mut F,
) -> (usize, Option<FindStat>)
where
F: FnMut(Option<FindStat>, Vec<StreamObject>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
let objects_with_tags = fetch_tags_for_objects(
tag_ctx.client.clone(),
tag_ctx.bucket.clone(),
objects,
tag_ctx.config.clone(),
Arc::clone(&tag_ctx.stats),
)
.await;
let matching: Vec<StreamObject> = objects_with_tags
.into_iter()
.filter(|obj| {
tag_ctx.filters.matches(obj).unwrap_or(false)
})
.collect();
let matching = if let Some(limit) = limit {
matching.into_iter().take(limit).collect()
} else {
matching
};
let count = matching.len();
let new_stats = if !matching.is_empty() {
f(stats, matching).await
} else {
stats
};
(count, new_stats)
}
#[inline]
async fn list_filter_limit_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<StreamObject>>,
limit: usize,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&StreamObject) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<StreamObject>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
iterator
.map(|x| futures::stream::iter(x.into_iter()))
.flatten()
.filter(p)
.take(limit)
.chunks(CHUNK)
.fold(stats, f)
.await
}
#[inline]
async fn list_filter_unlimited_execute<P, F, Fut, Fut2>(
iterator: impl Stream<Item = Vec<StreamObject>>,
stats: Option<FindStat>,
p: P,
f: &mut F,
) -> Option<FindStat>
where
P: FnMut(&StreamObject) -> Fut,
Fut: Future<Output = bool>,
F: FnMut(Option<FindStat>, Vec<StreamObject>) -> Fut2,
Fut2: Future<Output = Option<FindStat>>,
{
iterator
.map(|x| futures::stream::iter(x.into_iter()))
.flatten()
.filter(p)
.chunks(CHUNK)
.fold(stats, f)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use aws_sdk_s3::types::Object;
use futures::stream;
use std::future::{Ready, ready};
fn make_stats_accumulator()
-> impl FnMut(Option<FindStat>, Vec<StreamObject>) -> Ready<Option<FindStat>> {
|acc, list| {
let objects: Vec<_> = list.iter().map(|so| so.object.clone()).collect();
ready(
acc.map(|stat| stat + &objects)
.or_else(|| Some(FindStat::default() + &objects)),
)
}
}
fn make_stream_objects(keys: &[&str]) -> Vec<StreamObject> {
keys.iter()
.map(|k| StreamObject::from_object(Object::builder().key(*k).build()))
.collect()
}
use aws_config::BehaviorVersion;
use aws_sdk_s3::types::Tag;
use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient};
use aws_smithy_types::body::SdkBody;
use http::{HeaderValue, StatusCode};
fn make_test_client(events: Vec<ReplayEvent>) -> Client {
let replay_client = StaticReplayClient::new(events);
Client::from_conf(
aws_sdk_s3::Config::builder()
.behavior_version(BehaviorVersion::latest())
.credentials_provider(aws_sdk_s3::config::Credentials::new(
"test", "test", None, None, "test",
))
.region(aws_sdk_s3::config::Region::new("us-east-1"))
.http_client(replay_client)
.build(),
)
}
fn make_tag_response(key: &str, tags: &[(&str, &str)]) -> ReplayEvent {
let uri = format!(
"https://test-bucket.s3.us-east-1.amazonaws.com/{}?tagging",
key
);
let req = http::Request::builder()
.method("GET")
.uri(&uri)
.body(SdkBody::empty())
.unwrap();
let tag_xml: String = tags
.iter()
.map(|(k, v)| format!("<Tag><Key>{}</Key><Value>{}</Value></Tag>", k, v))
.collect::<Vec<_>>()
.join("");
let resp_body = format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<Tagging xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<TagSet>{}</TagSet>
</Tagging>"#,
tag_xml
);
let resp = http::Response::builder()
.status(StatusCode::OK)
.header("Content-Type", HeaderValue::from_static("application/xml"))
.body(SdkBody::from(resp_body))
.unwrap();
ReplayEvent::new(req, resp)
}
fn make_stream_objects_with_tags(keys_and_tags: &[(&str, Vec<Tag>)]) -> Vec<StreamObject> {
keys_and_tags
.iter()
.map(|(k, tags)| {
let mut obj = StreamObject::from_object(Object::builder().key(*k).build());
obj.tags = Some(tags.clone());
obj
})
.collect()
}
#[tokio::test]
async fn test_list_filter_execute_with_limit() {
let stream_objects = make_stream_objects(&["object1", "object2", "object3"]);
let iterator = stream::iter(vec![stream_objects]);
let limit = Some(2);
let stats = None;
let result = list_filter_execute(
iterator,
limit,
stats,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert_eq!(result.unwrap().total_files, 2);
}
#[tokio::test]
async fn test_list_filter_execute_without_limit() {
let stream_objects = make_stream_objects(&["object1", "object2", "object3"]);
let iterator = stream::iter(vec![stream_objects]);
let limit = None;
let stats = None;
let result = list_filter_execute(
iterator,
limit,
stats,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert_eq!(result.unwrap().total_files, 3);
}
#[tokio::test]
async fn test_list_filter_limit_execute() {
let stream_objects = make_stream_objects(&["object1", "object2", "object3"]);
let iterator = stream::iter(vec![stream_objects]);
let limit = 2;
let stats = None;
let result = list_filter_limit_execute(
iterator,
limit,
stats,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert_eq!(result.unwrap().total_files, 2);
}
#[tokio::test]
async fn test_list_filter_unlimited_execute() {
let stream_objects = make_stream_objects(&["object1", "object2", "object3"]);
let iterator = stream::iter(vec![stream_objects]);
let stats = None;
let result = list_filter_unlimited_execute(
iterator,
stats,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert_eq!(result.unwrap().total_files, 3);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_basic() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![tag.clone()]),
("file2.txt", vec![tag.clone()]),
]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None, None, tag_ctx,
|_: &StreamObject| ready(true), &mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 2);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_with_limit() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![tag.clone()]),
("file2.txt", vec![tag.clone()]),
("file3.txt", vec![tag.clone()]),
]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
Some(2), None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 2);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_cheap_filter_rejects() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![tag.clone()]),
("file2.txt", vec![tag.clone()]),
]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(false), &mut make_stats_accumulator(),
)
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_tag_filter_rejects() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("dev").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![tag.clone()]),
("file2.txt", vec![tag.clone()]),
]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true), &mut make_stats_accumulator(),
)
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_with_api_calls() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let stream_objects = make_stream_objects(&["file1.txt", "file2.txt"]);
let events = vec![
make_tag_response("file1.txt", &[("env", "prod")]),
make_tag_response("file2.txt", &[("env", "prod")]),
];
let client = make_test_client(events);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 2);
assert_eq!(stats.success.load(std::sync::atomic::Ordering::Relaxed), 2);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_empty_stream() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(Vec::<Vec<StreamObject>>::new());
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_limit_zero() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[("file1.txt", vec![tag.clone()])]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
Some(0), None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_none());
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_key_exists_filter() {
use crate::arg::TagExistsFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("owner").value("team-a").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![tag.clone()]),
("file2.txt", vec![]), ]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![],
vec![TagExistsFilter {
key: "owner".to_string(),
}],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 1);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_mixed_results() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let prod_tag = Tag::builder().key("env").value("prod").build().unwrap();
let dev_tag = Tag::builder().key("env").value("dev").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[
("file1.txt", vec![prod_tag.clone()]),
("file2.txt", vec![dev_tag]),
("file3.txt", vec![prod_tag.clone()]),
]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 2);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_with_initial_stats() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects = make_stream_objects_with_tags(&[("file1.txt", vec![tag.clone()])]);
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let mut initial_stats = FindStat::default();
initial_stats.total_files = 5;
initial_stats.total_space = 1000;
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
Some(initial_stats),
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 6);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_large_batch() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects: Vec<StreamObject> = (0..150)
.map(|i| {
let mut obj = StreamObject::from_object(
Object::builder().key(format!("file{}.txt", i)).build(),
);
obj.tags = Some(vec![tag.clone()]);
obj
})
.collect();
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
None,
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 150);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_large_batch_with_limit() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects: Vec<StreamObject> = (0..150)
.map(|i| {
let mut obj = StreamObject::from_object(
Object::builder().key(format!("file{}.txt", i)).build(),
);
obj.tags = Some(vec![tag.clone()]);
obj
})
.collect();
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
Some(50), None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 50);
}
#[tokio::test]
async fn test_list_filter_execute_with_tags_batch_limit_exhaustion() {
use crate::arg::TagFilter;
use crate::filter::TagFilterList;
let tag = Tag::builder().key("env").value("prod").build().unwrap();
let stream_objects: Vec<StreamObject> = (0..150)
.map(|i| {
let mut obj = StreamObject::from_object(
Object::builder().key(format!("file{}.txt", i)).build(),
);
obj.tags = Some(vec![tag.clone()]);
obj
})
.collect();
let client = make_test_client(vec![]);
let stats = Arc::new(TagFetchStats::new());
let tag_filters = TagFilterList::with_filters(
vec![TagFilter {
key: "env".to_string(),
value: "prod".to_string(),
}],
vec![],
);
let tag_ctx = TagFilterContext {
client,
bucket: "test-bucket".to_string(),
filters: tag_filters,
config: TagFetchConfig::default().with_concurrency(1),
stats: Arc::clone(&stats),
};
let iterator = stream::iter(vec![stream_objects]);
let result = list_filter_execute_with_tags(
iterator,
Some(100),
None,
tag_ctx,
|_: &StreamObject| ready(true),
&mut make_stats_accumulator(),
)
.await;
assert!(result.is_some());
assert_eq!(result.unwrap().total_files, 100);
}
}