use std::future::Future;
pub async fn join_all<T, F>(futures: Vec<F>) -> Vec<T>
where
F: Future<Output = T> + Send,
T: Send,
{
futures::future::join_all(futures).await
}
pub async fn try_join_all<T, E, F>(futures: Vec<F>) -> Result<Vec<T>, E>
where
F: Future<Output = Result<T, E>> + Send,
T: Send,
E: Send,
{
futures::future::try_join_all(futures).await
}
pub async fn batch_process<T, R, F, Fut>(items: Vec<T>, batch_size: usize, processor: F) -> Vec<R>
where
T: Send + Sync + Clone,
R: Send,
F: Fn(T) -> Fut + Send + Sync,
Fut: Future<Output = R> + Send,
{
let mut results = Vec::with_capacity(items.len());
for chunk in items.chunks(batch_size) {
let futures: Vec<_> = chunk.iter().map(|item| processor(item.clone())).collect();
let chunk_results = join_all(futures).await;
results.extend(chunk_results);
}
results
}
pub async fn batch_process_result<T, R, E, F, Fut>(
items: Vec<T>,
batch_size: usize,
processor: F,
) -> Result<Vec<R>, E>
where
T: Send + Sync + Clone,
R: Send,
E: Send,
F: Fn(T) -> Fut + Send + Sync,
Fut: Future<Output = Result<R, E>> + Send,
{
let mut results = Vec::with_capacity(items.len());
for chunk in items.chunks(batch_size) {
let futures: Vec<_> = chunk.iter().map(|item| processor(item.clone())).collect();
let chunk_results = try_join_all(futures).await?;
results.extend(chunk_results);
}
Ok(results)
}
pub async fn spawn_all<T, F>(futures: Vec<F>) -> Vec<Result<T, tokio::task::JoinError>>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
let handles: Vec<_> = futures.into_iter().map(|f| tokio::spawn(f)).collect();
join_all(handles).await
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_join_all() {
let futures: Vec<_> = (1..=3).map(|i| async move { i }).collect();
let results = join_all(futures).await;
assert_eq!(results, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_try_join_all() {
let futures: Vec<_> = (1..=3).map(|i| async move { Ok::<i32, &str>(i) }).collect();
let results = try_join_all(futures).await.unwrap();
assert_eq!(results, vec![1, 2, 3]);
}
#[tokio::test]
async fn test_batch_process() {
let items = vec![1, 2, 3, 4, 5];
let results = batch_process(items, 2, |item| async move { item * 2 }).await;
assert_eq!(results, vec![2, 4, 6, 8, 10]);
}
#[tokio::test]
async fn test_batch_process_result() {
let items = vec![1, 2, 3];
let results =
batch_process_result(items, 2, |item| async move { Ok::<i32, &str>(item * 2) })
.await
.unwrap();
assert_eq!(results, vec![2, 4, 6]);
}
}