use futures::stream::{self, StreamExt};
use std::future::Future;
use crate::errors::Result;
pub const RELATION_TRAVERSAL_CONCURRENCY: usize = 16;
pub async fn map_with_concurrency<T, Fut>(
count: usize,
limit: usize,
worker: impl Fn(usize) -> Fut,
) -> Result<Vec<T>>
where
Fut: Future<Output = Result<T>>,
{
if count == 0 {
return Ok(vec![]);
}
let effective = limit.clamp(1, count);
let results: Vec<Result<T>> = stream::iter(0..count)
.map(worker)
.buffered(effective)
.collect()
.await;
results.into_iter().collect()
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn preserves_input_order() {
let out = map_with_concurrency(5, 3, |i| async move { Ok::<usize, _>(i * 10) })
.await
.unwrap();
assert_eq!(out, vec![0, 10, 20, 30, 40]);
}
#[tokio::test]
async fn empty_input() {
let out: Vec<usize> = map_with_concurrency(0, 4, |_| async { Ok(0) })
.await
.unwrap();
assert!(out.is_empty());
}
#[tokio::test]
async fn first_error_propagates() {
let r = map_with_concurrency(3, 2, |i| async move {
if i == 1 {
Err(crate::errors::GraphDDBError::new("boom"))
} else {
Ok(i)
}
})
.await;
assert!(r.is_err());
}
}