use std::collections::HashMap;
use aws_sdk_dynamodb::types::{AttributeValue, DeleteRequest, PutRequest, WriteRequest};
use serde_json::{Map as JsonMap, Value as Json};
use crate::attribute::{deserialize_item, serialize_string};
use crate::batch::{batch_get, batch_write, serialize_key, BatchOptions};
use crate::entity::add_key_attributes;
use crate::errors::{GraphDDBError, Result};
use crate::hydration::hydrate_item;
use crate::per_key_cursor::{decode_per_key_cursor, encode_per_key_cursor, serialize_contract_key};
use crate::runtime::{connection_json, resolve_entity, select_from_projection, GraphDDBRuntime};
use crate::templates::{resolve_template, validate_params, Params};
use crate::value::Value;
impl GraphDDBRuntime {
fn contract(&self, name: &str) -> Option<Json> {
self.contracts_ref().get(name).cloned()
}
fn entities_json(&self) -> &Json {
self.manifest_ref().get("entities").unwrap_or(&Json::Null)
}
pub async fn execute_query_method(
&self,
contract_name: &str,
method_name: &str,
key_or_keys: &Json,
params: Option<&Params>,
) -> Result<Json> {
let method = self.resolve_contract_query_method(contract_name, method_name)?;
let resolution = method
.get("resolution")
.and_then(Json::as_str)
.unwrap_or("");
let input_arity = method.get("inputArity").and_then(Json::as_str);
let op_spec = self.contract_query_op(contract_name, method_name, &method)?;
let compose = method
.get("compose")
.and_then(Json::as_array)
.cloned()
.unwrap_or_default();
let composition_plan = method.get("compositionPlan").cloned();
let opts: Params = params.cloned().unwrap_or_default();
if let Some(keys) = key_or_keys.as_array() {
if input_arity == Some("single") || (input_arity.is_none() && resolution == "range") {
let (kind, per_key) = if resolution == "range" {
("range", "partition")
} else {
("unique-GSI point", "GSI")
};
return Err(GraphDDBError::contract_arity(format!(
"{contract_name}.{method_name}: {kind} method called with an array of keys, \
but its inputArity is 'single'. A {kind} resolution is one {per_key} Query \
per key, so an array would be an N+1 fan-out — forbidden by the contract's \
N+1 rule. Loop in application code for N independent reads."
)));
}
let mut keyed = self
.execute_point_batch(contract_name, method_name, &op_spec, keys)
.await?;
let present_ids: Vec<String> = keyed
.keys()
.filter(|k| !keyed[*k].is_null())
.cloned()
.collect();
self.resolve_compositions_map(
contract_name,
method_name,
&compose,
&mut keyed,
&present_ids,
composition_plan.as_ref(),
)
.await?;
let mut obj = JsonMap::new();
for (k, v) in keyed {
obj.insert(k, v);
}
return Ok(Json::Object(obj));
}
let key = key_or_keys;
if resolution == "point" {
let item = self
.execute_point_single(contract_name, method_name, &op_spec, key, &opts)
.await?;
match item {
Json::Null => Ok(Json::Null),
mut it => {
let mut list = vec![std::mem::take(&mut it)];
self.resolve_compositions_list(
contract_name,
method_name,
&compose,
&mut list,
composition_plan.as_ref(),
)
.await?;
Ok(list.into_iter().next().unwrap())
}
}
} else {
let (mut items, cursor) = self
.execute_range_single(contract_name, method_name, &op_spec, key, &opts)
.await?;
self.resolve_compositions_list(
contract_name,
method_name,
&compose,
&mut items,
composition_plan.as_ref(),
)
.await?;
Ok(connection_json(items, cursor))
}
}
fn resolve_contract_query_method(
&self,
contract_name: &str,
method_name: &str,
) -> Result<Json> {
let contract = self.contract(contract_name).ok_or_else(|| {
GraphDDBError::contract_not_found(format!("unknown contract '{contract_name}'."))
})?;
if contract.get("kind").and_then(Json::as_str) != Some("query") {
return Err(GraphDDBError::contract_not_found(format!(
"contract '{contract_name}' is a '{}' contract; execute_query_method only runs \
query contracts.",
contract.get("kind").and_then(Json::as_str).unwrap_or("?")
)));
}
contract
.get("methods")
.and_then(|m| m.get(method_name))
.cloned()
.ok_or_else(|| {
GraphDDBError::contract_not_found(format!(
"contract '{contract_name}' has no query method '{method_name}'."
))
})
}
fn contract_query_op(
&self,
contract_name: &str,
method_name: &str,
method: &Json,
) -> Result<Json> {
let op_name = method.get("operation").and_then(Json::as_str).unwrap_or("");
let spec = self.query_spec_clone(op_name).ok_or_else(|| {
GraphDDBError::contract_not_found(format!(
"{contract_name}.{method_name}: referenced operation '{op_name}' is not present in `queries`."
))
})?;
let n = spec
.get("operations")
.and_then(Json::as_array)
.map(|a| a.len())
.unwrap_or(0);
if n != 1 {
return Err(GraphDDBError::new(format!(
"{contract_name}.{method_name}: a single-contract query method must resolve to \
exactly one read operation, found {n}."
)));
}
Ok(spec)
}
fn query_spec_clone(&self, id: &str) -> Option<Json> {
self.operations_ref()
.get("queries")
.and_then(|q| q.get(id))
.cloned()
}
async fn execute_point_single(
&self,
contract_name: &str,
method_name: &str,
spec: &Json,
key: &Json,
opts: &Params,
) -> Result<Json> {
let op = &spec["operations"][0];
let key_params = key.as_object().cloned().unwrap_or_default();
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(
&key_params,
¶m_specs,
&format!("{contract_name}.{method_name}"),
)?;
let select = select_from_projection(op);
let entity_meta = resolve_entity(self.entities_json(), op);
let consistent_read = opts
.get("consistentRead")
.and_then(Json::as_bool)
.unwrap_or(false);
let op_type = op["type"].as_str().unwrap_or("");
let label = format!("{contract_name}.{method_name}");
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
vec![],
Json::Object(JsonMap::new()),
);
match op_type {
"GetItem" => {
let item = self
.run_get_item(
&label,
op,
&key_params,
&select,
&entity_meta,
consistent_read,
&scope,
)
.await?;
Ok(item.map(|m| Value::M(m).to_json()).unwrap_or(Json::Null))
}
"Query" => {
let mut options = JsonMap::new();
options.insert("consistentRead".into(), Json::Bool(consistent_read));
let conn = self
.run_query(
&label,
op,
&key_params,
&select,
&entity_meta,
&Json::Object(options),
&scope,
)
.await?;
Ok(conn.items.into_iter().next().unwrap_or(Json::Null))
}
other => Err(GraphDDBError::new(format!(
"{label}: unsupported point read operation '{other}'"
))),
}
}
async fn execute_range_single(
&self,
contract_name: &str,
method_name: &str,
spec: &Json,
key: &Json,
opts: &Params,
) -> Result<(Vec<Json>, Option<String>)> {
let mut op = spec["operations"][0].clone();
let key_params = key.as_object().cloned().unwrap_or_default();
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
validate_params(
&key_params,
¶m_specs,
&format!("{contract_name}.{method_name}"),
)?;
let select = select_from_projection(&op);
let entity_meta = resolve_entity(self.entities_json(), &op);
let mut run_options = JsonMap::new();
if let Some(after) = opts.get("after").filter(|v| !v.is_null()) {
let after = after.as_str().unwrap_or("");
let inner = decode_per_key_cursor(after, key)?;
run_options.insert("cursor".into(), Json::String(inner));
}
if let Some(limit) = opts.get("limit").filter(|v| !v.is_null()) {
op.as_object_mut()
.unwrap()
.insert("limit".into(), limit.clone());
}
let label = format!("{contract_name}.{method_name}");
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
vec![],
Json::Object(JsonMap::new()),
);
let conn = self
.run_query(
&label,
&op,
&key_params,
&select,
&entity_meta,
&Json::Object(run_options),
&scope,
)
.await?;
let cursor = encode_per_key_cursor(key, conn.cursor.as_deref());
Ok((conn.items, cursor))
}
async fn execute_point_batch(
&self,
contract_name: &str,
method_name: &str,
spec: &Json,
keys: &[Json],
) -> Result<HashMap<String, Json>> {
let op = &spec["operations"][0];
if op["type"].as_str() != Some("GetItem") {
return Err(GraphDDBError::new(format!(
"{contract_name}.{method_name}: a batched point read requires a GetItem op."
)));
}
let select = select_from_projection(op);
let entity_meta = resolve_entity(self.entities_json(), op);
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let kc = op["keyCondition"].as_object().cloned().unwrap_or_default();
let key_attrs: Vec<String> = kc.keys().cloned().collect();
let label = format!("{contract_name}.{method_name}");
let mut resolved: Vec<(String, String)> = Vec::new(); let mut unique_by_marker: Vec<(String, HashMap<String, AttributeValue>)> = Vec::new();
let mut seen_markers = std::collections::HashSet::new();
for key in keys {
let key_params = key.as_object().cloned().unwrap_or_default();
validate_params(&key_params, ¶m_specs, &label)?;
let mut plain: Vec<(String, Json)> = Vec::new();
for (attr, tmpl) in &kc {
plain.push((
attr.clone(),
Json::String(resolve_template(tmpl.as_str().unwrap_or(""), &key_params)?),
));
}
let marker = serialize_key(&plain.iter().cloned().collect());
resolved.push((serialize_contract_key(key), marker.clone()));
if seen_markers.insert(marker.clone()) {
let av: HashMap<String, AttributeValue> = plain
.iter()
.map(|(k, v)| (k.clone(), serialize_string(v.as_str().unwrap_or(""))))
.collect();
unique_by_marker.push((marker, av));
}
}
if unique_by_marker.len() > self.limits_ref().max_items {
return Err(GraphDDBError::limit_exceeded(format!(
"{label}: BatchGet needs {} keys, exceeds max_items {}",
unique_by_marker.len(),
self.limits_ref().max_items
)));
}
let mut raw_by_marker: HashMap<String, Value> = HashMap::new();
if !unique_by_marker.is_empty() {
let serialized_keys: Vec<HashMap<String, AttributeValue>> =
unique_by_marker.iter().map(|(_, av)| av.clone()).collect();
let (pe, names) = self.projection_request(op, &select);
let opts = BatchOptions {
max_batch_get_items: self.limits_ref().max_batch_get_items,
sleep: self.batch_sleep,
};
let raw = batch_get(
self.client_ref(),
&self.physical_table(op["tableName"].as_str().unwrap_or("")),
serialized_keys,
pe,
names,
opts,
)
.await?;
for item in &raw {
let de = deserialize_item(item);
let mut key_map: HashMap<String, Json> = HashMap::new();
for attr in &key_attrs {
let v = de.get(attr).map(|v| v.to_json()).unwrap_or(Json::Null);
key_map.insert(attr.clone(), v);
}
let marker = serialize_key(&key_map);
raw_by_marker.insert(marker, Value::M(de));
}
}
let mut out: HashMap<String, Json> = HashMap::new();
for (key_id, marker) in resolved {
if out.contains_key(&key_id) {
continue;
}
let item = match raw_by_marker.get(&marker) {
Some(Value::M(m)) => Value::M(hydrate_item(m, &select, &entity_meta)?).to_json(),
_ => Json::Null,
};
out.insert(key_id, item);
}
Ok(out)
}
async fn resolve_compositions_list(
&self,
contract_name: &str,
method_name: &str,
compose: &[Json],
parents: &mut [Json],
plan: Option<&Json>,
) -> Result<()> {
if compose.is_empty() || parents.is_empty() {
return Ok(());
}
let stages = composition_stages(plan, compose.len());
let concurrency = composition_concurrency(plan);
for stage in stages {
let snapshot: Vec<Json> = parents.to_vec();
let applies = crate::concurrency::map_with_concurrency(stage.len(), concurrency, |i| {
let edge = compose[stage[i]].clone();
let snapshot = &snapshot;
async move {
self.resolve_composition_fetch(contract_name, method_name, &edge, snapshot)
.await
}
})
.await?;
for apply in applies {
for (i, parent) in parents.iter_mut().enumerate() {
if let Some(obj) = parent.as_object_mut() {
obj.insert(apply.as_prop.clone(), apply.values[i].clone());
}
}
}
}
Ok(())
}
async fn resolve_compositions_map(
&self,
contract_name: &str,
method_name: &str,
compose: &[Json],
keyed: &mut HashMap<String, Json>,
present_ids: &[String],
plan: Option<&Json>,
) -> Result<()> {
if compose.is_empty() || present_ids.is_empty() {
return Ok(());
}
let mut parents: Vec<Json> = present_ids.iter().map(|id| keyed[id].clone()).collect();
self.resolve_compositions_list(contract_name, method_name, compose, &mut parents, plan)
.await?;
for (id, parent) in present_ids.iter().zip(parents) {
keyed.insert(id.clone(), parent);
}
Ok(())
}
async fn resolve_composition_fetch(
&self,
contract_name: &str,
method_name: &str,
edge: &Json,
parents: &[Json],
) -> Result<CompositionApply> {
let as_prop = edge
.get("as")
.and_then(Json::as_str)
.unwrap_or("")
.to_string();
if edge.get("resolution").and_then(Json::as_str) != Some("point") {
return Err(GraphDDBError::new(format!(
"{contract_name}.{method_name}: composed child '{as_prop}' resolves to a 'range' \
method, but an External Query child must be 'point'."
)));
}
let child_contract = edge.get("contract").and_then(Json::as_str).unwrap_or("");
let child_method_name = edge.get("method").and_then(Json::as_str).unwrap_or("");
let child_method = self.resolve_contract_query_method(child_contract, child_method_name)?;
let child_op_spec =
self.contract_query_op(child_contract, child_method_name, &child_method)?;
let bind = edge
.get("bind")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let mut values: Vec<Json> = vec![Json::Null; parents.len()];
let mut bound: Vec<(usize, String)> = Vec::new(); let mut child_keys: Vec<Json> = Vec::new();
for (i, parent) in parents.iter().enumerate() {
let mut child_key = JsonMap::new();
let mut complete = true;
for (field, path) in &bind {
match read_from_path(parent, path.as_str().unwrap_or("")) {
Some(v) if !v.is_null() => {
child_key.insert(field.clone(), v);
}
_ => {
complete = false;
break;
}
}
}
if !complete {
continue; }
let child_key = Json::Object(child_key);
bound.push((i, serialize_contract_key(&child_key)));
child_keys.push(child_key);
}
if !child_keys.is_empty() {
let child_results = self
.execute_point_batch(
child_contract,
child_method_name,
&child_op_spec,
&child_keys,
)
.await?;
for (idx, key_id) in bound {
values[idx] = child_results.get(&key_id).cloned().unwrap_or(Json::Null);
}
}
Ok(CompositionApply { as_prop, values })
}
pub async fn execute_command_method(
&self,
contract_name: &str,
method_name: &str,
key_or_keys: &Json,
params: Option<&Params>,
) -> Result<Json> {
let method = self.resolve_contract_command_method(contract_name, method_name)?;
let shared: Params = params.cloned().unwrap_or_default();
if let Some(keys) = key_or_keys.as_array() {
let outcome = self
.execute_command_batch(contract_name, method_name, &method, keys, &shared)
.await?;
return Ok(outcome.unwrap_or(Json::Null));
}
let single = method.get("single").cloned().unwrap_or(Json::Null);
let mut merged = key_or_keys.as_object().cloned().unwrap_or_default();
for (k, v) in &shared {
merged.insert(k.clone(), v.clone());
}
let mode = single.get("mode").and_then(Json::as_str).unwrap_or("op");
if mode == "transaction" {
let tx = single
.get("transaction")
.and_then(Json::as_str)
.unwrap_or("");
self.execute_transaction(tx, &merged).await?;
} else {
let command_id = single.get("operation").and_then(Json::as_str).unwrap_or("");
self.execute_command(command_id, &merged).await?;
}
if method
.get("returnSelection")
.map(|v| !v.is_null())
.unwrap_or(false)
{
return self
.read_back_projection(contract_name, method_name, &merged)
.await;
}
Ok(Json::Null)
}
async fn read_back_projection(
&self,
contract_name: &str,
method_name: &str,
params: &Params,
) -> Result<Json> {
let readback_id = format!("{contract_name}__{method_name}__readback");
let spec = self.query_spec_clone(&readback_id).ok_or_else(|| {
GraphDDBError::new(format!(
"{contract_name}.{method_name}: declares a return projection but the read-back \
query '{readback_id}' is not present in `queries`."
))
})?;
let op = &spec["operations"][0];
if op["type"].as_str() != Some("GetItem") {
return Err(GraphDDBError::new(format!(
"{contract_name}.{method_name}: the read-back op '{readback_id}' must be a \
base-table GetItem."
)));
}
let select = select_from_projection(op);
let entity_meta = resolve_entity(self.entities_json(), op);
let scope = self.op_scope(
op.get("entity").and_then(Json::as_str),
vec![],
Json::Object(JsonMap::new()),
);
let item = self
.run_get_item(
&format!("{contract_name}.{method_name}__readback"),
op,
params,
&select,
&entity_meta,
true,
&scope,
)
.await?;
Ok(item.map(|m| Value::M(m).to_json()).unwrap_or(Json::Null))
}
async fn execute_command_batch(
&self,
contract_name: &str,
method_name: &str,
method: &Json,
keys: &[Json],
shared: &Params,
) -> Result<Option<Json>> {
let label = format!("{contract_name}.{method_name}");
let batch = method
.get("batch")
.filter(|b| !b.is_null())
.ok_or_else(|| {
GraphDDBError::contract_arity(format!(
"command method '{label}' was called with an array of keys, but it declares no \
key-array bulk form."
))
})?;
let mode = batch.get("mode").and_then(Json::as_str).unwrap_or("");
match mode {
"transaction" => {
let tx = batch
.get("transaction")
.and_then(Json::as_str)
.unwrap_or("");
let mut tx_params = shared.clone();
tx_params.insert("keys".into(), Json::Array(keys.to_vec()));
self.execute_transaction(tx, &tx_params).await?;
Ok(None)
}
"parallel" => {
let op = batch.get("operation").and_then(Json::as_str).unwrap_or("");
let results = self
.execute_command_parallel(&label, op, keys, shared)
.await?;
let mut m = JsonMap::new();
m.insert("results".into(), Json::Array(results));
Ok(Some(Json::Object(m)))
}
other => Err(GraphDDBError::new(format!(
"{label}: unknown batch resolution mode '{other}'"
))),
}
}
async fn execute_command_parallel(
&self,
label: &str,
command_id: &str,
keys: &[Json],
shared: &Params,
) -> Result<Vec<Json>> {
if keys.is_empty() {
return Ok(vec![]);
}
let spec = self
.operations_ref()
.get("commands")
.and_then(|c| c.get(command_id))
.cloned()
.ok_or_else(|| {
GraphDDBError::contract_not_found(format!(
"{label}: referenced write op '{command_id}' is not present in `commands`."
))
})?;
let op_type = spec["type"].as_str().unwrap_or("");
let has_condition = spec.get("condition").map(|c| !c.is_null()).unwrap_or(false);
let coalescible = !has_condition && (op_type == "PutItem" || op_type == "DeleteItem");
if coalescible {
self.execute_batch_write(label, command_id, keys, shared)
.await?;
return Ok(keys.iter().map(|_| ok_result()).collect());
}
let mut results = Vec::new();
for key in keys {
let mut params = key.as_object().cloned().unwrap_or_default();
for (k, v) in shared {
params.insert(k.clone(), v.clone());
}
match self.execute_command(command_id, ¶ms).await {
Ok(()) => results.push(ok_result()),
Err(e) => {
let mut m = JsonMap::new();
m.insert("ok".into(), Json::Bool(false));
m.insert("error".into(), Json::String(e.message));
results.push(Json::Object(m));
}
}
}
Ok(results)
}
async fn execute_batch_write(
&self,
label: &str,
command_id: &str,
keys: &[Json],
shared: &Params,
) -> Result<()> {
let spec = self
.operations_ref()
.get("commands")
.and_then(|c| c.get(command_id))
.cloned()
.ok_or_else(|| {
GraphDDBError::contract_not_found(format!(
"{label}: referenced write op '{command_id}' is not present in `commands`."
))
})?;
let op_type = spec["type"].as_str().unwrap_or("");
if op_type == "UpdateItem" {
return Err(GraphDDBError::new(format!(
"{label}: BatchWriteItem only supports put / delete."
)));
}
let param_specs = spec
.get("params")
.and_then(Json::as_object)
.cloned()
.unwrap_or_default();
let entity_name = spec["entity"].as_str().unwrap_or("");
let mut requests: Vec<WriteRequest> = Vec::new();
for key in keys {
let mut params = key.as_object().cloned().unwrap_or_default();
for (k, v) in shared {
params.insert(k.clone(), v.clone());
}
validate_params(¶ms, ¶m_specs, command_id)?;
if op_type == "PutItem" {
let mut plain: Vec<(String, Json)> = Vec::new();
if let Some(item) = spec.get("item").and_then(Json::as_object) {
for (field, tmpl) in item {
plain.push((
field.clone(),
Json::String(resolve_template(tmpl.as_str().unwrap_or(""), ¶ms)?),
));
}
}
add_key_attributes(self.manifest_ref(), entity_name, &mut plain);
let mut it: HashMap<String, AttributeValue> = HashMap::new();
for (k, v) in &plain {
it.insert(k.clone(), crate::attribute::serialize_json(v)?);
}
requests.push(
WriteRequest::builder()
.put_request(
PutRequest::builder()
.set_item(Some(it))
.build()
.expect("PutRequest"),
)
.build(),
);
} else {
let mut dynamo_key: HashMap<String, AttributeValue> = HashMap::new();
if let Some(kc) = spec.get("keyCondition").and_then(Json::as_object) {
for (attr, tmpl) in kc {
dynamo_key.insert(
attr.clone(),
serialize_string(resolve_template(
tmpl.as_str().unwrap_or(""),
¶ms,
)?),
);
}
}
requests.push(
WriteRequest::builder()
.delete_request(
DeleteRequest::builder()
.set_key(Some(dynamo_key))
.build()
.expect("DeleteRequest"),
)
.build(),
);
}
}
batch_write(
self.client_ref(),
&self.physical_table(spec["tableName"].as_str().unwrap_or("")),
requests,
self.batch_sleep,
)
.await
}
fn resolve_contract_command_method(
&self,
contract_name: &str,
method_name: &str,
) -> Result<Json> {
let contract = self.contract(contract_name).ok_or_else(|| {
GraphDDBError::contract_not_found(format!("unknown contract '{contract_name}'."))
})?;
if contract.get("kind").and_then(Json::as_str) != Some("command") {
return Err(GraphDDBError::contract_not_found(format!(
"contract '{contract_name}' is a '{}' contract; execute_command_method only runs \
command contracts.",
contract.get("kind").and_then(Json::as_str).unwrap_or("?")
)));
}
contract
.get("methods")
.and_then(|m| m.get(method_name))
.cloned()
.ok_or_else(|| {
GraphDDBError::contract_not_found(format!(
"contract '{contract_name}' has no command method '{method_name}'."
))
})
}
}
fn ok_result() -> Json {
let mut m = JsonMap::new();
m.insert("ok".into(), Json::Bool(true));
Json::Object(m)
}
fn read_from_path(record: &Json, path: &str) -> Option<Json> {
if path == "$" {
return Some(record.clone());
}
let mut current = record;
for segment in path[2..].split('.') {
current = current.get(segment)?;
}
Some(current.clone())
}
struct CompositionApply {
as_prop: String,
values: Vec<Json>,
}
fn composition_stages(plan: Option<&Json>, compose_count: usize) -> Vec<Vec<usize>> {
if let Some(stages) = plan.and_then(|p| p.get("stages")).and_then(Json::as_array) {
let mut out = Vec::new();
for stage in stages {
let indices: Vec<usize> = stage
.as_array()
.map(|a| {
a.iter()
.filter_map(Json::as_u64)
.map(|i| i as usize)
.filter(|&i| i < compose_count)
.collect()
})
.unwrap_or_default();
if !indices.is_empty() {
out.push(indices);
}
}
if !out.is_empty() {
return out;
}
}
vec![(0..compose_count).collect()]
}
fn composition_concurrency(plan: Option<&Json>) -> usize {
plan.and_then(|p| p.get("concurrency"))
.and_then(Json::as_u64)
.filter(|&c| c > 0)
.map(|c| c as usize)
.unwrap_or(1)
}