graphddb_runtime 0.7.5

Rust runtime for GraphDDB — interprets the language-neutral IR (manifest.json + operations.json) and executes the validated access patterns against DynamoDB.
Documentation
//! Bounded-concurrency staged execution — the async counterpart of
//! `python/graphddb_runtime/concurrency.py` / the TS `mapWithConcurrency`.
//!
//! Runs the independent work of one stage with at most `limit` in-flight futures,
//! preserving INPUT order in the output (`output[i]` is `worker(items[i])`)
//! regardless of completion order, via `futures::stream::buffered`.

use futures::stream::{self, StreamExt};
use std::future::Future;

use crate::errors::Result;

/// The default declared in-flight bound, mirroring the TypeScript
/// `RELATION_TRAVERSAL_CONCURRENCY`. A spec's `executionPlan.concurrency`
/// overrides it.
pub const RELATION_TRAVERSAL_CONCURRENCY: usize = 16;

/// Map `items` to results by invoking `worker(index)` with at most `limit`
/// futures in flight, preserving input order in the output. An empty input
/// returns `Ok(vec![])`; the first worker error short-circuits (mirroring
/// `Promise.all` rejecting on the first failure).
pub async fn map_with_concurrency<T, Fut>(
    count: usize,
    limit: usize,
    worker: impl Fn(usize) -> Fut,
) -> Result<Vec<T>>
where
    Fut: Future<Output = Result<T>>,
{
    if count == 0 {
        return Ok(vec![]);
    }
    let effective = limit.clamp(1, count);
    // `buffered(n)` polls up to n futures concurrently and yields results in the
    // ORDER the futures were created (input order), exactly like the TS helper.
    let results: Vec<Result<T>> = stream::iter(0..count)
        .map(worker)
        .buffered(effective)
        .collect()
        .await;
    results.into_iter().collect()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn preserves_input_order() {
        let out = map_with_concurrency(5, 3, |i| async move { Ok::<usize, _>(i * 10) })
            .await
            .unwrap();
        assert_eq!(out, vec![0, 10, 20, 30, 40]);
    }

    #[tokio::test]
    async fn empty_input() {
        let out: Vec<usize> = map_with_concurrency(0, 4, |_| async { Ok(0) })
            .await
            .unwrap();
        assert!(out.is_empty());
    }

    #[tokio::test]
    async fn first_error_propagates() {
        let r = map_with_concurrency(3, 2, |i| async move {
            if i == 1 {
                Err(crate::errors::GraphDDBError::new("boom"))
            } else {
                Ok(i)
            }
        })
        .await;
        assert!(r.is_err());
    }
}