graphddb_runtime 0.7.7

Rust runtime for GraphDDB — interprets the language-neutral IR (manifest.json + operations.json) and executes the validated access patterns against DynamoDB.
Documentation
//! Proves the relation-stage and composition executors ACTUALLY invoke
//! `map_with_concurrency` — i.e. independent ops within one stage run concurrently
//! (bounded, tokio), not sequentially. A barrier client releases only once TWO
//! fetches are simultaneously in flight; if the executor ran the stage's ops
//! sequentially the barrier would never release and the test would time out.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use aws_sdk_dynamodb::types::{AttributeValue, TransactWriteItem, WriteRequest};
use graphddb_runtime::client::{
    DynamoClient, GetItemInput, GetItemOutput, Item, QueryInput, QueryOutput, WriteOutput,
};
use graphddb_runtime::GraphDDBRuntime;
use serde_json::{json, Value as Json};
use tokio::sync::Barrier;

/// A client whose Query blocks on a 2-party barrier: the call returns only after
/// TWO queries have both reached the barrier — so it completes iff the two
/// relation-stage fetches overlap.
struct BarrierClient {
    barrier: Arc<Barrier>,
}

#[async_trait]
impl DynamoClient for BarrierClient {
    async fn get_item(&self, _i: GetItemInput) -> graphddb_runtime::Result<GetItemOutput> {
        // The root GetItem returns one parent row that both relations hang off.
        Ok(GetItemOutput {
            item: Some(HashMap::from([(
                "userId".to_string(),
                AttributeValue::S("root".into()),
            )])),
        })
    }
    async fn query(&self, _i: QueryInput) -> graphddb_runtime::Result<QueryOutput> {
        // Wait until the sibling relation Query is also in flight.
        self.barrier.wait().await;
        Ok(QueryOutput {
            items: vec![],
            last_evaluated_key: None,
        })
    }
    async fn put_item(
        &self,
        _t: &str,
        _i: Item,
        _c: Option<String>,
        _n: Option<HashMap<String, String>>,
        _v: Option<HashMap<String, AttributeValue>>,
        _o: bool,
    ) -> graphddb_runtime::Result<WriteOutput> {
        Ok(WriteOutput::default())
    }
    async fn update_item(
        &self,
        _t: &str,
        _k: Item,
        _u: Option<String>,
        _c: Option<String>,
        _n: Option<HashMap<String, String>>,
        _v: Option<HashMap<String, AttributeValue>>,
        _o: bool,
    ) -> graphddb_runtime::Result<WriteOutput> {
        Ok(WriteOutput::default())
    }
    async fn delete_item(
        &self,
        _t: &str,
        _k: Item,
        _c: Option<String>,
        _n: Option<HashMap<String, String>>,
        _v: Option<HashMap<String, AttributeValue>>,
        _o: bool,
    ) -> graphddb_runtime::Result<WriteOutput> {
        Ok(WriteOutput::default())
    }
    async fn batch_get_item(
        &self,
        _t: &str,
        _k: Vec<Item>,
        _p: Option<String>,
        _n: Option<HashMap<String, String>>,
    ) -> graphddb_runtime::Result<(Vec<Item>, Vec<Item>)> {
        Ok((vec![], vec![]))
    }
    async fn batch_write_item(
        &self,
        _t: &str,
        _r: Vec<WriteRequest>,
    ) -> graphddb_runtime::Result<Vec<WriteRequest>> {
        Ok(vec![])
    }
    async fn transact_write_items(
        &self,
        _i: Vec<TransactWriteItem>,
    ) -> graphddb_runtime::Result<()> {
        Ok(())
    }
}

fn manifest() -> Json {
    json!({
        "version": "1.1",
        "entities": {
            "UserModel": { "table": "T", "key": { "pkTemplate": "USER#{userId}", "skTemplate": "PROFILE" }, "fields": { "userId": {} } }
        }
    })
}

/// A query whose ONE stage holds TWO independent hasMany relations off the root —
/// the executionPlan groups them together so they run concurrently.
fn operations() -> Json {
    json!({
        "version": "1.1",
        "queries": {
            "userWithTwoRels": {
                "cardinality": "one",
                "params": { "userId": { "type": "string", "required": true } },
                "executionPlan": { "groups": [[0], [1, 2]], "concurrency": 8 },
                "operations": [
                    {
                        "type": "GetItem", "tableName": "T",
                        "keyCondition": { "PK": "USER#{userId}", "SK": "PROFILE" },
                        "projection": ["userId"], "resultPath": "$", "entity": "UserModel"
                    },
                    {
                        "type": "Query", "tableName": "T",
                        "keyCondition": { "PK": "POSTS#{result.userId}" },
                        "projection": ["userId"], "resultPath": "$.posts.items",
                        "sourceField": "userId", "entity": "UserModel"
                    },
                    {
                        "type": "Query", "tableName": "T",
                        "keyCondition": { "PK": "COMMENTS#{result.userId}" },
                        "projection": ["userId"], "resultPath": "$.comments.items",
                        "sourceField": "userId", "entity": "UserModel"
                    }
                ]
            }
        },
        "commands": {}, "transactions": {}, "contracts": {}
    })
}

#[tokio::test]
async fn relation_stage_ops_run_concurrently() {
    let barrier = Arc::new(Barrier::new(2));
    let client = Arc::new(BarrierClient { barrier });
    let rt = GraphDDBRuntime::new(
        client,
        manifest(),
        operations(),
        None,
        Some(graphddb_runtime::RuntimeLimits {
            max_depth: 5,
            ..Default::default()
        }),
    )
    .unwrap();

    let params = json!({"userId": "u1"}).as_object().unwrap().clone();
    // If the two stage-1 relation Queries ran sequentially, the 2-party barrier
    // would deadlock. A 5s timeout turns that into a clear failure.
    let result = tokio::time::timeout(
        std::time::Duration::from_secs(5),
        rt.execute_query("userWithTwoRels", &params),
    )
    .await
    .expect(
        "relation stage ops did NOT run concurrently (barrier deadlocked → sequential execution)",
    )
    .unwrap();

    // Both relations resolved (empty connections) and order/slots preserved.
    assert!(result.get("posts").is_some(), "posts relation slot missing");
    assert!(
        result.get("comments").is_some(),
        "comments relation slot missing"
    );
    assert_eq!(result["posts"]["items"].as_array().unwrap().len(), 0);
    assert_eq!(result["comments"]["items"].as_array().unwrap().len(), 0);
}