use std::collections::HashMap;
use aws_sdk_dynamodb::types::AttributeValue;
use serde_json::Value as Json;
use crate::attribute::{serialize_json, serialize_string};
use crate::entity::add_key_attributes;
use crate::errors::{GraphDDBError, Result};
use crate::filters::compile_filter;
use crate::runtime::GraphDDBRuntime;
use crate::templates::{resolve_template, validate_params, Params};
use crate::transactions::{expand_transaction, MAX_TRANSACT_ITEMS_LIMIT};
#[derive(Default)]
pub(crate) struct WriteBody {
pub table_name: String,
pub kind: WriteKind,
pub condition_expression: Option<String>,
pub names: HashMap<String, String>,
pub values: HashMap<String, AttributeValue>,
}
#[derive(Default)]
pub(crate) enum WriteKind {
#[default]
None,
Put(HashMap<String, AttributeValue>),
Update {
key: HashMap<String, AttributeValue>,
update_expression: Option<String>,
},
Delete(HashMap<String, AttributeValue>),
}
#[derive(Default)]
pub(crate) struct Cap {
pub old_image: Option<Json>,
pub new_image: Option<Json>,
}
pub(crate) fn type_to_kind(op_type: &str) -> Option<&'static str> {
match op_type {
"PutItem" => Some("put"),
"UpdateItem" => Some("update"),
"DeleteItem" => Some("delete"),
_ => None,
}
}
pub(crate) fn write_input_for(kind: &str, params: &Params) -> serde_json::Map<String, Json> {
let mut input = serde_json::Map::new();
input.insert("params".into(), Json::Object(params.clone()));
if kind == "put" {
input.insert("item".into(), Json::Object(params.clone()));
} else {
input.insert("key".into(), Json::Object(params.clone()));
if kind == "update" {
input.insert("changes".into(), Json::Object(serde_json::Map::new()));
}
}
input
}
fn image_to_json(item: &HashMap<String, AttributeValue>) -> Json {
let de = crate::attribute::deserialize_item(item);
crate::value::Value::M(de).to_json()
}
impl GraphDDBRuntime {
fn commands_get(&self, id: &str) -> Option<Json> {
self.operations_ref()
.get("commands")
.and_then(|c| c.get(id))
.cloned()
}
fn transactions_get(&self, id: &str) -> Option<Json> {
self.operations_ref()
.get("transactions")
.and_then(|t| t.get(id))
.cloned()
}
pub async fn execute_command(&self, command_id: &str, params: &Params) -> Result<()> {
self.execute_command_with_options(command_id, params, &Json::Null)
.await
}
pub async fn execute_command_with_options(
&self,
command_id: &str,
params: &Params,
options: &Json,
) -> Result<()> {
let spec = self.commands_get(command_id).ok_or_else(|| {
GraphDDBError::command_not_found(format!("unknown command '{command_id}'"))
})?;
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(params, ¶m_specs, command_id)?;
let op_type = spec["type"].as_str().unwrap_or("");
if !self.middleware_active() {
let body = self.build_write_body(command_id, &spec, op_type, params)?;
self.send_write_body(command_id, body, &Json::Null).await?;
return Ok(());
}
let context = options
.get("context")
.cloned()
.unwrap_or(Json::Object(serde_json::Map::new()));
let kind = type_to_kind(op_type).ok_or_else(|| {
GraphDDBError::new(format!("{command_id}: unknown command type '{op_type}'"))
})?;
let mut w_ctx = crate::middleware::WriteCtx {
kind: kind.to_string(),
model: self.ctx_model(spec.get("entity").and_then(Json::as_str)),
context: context.clone(),
input: write_input_for(kind, params),
state: serde_json::Map::new(),
transaction: None,
};
let outcome: Result<Json> = async {
self.middleware_ref().run_write_before(&mut w_ctx)?; let (eff_command_id, eff_spec, eff_type, eff_params) =
self.rederive_write(command_id, &spec, &w_ctx)?;
let change = self
.dispatch_command(&eff_command_id, &eff_spec, &eff_type, &eff_params, &context)
.await?;
Ok(change)
}
.await;
match outcome {
Ok(change) => {
self.middleware_ref().run_write_after(&w_ctx, &change); Ok(())
}
Err(err) => {
self.middleware_ref()
.run_write_error(&w_ctx, err)
.map(|_| ())
}
}
}
async fn dispatch_command(
&self,
command_id: &str,
spec: &Json,
op_type: &str,
params: &Params,
context: &Json,
) -> Result<Json> {
let body = self.build_write_body(command_id, spec, op_type, params)?;
let force_old = self.middleware_ref().has_write_after();
let cap = self
.send_write_body_capturing(command_id, body, context, force_old)
.await?;
if !force_old {
return Ok(Json::Object(serde_json::Map::new()));
}
let kind = type_to_kind(op_type).unwrap_or("put");
let mut change = serde_json::Map::new();
if let Some(old) = cap.old_image.clone() {
change.insert("oldImage".into(), old);
}
match kind {
"put" => {
if let Some(new) = cap.new_image.clone() {
change.insert("newImage".into(), new);
}
}
"update" => {
let mut base = cap
.old_image
.clone()
.and_then(|v| v.as_object().cloned())
.unwrap_or_default();
for (field, tmpl) in spec
.get("changes")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default()
{
base.insert(
field,
Json::String(resolve_template(tmpl.as_str().unwrap_or(""), params)?),
);
}
change.insert("newImage".into(), Json::Object(base));
}
_ => {}
}
Ok(Json::Object(change))
}
fn rederive_write(
&self,
command_id: &str,
spec: &Json,
ctx: &crate::middleware::WriteCtx,
) -> Result<(String, Json, String, Params)> {
let input = &ctx.input;
let mut merged = input
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
for bucket in ["item", "key"] {
if let Some(obj) = input.get(bucket).and_then(Json::as_object) {
for (k, v) in obj {
merged.insert(k.clone(), v.clone());
}
}
}
let changes = input
.get("changes")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let original_kind = type_to_kind(spec["type"].as_str().unwrap_or("")).unwrap_or("");
let spec_type = spec["type"].as_str().unwrap_or("").to_string();
if ctx.kind == original_kind {
if ctx.kind == "update" && !changes.is_empty() {
let mut merged_spec = spec.clone();
let mut merged_changes = spec
.get("changes")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
for (field, value) in &changes {
merged.insert(field.clone(), value.clone());
merged_changes.insert(field.clone(), Json::String(format!("{{{field}}}")));
}
merged_spec["changes"] = Json::Object(merged_changes);
return Ok((command_id.to_string(), merged_spec, spec_type, merged));
}
return Ok((command_id.to_string(), spec.clone(), spec_type, merged));
}
match ctx.kind.as_str() {
"update" => {
let mut synth_changes = serde_json::Map::new();
for (field, value) in &changes {
merged.insert(field.clone(), value.clone());
synth_changes.insert(field.clone(), Json::String(format!("{{{field}}}")));
}
let synth = serde_json::json!({
"type": "UpdateItem",
"tableName": spec["tableName"],
"entity": spec.get("entity").cloned().unwrap_or(Json::Null),
"keyCondition": spec.get("keyCondition").cloned().unwrap_or(Json::Object(serde_json::Map::new())),
"changes": Json::Object(synth_changes),
});
Ok((
command_id.to_string(),
synth,
"UpdateItem".to_string(),
merged,
))
}
"put" => {
let mut s = spec.clone();
s["type"] = Json::String("PutItem".into());
Ok((command_id.to_string(), s, "PutItem".to_string(), merged))
}
"delete" => {
let mut s = spec.clone();
s["type"] = Json::String("DeleteItem".into());
Ok((command_id.to_string(), s, "DeleteItem".to_string(), merged))
}
other => Err(GraphDDBError::new(format!(
"{command_id}: W1 rewrote to unknown kind '{other}'"
))),
}
}
pub(crate) fn build_write_body(
&self,
command_id: &str,
spec: &Json,
op_type: &str,
params: &Params,
) -> Result<WriteBody> {
match op_type {
"PutItem" => self.build_put_request(spec, params),
"UpdateItem" => self.build_update_request(command_id, spec, params),
"DeleteItem" => self.build_delete_request(spec, params),
other => Err(GraphDDBError::new(format!(
"{command_id}: unknown command type '{other}'"
))),
}
}
fn build_put_request(&self, spec: &Json, params: &Params) -> Result<WriteBody> {
let entity_name = spec["entity"].as_str().unwrap_or("");
let mut plain: Vec<(String, Json)> = Vec::new();
if let Some(item) = spec.get("item").and_then(Json::as_object) {
for (field, tmpl) in item {
plain.push((
field.clone(),
Json::String(resolve_template(tmpl.as_str().unwrap_or(""), params)?),
));
}
}
add_key_attributes(self.manifest_ref(), entity_name, &mut plain);
let mut item: HashMap<String, AttributeValue> = HashMap::new();
for (k, v) in &plain {
item.insert(k.clone(), serialize_json(v)?);
}
let mut names = HashMap::new();
let mut values = HashMap::new();
let condition_expression = self.apply_condition(spec, params, &mut names, &mut values)?;
Ok(WriteBody {
table_name: self.physical_table(spec["tableName"].as_str().unwrap_or("")),
kind: WriteKind::Put(item),
condition_expression,
names,
values,
})
}
fn build_delete_request(&self, spec: &Json, params: &Params) -> Result<WriteBody> {
let key = self.resolve_key(spec, params)?;
let mut names = HashMap::new();
let mut values = HashMap::new();
let condition_expression = self.apply_condition(spec, params, &mut names, &mut values)?;
Ok(WriteBody {
table_name: self.physical_table(spec["tableName"].as_str().unwrap_or("")),
kind: WriteKind::Delete(key),
condition_expression,
names,
values,
})
}
fn build_update_request(
&self,
command_id: &str,
spec: &Json,
params: &Params,
) -> Result<WriteBody> {
let key = self.resolve_key(spec, params)?;
let mut names: HashMap<String, String> = HashMap::new();
let mut values: HashMap<String, AttributeValue> = HashMap::new();
let mut sets: Vec<String> = Vec::new();
if let Some(changes) = spec.get("changes").and_then(Json::as_object) {
for (i, (field, tmpl)) in changes.iter().enumerate() {
let n = format!("#c{i}");
let v = format!(":c{i}");
names.insert(n.clone(), field.clone());
values.insert(
v.clone(),
serialize_string(resolve_template(tmpl.as_str().unwrap_or(""), params)?),
);
sets.push(format!("{n} = {v}"));
}
}
self.append_gsi_rederive_sets(
command_id,
spec,
params,
&mut names,
&mut values,
&mut sets,
)?;
let update_expression = if sets.is_empty() {
None
} else {
Some(format!("SET {}", sets.join(", ")))
};
if update_expression.is_none() {
names.clear();
values.clear();
}
let condition_expression = self.apply_condition(spec, params, &mut names, &mut values)?;
Ok(WriteBody {
table_name: self.physical_table(spec["tableName"].as_str().unwrap_or("")),
kind: WriteKind::Update {
key,
update_expression,
},
condition_expression,
names,
values,
})
}
fn resolve_key(&self, spec: &Json, params: &Params) -> Result<HashMap<String, AttributeValue>> {
let mut key = HashMap::new();
if let Some(kc) = spec.get("keyCondition").and_then(Json::as_object) {
for (attr, tmpl) in kc {
key.insert(
attr.clone(),
serialize_string(resolve_template(tmpl.as_str().unwrap_or(""), params)?),
);
}
}
Ok(key)
}
fn append_gsi_rederive_sets(
&self,
command_id: &str,
spec: &Json,
params: &Params,
names: &mut HashMap<String, String>,
values: &mut HashMap<String, AttributeValue>,
sets: &mut Vec<String>,
) -> Result<()> {
let entity_name = spec["entity"].as_str().unwrap_or("");
let entity = self
.manifest_ref()
.get("entities")
.and_then(|e| e.get(entity_name));
let gsis = match entity.and_then(|e| e.get("gsis")).and_then(Json::as_array) {
Some(g) if !g.is_empty() => g.clone(),
_ => return Ok(()),
};
let changed: Vec<String> = spec
.get("changes")
.and_then(Json::as_object)
.map(|c| c.keys().cloned().collect())
.unwrap_or_default();
if changed.is_empty() {
return Ok(());
}
let available: Vec<(String, Json)> = params
.iter()
.filter(|(_, v)| !v.is_null())
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let mut gsi_index = 0;
for gsi in &gsis {
let input_fields: Vec<String> = gsi
.get("inputFields")
.and_then(Json::as_array)
.map(|a| {
a.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect()
})
.unwrap_or_default();
if !input_fields.iter().any(|f| changed.contains(f)) {
continue;
}
let missing: Vec<String> = input_fields
.iter()
.filter(|f| !available.iter().any(|(k, _)| k == *f))
.cloned()
.collect();
if !missing.is_empty() {
let changed_in_gsi: Vec<String> = input_fields
.iter()
.filter(|f| changed.contains(*f))
.cloned()
.collect();
return Err(GraphDDBError::new(format!(
"{command_id}: updating {} affects index '{}' (also depends on {}); provide {}, or use read-modify-write.",
changed_in_gsi.iter().map(|f| format!("'{f}'")).collect::<Vec<_>>().join(", "),
gsi.get("indexName").and_then(Json::as_str).unwrap_or(""),
missing.iter().map(|f| format!("'{f}'")).collect::<Vec<_>>().join(", "),
if missing.len() > 1 { "them" } else { "it" },
)));
}
let index = gsi.get("indexName").and_then(Json::as_str).unwrap_or("");
if let Some(pk_tmpl) = gsi.get("pkTemplate").and_then(Json::as_str) {
let n = format!("#gsi{gsi_index}pk");
let v = format!(":gsi{gsi_index}pk");
names.insert(n.clone(), format!("{index}PK"));
values.insert(
v.clone(),
serialize_string(fill_available(pk_tmpl, &available)),
);
sets.push(format!("{n} = {v}"));
}
if let Some(sk_tmpl) = gsi.get("skTemplate").and_then(Json::as_str) {
let n = format!("#gsi{gsi_index}sk");
let v = format!(":gsi{gsi_index}sk");
names.insert(n.clone(), format!("{index}SK"));
values.insert(
v.clone(),
serialize_string(fill_available(sk_tmpl, &available)),
);
sets.push(format!("{n} = {v}"));
}
gsi_index += 1;
}
Ok(())
}
fn apply_condition(
&self,
spec: &Json,
params: &Params,
names: &mut HashMap<String, String>,
values: &mut HashMap<String, AttributeValue>,
) -> Result<Option<String>> {
let condition = match spec.get("condition").filter(|c| !c.is_null()) {
Some(c) => c,
None => return Ok(None),
};
let kind = condition["kind"].as_str().unwrap_or("");
match kind {
"notExists" => {
names.insert("#pk".to_string(), "PK".to_string());
Ok(Some("attribute_not_exists(#pk)".to_string()))
}
"attributeExists" | "attributeNotExists" => {
let fname = if kind == "attributeExists" {
"attribute_exists"
} else {
"attribute_not_exists"
};
names.insert(
"#ce".to_string(),
condition["field"].as_str().unwrap_or("").to_string(),
);
Ok(Some(format!("{fname}(#ce)")))
}
"equals" => {
let mut clauses = Vec::new();
if let Some(fields) = condition.get("fields").and_then(Json::as_object) {
for (i, (field, tmpl)) in fields.iter().enumerate() {
let n = format!("#e{i}");
let v = format!(":e{i}");
names.insert(n.clone(), field.clone());
let resolved = resolve_template(tmpl.as_str().unwrap_or(""), params)?;
values.insert(v.clone(), serialize_string(resolved));
clauses.push(format!("{n} = {v}"));
}
}
Ok(Some(clauses.join(" AND ")))
}
"expr" => {
let concrete = resolve_condition_tree(&condition["declarative"], params)?;
match compile_filter(&concrete)? {
None => Ok(None),
Some(c) => {
for (a, col) in c.names.iter() {
names.insert(a.clone(), col.clone());
}
for (a, v) in c.values.iter() {
values.insert(a.clone(), v.clone());
}
Ok(Some(c.expression))
}
}
}
"raw" => {
if let Some(n) = condition.get("names").and_then(Json::as_object) {
for (a, col) in n {
names.insert(a.clone(), col.as_str().unwrap_or("").to_string());
}
}
if let Some(vals) = condition.get("values").and_then(Json::as_object) {
for (alias, raw_val) in vals {
let resolved = resolve_condition_leaf(raw_val, params)?;
values.insert(alias.clone(), serialize_json(&resolved)?);
}
}
Ok(Some(
condition["expression"].as_str().unwrap_or("").to_string(),
))
}
_ => Ok(None),
}
}
async fn send_write_body(
&self,
command_id: &str,
body: WriteBody,
_context: &Json,
) -> Result<()> {
self.send_write_body_capturing(command_id, body, &Json::Null, false)
.await?;
Ok(())
}
async fn send_write_body_capturing(
&self,
command_id: &str,
body: WriteBody,
context: &Json,
force_old_image: bool,
) -> Result<Cap> {
let op_kind = match &body.kind {
WriteKind::None => "None",
WriteKind::Put(_) => "Put",
WriteKind::Update { .. } => "Update",
WriteKind::Delete(_) => "Delete",
};
let written_item = match &body.kind {
WriteKind::Put(item) => Some(item.clone()),
_ => None,
};
let mut body = body;
if self.middleware_active() {
let mut pctx = crate::middleware::PersistCtx {
items: vec![crate::middleware::PersistItemCtx::new(
op_kind,
body.condition_expression.clone(),
body.names.clone(),
body.values.clone(),
)],
origins: vec![Json::Null],
context: context.clone(),
state: serde_json::Map::new(),
transaction: None,
};
let cap = self
.run_persist_single(
&mut pctx,
&mut body,
written_item,
command_id,
force_old_image,
)
.await?;
return Ok(cap);
}
self.dispatch_write(command_id, body, written_item, force_old_image)
.await
}
async fn run_persist_single(
&self,
pctx: &mut crate::middleware::PersistCtx,
body: &mut WriteBody,
written_item: Option<HashMap<String, AttributeValue>>,
command_id: &str,
force_old_image: bool,
) -> Result<Cap> {
if let Err(e) = self.middleware_ref().run_persist_before(pctx) {
self.middleware_ref().run_persist_error(pctx, e)?;
return Ok(Cap::default());
}
if let Some(item) = pctx.items.first() {
body.condition_expression = item.condition_expression.clone();
body.names = item.names.clone();
body.values = item.values.clone();
}
let dispatched = self
.dispatch_write(
command_id,
std::mem::take(body),
written_item,
force_old_image,
)
.await;
match dispatched {
Ok(cap) => {
let results = Json::Null;
self.middleware_ref().run_persist_after(pctx, &results); Ok(cap)
}
Err(err) => {
self.middleware_ref().run_persist_error(pctx, err)?; Ok(Cap::default())
}
}
}
async fn dispatch_write(
&self,
command_id: &str,
body: WriteBody,
written_item: Option<HashMap<String, AttributeValue>>,
force_old_image: bool,
) -> Result<Cap> {
let _ = command_id;
let names = if body.names.is_empty() {
None
} else {
Some(body.names)
};
let values = if body.values.is_empty() {
None
} else {
Some(body.values)
};
let mut cap = Cap::default();
match body.kind {
WriteKind::None => {}
WriteKind::Put(item) => {
let out = self
.client_ref()
.put_item(
&body.table_name,
item,
body.condition_expression,
names,
values,
force_old_image,
)
.await?;
if force_old_image {
cap.old_image = out.attributes.as_ref().map(image_to_json);
cap.new_image = written_item.as_ref().map(image_to_json);
}
}
WriteKind::Update {
key,
update_expression,
} => {
let out = self
.client_ref()
.update_item(
&body.table_name,
key,
update_expression,
body.condition_expression,
names,
values,
force_old_image,
)
.await?;
if force_old_image {
cap.old_image = out.attributes.as_ref().map(image_to_json);
}
}
WriteKind::Delete(key) => {
let out = self
.client_ref()
.delete_item(
&body.table_name,
key,
body.condition_expression,
names,
values,
force_old_image,
)
.await?;
if force_old_image {
cap.old_image = out.attributes.as_ref().map(image_to_json);
}
}
}
Ok(cap)
}
pub async fn execute_transaction(&self, transaction_id: &str, params: &Params) -> Result<()> {
self.execute_transaction_with_options(transaction_id, params, &Json::Null)
.await
}
pub async fn execute_transaction_with_options(
&self,
transaction_id: &str,
params: &Params,
options: &Json,
) -> Result<()> {
let spec = self.transactions_get(transaction_id).ok_or_else(|| {
GraphDDBError::transaction_not_found(format!("unknown transaction '{transaction_id}'"))
})?;
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(params, ¶m_specs, transaction_id)?;
let context = options
.get("context")
.cloned()
.unwrap_or(Json::Object(serde_json::Map::new()));
let active = self.middleware_active();
let mut write_ctxs: Vec<crate::middleware::WriteCtx> = Vec::new();
if active {
let tx_id: u64 = 1; for (kind, op_spec, op_params) in
crate::transactions::transaction_logical_ops(&spec, params)?
{
let mut w_ctx = crate::middleware::WriteCtx {
kind: kind.clone(),
model: self.ctx_model(op_spec.get("entity").and_then(Json::as_str)),
context: context.clone(),
input: write_input_for(&kind, &op_params),
state: serde_json::Map::new(),
transaction: Some(tx_id),
};
self.middleware_ref().run_write_before(&mut w_ctx)?; write_ctxs.push(w_ctx);
}
}
let items = expand_transaction(self, &spec, params)?;
if items.len() > MAX_TRANSACT_ITEMS_LIMIT {
return Err(GraphDDBError::limit_exceeded(format!(
"{transaction_id}: transaction expanded to {} items, exceeds the DynamoDB \
TransactWriteItems limit of {MAX_TRANSACT_ITEMS_LIMIT}",
items.len()
)));
}
if items.is_empty() {
return Ok(());
}
if !active {
self.client_ref().transact_write_items(items).await?;
return Ok(());
}
let origins: Vec<Json> = write_ctxs
.iter()
.map(|c| serde_json::json!({"model": c.model, "kind": c.kind}))
.collect();
let sent = self
.run_persist_batch(items, origins, &context, Some(1))
.await;
match sent {
Ok(()) => {
for c in &write_ctxs {
self.middleware_ref()
.run_write_after(c, &Json::Object(serde_json::Map::new()));
}
Ok(())
}
Err(err) => Err(err),
}
}
async fn run_persist_batch(
&self,
items: Vec<aws_sdk_dynamodb::types::TransactWriteItem>,
origins: Vec<Json>,
context: &Json,
transaction: Option<u64>,
) -> Result<()> {
let mut pctx = crate::middleware::PersistCtx {
items: items.iter().map(crate::persist_hook::to_item_ctx).collect(),
origins,
context: context.clone(),
state: serde_json::Map::new(),
transaction,
};
if let Err(e) = self.middleware_ref().run_persist_before(&mut pctx) {
self.middleware_ref().run_persist_error(&pctx, e)?;
return Ok(());
}
let rebuilt: Vec<aws_sdk_dynamodb::types::TransactWriteItem> = items
.into_iter()
.zip(pctx.items.iter())
.map(|(item, ctx)| crate::persist_hook::apply_item_ctx(item, ctx))
.collect();
match self.client_ref().transact_write_items(rebuilt).await {
Ok(()) => {
self.middleware_ref().run_persist_after(&pctx, &Json::Null); Ok(())
}
Err(err) => {
self.middleware_ref().run_persist_error(&pctx, err)?; Ok(())
}
}
}
}
fn fill_available(template: &str, available: &[(String, Json)]) -> String {
let mut out = String::new();
let bytes = template.as_bytes();
let mut i = 0;
while i < bytes.len() {
if bytes[i] == b'{' {
if let Some(close) = template[i + 1..].find('}') {
let name = &template[i + 1..i + 1 + close];
if !name.is_empty() && !name.contains('{') {
let v = available
.iter()
.find(|(k, _)| k == name)
.map(|(_, v)| crate::templates::json_to_template_string(v))
.unwrap_or_default();
out.push_str(&v);
i += 1 + close + 1;
continue;
}
}
}
out.push(bytes[i] as char);
i += 1;
}
out
}
fn resolve_condition_tree(node: &Json, params: &Params) -> Result<Json> {
let obj = node.as_object().cloned().unwrap_or_default();
let mut out = serde_json::Map::new();
for (key, value) in obj {
match key.as_str() {
"and" | "or" => {
let arr = value.as_array().cloned().unwrap_or_default();
let mut parts = Vec::new();
for s in &arr {
parts.push(resolve_condition_tree(s, params)?);
}
out.insert(key, Json::Array(parts));
}
"not" => {
out.insert(key, resolve_condition_tree(&value, params)?);
}
_ => {
let ops_obj = value.as_object().cloned().unwrap_or_default();
let mut ops = serde_json::Map::new();
for (op, op_val) in ops_obj {
let resolved = match op.as_str() {
"between" => {
let arr =
op_val.as_array().filter(|a| a.len() == 2).ok_or_else(|| {
GraphDDBError::new(
"`between` condition expects a [lo, hi] array of length 2",
)
})?;
Json::Array(vec![
resolve_condition_leaf(&arr[0], params)?,
resolve_condition_leaf(&arr[1], params)?,
])
}
"in" => {
let arr = op_val.as_array().cloned().unwrap_or_default();
let mut vals = Vec::new();
for v in &arr {
vals.push(resolve_condition_leaf(v, params)?);
}
Json::Array(vals)
}
"attributeExists" => op_val.clone(),
_ => resolve_condition_leaf(&op_val, params)?,
};
ops.insert(op, resolved);
}
out.insert(key, Json::Object(ops));
}
}
}
Ok(Json::Object(out))
}
fn resolve_condition_leaf(value: &Json, params: &Params) -> Result<Json> {
if let Some(name) = value.get("$param").and_then(Json::as_str) {
return params
.get(name)
.filter(|v| !v.is_null())
.cloned()
.ok_or_else(|| {
GraphDDBError::new(format!("condition references unbound parameter '{name}'"))
});
}
Ok(value.clone())
}