use futures::StreamExt;
use std::future::Future;
pub async fn map_async_chunked<
'a,
T: Send + Sync,
U,
E,
F: Future<Output = Result<Vec<U>, E>>,
C: Send + Sync + FnMut(&'a [T]) -> F,
>(
input: &'a [T],
callback: C,
chunk_size: usize,
concurrent_futs: usize,
) -> Result<Vec<U>, E> {
let mut output = Vec::with_capacity(input.len());
let mut stream = futures::stream::iter(input.chunks(chunk_size).map(callback))
.boxed()
.buffered(concurrent_futs);
while let Some(result) = stream.next().await {
output.append(&mut result?);
}
Ok(output)
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
#[tokio::test]
async fn test_keeps_the_same_order() {
let input = vec![200, 100, 50, 25, 20, 15, 10, 5, 4, 3, 2, 1];
let output = map_async_chunked(
&input,
|x| async {
tokio::time::sleep(Duration::from_millis(input[0])).await;
Result::<_, ()>::Ok(x.to_vec())
},
2,
10,
)
.await
.unwrap();
assert_eq!(input, output);
}
#[tokio::test]
async fn test_works_when_chunks_dont_divide_nicely() {
let input = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let output = map_async_chunked(&input, |x| async { Result::<_, ()>::Ok(x.to_vec()) }, 3, 1)
.await
.unwrap();
assert_eq!(input, output);
}
}