use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use aws_sdk_dynamodb::types::{AttributeValue, TransactWriteItem, WriteRequest};
use graphddb_runtime::client::{
DynamoClient, GetItemInput, GetItemOutput, Item, QueryInput, QueryOutput, WriteOutput,
};
use graphddb_runtime::GraphDDBRuntime;
use serde_json::{json, Value as Json};
use tokio::sync::Barrier;
struct BarrierClient {
barrier: Arc<Barrier>,
}
#[async_trait]
impl DynamoClient for BarrierClient {
async fn get_item(&self, _i: GetItemInput) -> graphddb_runtime::Result<GetItemOutput> {
Ok(GetItemOutput {
item: Some(HashMap::from([(
"userId".to_string(),
AttributeValue::S("root".into()),
)])),
})
}
async fn query(&self, _i: QueryInput) -> graphddb_runtime::Result<QueryOutput> {
self.barrier.wait().await;
Ok(QueryOutput {
items: vec![],
last_evaluated_key: None,
})
}
async fn put_item(
&self,
_t: &str,
_i: Item,
_c: Option<String>,
_n: Option<HashMap<String, String>>,
_v: Option<HashMap<String, AttributeValue>>,
_o: bool,
) -> graphddb_runtime::Result<WriteOutput> {
Ok(WriteOutput::default())
}
async fn update_item(
&self,
_t: &str,
_k: Item,
_u: Option<String>,
_c: Option<String>,
_n: Option<HashMap<String, String>>,
_v: Option<HashMap<String, AttributeValue>>,
_o: bool,
) -> graphddb_runtime::Result<WriteOutput> {
Ok(WriteOutput::default())
}
async fn delete_item(
&self,
_t: &str,
_k: Item,
_c: Option<String>,
_n: Option<HashMap<String, String>>,
_v: Option<HashMap<String, AttributeValue>>,
_o: bool,
) -> graphddb_runtime::Result<WriteOutput> {
Ok(WriteOutput::default())
}
async fn batch_get_item(
&self,
_t: &str,
_k: Vec<Item>,
_p: Option<String>,
_n: Option<HashMap<String, String>>,
) -> graphddb_runtime::Result<(Vec<Item>, Vec<Item>)> {
Ok((vec![], vec![]))
}
async fn batch_write_item(
&self,
_t: &str,
_r: Vec<WriteRequest>,
) -> graphddb_runtime::Result<Vec<WriteRequest>> {
Ok(vec![])
}
async fn transact_write_items(
&self,
_i: Vec<TransactWriteItem>,
) -> graphddb_runtime::Result<()> {
Ok(())
}
}
fn manifest() -> Json {
json!({
"version": "1.1",
"entities": {
"UserModel": { "table": "T", "key": { "pkTemplate": "USER#{userId}", "skTemplate": "PROFILE" }, "fields": { "userId": {} } }
}
})
}
fn operations() -> Json {
json!({
"version": "1.1",
"queries": {
"userWithTwoRels": {
"cardinality": "one",
"params": { "userId": { "type": "string", "required": true } },
"executionPlan": { "groups": [[0], [1, 2]], "concurrency": 8 },
"operations": [
{
"type": "GetItem", "tableName": "T",
"keyCondition": { "PK": "USER#{userId}", "SK": "PROFILE" },
"projection": ["userId"], "resultPath": "$", "entity": "UserModel"
},
{
"type": "Query", "tableName": "T",
"keyCondition": { "PK": "POSTS#{result.userId}" },
"projection": ["userId"], "resultPath": "$.posts.items",
"sourceField": "userId", "entity": "UserModel"
},
{
"type": "Query", "tableName": "T",
"keyCondition": { "PK": "COMMENTS#{result.userId}" },
"projection": ["userId"], "resultPath": "$.comments.items",
"sourceField": "userId", "entity": "UserModel"
}
]
}
},
"commands": {}, "transactions": {}, "contracts": {}
})
}
#[tokio::test]
async fn relation_stage_ops_run_concurrently() {
let barrier = Arc::new(Barrier::new(2));
let client = Arc::new(BarrierClient { barrier });
let rt = GraphDDBRuntime::new(
client,
manifest(),
operations(),
None,
Some(graphddb_runtime::RuntimeLimits {
max_depth: 5,
..Default::default()
}),
)
.unwrap();
let params = json!({"userId": "u1"}).as_object().unwrap().clone();
let result = tokio::time::timeout(
std::time::Duration::from_secs(5),
rt.execute_query("userWithTwoRels", ¶ms),
)
.await
.expect(
"relation stage ops did NOT run concurrently (barrier deadlocked → sequential execution)",
)
.unwrap();
assert!(result.get("posts").is_some(), "posts relation slot missing");
assert!(
result.get("comments").is_some(),
"comments relation slot missing"
);
assert_eq!(result["posts"]["items"].as_array().unwrap().len(), 0);
assert_eq!(result["comments"]["items"].as_array().unwrap().len(), 0);
}