use std::collections::HashMap;
use aws_sdk_dynamodb::types::AttributeValue;
use serde_json::{Map as JsonMap, Value as Json};
use crate::attribute::{deserialize_item, serialize_string};
use crate::batch::{batch_get, serialize_key, BatchOptions};
use crate::concurrency::RELATION_TRAVERSAL_CONCURRENCY;
use crate::errors::Result;
use crate::hydration::hydrate_item;
use crate::middleware::ReadOpCtx;
use crate::relations::{collect_parents_mut, parse_result_path, relation_path};
use crate::runtime::{connection_json, resolve_entity, select_from_projection, GraphDDBRuntime};
use crate::templates::{resolve_template, Params};
use crate::value::Value;
struct RelationMerge {
parent_tokens: Vec<String>,
write_key: String,
values: Vec<Json>,
}
impl GraphDDBRuntime {
pub(crate) async fn assemble_relations(
&self,
query_id: &str,
spec: &Json,
operations: &[Json],
root: &mut Json,
context: &Json,
) -> Result<()> {
let stages = relation_stages(spec, operations);
let concurrency = plan_concurrency(spec);
for stage in stages {
let snapshot = root.clone();
let merges = crate::concurrency::map_with_concurrency(stage.len(), concurrency, |i| {
let op = operations[stage[i]].clone();
let snapshot = &snapshot;
let context = context.clone();
async move { self.fetch_relation(query_id, &op, snapshot, context).await }
})
.await?;
for merge in merges {
apply_merge(root, &merge);
}
}
Ok(())
}
async fn fetch_relation(
&self,
query_id: &str,
op: &Json,
snapshot: &Json,
context: Json,
) -> Result<RelationMerge> {
let (parent_tokens, write_key, _is_conn) =
parse_result_path(op.get("resultPath").and_then(Json::as_str).unwrap_or("$"))?;
let source_field = op
.get("sourceField")
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
let op_type = op.get("type").and_then(Json::as_str).unwrap_or("Query");
let values = if op_type == "BatchGetItem" {
if op.get("sourceList").map(|s| !s.is_null()).unwrap_or(false) {
self.fetch_refs(query_id, op, snapshot, &parent_tokens, &context)
.await?
} else {
self.fetch_batch_get(
query_id,
op,
snapshot,
&parent_tokens,
&source_field,
&context,
)
.await?
}
} else {
self.fetch_query(
query_id,
op,
snapshot,
&parent_tokens,
&source_field,
&context,
)
.await?
};
Ok(RelationMerge {
parent_tokens,
write_key,
values,
})
}
async fn fetch_batch_get(
&self,
query_id: &str,
op: &Json,
snapshot: &Json,
parent_tokens: &[String],
source_field: &str,
context: &Json,
) -> Result<Vec<Json>> {
let parents = collect_parents_ro(snapshot, parent_tokens);
let mut source_values: Vec<Json> = Vec::new();
let mut seen = std::collections::HashSet::new();
for p in &parents {
if let Some(v) = source_value(p, source_field) {
let marker = self.relation_key_marker(op, &v)?;
if seen.insert(marker) {
source_values.push(v);
}
}
}
let key_to_item = self
.run_relation_batch_get(query_id, op, &source_values, context)
.await?;
let mut out = Vec::with_capacity(parents.len());
for p in &parents {
match source_value(p, source_field) {
None => out.push(Json::Null),
Some(v) => {
let marker = self.relation_key_marker(op, &v)?;
out.push(key_to_item.get(&marker).cloned().unwrap_or(Json::Null));
}
}
}
Ok(out)
}
async fn fetch_refs(
&self,
query_id: &str,
op: &Json,
snapshot: &Json,
parent_tokens: &[String],
context: &Json,
) -> Result<Vec<Json>> {
let source_field = op
.get("sourceField")
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
let source_list = op
.get("sourceList")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let from_attr = source_list
.get("from")
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
let element_key = source_list
.get("key")
.and_then(Json::as_str)
.unwrap_or(&source_field)
.to_string();
let element_values = |parent: &Json| -> Vec<Json> {
let elements = parent
.get(&from_attr)
.and_then(Json::as_array)
.cloned()
.unwrap_or_default();
let mut out = Vec::new();
for el in elements {
let value = if el.is_object() {
el.get(&element_key).cloned().unwrap_or(Json::Null)
} else {
el
};
if !value.is_null() {
out.push(value);
}
}
out
};
let parents = collect_parents_ro(snapshot, parent_tokens);
let mut source_values: Vec<Json> = Vec::new();
let mut seen = std::collections::HashSet::new();
for p in &parents {
for value in element_values(p) {
let sv = single_field(&source_field, value);
let marker = self.relation_key_marker(op, &sv)?;
if seen.insert(marker) {
source_values.push(sv);
}
}
}
let key_to_item = if source_values.is_empty() {
HashMap::new()
} else {
self.run_relation_batch_get(query_id, op, &source_values, context)
.await?
};
let mut out = Vec::with_capacity(parents.len());
for p in &parents {
let mut items = Vec::new();
let mut seen_local = std::collections::HashSet::new();
for value in element_values(p) {
let sv = single_field(&source_field, value);
let marker = self.relation_key_marker(op, &sv)?;
if !seen_local.insert(marker.clone()) {
continue;
}
if let Some(item) = key_to_item.get(&marker) {
if !item.is_null() {
items.push(item.clone());
}
}
}
out.push(connection_json(items, None));
}
Ok(out)
}
async fn fetch_query(
&self,
query_id: &str,
op: &Json,
snapshot: &Json,
parent_tokens: &[String],
source_field: &str,
context: &Json,
) -> Result<Vec<Json>> {
let parents = collect_parents_ro(snapshot, parent_tokens);
let mut out = Vec::with_capacity(parents.len());
for p in &parents {
match source_value(p, source_field) {
None => out.push(connection_json(vec![], None)),
Some(v) => out.push(self.run_relation_query(query_id, op, &v, context).await?),
}
}
Ok(out)
}
async fn run_relation_query(
&self,
query_id: &str,
op: &Json,
source_values: &Json,
context: &Json,
) -> Result<Json> {
let select = select_from_projection(op);
let entity_meta = resolve_entity(
self.manifest_ref().get("entities").unwrap_or(&Json::Null),
op,
);
let ctx = result_context(source_values);
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
relation_path(op),
context.clone(),
);
let conn = self
.run_query(
query_id,
op,
&ctx,
&select,
&entity_meta,
&Json::Null,
&scope,
)
.await?;
Ok(connection_json(conn.items, conn.cursor))
}
async fn run_relation_batch_get(
&self,
query_id: &str,
op: &Json,
source_values: &[Json],
context: &Json,
) -> Result<HashMap<String, Json>> {
let select = select_from_projection(op);
let entity_meta = resolve_entity(
self.manifest_ref().get("entities").unwrap_or(&Json::Null),
op,
);
let mut plain_keys: Vec<Vec<(String, String)>> = Vec::new();
let mut seen = std::collections::HashSet::new();
let kc = op["keyCondition"].as_object().cloned().unwrap_or_default();
for sv in source_values {
let ctx = result_context(sv);
let mut plain: Vec<(String, String)> = Vec::new();
for (attr, tmpl) in &kc {
plain.push((
attr.clone(),
resolve_template(tmpl.as_str().unwrap_or(""), &ctx)?,
));
}
let marker = serialize_key(
&plain
.iter()
.map(|(k, v)| (k.clone(), Json::String(v.clone())))
.collect(),
);
if seen.insert(marker) {
plain_keys.push(plain);
}
}
if plain_keys.len() > self.limits_ref().max_items {
return Err(crate::errors::GraphDDBError::limit_exceeded(format!(
"{query_id}: BatchGet needs {} keys, exceeds max_items {}",
plain_keys.len(),
self.limits_ref().max_items
)));
}
let serialized_keys: Vec<HashMap<String, AttributeValue>> = plain_keys
.iter()
.map(|plain| {
plain
.iter()
.map(|(k, v)| (k.clone(), serialize_string(v.clone())))
.collect()
})
.collect();
let (projection_expression, names) = self.projection_request(op, &select);
let physical_table = self.physical_table(op["tableName"].as_str().unwrap_or(""));
let key_attrs: Vec<String> = kc.keys().cloned().collect();
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
relation_path(op),
context.clone(),
);
let mut hook = ReadOpCtx::batch_get(
physical_table.clone(),
names.clone().unwrap_or_default(),
scope.relation_path.clone(),
scope.model.clone(),
scope.context.clone(),
);
let raw_json = self
.run_op(&mut hook, |_ctx| {
let opts = BatchOptions {
max_batch_get_items: self.limits_ref().max_batch_get_items,
sleep: self.batch_sleep,
};
let physical_table = physical_table.clone();
let serialized_keys = serialized_keys.clone();
let projection_expression = projection_expression.clone();
let names = names.clone();
async move {
let raw = batch_get(
self.client_ref(),
&physical_table,
serialized_keys,
projection_expression,
names,
opts,
)
.await?;
Ok(raw
.iter()
.map(|r| Value::M(deserialize_item(r)).to_json())
.collect())
}
})
.await?;
let mut key_to_item: HashMap<String, Json> = HashMap::new();
for item_json in &raw_json {
let mut key_map: HashMap<String, Json> = HashMap::new();
for attr in &key_attrs {
key_map.insert(
attr.clone(),
item_json.get(attr).cloned().unwrap_or(Json::Null),
);
}
let marker = serialize_key(&key_map);
let de = crate::runtime::json_to_value_map(item_json);
let hydrated = hydrate_item(&de, &select, &entity_meta)?;
key_to_item.insert(marker, Value::M(hydrated).to_json());
}
Ok(key_to_item)
}
fn relation_key_marker(&self, op: &Json, source_values: &Json) -> Result<String> {
let ctx = result_context(source_values);
let kc = op["keyCondition"].as_object().cloned().unwrap_or_default();
let mut plain: HashMap<String, Json> = HashMap::new();
for (attr, tmpl) in &kc {
plain.insert(
attr.clone(),
Json::String(resolve_template(tmpl.as_str().unwrap_or(""), &ctx)?),
);
}
Ok(serialize_key(&plain))
}
pub(crate) fn projection_request(
&self,
op: &Json,
select: &[String],
) -> (Option<String>, Option<HashMap<String, String>>) {
let mut fields: Vec<String> = select.to_vec();
if let Some(kc) = op.get("keyCondition").and_then(Json::as_object) {
for attr in kc.keys() {
if !fields.contains(attr) {
fields.push(attr.clone());
}
}
}
if fields.is_empty() {
return (None, None);
}
let mut names = HashMap::new();
let mut alias_order: Vec<String> = Vec::new();
for (i, f) in fields.iter().enumerate() {
let alias = format!("#p{i}");
names.insert(alias.clone(), f.clone());
alias_order.push(alias);
}
(Some(alias_order.join(", ")), Some(names))
}
}
fn apply_merge(root: &mut Json, merge: &RelationMerge) {
for (parent, value) in collect_parents_mut(root, &merge.parent_tokens)
.into_iter()
.zip(merge.values.iter())
{
if let Some(obj) = parent.as_object_mut() {
obj.insert(merge.write_key.clone(), value.clone());
}
}
}
fn collect_parents_ro(root: &Json, parent_tokens: &[String]) -> Vec<Json> {
let mut nodes: Vec<Json> = vec![root.clone()];
for token in parent_tokens {
let mut next: Vec<Json> = Vec::new();
for node in &nodes {
if token == "items" {
if let Some(items) = node.get("items").and_then(Json::as_array) {
next.extend(items.iter().cloned());
}
} else if let Some(child) = node.get(token) {
if !child.is_null() {
next.push(child.clone());
}
}
}
nodes = next;
}
nodes.into_iter().filter(|n| n.is_object()).collect()
}
fn source_value(parent: &Json, source_field: &str) -> Option<Json> {
match parent.get(source_field) {
Some(v) if !v.is_null() => Some(single_field(source_field, v.clone())),
_ => None,
}
}
fn single_field(field: &str, value: Json) -> Json {
let mut m = JsonMap::new();
m.insert(field.to_string(), value);
Json::Object(m)
}
fn result_context(source_values: &Json) -> Params {
let mut ctx = Params::new();
if let Some(obj) = source_values.as_object() {
for (field, value) in obj {
ctx.insert(format!("result.{field}"), value.clone());
}
}
ctx
}
fn relation_stages(spec: &Json, operations: &[Json]) -> Vec<Vec<usize>> {
if let Some(plan) = spec.get("executionPlan") {
if let Some(groups) = plan.get("groups").and_then(Json::as_array) {
let mut stages = Vec::new();
for group in groups {
let relation_indices: Vec<usize> = group
.as_array()
.map(|a| {
a.iter()
.filter_map(Json::as_u64)
.map(|i| i as usize)
.filter(|&i| i != 0)
.collect()
})
.unwrap_or_default();
if !relation_indices.is_empty() {
stages.push(relation_indices);
}
}
return stages;
}
}
(1..operations.len()).map(|i| vec![i]).collect()
}
fn plan_concurrency(spec: &Json) -> usize {
spec.get("executionPlan")
.and_then(|p| p.get("concurrency"))
.and_then(Json::as_u64)
.filter(|&c| c > 0)
.map(|c| c as usize)
.unwrap_or(RELATION_TRAVERSAL_CONCURRENCY)
}