use std::collections::HashMap;
use std::sync::Arc;
use aws_sdk_dynamodb::types::AttributeValue;
use serde_json::{Map as JsonMap, Value as Json};
use crate::attribute::{deserialize_item, serialize_json, serialize_string};
use crate::client::{DynamoClient, GetItemInput, Item, QueryInput};
use crate::cursor::{decode_cursor, encode_cursor};
use crate::errors::{GraphDDBError, Result};
use crate::filters::compile_filter;
use crate::hydration::hydrate_item;
use crate::limits::RuntimeLimits;
use crate::middleware::{Middleware, MiddlewareRegistry, ReadOpCtx};
use crate::templates::{resolve_template, resolve_with, validate_params, Params};
use crate::value::indexmap_shim::IndexMap;
use crate::value::Value;
#[derive(Debug, Clone)]
pub struct Connection {
pub items: Vec<Json>,
pub cursor: Option<String>,
}
#[derive(Debug, Clone)]
pub struct OpScope {
pub context: Json,
pub model: Json,
pub relation_path: Vec<String>,
}
impl OpScope {
pub fn root(model: Json) -> Self {
Self {
context: Json::Object(JsonMap::new()),
model,
relation_path: vec![],
}
}
}
pub struct GraphDDBRuntime {
client: Arc<dyn DynamoClient>,
table_mapping: HashMap<String, String>,
limits: RuntimeLimits,
manifest: Json,
operations: Json,
middleware: MiddlewareRegistry,
pub(crate) batch_sleep: bool,
}
impl GraphDDBRuntime {
pub fn from_paths(
client: Arc<dyn DynamoClient>,
manifest_path: &str,
operations_path: &str,
table_mapping: Option<HashMap<String, String>>,
limits: Option<RuntimeLimits>,
) -> Result<Self> {
let manifest_txt = std::fs::read_to_string(manifest_path)
.map_err(|e| GraphDDBError::new(format!("cannot read {manifest_path}: {e}")))?;
let operations_txt = std::fs::read_to_string(operations_path)
.map_err(|e| GraphDDBError::new(format!("cannot read {operations_path}: {e}")))?;
let manifest: Json = serde_json::from_str(&manifest_txt)
.map_err(|e| GraphDDBError::new(format!("cannot parse {manifest_path}: {e}")))?;
let operations: Json = serde_json::from_str(&operations_txt)
.map_err(|e| GraphDDBError::new(format!("cannot parse {operations_path}: {e}")))?;
Self::new(client, manifest, operations, table_mapping, limits)
}
pub fn new(
client: Arc<dyn DynamoClient>,
manifest: Json,
operations: Json,
table_mapping: Option<HashMap<String, String>>,
limits: Option<RuntimeLimits>,
) -> Result<Self> {
crate::spec_version::validate_spec_version(&manifest, "manifest.json")?;
crate::spec_version::validate_spec_version(&operations, "operations.json")?;
Ok(Self {
client,
table_mapping: table_mapping.unwrap_or_default(),
limits: limits.unwrap_or_default(),
manifest,
operations,
middleware: MiddlewareRegistry::new(),
batch_sleep: true,
})
}
pub fn use_middleware(&mut self, mw: Middleware) {
self.middleware.use_hooks(mw);
}
pub fn clear_middleware(&mut self) {
self.middleware.clear();
}
pub(crate) fn op_scope(
&self,
entity: Option<&str>,
relation_path: Vec<String>,
context: Json,
) -> OpScope {
OpScope {
context,
model: self.ctx_model(entity),
relation_path,
}
}
pub(crate) fn ctx_model(&self, entity: Option<&str>) -> Json {
let meta = entity
.and_then(|e| self.entities().get(e))
.cloned()
.unwrap_or_else(|| Json::Object(JsonMap::new()));
serde_json::json!({ "entity": entity, "meta": meta })
}
pub(crate) fn middleware_ref(&self) -> &MiddlewareRegistry {
&self.middleware
}
fn queries(&self) -> &Json {
self.operations.get("queries").unwrap_or(&Json::Null)
}
fn contracts(&self) -> &Json {
self.operations.get("contracts").unwrap_or(&Json::Null)
}
fn entities(&self) -> &Json {
self.manifest.get("entities").unwrap_or(&Json::Null)
}
fn query_spec(&self, id: &str) -> Option<&Json> {
self.queries().get(id)
}
pub(crate) fn physical_table(&self, logical: &str) -> String {
self.table_mapping
.get(logical)
.cloned()
.unwrap_or_else(|| logical.to_string())
}
pub(crate) fn manifest_ref(&self) -> &Json {
&self.manifest
}
pub(crate) fn operations_ref(&self) -> &Json {
&self.operations
}
pub(crate) fn client_ref(&self) -> &dyn DynamoClient {
self.client.as_ref()
}
pub(crate) fn limits_ref(&self) -> &RuntimeLimits {
&self.limits
}
pub(crate) fn contracts_ref(&self) -> &Json {
self.contracts()
}
pub(crate) fn middleware_active(&self) -> bool {
self.middleware.active()
}
pub async fn execute_query(&self, query_id: &str, params: &Params) -> Result<Json> {
self.execute_query_with_options(query_id, params, &Json::Null)
.await
}
pub async fn execute_query_with_options(
&self,
query_id: &str,
params: &Params,
options: &Json,
) -> Result<Json> {
let spec = self
.query_spec(query_id)
.ok_or_else(|| GraphDDBError::query_not_found(format!("unknown query '{query_id}'")))?
.clone();
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(params, ¶m_specs, query_id)?;
let operations: Vec<Json> = spec
.get("operations")
.and_then(Json::as_array)
.cloned()
.unwrap_or_default();
self.enforce_operation_limits(query_id, &operations)?;
let root_op = operations[0].clone();
let root_entity = root_op.get("entity").and_then(Json::as_str);
if !self.middleware.active() {
return self
.execute_query_body(query_id, &spec, &operations, params, &Json::Null)
.await;
}
let context = options
.get("context")
.cloned()
.unwrap_or(Json::Object(JsonMap::new()));
let mut ctx_params = JsonMap::new();
ctx_params.insert("params".into(), Json::Object(params.clone()));
ctx_params.insert("options".into(), options.clone());
let mut req_ctx = crate::middleware::ReadRequestCtx {
kind: "query".to_string(),
model: self.ctx_model(root_entity),
context: context.clone(),
params: ctx_params,
state: JsonMap::new(),
};
let outcome: Result<Json> = async {
self.middleware.run_request_before(&mut req_ctx)?; let eff_params = req_ctx
.params
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let eff_options = req_ctx.params.get("options").cloned().unwrap_or(Json::Null);
self.execute_query_body(query_id, &spec, &operations, &eff_params, &eff_options)
.await
}
.await;
match outcome {
Ok(result) => Ok(self.middleware.run_request_after(&req_ctx, result)), Err(err) => self.middleware.run_request_error(&req_ctx, err), }
}
async fn execute_query_body(
&self,
query_id: &str,
spec: &Json,
operations: &[Json],
params: &Params,
options: &Json,
) -> Result<Json> {
let root_op = operations[0].clone();
let cardinality = spec
.get("cardinality")
.and_then(Json::as_str)
.map(str::to_string);
let context = options
.get("context")
.cloned()
.unwrap_or(Json::Object(JsonMap::new()));
let root = self
.run_root(
query_id,
&root_op,
params,
options,
cardinality.as_deref(),
&context,
)
.await?;
let mut root = match root {
Some(r) => r,
None => return Ok(Json::Null),
};
self.assemble_relations(query_id, spec, operations, &mut root, &context)
.await?;
strip_implicit_sources(operations, &mut root);
Ok(root)
}
pub fn explain(&self, query_id: &str, params: &Params) -> Result<Json> {
let spec = self
.query_spec(query_id)
.ok_or_else(|| GraphDDBError::query_not_found(format!("unknown query '{query_id}'")))?
.clone();
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(params, ¶m_specs, query_id)?;
let mut resolved_ops: Vec<Json> = Vec::new();
for op in spec
.get("operations")
.and_then(Json::as_array)
.cloned()
.unwrap_or_default()
{
let mut resolved = JsonMap::new();
resolved.insert("type".into(), op["type"].clone());
resolved.insert(
"tableName".into(),
Json::String(self.physical_table(op["tableName"].as_str().unwrap_or(""))),
);
let mut kc = JsonMap::new();
if let Some(kco) = op.get("keyCondition").and_then(Json::as_object) {
for (attr, tmpl) in kco {
kc.insert(
attr.clone(),
Json::String(resolve_partial(tmpl.as_str().unwrap_or(""), params)),
);
}
}
resolved.insert("keyCondition".into(), Json::Object(kc));
resolved.insert(
"projection".into(),
op.get("projection").cloned().unwrap_or(Json::Array(vec![])),
);
resolved.insert(
"resultPath".into(),
Json::String(
op.get("resultPath")
.and_then(Json::as_str)
.unwrap_or("$")
.to_string(),
),
);
if let Some(idx) = op.get("indexName").and_then(Json::as_str) {
resolved.insert("indexName".into(), Json::String(idx.to_string()));
}
if let Some(rc) = op.get("rangeCondition").and_then(Json::as_object) {
let mut rco = JsonMap::new();
rco.insert("operator".into(), rc["operator"].clone());
rco.insert("key".into(), rc["key"].clone());
rco.insert(
"value".into(),
Json::String(resolve_partial(rc["value"].as_str().unwrap_or(""), params)),
);
resolved.insert("rangeCondition".into(), Json::Object(rco));
}
if let Some(lim) = op.get("limit") {
if !lim.is_null() {
resolved.insert("limit".into(), lim.clone());
}
}
if let Some(f) = op.get("filter") {
if !f.is_null() {
resolved.insert("filter".into(), f.clone());
}
}
if let Some(sf) = op.get("sourceField") {
if !sf.is_null() {
resolved.insert("sourceField".into(), sf.clone());
}
}
if let Some(sl) = op.get("sourceList") {
if !sl.is_null() {
resolved.insert("sourceList".into(), sl.clone());
}
}
resolved_ops.push(Json::Object(resolved));
}
let mut out = JsonMap::new();
out.insert("queryId".into(), Json::String(query_id.to_string()));
out.insert(
"cardinality".into(),
spec.get("cardinality").cloned().unwrap_or(Json::Null),
);
out.insert("operations".into(), Json::Array(resolved_ops));
Ok(Json::Object(out))
}
fn enforce_operation_limits(&self, query_id: &str, operations: &[Json]) -> Result<()> {
if operations.is_empty() {
return Err(GraphDDBError::new(format!(
"{query_id}: query has no operations"
)));
}
if operations.len() > self.limits.max_operations {
return Err(GraphDDBError::limit_exceeded(format!(
"{query_id}: query needs {} operations, exceeds max_operations {}",
operations.len(),
self.limits.max_operations
)));
}
let depth = max_relation_depth(operations);
if depth > self.limits.max_depth {
return Err(GraphDDBError::limit_exceeded(format!(
"{query_id}: relation traversal depth {depth} exceeds max_depth {}",
self.limits.max_depth
)));
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn run_root(
&self,
query_id: &str,
op: &Json,
params: &Params,
options: &Json,
cardinality: Option<&str>,
context: &Json,
) -> Result<Option<Json>> {
let select = select_from_projection(op);
let entity_meta = self.resolve_entity(op);
let op_type = op["type"].as_str().unwrap_or("");
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
vec![],
context.clone(),
);
match op_type {
"GetItem" => {
let item = self
.run_get_item(query_id, op, params, &select, &entity_meta, false, &scope)
.await?;
Ok(item.map(|m| Value::M(m).to_json()))
}
"Query" => {
let conn = self
.run_query(query_id, op, params, &select, &entity_meta, options, &scope)
.await?;
if cardinality == Some("one") {
Ok(conn.items.into_iter().next())
} else {
Ok(Some(connection_to_json(&conn)))
}
}
other => Err(GraphDDBError::new(format!(
"{query_id}: unsupported root read operation '{other}'"
))),
}
}
pub(crate) async fn run_op<F, Fut>(&self, ctx: &mut ReadOpCtx, send: F) -> Result<Vec<Json>>
where
F: FnOnce(&ReadOpCtx) -> Fut,
Fut: std::future::Future<Output = Result<Vec<Json>>>,
{
if !self.middleware.active() {
return send(ctx).await;
}
let result = match self.middleware.run_op_before(ctx) {
Ok(()) => send(ctx).await,
Err(e) => Err(e),
};
match result {
Ok(items) => Ok(self.middleware.run_op_after(ctx, items)), Err(err) => self.middleware.run_op_error(ctx, err), }
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_get_item(
&self,
_query_id: &str,
op: &Json,
params: &Params,
select: &[String],
entity_meta: &Json,
consistent_read: bool,
scope: &OpScope,
) -> Result<Option<IndexMap<Value>>> {
let mut key: Item = HashMap::new();
for (attr, tmpl) in op["keyCondition"].as_object().cloned().unwrap_or_default() {
let resolved = resolve_template(tmpl.as_str().unwrap_or(""), params)?;
key.insert(attr, serialize_string(resolved));
}
let table_name = self.physical_table(op["tableName"].as_str().unwrap_or(""));
let (projection_expression, projection_names) = self.projection_request(op, select);
if !self.middleware.active() {
let input = GetItemInput {
table_name,
key,
consistent_read,
projection_expression: projection_expression.clone(),
expression_attribute_names: projection_names.clone(),
};
let resp = self.client.get_item(input).await?;
return match resp.item {
Some(it) => Ok(Some(hydrate_item(
&deserialize_item(&it),
select,
entity_meta,
)?)),
None => Ok(None),
};
}
let mut hook_op = ReadOpCtx::get_item(
table_name,
key.clone(),
consistent_read,
scope.relation_path.clone(),
scope.model.clone(),
scope.context.clone(),
);
let raw_items = self
.run_op(&mut hook_op, |ctx| {
let input = GetItemInput {
table_name: ctx.table_name.clone(),
key: ctx.key.clone(),
consistent_read: ctx.consistent_read,
projection_expression: projection_expression.clone(),
expression_attribute_names: projection_names.clone(),
};
async move {
let resp = self.client.get_item(input).await?;
Ok(match resp.item {
Some(it) => vec![Value::M(deserialize_item(&it)).to_json()],
None => vec![],
})
}
})
.await?;
match raw_items.into_iter().next() {
None => Ok(None),
Some(raw) => {
let de = json_to_value_map(&raw);
Ok(Some(hydrate_item(&de, select, entity_meta)?))
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn run_query(
&self,
query_id: &str,
op: &Json,
params: &Params,
select: &[String],
entity_meta: &Json,
options: &Json,
scope: &OpScope,
) -> Result<Connection> {
let mut names: HashMap<String, String> = HashMap::new();
let mut values: HashMap<String, AttributeValue> = HashMap::new();
let mut clauses: Vec<String> = Vec::new();
let kc = op["keyCondition"].as_object().cloned().unwrap_or_default();
for (i, (attr, tmpl)) in kc.iter().enumerate() {
let n = format!("#k{i}");
let v = format!(":k{i}");
names.insert(n.clone(), attr.clone());
let resolved = resolve_template(tmpl.as_str().unwrap_or(""), params)?;
values.insert(v.clone(), serialize_string(resolved));
clauses.push(format!("{n} = {v}"));
}
if let Some(rng) = op.get("rangeCondition").and_then(Json::as_object) {
let n = "#kr".to_string();
let v = ":kr".to_string();
names.insert(n.clone(), rng["key"].as_str().unwrap_or("").to_string());
let resolved = resolve_template(rng["value"].as_str().unwrap_or(""), params)?;
values.insert(v.clone(), serialize_string(resolved));
match rng["operator"].as_str() {
Some("begins_with") => clauses.push(format!("begins_with({n}, {v})")),
other => {
return Err(GraphDDBError::new(format!(
"unsupported range operator '{}'",
other.unwrap_or("?")
)))
}
}
}
let index_name = op
.get("indexName")
.and_then(Json::as_str)
.map(str::to_string);
let consistent_read = options
.get("consistentRead")
.and_then(Json::as_bool)
.unwrap_or(false)
&& index_name.is_none();
let mut limit: Option<i32> = None;
if let Some(l) = op.get("limit").and_then(Json::as_i64) {
if l as usize > self.limits.max_items {
return Err(GraphDDBError::limit_exceeded(format!(
"{query_id}: limit {l} exceeds max_items {}",
self.limits.max_items
)));
}
limit = Some(l as i32);
}
let mut filter_expression: Option<String> = None;
if let Some(decl) = op.get("filter").and_then(|f| f.get("declarative")) {
if let Some(compiled) = compile_filter(decl)? {
filter_expression = Some(compiled.expression);
for (a, c) in compiled.names.iter() {
names.insert(a.clone(), c.clone());
}
for (a, val) in compiled.values.iter() {
values.insert(a.clone(), val.clone());
}
}
}
let exclusive_start_key = match options.get("cursor").and_then(Json::as_str) {
Some(cursor) if !cursor.is_empty() => Some(self.serialize_cursor_key(cursor)?),
_ => None,
};
let (projection_expression, projection_names) = self.projection_request(op, select);
if let Some(pn) = &projection_names {
for (a, c) in pn.iter() {
names.insert(a.clone(), c.clone());
}
}
let table_name = self.physical_table(op["tableName"].as_str().unwrap_or(""));
let kce = clauses.join(" AND ");
if !self.middleware.active() {
let input = QueryInput {
table_name,
key_condition_expression: kce,
expression_attribute_names: names,
expression_attribute_values: values,
index_name,
filter_expression,
projection_expression: projection_expression.clone(),
limit,
exclusive_start_key,
consistent_read,
};
let resp = self.client.query(input).await?;
let cursor = match resp.last_evaluated_key {
Some(lek) => Some(encode_cursor(&Value::M(deserialize_lek(&lek)))?),
None => None,
};
let mut items: Vec<Json> = Vec::with_capacity(resp.items.len());
for r in &resp.items {
let de = deserialize_item(r);
items.push(Value::M(hydrate_item(&de, select, entity_meta)?).to_json());
}
if items.len() > self.limits.max_items {
return Err(GraphDDBError::limit_exceeded(format!(
"{query_id}: returned {} items, exceeds max_items {}",
items.len(),
self.limits.max_items
)));
}
return Ok(Connection { items, cursor });
}
let mut hook_op = ReadOpCtx::query(
table_name,
names,
values,
filter_expression,
scope.relation_path.clone(),
scope.model.clone(),
scope.context.clone(),
);
let captured_cursor: std::cell::RefCell<Option<String>> = std::cell::RefCell::new(None);
let raw_items = self
.run_op(&mut hook_op, |ctx| {
let input = QueryInput {
table_name: ctx.table_name.clone(),
key_condition_expression: kce.clone(),
expression_attribute_names: ctx.names.clone(),
expression_attribute_values: ctx.values.clone(),
index_name: index_name.clone(),
filter_expression: ctx.filter_expression.clone(),
projection_expression: projection_expression.clone(),
limit,
exclusive_start_key: exclusive_start_key.clone(),
consistent_read,
};
let captured_cursor = &captured_cursor;
async move {
let resp = self.client.query(input).await?;
if let Some(lek) = resp.last_evaluated_key {
let de = deserialize_lek(&lek);
*captured_cursor.borrow_mut() = Some(encode_cursor(&Value::M(de))?);
}
Ok(resp
.items
.iter()
.map(|r| Value::M(deserialize_item(r)).to_json())
.collect())
}
})
.await?;
let mut items: Vec<Json> = Vec::new();
for raw in &raw_items {
let de = json_to_value_map(raw);
items.push(Value::M(hydrate_item(&de, select, entity_meta)?).to_json());
}
if items.len() > self.limits.max_items {
return Err(GraphDDBError::limit_exceeded(format!(
"{query_id}: returned {} items, exceeds max_items {}",
items.len(),
self.limits.max_items
)));
}
let cursor = captured_cursor.into_inner();
Ok(Connection { items, cursor })
}
fn serialize_cursor_key(&self, cursor: &str) -> Result<Item> {
let decoded = decode_cursor(cursor)?;
let obj = decoded
.as_object()
.ok_or_else(|| GraphDDBError::new("cursor did not decode to a key object"))?;
let mut out: Item = HashMap::new();
for (k, v) in obj {
out.insert(k.clone(), serialize_json(v)?);
}
Ok(out)
}
fn resolve_entity(&self, op: &Json) -> Json {
resolve_entity(self.entities(), op)
}
}
fn deserialize_lek(item: &Item) -> IndexMap<Value> {
let mut keys: Vec<&String> = item.keys().collect();
keys.sort_by(|a, b| lek_rank(a).cmp(&lek_rank(b)).then_with(|| a.cmp(b)));
let mut out = IndexMap::new();
for k in keys {
out.insert(k.clone(), crate::attribute::deserialize(&item[k]));
}
out
}
fn lek_rank(attr: &str) -> (u32, u32, u32) {
match attr {
"SK" => (0, 0, 0),
"PK" => (0, 1, 0),
_ if attr.starts_with("GSI") && (attr.ends_with("PK") || attr.ends_with("SK")) => {
let body = &attr[3..attr.len() - 2];
let idx: u32 = body.parse().unwrap_or(0);
let sub = if attr.ends_with("PK") { 0 } else { 1 };
(1, idx, sub)
}
_ => (2, 0, 0),
}
}
pub(crate) fn json_to_value_map(raw: &Json) -> IndexMap<Value> {
let mut out = IndexMap::new();
if let Some(obj) = raw.as_object() {
for (k, v) in obj {
out.insert(k.clone(), json_to_value(v));
}
}
out
}
fn json_to_value(v: &Json) -> Value {
match v {
Json::String(s) => Value::S(s.clone()),
Json::Bool(b) => Value::Bool(*b),
Json::Null => Value::Null,
Json::Number(n) => Value::N(n.to_string()),
Json::Array(a) => Value::L(a.iter().map(json_to_value).collect()),
Json::Object(o) => {
let mut m = IndexMap::new();
for (k, vv) in o {
m.insert(k.clone(), json_to_value(vv));
}
Value::M(m)
}
}
}
pub(crate) fn select_from_projection(op: &Json) -> Vec<String> {
op.get("projection")
.and_then(Json::as_array)
.map(|a| {
a.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default()
}
fn connection_to_json(conn: &Connection) -> Json {
let mut m = JsonMap::new();
m.insert("items".into(), Json::Array(conn.items.clone()));
m.insert(
"cursor".into(),
conn.cursor.clone().map(Json::String).unwrap_or(Json::Null),
);
Json::Object(m)
}
pub fn connection_json(items: Vec<Json>, cursor: Option<String>) -> Json {
connection_to_json(&Connection { items, cursor })
}
fn resolve_partial(template: &str, params: &Params) -> String {
resolve_with(template, |name| {
if name.starts_with("result.") {
Some(format!("{{{name}}}"))
} else {
match params.get(name) {
Some(v) if !v.is_null() => Some(crate::templates::json_to_template_string(v)),
_ => Some(format!("{{{name}}}")),
}
}
})
.unwrap_or_else(|_| template.to_string())
}
fn max_relation_depth(operations: &[Json]) -> usize {
let mut max_depth = 0;
for op in &operations[1..] {
let path = op.get("resultPath").and_then(Json::as_str).unwrap_or("$");
if path == "$" || path.is_empty() {
continue;
}
let tokens: Vec<&str> = path[2..].split('.').collect();
let depth = tokens.iter().filter(|t| **t != "items").count();
max_depth = max_depth.max(depth);
}
max_depth
}
fn strip_implicit_sources(operations: &[Json], root: &mut Json) {
for op in operations {
let source_list = match op.get("sourceList").and_then(Json::as_object) {
Some(sl) if sl.get("implicit").and_then(Json::as_bool).unwrap_or(false) => sl,
_ => continue,
};
let from = match source_list.get("from").and_then(Json::as_str) {
Some(f) => f.to_string(),
None => continue,
};
let (parent_tokens, _wk, _conn) = match crate::relations::parse_result_path(
op.get("resultPath").and_then(Json::as_str).unwrap_or("$"),
) {
Ok(t) => t,
Err(_) => continue,
};
for parent in crate::relations::collect_parents_mut(root, &parent_tokens) {
if let Some(obj) = parent.as_object_mut() {
obj.remove(&from);
}
}
}
}
pub(crate) fn resolve_entity(entities: &Json, op: &Json) -> Json {
let empty_meta = || {
let mut m = JsonMap::new();
m.insert("fields".into(), Json::Object(JsonMap::new()));
Json::Object(m)
};
let entities = match entities.as_object() {
Some(e) => e,
None => return empty_meta(),
};
let table = op["tableName"].as_str().unwrap_or("");
let index_name = op.get("indexName").and_then(Json::as_str);
let kc = op.get("keyCondition").and_then(Json::as_object);
let projection: Vec<String> = select_from_projection(op);
let mut candidates: Vec<(String, Json)> = Vec::new();
for meta in entities.values() {
if meta.get("table").and_then(Json::as_str) != Some(table) {
continue;
}
let matched = match index_name {
None => {
let key = meta.get("key");
match key {
Some(k) if !k.is_null() => {
let pk = k.get("pkTemplate").and_then(Json::as_str).unwrap_or("");
let sk = k.get("skTemplate").and_then(Json::as_str);
key_match_kind(kc, op, "PK", Some(pk), "SK", sk)
}
_ => None,
}
}
Some(idx) => {
let mut m = None;
if let Some(gsis) = meta.get("gsis").and_then(Json::as_array) {
for gsi in gsis {
if gsi.get("indexName").and_then(Json::as_str) != Some(idx) {
continue;
}
let pk_attr = format!("{idx}PK");
let sk_attr = format!("{idx}SK");
m = key_match_kind(
kc,
op,
&pk_attr,
gsi.get("pkTemplate").and_then(Json::as_str),
&sk_attr,
gsi.get("skTemplate").and_then(Json::as_str),
);
if m.is_some() {
break;
}
}
}
m
}
};
if let Some(kind) = matched {
candidates.push((kind, meta.clone()));
}
}
if candidates.is_empty() {
return empty_meta();
}
let exact: Vec<Json> = candidates
.iter()
.filter(|(k, _)| k == "exact")
.map(|(_, m)| m.clone())
.collect();
if exact.len() == 1 {
return exact.into_iter().next().unwrap();
}
if exact.len() > 1 {
let narrowed = disambiguate_by_projection(&exact, &projection);
if narrowed.len() == 1 {
return narrowed.into_iter().next().unwrap();
}
}
let partition: Vec<Json> = candidates
.iter()
.filter(|(k, _)| k == "partition")
.map(|(_, m)| m.clone())
.collect();
let pool = if !exact.is_empty() {
exact
} else if !partition.is_empty() {
partition
} else {
candidates.iter().map(|(_, m)| m.clone()).collect()
};
let narrowed = disambiguate_by_projection(&pool, &projection);
if narrowed.len() == 1 {
return narrowed.into_iter().next().unwrap();
}
empty_meta()
}
fn disambiguate_by_projection(candidates: &[Json], projection: &[String]) -> Vec<Json> {
if projection.is_empty() {
return candidates.to_vec();
}
let covering: Vec<Json> = candidates
.iter()
.filter(|meta| {
let fields = meta.get("fields").and_then(Json::as_object);
match fields {
Some(f) => projection.iter().all(|p| f.contains_key(p)),
None => false,
}
})
.cloned()
.collect();
if covering.is_empty() {
candidates.to_vec()
} else {
covering
}
}
fn key_match_kind(
kc: Option<&JsonMap<String, Json>>,
op: &Json,
pk_attr: &str,
pk_tmpl: Option<&str>,
sk_attr: &str,
sk_tmpl: Option<&str>,
) -> Option<String> {
let kc_get = |k: &str| kc.and_then(|m| m.get(k)).and_then(Json::as_str);
if normalize_template(kc_get(pk_attr)) != normalize_template(pk_tmpl) {
return None;
}
let op_sk = kc_get(sk_attr);
if let Some(op_sk) = op_sk {
return if normalize_template(Some(op_sk)) == normalize_template(sk_tmpl) {
Some("exact".to_string())
} else {
None
};
}
if let Some(rng) = op.get("rangeCondition").and_then(Json::as_object) {
if rng.get("key").and_then(Json::as_str) == Some(sk_attr) {
let sk_tmpl = sk_tmpl?;
let value = rng.get("value").and_then(Json::as_str).unwrap_or("");
let literal_prefix = placeholder_split_first(value);
if !literal_prefix.is_empty() && !sk_tmpl.starts_with(&literal_prefix) {
return None;
}
return Some("exact".to_string());
}
}
Some("partition".to_string())
}
fn normalize_template(t: Option<&str>) -> Option<String> {
t.map(|s| {
let mut out = String::new();
let bytes = s.as_bytes();
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'{' {
if let Some(close) = s[i + 1..].find('}') {
let name = &s[i + 1..i + 1 + close];
if !name.is_empty() && !name.contains('{') {
out.push('\u{0}');
i += 1 + close + 1;
continue;
}
}
}
out.push(bytes[i] as char);
i += 1;
}
out
})
}
fn placeholder_split_first(value: &str) -> String {
match value.find('{') {
Some(pos) => {
if value[pos + 1..].contains('}') {
value[..pos].to_string()
} else {
value.to_string()
}
}
None => value.to_string(),
}
}