mod batch;
#[cfg(test)]
mod expression_corpus_tests;
mod global_tables;
mod items;
mod queries;
mod streams;
mod tables;
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use base64::Engine;
use http::StatusCode;
use serde_json::{json, Value};
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
use fakecloud_persistence::{S3Store, SnapshotStore};
use fakecloud_s3::state::SharedS3State;
use crate::state::{
attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoDbSnapshot, DynamoTable,
GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
ProvisionedThroughput, SharedDynamoDbState, DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
};
pub(super) struct KinesisDeliveryTarget {
pub destinations: Vec<KinesisDestination>,
pub arn: String,
pub name: String,
}
pub(crate) enum TableKmsOp {
Read,
Write,
}
pub struct DynamoDbService {
state: SharedDynamoDbState,
pub(crate) s3_state: Option<SharedS3State>,
pub(crate) s3_store: Option<Arc<dyn S3Store>>,
delivery: Option<Arc<DeliveryBus>>,
snapshot_store: Option<Arc<dyn SnapshotStore>>,
pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
pub(crate) region: String,
snapshot_lock: Arc<tokio::sync::Mutex<()>>,
}
impl DynamoDbService {
pub fn new(state: SharedDynamoDbState) -> Self {
Self {
state,
s3_state: None,
s3_store: None,
delivery: None,
snapshot_store: None,
kms_hook: None,
region: "us-east-1".to_string(),
snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
}
}
pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
self.s3_state = Some(s3_state);
self
}
pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
self.s3_store = Some(store);
self
}
pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
self.delivery = Some(delivery);
self
}
pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
self.snapshot_store = Some(store);
self
}
pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
self.kms_hook = Some(hook);
self
}
pub fn with_region(mut self, region: impl Into<String>) -> Self {
self.region = region.into();
self
}
pub(crate) fn record_table_kms_usage(
&self,
account_id: &str,
table_arn: &str,
kms_key_arn: Option<&str>,
operation: TableKmsOp,
) {
let Some(hook) = &self.kms_hook else { return };
let key = kms_key_arn
.filter(|k| !k.is_empty())
.unwrap_or("aws/dynamodb");
let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
let mut ctx = std::collections::HashMap::new();
ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
ctx.insert(
"aws:dynamodb:subscriberId".to_string(),
account_id.to_string(),
);
let envelope = match hook.encrypt(
account_id,
&self.region,
key,
b"ddb-item",
"dynamodb.amazonaws.com",
ctx.clone(),
) {
Ok(env) => env,
Err(_) => return,
};
if matches!(operation, TableKmsOp::Read) {
let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
}
}
async fn save_snapshot(&self) {
let Some(store) = self.snapshot_store.clone() else {
return;
};
let _guard = self.snapshot_lock.lock().await;
let snapshot = DynamoDbSnapshot {
schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
accounts: Some(self.state.read().clone()),
state: None,
};
let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
let bytes = serde_json::to_vec(&snapshot)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
store.save(&bytes)
})
.await;
match join {
Ok(Ok(())) => {}
Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
}
}
fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
if table
.kinesis_destinations
.iter()
.any(|d| d.destination_status == "ACTIVE")
{
Some(KinesisDeliveryTarget {
destinations: table.kinesis_destinations.clone(),
arn: table.arn.clone(),
name: table.name.clone(),
})
} else {
None
}
}
pub(super) fn deliver_to_kinesis_destinations(
&self,
target: &KinesisDeliveryTarget,
event_name: &str,
keys: &HashMap<String, AttributeValue>,
old_image: Option<&HashMap<String, AttributeValue>>,
new_image: Option<&HashMap<String, AttributeValue>>,
) {
let delivery = match &self.delivery {
Some(d) => d,
None => return,
};
let active_destinations: Vec<_> = target
.destinations
.iter()
.filter(|d| d.destination_status == "ACTIVE")
.collect();
if active_destinations.is_empty() {
return;
}
let mut record = json!({
"eventID": uuid::Uuid::new_v4().to_string(),
"eventName": event_name,
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
"dynamodb": {
"Keys": keys,
"SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
"SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
"eventSourceARN": &target.arn,
"tableName": &target.name,
});
if let Some(old) = old_image {
record["dynamodb"]["OldImage"] = json!(old);
}
if let Some(new) = new_image {
record["dynamodb"]["NewImage"] = json!(new);
}
let record_str = serde_json::to_string(&record).unwrap_or_default();
let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
let partition_key = serde_json::to_string(keys).unwrap_or_default();
for dest in active_destinations {
delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
}
}
fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
serde_json::from_slice(&req.body).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"SerializationException",
format!("Invalid JSON: {e}"),
)
})
}
fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
Ok(AwsResponse::ok_json(body))
}
}
#[async_trait]
impl AwsService for DynamoDbService {
fn service_name(&self) -> &str {
"dynamodb"
}
async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let mutates = is_mutating_action(req.action.as_str());
let result = match req.action.as_str() {
"CreateTable" => self.create_table(&req),
"DeleteTable" => self.delete_table(&req),
"DescribeTable" => self.describe_table(&req),
"ListTables" => self.list_tables(&req),
"UpdateTable" => self.update_table(&req),
"PutItem" => self.put_item(&req),
"GetItem" => self.get_item(&req),
"DeleteItem" => self.delete_item(&req),
"UpdateItem" => self.update_item(&req),
"Query" => self.query(&req),
"Scan" => self.scan(&req),
"BatchGetItem" => self.batch_get_item(&req),
"BatchWriteItem" => self.batch_write_item(&req),
"TagResource" => self.tag_resource(&req),
"UntagResource" => self.untag_resource(&req),
"ListTagsOfResource" => self.list_tags_of_resource(&req),
"TransactGetItems" => self.transact_get_items(&req),
"TransactWriteItems" => self.transact_write_items(&req),
"ExecuteStatement" => self.execute_statement(&req),
"BatchExecuteStatement" => self.batch_execute_statement(&req),
"ExecuteTransaction" => self.execute_transaction(&req),
"UpdateTimeToLive" => self.update_time_to_live(&req),
"DescribeTimeToLive" => self.describe_time_to_live(&req),
"PutResourcePolicy" => self.put_resource_policy(&req),
"GetResourcePolicy" => self.get_resource_policy(&req),
"DeleteResourcePolicy" => self.delete_resource_policy(&req),
"DescribeEndpoints" => self.describe_endpoints(&req),
"DescribeLimits" => self.describe_limits(&req),
"CreateBackup" => self.create_backup(&req),
"DeleteBackup" => self.delete_backup(&req),
"DescribeBackup" => self.describe_backup(&req),
"ListBackups" => self.list_backups(&req),
"RestoreTableFromBackup" => self.restore_table_from_backup(&req),
"RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
"UpdateContinuousBackups" => self.update_continuous_backups(&req),
"DescribeContinuousBackups" => self.describe_continuous_backups(&req),
"CreateGlobalTable" => self.create_global_table(&req),
"DescribeGlobalTable" => self.describe_global_table(&req),
"DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
"ListGlobalTables" => self.list_global_tables(&req),
"UpdateGlobalTable" => self.update_global_table(&req),
"UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
"DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
"UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
"EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
"DisableKinesisStreamingDestination" => {
self.disable_kinesis_streaming_destination(&req)
}
"DescribeKinesisStreamingDestination" => {
self.describe_kinesis_streaming_destination(&req)
}
"UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
"DescribeContributorInsights" => self.describe_contributor_insights(&req),
"UpdateContributorInsights" => self.update_contributor_insights(&req),
"ListContributorInsights" => self.list_contributor_insights(&req),
"ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
"DescribeExport" => self.describe_export(&req),
"ListExports" => self.list_exports(&req),
"ImportTable" => self.import_table(&req),
"DescribeImport" => self.describe_import(&req),
"ListImports" => self.list_imports(&req),
_ => Err(AwsServiceError::action_not_implemented(
"dynamodb",
&req.action,
)),
};
if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
self.save_snapshot().await;
}
result
}
fn supported_actions(&self) -> &[&str] {
&[
"CreateTable",
"DeleteTable",
"DescribeTable",
"ListTables",
"UpdateTable",
"PutItem",
"GetItem",
"DeleteItem",
"UpdateItem",
"Query",
"Scan",
"BatchGetItem",
"BatchWriteItem",
"TagResource",
"UntagResource",
"ListTagsOfResource",
"TransactGetItems",
"TransactWriteItems",
"ExecuteStatement",
"BatchExecuteStatement",
"ExecuteTransaction",
"UpdateTimeToLive",
"DescribeTimeToLive",
"PutResourcePolicy",
"GetResourcePolicy",
"DeleteResourcePolicy",
"DescribeEndpoints",
"DescribeLimits",
"CreateBackup",
"DeleteBackup",
"DescribeBackup",
"ListBackups",
"RestoreTableFromBackup",
"RestoreTableToPointInTime",
"UpdateContinuousBackups",
"DescribeContinuousBackups",
"CreateGlobalTable",
"DescribeGlobalTable",
"DescribeGlobalTableSettings",
"ListGlobalTables",
"UpdateGlobalTable",
"UpdateGlobalTableSettings",
"DescribeTableReplicaAutoScaling",
"UpdateTableReplicaAutoScaling",
"EnableKinesisStreamingDestination",
"DisableKinesisStreamingDestination",
"DescribeKinesisStreamingDestination",
"UpdateKinesisStreamingDestination",
"DescribeContributorInsights",
"UpdateContributorInsights",
"ListContributorInsights",
"ExportTableToPointInTime",
"DescribeExport",
"ListExports",
"ImportTable",
"DescribeImport",
"ListImports",
]
}
}
fn is_mutating_action(action: &str) -> bool {
matches!(
action,
"CreateTable"
| "DeleteTable"
| "UpdateTable"
| "PutItem"
| "DeleteItem"
| "UpdateItem"
| "BatchWriteItem"
| "TagResource"
| "UntagResource"
| "TransactWriteItems"
| "ExecuteStatement"
| "BatchExecuteStatement"
| "ExecuteTransaction"
| "UpdateTimeToLive"
| "PutResourcePolicy"
| "DeleteResourcePolicy"
| "CreateBackup"
| "DeleteBackup"
| "RestoreTableFromBackup"
| "RestoreTableToPointInTime"
| "UpdateContinuousBackups"
| "CreateGlobalTable"
| "UpdateGlobalTable"
| "UpdateGlobalTableSettings"
| "UpdateTableReplicaAutoScaling"
| "EnableKinesisStreamingDestination"
| "DisableKinesisStreamingDestination"
| "UpdateKinesisStreamingDestination"
| "UpdateContributorInsights"
| "ExportTableToPointInTime"
| "ImportTable"
)
}
fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
body[field].as_str().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("{field} is required"),
)
})
}
fn require_object(
body: &Value,
field: &str,
) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
let obj = body[field].as_object().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("{field} is required"),
)
})?;
Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
fn get_table<'a>(
tables: &'a HashMap<String, DynamoTable>,
name: &str,
) -> Result<&'a DynamoTable, AwsServiceError> {
tables.get(name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ResourceNotFoundException",
format!("Requested resource not found: Table: {name} not found"),
)
})
}
fn get_table_mut<'a>(
tables: &'a mut HashMap<String, DynamoTable>,
name: &str,
) -> Result<&'a mut DynamoTable, AwsServiceError> {
tables.get_mut(name).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ResourceNotFoundException",
format!("Requested resource not found: Table: {name} not found"),
)
})
}
fn find_table_by_arn<'a>(
tables: &'a HashMap<String, DynamoTable>,
arn: &str,
) -> Result<&'a DynamoTable, AwsServiceError> {
tables.values().find(|t| t.arn == arn).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ResourceNotFoundException",
format!("Requested resource not found: {arn}"),
)
})
}
fn find_table_by_arn_mut<'a>(
tables: &'a mut HashMap<String, DynamoTable>,
arn: &str,
) -> Result<&'a mut DynamoTable, AwsServiceError> {
tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ResourceNotFoundException",
format!("Requested resource not found: {arn}"),
)
})
}
fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
let arr = val.as_array().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"KeySchema is required",
)
})?;
Ok(arr
.iter()
.map(|elem| KeySchemaElement {
attribute_name: elem["AttributeName"]
.as_str()
.unwrap_or_default()
.to_string(),
key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
})
.collect())
}
fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
let arr = val.as_array().ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"AttributeDefinitions is required",
)
})?;
Ok(arr
.iter()
.map(|elem| AttributeDefinition {
attribute_name: elem["AttributeName"]
.as_str()
.unwrap_or_default()
.to_string(),
attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
})
.collect())
}
fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
Ok(ProvisionedThroughput {
read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
})
}
fn parse_gsi(val: &Value, billing_mode: &str) -> Vec<GlobalSecondaryIndex> {
let Some(arr) = val.as_array() else {
return Vec::new();
};
arr.iter()
.filter_map(|g| {
Some(GlobalSecondaryIndex {
index_name: g["IndexName"].as_str()?.to_string(),
key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
projection: parse_projection(&g["Projection"]),
provisioned_throughput: Some(parse_gsi_throughput(
&g["ProvisionedThroughput"],
billing_mode,
)),
on_demand_throughput: parse_on_demand_throughput(&g["OnDemandThroughput"]),
})
})
.collect()
}
pub(super) fn parse_on_demand_throughput(val: &Value) -> Option<crate::state::OnDemandThroughput> {
if !val.is_object() {
return None;
}
Some(crate::state::OnDemandThroughput {
max_read_request_units: val["MaxReadRequestUnits"].as_i64().unwrap_or(-1),
max_write_request_units: val["MaxWriteRequestUnits"].as_i64().unwrap_or(-1),
})
}
fn parse_gsi_throughput(val: &Value, billing_mode: &str) -> ProvisionedThroughput {
if billing_mode == "PAY_PER_REQUEST" {
return ProvisionedThroughput {
read_capacity_units: 0,
write_capacity_units: 0,
};
}
ProvisionedThroughput {
read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
}
}
fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
let Some(arr) = val.as_array() else {
return Vec::new();
};
arr.iter()
.filter_map(|l| {
Some(LocalSecondaryIndex {
index_name: l["IndexName"].as_str()?.to_string(),
key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
projection: parse_projection(&l["Projection"]),
})
})
.collect()
}
pub(super) fn parse_projection(val: &Value) -> Projection {
Projection {
projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
non_key_attributes: val["NonKeyAttributes"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect()
})
.unwrap_or_default(),
}
}
fn parse_tags(val: &Value) -> HashMap<String, String> {
let mut tags = HashMap::new();
if let Some(arr) = val.as_array() {
for tag in arr {
if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
tags.insert(k.to_string(), v.to_string());
}
}
}
tags
}
fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
let mut names = HashMap::new();
if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
for (k, v) in obj {
if let Some(s) = v.as_str() {
names.insert(k.clone(), s.to_string());
}
}
}
names
}
fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
let mut values = HashMap::new();
if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
for (k, v) in obj {
values.insert(k.clone(), v.clone());
}
}
values
}
fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
if name.starts_with('#') {
expr_attr_names
.get(name)
.cloned()
.unwrap_or_else(|| name.to_string())
} else {
name.to_string()
}
}
fn resolve_path(
path: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
) -> Option<Value> {
if !path.contains('.') && !path.contains('[') {
return item.get(&resolve_attr_name(path, expr_attr_names)).cloned();
}
let resolved = resolve_projection_path(path, expr_attr_names);
resolve_nested_path(item, &resolved)
}
fn extract_key(
table: &DynamoTable,
item: &HashMap<String, AttributeValue>,
) -> HashMap<String, AttributeValue> {
let mut key = HashMap::new();
let hash_key = table.hash_key_name();
if let Some(v) = item.get(hash_key) {
key.insert(hash_key.to_string(), v.clone());
}
if let Some(range_key) = table.range_key_name() {
if let Some(v) = item.get(range_key) {
key.insert(range_key.to_string(), v.clone());
}
}
key
}
fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
let obj = value.as_object()?;
if obj.is_empty() {
return None;
}
Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
}
fn item_matches_key(
item: &HashMap<String, AttributeValue>,
key: &HashMap<String, AttributeValue>,
hash_key_name: &str,
range_key_name: Option<&str>,
) -> bool {
let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
(Some(a), Some(b)) => a == b,
_ => false,
};
if !hash_match {
return false;
}
match range_key_name {
Some(rk) => match (item.get(rk), key.get(rk)) {
(Some(a), Some(b)) => a == b,
(None, None) => true,
_ => false,
},
None => true,
}
}
fn extract_key_for_schema(
item: &HashMap<String, AttributeValue>,
hash_key_name: &str,
range_key_name: Option<&str>,
) -> HashMap<String, AttributeValue> {
let mut key = HashMap::new();
if let Some(v) = item.get(hash_key_name) {
key.insert(hash_key_name.to_string(), v.clone());
}
if let Some(rk) = range_key_name {
if let Some(v) = item.get(rk) {
key.insert(rk.to_string(), v.clone());
}
}
key
}
fn validate_key_in_item(
table: &DynamoTable,
item: &HashMap<String, AttributeValue>,
) -> Result<(), AwsServiceError> {
let hash_key = table.hash_key_name();
if !item.contains_key(hash_key) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("Missing the key {hash_key} in the item"),
));
}
if let Some(range_key) = table.range_key_name() {
if !item.contains_key(range_key) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("Missing the key {range_key} in the item"),
));
}
}
Ok(())
}
fn validate_key_attributes_in_key(
table: &DynamoTable,
key: &HashMap<String, AttributeValue>,
) -> Result<(), AwsServiceError> {
let hash_key = table.hash_key_name();
if !key.contains_key(hash_key) {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("Missing the key {hash_key} in the item"),
));
}
Ok(())
}
fn project_item(
item: &HashMap<String, AttributeValue>,
body: &Value,
) -> HashMap<String, AttributeValue> {
let projection = body["ProjectionExpression"].as_str();
match projection {
Some(proj) if !proj.is_empty() => {
let expr_attr_names = parse_expression_attribute_names(body);
let mut result = HashMap::new();
for raw in proj.split(',') {
let raw = raw.trim();
if !raw.contains('.') && !raw.contains('[') {
let key = resolve_attr_name(raw, &expr_attr_names);
if let Some(v) = item.get(&key) {
result.insert(key, v.clone());
}
} else {
let resolved = resolve_projection_path(raw, &expr_attr_names);
if let Some(v) = resolve_nested_path(item, &resolved) {
insert_nested_value(&mut result, &resolved, v);
}
}
}
result
}
_ => item.clone(),
}
}
fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
let mut result = String::new();
for (i, segment) in path.split('.').enumerate() {
if i > 0 {
result.push('.');
}
if let Some(bracket_pos) = segment.find('[') {
let key_part = &segment[..bracket_pos];
let index_part = &segment[bracket_pos..];
result.push_str(&resolve_attr_name(key_part, expr_attr_names));
result.push_str(index_part);
} else {
result.push_str(&resolve_attr_name(segment, expr_attr_names));
}
}
result
}
fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
let segments = parse_path_segments(path);
if segments.is_empty() {
return None;
}
let first = &segments[0];
let top_key = match first {
PathSegment::Key(k) => k.as_str(),
_ => return None,
};
let mut current = item.get(top_key)?.clone();
for segment in &segments[1..] {
match segment {
PathSegment::Key(k) => {
current = current.get("M")?.get(k)?.clone();
}
PathSegment::Index(idx) => {
current = current.get("L")?.get(*idx)?.clone();
}
}
}
Some(current)
}
#[derive(Debug)]
enum PathSegment {
Key(String),
Index(usize),
}
fn parse_path_segments(path: &str) -> Vec<PathSegment> {
let mut segments = Vec::new();
let mut current = String::new();
let chars: Vec<char> = path.chars().collect();
let mut i = 0;
while i < chars.len() {
match chars[i] {
'.' => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
}
'[' => {
if !current.is_empty() {
segments.push(PathSegment::Key(current.clone()));
current.clear();
}
i += 1;
let mut num = String::new();
while i < chars.len() && chars[i] != ']' {
num.push(chars[i]);
i += 1;
}
if let Ok(idx) = num.parse::<usize>() {
segments.push(PathSegment::Index(idx));
}
}
c => {
current.push(c);
}
}
i += 1;
}
if !current.is_empty() {
segments.push(PathSegment::Key(current));
}
segments
}
fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
if !path.contains('.') && !path.contains('[') {
result.insert(path.to_string(), value);
return;
}
let segments = parse_path_segments(path);
if segments.is_empty() {
return;
}
let top_key = match &segments[0] {
PathSegment::Key(k) => k.clone(),
_ => return,
};
if segments.len() == 1 {
result.insert(top_key, value);
return;
}
let wrapped = wrap_value_in_path(&segments[1..], value);
let existing = result.remove(&top_key);
let merged = match existing {
Some(existing) => merge_attribute_values(existing, wrapped),
None => wrapped,
};
result.insert(top_key, merged);
}
fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
if segments.is_empty() {
return value;
}
let inner = wrap_value_in_path(&segments[1..], value);
match &segments[0] {
PathSegment::Key(k) => {
json!({"M": {k.clone(): inner}})
}
PathSegment::Index(idx) => {
let mut arr = vec![Value::Null; idx + 1];
arr[*idx] = inner;
json!({"L": arr})
}
}
}
fn merge_attribute_values(a: Value, b: Value) -> Value {
if let (Some(a_map), Some(b_map)) = (
a.get("M").and_then(|v| v.as_object()),
b.get("M").and_then(|v| v.as_object()),
) {
let mut merged = a_map.clone();
for (k, v) in b_map {
if let Some(existing) = merged.get(k) {
merged.insert(
k.clone(),
merge_attribute_values(existing.clone(), v.clone()),
);
} else {
merged.insert(k.clone(), v.clone());
}
}
json!({"M": merged})
} else {
b
}
}
fn evaluate_condition(
condition: &str,
existing: Option<&HashMap<String, AttributeValue>>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<(), AwsServiceError> {
let empty = HashMap::new();
let item = existing.unwrap_or(&empty);
if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
Ok(())
} else {
Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ConditionalCheckFailedException",
"The conditional request failed",
))
}
}
fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
let with_paren = format!("{func_name}(");
let with_space = format!("{func_name} (");
let rest = expr
.strip_prefix(&with_paren)
.or_else(|| expr.strip_prefix(&with_space))?;
let inner = rest.strip_suffix(')')?;
Some(inner.trim())
}
fn evaluate_key_condition(
expr: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let trimmed = expr.trim();
let parts = split_on_and(trimmed);
if parts.len() > 1 {
return parts.iter().all(|part| {
evaluate_key_condition(part.trim(), item, expr_attr_names, expr_attr_values)
});
}
let stripped = strip_outer_parens(trimmed);
if stripped != trimmed {
return evaluate_key_condition(stripped, item, expr_attr_names, expr_attr_values);
}
evaluate_single_key_condition(trimmed, item, expr_attr_names, expr_attr_values)
}
fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
let bytes = expr.as_bytes();
let len = bytes.len();
let kw = keyword.as_bytes();
let is_and = keyword.eq_ignore_ascii_case("AND");
let mut parts: Vec<&str> = Vec::new();
let mut start = 0usize;
let mut depth: i32 = 0;
let mut between_skip: u32 = 0;
let mut i = 0usize;
while i < len {
let ch = bytes[i];
if ch == b'(' {
depth += 1;
i += 1;
continue;
}
if ch == b')' {
if depth > 0 {
depth -= 1;
}
i += 1;
continue;
}
if depth == 0 {
if is_and {
if let Some(end) = match_keyword(bytes, i, b"BETWEEN") {
between_skip = between_skip.saturating_add(1);
i = end;
continue;
}
}
if let Some(end) = match_keyword(bytes, i, kw) {
if is_and && between_skip > 0 {
between_skip -= 1;
i = end;
continue;
}
parts.push(&expr[start..i]);
start = end;
i = end;
continue;
}
}
i += 1;
}
parts.push(&expr[start..]);
parts
}
fn match_keyword(bytes: &[u8], i: usize, keyword: &[u8]) -> Option<usize> {
let end = i + keyword.len();
if end > bytes.len() {
return None;
}
for k in 0..keyword.len() {
if !bytes[i + k].eq_ignore_ascii_case(&keyword[k]) {
return None;
}
}
let needs_word_boundary = keyword.iter().all(|b| b.is_ascii_alphanumeric());
if needs_word_boundary {
let left_ok = i == 0 || bytes[i - 1].is_ascii_whitespace();
if !left_ok {
return None;
}
let right_ok = end == bytes.len() || bytes[end].is_ascii_whitespace();
if !right_ok {
return None;
}
}
Some(end)
}
fn split_on_and(expr: &str) -> Vec<&str> {
split_on_top_level_keyword(expr, "AND")
}
fn split_on_or(expr: &str) -> Vec<&str> {
split_on_top_level_keyword(expr, "OR")
}
fn evaluate_single_key_condition(
part: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let part = part.trim();
if let Some(rest) = part
.strip_prefix("begins_with(")
.or_else(|| part.strip_prefix("begins_with ("))
{
return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
}
if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
}
key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
}
fn key_cond_begins_with(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let Some(inner) = rest.strip_suffix(')') else {
return false;
};
let mut split = inner.splitn(2, ',');
let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
return false;
};
let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
let expected = expr_attr_values.get(val_ref.trim());
let actual = item.get(&attr_name);
match (actual, expected) {
(Some(a), Some(e)) => {
let a_str = a.get("S").and_then(|v| v.as_str());
let e_str = e.get("S").and_then(|v| v.as_str());
matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
}
_ => false,
}
}
fn key_cond_between(
part: &str,
between_pos: usize,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let attr_part = part[..between_pos].trim();
let attr_name = resolve_attr_name(attr_part, expr_attr_names);
let range_part = &part[between_pos + 7..];
let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
return false;
};
let lo_ref = range_part[..and_pos].trim();
let hi_ref = range_part[and_pos + 5..].trim();
let lo = expr_attr_values.get(lo_ref);
let hi = expr_attr_values.get(hi_ref);
let actual = item.get(&attr_name);
match (actual, lo, hi) {
(Some(a), Some(l), Some(h)) => {
compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
&& compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
}
_ => false,
}
}
fn key_cond_simple_comparison(
part: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
for op in &["<=", ">=", "<>", "=", "<", ">"] {
let Some(pos) = part.find(op) else {
continue;
};
let left = part[..pos].trim();
let right = part[pos + op.len()..].trim();
let actual_owned = resolve_path(left, item, expr_attr_names);
let actual = actual_owned.as_ref();
let expected = expr_attr_values.get(right);
return match *op {
"=" => actual == expected,
"<>" => actual != expected,
"<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
"<=" => {
let cmp = compare_attribute_values(actual, expected);
cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
}
">=" => {
let cmp = compare_attribute_values(actual, expected);
cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
}
_ => false,
};
}
false
}
fn attribute_size(val: &Value) -> Option<usize> {
if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
return Some(s.len());
}
if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
let decoded_len = base64::engine::general_purpose::STANDARD
.decode(b)
.map(|v| v.len())
.unwrap_or(b.len());
return Some(decoded_len);
}
if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
return Some(arr.len());
}
if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
return Some(arr.len());
}
if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
return Some(arr.len());
}
if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
return Some(arr.len());
}
if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
return Some(obj.len());
}
None
}
fn evaluate_size_comparison(
part: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Option<bool> {
let open = part.find('(')?;
let close = part[open..].find(')')? + open;
let path = part[open + 1..close].trim();
let remainder = part[close + 1..].trim();
let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
("<=", rest.trim())
} else if let Some(rest) = remainder.strip_prefix(">=") {
(">=", rest.trim())
} else if let Some(rest) = remainder.strip_prefix("<>") {
("<>", rest.trim())
} else if let Some(rest) = remainder.strip_prefix('<') {
("<", rest.trim())
} else if let Some(rest) = remainder.strip_prefix('>') {
(">", rest.trim())
} else if let Some(rest) = remainder.strip_prefix('=') {
("=", rest.trim())
} else {
return None;
};
let actual_owned = resolve_path(path, item, expr_attr_names)?;
let size = attribute_size(&actual_owned)? as f64;
let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
Some(match op {
"=" => (size - expected).abs() < f64::EPSILON,
"<>" => (size - expected).abs() >= f64::EPSILON,
"<" => size < expected,
">" => size > expected,
"<=" => size <= expected,
">=" => size >= expected,
_ => false,
})
}
fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
match (a, b) {
(None, None) => std::cmp::Ordering::Equal,
(None, Some(_)) => std::cmp::Ordering::Less,
(Some(_), None) => std::cmp::Ordering::Greater,
(Some(a), Some(b)) => {
let a_type = attribute_type_and_value(a);
let b_type = attribute_type_and_value(b);
match (a_type, b_type) {
(Some(("S", a_val)), Some(("S", b_val))) => {
let a_str = a_val.as_str().unwrap_or("");
let b_str = b_val.as_str().unwrap_or("");
a_str.cmp(b_str)
}
(Some(("N", a_val)), Some(("N", b_val))) => {
let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
a_num
.partial_cmp(&b_num)
.unwrap_or(std::cmp::Ordering::Equal)
}
(Some(("B", a_val)), Some(("B", b_val))) => {
let a_str = a_val.as_str().unwrap_or("");
let b_str = b_val.as_str().unwrap_or("");
a_str.cmp(b_str)
}
_ => std::cmp::Ordering::Equal,
}
}
}
}
fn evaluate_filter_expression(
expr: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let trimmed = expr.trim();
let or_parts = split_on_or(trimmed);
if or_parts.len() > 1 {
return or_parts.iter().any(|part| {
evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
});
}
let and_parts = split_on_and(trimmed);
if and_parts.len() > 1 {
return and_parts.iter().all(|part| {
evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
});
}
let stripped = strip_outer_parens(trimmed);
if stripped != trimmed {
return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
}
if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
}
evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
}
fn strip_outer_parens(expr: &str) -> &str {
let trimmed = expr.trim();
if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
return trimmed;
}
let inner = &trimmed[1..trimmed.len() - 1];
let mut depth = 0;
for ch in inner.bytes() {
match ch {
b'(' => depth += 1,
b')' => {
if depth == 0 {
return trimmed; }
depth -= 1;
}
_ => {}
}
}
if depth == 0 {
inner
} else {
trimmed
}
}
fn evaluate_single_filter_condition(
part: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
if let Some(inner) = extract_function_arg(part, "attribute_exists") {
return resolve_path(inner, item, expr_attr_names).is_some();
}
if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
return resolve_path(inner, item, expr_attr_names).is_none();
}
if let Some(rest) = part
.strip_prefix("begins_with(")
.or_else(|| part.strip_prefix("begins_with ("))
{
return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
}
if let Some(rest) = part
.strip_prefix("contains(")
.or_else(|| part.strip_prefix("contains ("))
{
return eval_contains(rest, item, expr_attr_names, expr_attr_values);
}
if part.starts_with("size(") || part.starts_with("size (") {
if let Some(result) =
evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
{
return result;
}
}
if let Some(rest) = part
.strip_prefix("attribute_type(")
.or_else(|| part.strip_prefix("attribute_type ("))
{
return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
}
if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
let actual = item.get(&attr_name);
return evaluate_in_match(actual, &value_refs, expr_attr_values);
}
evaluate_single_key_condition(part, item, expr_attr_names, expr_attr_values)
}
fn eval_begins_with(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let Some(inner) = rest.strip_suffix(')') else {
return false;
};
let mut split = inner.splitn(2, ',');
let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
return false;
};
let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
let expected = expr_attr_values.get(val_ref.trim());
match (actual.as_ref(), expected) {
(Some(a), Some(e)) => {
let a_str = a.get("S").and_then(|v| v.as_str());
let e_str = e.get("S").and_then(|v| v.as_str());
matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
}
_ => false,
}
}
fn eval_contains(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let Some(inner) = rest.strip_suffix(')') else {
return false;
};
let mut split = inner.splitn(2, ',');
let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
return false;
};
let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
let expected = expr_attr_values.get(val_ref.trim());
let (Some(a), Some(e)) = (actual.as_ref(), expected) else {
return false;
};
if let (Some(a_s), Some(e_s)) = (
a.get("S").and_then(|v| v.as_str()),
e.get("S").and_then(|v| v.as_str()),
) {
return a_s.contains(e_s);
}
if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
if let Some(val) = e.get("S") {
return set.contains(val);
}
}
if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
if let Some(val) = e.get("N") {
return set.contains(val);
}
}
if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
if let Some(val) = e.get("B") {
return set.contains(val);
}
}
if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
return list.contains(e);
}
false
}
fn eval_attribute_type(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> bool {
let Some(inner) = rest.strip_suffix(')') else {
return false;
};
let mut split = inner.splitn(2, ',');
let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
return false;
};
let actual = resolve_path(attr_ref.trim(), item, expr_attr_names);
let expected_type = expr_attr_values
.get(val_ref.trim())
.and_then(|v| v.get("S"))
.and_then(|v| v.as_str());
let (Some(val), Some(t)) = (actual.as_ref(), expected_type) else {
return false;
};
match t {
"S" => val.get("S").is_some(),
"N" => val.get("N").is_some(),
"B" => val.get("B").is_some(),
"BOOL" => val.get("BOOL").is_some(),
"NULL" => val.get("NULL").is_some(),
"SS" => val.get("SS").is_some(),
"NS" => val.get("NS").is_some(),
"BS" => val.get("BS").is_some(),
"L" => val.get("L").is_some(),
"M" => val.get("M").is_some(),
_ => false,
}
}
fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
let upper = expr.to_ascii_uppercase();
let in_pos = upper.find(" IN ")?;
let attr_ref = expr[..in_pos].trim();
if attr_ref.is_empty() {
return None;
}
let rest = expr[in_pos + 4..].trim_start();
let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
let values: Vec<&str> = inner
.split(',')
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.collect();
if values.is_empty() {
return None;
}
Some((attr_ref, values))
}
fn evaluate_in_match(
actual: Option<&AttributeValue>,
value_refs: &[&str],
expr_attr_values: &HashMap<String, Value>,
) -> bool {
value_refs.iter().any(|v_ref| {
let expected = expr_attr_values.get(*v_ref);
matches!((actual, expected), (Some(a), Some(e)) if a == e)
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UpdateAction {
Set,
Remove,
Add,
Delete,
}
impl UpdateAction {
const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
("SET", UpdateAction::Set),
("REMOVE", UpdateAction::Remove),
("ADD", UpdateAction::Add),
("DELETE", UpdateAction::Delete),
];
fn keyword(self) -> &'static str {
match self {
UpdateAction::Set => "SET",
UpdateAction::Remove => "REMOVE",
UpdateAction::Add => "ADD",
UpdateAction::Delete => "DELETE",
}
}
}
fn apply_update_expression(
item: &mut HashMap<String, AttributeValue>,
expr: &str,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<(), AwsServiceError> {
let clauses = parse_update_clauses(expr);
if clauses.is_empty() && !expr.trim().is_empty() {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
));
}
for (action, assignments) in &clauses {
match action {
UpdateAction::Set => {
for assignment in assignments {
apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
}
}
UpdateAction::Remove => {
for attr_ref in assignments {
let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
item.remove(&attr);
}
}
UpdateAction::Add => {
for assignment in assignments {
apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
}
}
UpdateAction::Delete => {
for assignment in assignments {
apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
}
}
}
}
Ok(())
}
fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
let upper = expr.to_ascii_uppercase();
let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
for &(kw, action) in UpdateAction::KEYWORDS {
let mut search_from = 0;
while let Some(pos) = upper[search_from..].find(kw) {
let abs_pos = search_from + pos;
let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
let after_pos = abs_pos + kw.len();
let after_ok =
after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
if before_ok && after_ok {
positions.push((abs_pos, action));
}
search_from = abs_pos + kw.len();
}
}
positions.sort_by_key(|(pos, _)| *pos);
for (i, &(pos, action)) in positions.iter().enumerate() {
let start = pos + action.keyword().len();
let end = if i + 1 < positions.len() {
positions[i + 1].0
} else {
expr.len()
};
let content = expr[start..end].trim();
let assignments: Vec<String> = split_on_top_level_keyword(content, ",")
.into_iter()
.map(|s| s.trim().to_string())
.collect();
clauses.push((action, assignments));
}
clauses
}
fn apply_set_assignment(
item: &mut HashMap<String, AttributeValue>,
assignment: &str,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<(), AwsServiceError> {
let Some((left, right)) = assignment.split_once('=') else {
return Ok(());
};
let left_trimmed = left.trim();
let right = right.trim();
let new_value = evaluate_set_rhs(right, item, expr_attr_names, expr_attr_values)?;
if is_dotted_path(left_trimmed) {
let Some(v) = new_value else {
return Ok(());
};
return assign_nested_path(item, left_trimmed, expr_attr_names, v);
}
let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
Some((name, idx)) => (name, Some(idx)),
None => (left_trimmed, None),
};
let attr = resolve_attr_name(attr_ref, expr_attr_names);
let Some(v) = new_value else {
return Ok(());
};
match list_index {
Some(idx) => assign_list_index(item, &attr, idx, v),
None => {
item.insert(attr, v);
Ok(())
}
}
}
fn evaluate_set_rhs(
right: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<Option<Value>, AwsServiceError> {
if let Some(rest) = right
.strip_prefix("if_not_exists(")
.or_else(|| right.strip_prefix("if_not_exists ("))
{
return Ok(evaluate_if_not_exists_rhs(
rest,
item,
expr_attr_names,
expr_attr_values,
));
}
if let Some(rest) = right
.strip_prefix("list_append(")
.or_else(|| right.strip_prefix("list_append ("))
{
return Ok(evaluate_list_append_rhs(
rest,
item,
expr_attr_names,
expr_attr_values,
));
}
if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
return evaluate_arithmetic_rhs(
arith_left,
arith_right,
is_add,
item,
expr_attr_names,
expr_attr_values,
);
}
Ok(resolve_ref_or_path(
right,
item,
expr_attr_names,
expr_attr_values,
))
}
fn evaluate_if_not_exists_rhs(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Option<Value> {
let inner = rest.strip_suffix(')')?;
let mut split = inner.splitn(2, ',');
let (check, default) = (split.next()?, split.next()?);
if resolve_ref_or_path(check.trim(), item, expr_attr_names, expr_attr_values).is_some() {
return None;
}
resolve_ref_or_path(default.trim(), item, expr_attr_names, expr_attr_values)
}
fn evaluate_list_append_rhs(
rest: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Option<Value> {
let inner = rest.strip_suffix(')')?;
let mut split = inner.splitn(2, ',');
let (a_ref, b_ref) = (split.next()?, split.next()?);
let a_val = resolve_ref_or_path(a_ref.trim(), item, expr_attr_names, expr_attr_values);
let b_val = resolve_ref_or_path(b_ref.trim(), item, expr_attr_names, expr_attr_values);
let mut merged = Vec::new();
for v in [&a_val, &b_val].iter().copied().flatten() {
if let Value::Object(obj) = v {
if let Some(Value::Array(arr)) = obj.get("L") {
merged.extend(arr.clone());
}
}
}
Some(json!({ "L": merged }))
}
fn evaluate_arithmetic_rhs(
arith_left: &str,
arith_right: &str,
is_add: bool,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<Option<Value>, AwsServiceError> {
let left_val = resolve_ref_or_path(arith_left.trim(), item, expr_attr_names, expr_attr_values);
let right_val =
resolve_ref_or_path(arith_right.trim(), item, expr_attr_names, expr_attr_values);
let left_num = match extract_number(&left_val) {
Some(n) => n,
None if left_val.is_some() => {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"An operand in the update expression has an incorrect data type",
));
}
None => 0.0,
};
let right_num = extract_number(&right_val).ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"An operand in the update expression has an incorrect data type",
)
})?;
let result = if is_add {
left_num + right_num
} else {
left_num - right_num
};
let num_str = if result == result.trunc() {
format!("{}", result as i64)
} else {
format!("{result}")
};
Ok(Some(json!({ "N": num_str })))
}
fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
let path = path.trim();
if !path.ends_with(']') {
return None;
}
let open = path.rfind('[')?;
let idx_str = &path[open + 1..path.len() - 1];
let idx: usize = idx_str.parse().ok()?;
let name = &path[..open];
if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
return None;
}
Some((name, idx))
}
fn assign_list_index(
item: &mut HashMap<String, AttributeValue>,
attr: &str,
idx: usize,
value: Value,
) -> Result<(), AwsServiceError> {
let Some(existing) = item.get_mut(attr) else {
return Err(invalid_document_path());
};
let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
return Err(invalid_document_path());
};
if idx < list.len() {
list[idx] = value;
} else if idx == list.len() {
list.push(value);
} else {
return Err(invalid_document_path());
}
Ok(())
}
fn invalid_document_path() -> AwsServiceError {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"The document path provided in the update expression is invalid for update",
)
}
fn resolve_ref_or_path(
reference: &str,
item: &HashMap<String, AttributeValue>,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Option<Value> {
let reference = reference.trim();
if reference.starts_with(':') {
return expr_attr_values.get(reference).cloned();
}
resolve_path(reference, item, expr_attr_names)
}
fn is_dotted_path(path: &str) -> bool {
path.contains('.') && !path.contains('[')
}
fn assign_nested_path(
item: &mut HashMap<String, AttributeValue>,
path: &str,
expr_attr_names: &HashMap<String, String>,
value: Value,
) -> Result<(), AwsServiceError> {
let mut segments: Vec<String> = path
.split('.')
.map(|seg| resolve_attr_name(seg.trim(), expr_attr_names))
.collect();
if segments.len() < 2 {
return Err(invalid_document_path());
}
let leaf = segments.pop().expect("len >= 2");
let top = segments.remove(0);
let top_attr = item.get_mut(&top).ok_or_else(invalid_document_path)?;
let mut current = top_attr
.get_mut("M")
.and_then(|m| m.as_object_mut())
.ok_or_else(invalid_document_path)?;
for seg in &segments {
current = current
.get_mut(seg)
.and_then(|v| v.get_mut("M"))
.and_then(|m| m.as_object_mut())
.ok_or_else(invalid_document_path)?;
}
current.insert(leaf, value);
Ok(())
}
fn extract_number(val: &Option<Value>) -> Option<f64> {
val.as_ref()
.and_then(|v| v.get("N"))
.and_then(|n| n.as_str())
.and_then(|s| s.parse().ok())
}
fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
let mut depth = 0;
for (i, c) in expr.char_indices() {
match c {
'(' => depth += 1,
')' => depth -= 1,
'+' if depth == 0 && i > 0 => {
return Some((&expr[..i], &expr[i + 1..], true));
}
'-' if depth == 0 && i > 0 => {
return Some((&expr[..i], &expr[i + 1..], false));
}
_ => {}
}
}
None
}
fn apply_add_assignment(
item: &mut HashMap<String, AttributeValue>,
assignment: &str,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<(), AwsServiceError> {
let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
if parts.len() != 2 {
return Ok(());
}
let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
let val_ref = parts[1].trim();
let add_val = expr_attr_values.get(val_ref);
if let Some(add_val) = add_val {
if let Some(existing) = item.get(&attr) {
if let (Some(existing_num), Some(add_num)) = (
extract_number(&Some(existing.clone())),
extract_number(&Some(add_val.clone())),
) {
let result = existing_num + add_num;
let num_str = if result == result.trunc() {
format!("{}", result as i64)
} else {
format!("{result}")
};
item.insert(attr, json!({"N": num_str}));
} else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
let mut merged: Vec<Value> = existing_set.clone();
for v in add_set {
if !merged.contains(v) {
merged.push(v.clone());
}
}
item.insert(attr, json!({"SS": merged}));
}
} else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
let mut merged: Vec<Value> = existing_set.clone();
for v in add_set {
if !merged.contains(v) {
merged.push(v.clone());
}
}
item.insert(attr, json!({"NS": merged}));
}
} else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
let mut merged: Vec<Value> = existing_set.clone();
for v in add_set {
if !merged.contains(v) {
merged.push(v.clone());
}
}
item.insert(attr, json!({"BS": merged}));
}
}
} else {
item.insert(attr, add_val.clone());
}
}
Ok(())
}
fn apply_delete_assignment(
item: &mut HashMap<String, AttributeValue>,
assignment: &str,
expr_attr_names: &HashMap<String, String>,
expr_attr_values: &HashMap<String, Value>,
) -> Result<(), AwsServiceError> {
let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
if parts.len() != 2 {
return Ok(());
}
let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
let val_ref = parts[1].trim();
let del_val = expr_attr_values.get(val_ref);
if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
if let (Some(existing_set), Some(del_set)) = (
existing.get("SS").and_then(|v| v.as_array()),
del_val.get("SS").and_then(|v| v.as_array()),
) {
let filtered: Vec<Value> = existing_set
.iter()
.filter(|v| !del_set.contains(v))
.cloned()
.collect();
if filtered.is_empty() {
item.remove(&attr);
} else {
item.insert(attr, json!({"SS": filtered}));
}
} else if let (Some(existing_set), Some(del_set)) = (
existing.get("NS").and_then(|v| v.as_array()),
del_val.get("NS").and_then(|v| v.as_array()),
) {
let filtered: Vec<Value> = existing_set
.iter()
.filter(|v| !del_set.contains(v))
.cloned()
.collect();
if filtered.is_empty() {
item.remove(&attr);
} else {
item.insert(attr, json!({"NS": filtered}));
}
} else if let (Some(existing_set), Some(del_set)) = (
existing.get("BS").and_then(|v| v.as_array()),
del_val.get("BS").and_then(|v| v.as_array()),
) {
let filtered: Vec<Value> = existing_set
.iter()
.filter(|v| !del_set.contains(v))
.cloned()
.collect();
if filtered.is_empty() {
item.remove(&attr);
} else {
item.insert(attr, json!({"BS": filtered}));
}
}
}
Ok(())
}
pub(super) struct TableDescriptionInput<'a> {
pub arn: &'a str,
pub table_id: &'a str,
pub key_schema: &'a [KeySchemaElement],
pub attribute_definitions: &'a [AttributeDefinition],
pub provisioned_throughput: &'a ProvisionedThroughput,
pub gsi: &'a [GlobalSecondaryIndex],
pub lsi: &'a [LocalSecondaryIndex],
pub billing_mode: &'a str,
pub created_at: chrono::DateTime<chrono::Utc>,
pub item_count: i64,
pub size_bytes: i64,
pub status: &'a str,
pub deletion_protection_enabled: bool,
pub on_demand_throughput: Option<&'a crate::state::OnDemandThroughput>,
}
fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
let TableDescriptionInput {
arn,
table_id,
key_schema,
attribute_definitions,
provisioned_throughput,
gsi,
lsi,
billing_mode,
created_at,
item_count,
size_bytes,
status,
deletion_protection_enabled,
on_demand_throughput,
} = *input;
let table_name = arn.rsplit('/').next().unwrap_or("");
let creation_timestamp =
created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
let ks: Vec<Value> = key_schema
.iter()
.map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
.collect();
let ad: Vec<Value> = attribute_definitions
.iter()
.map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
.collect();
let mut desc = json!({
"TableName": table_name,
"TableArn": arn,
"TableId": table_id,
"TableStatus": status,
"KeySchema": ks,
"AttributeDefinitions": ad,
"CreationDateTime": creation_timestamp,
"ItemCount": item_count,
"TableSizeBytes": size_bytes,
"BillingModeSummary": { "BillingMode": billing_mode },
"DeletionProtectionEnabled": deletion_protection_enabled,
});
if billing_mode != "PAY_PER_REQUEST" {
desc["ProvisionedThroughput"] = json!({
"ReadCapacityUnits": provisioned_throughput.read_capacity_units,
"WriteCapacityUnits": provisioned_throughput.write_capacity_units,
"NumberOfDecreasesToday": 0,
});
} else {
desc["ProvisionedThroughput"] = json!({
"ReadCapacityUnits": 0,
"WriteCapacityUnits": 0,
"NumberOfDecreasesToday": 0,
});
}
if let Some(odt) = on_demand_throughput {
desc["OnDemandThroughput"] = json!({
"MaxReadRequestUnits": odt.max_read_request_units,
"MaxWriteRequestUnits": odt.max_write_request_units,
});
}
if status == "ACTIVE" {
desc["WarmThroughput"] = json!({
"ReadUnitsPerSecond": 0,
"WriteUnitsPerSecond": 0,
"Status": "ACTIVE",
});
}
if !gsi.is_empty() {
let gsi_json: Vec<Value> = gsi
.iter()
.map(|g| {
let gks: Vec<Value> = g
.key_schema
.iter()
.map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
.collect();
let mut idx = json!({
"IndexName": g.index_name,
"KeySchema": gks,
"Projection": { "ProjectionType": g.projection.projection_type },
"IndexStatus": "ACTIVE",
"IndexArn": format!("{arn}/index/{}", g.index_name),
"ItemCount": 0,
"IndexSizeBytes": 0,
});
if !g.projection.non_key_attributes.is_empty() {
idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
}
if let Some(ref pt) = g.provisioned_throughput {
idx["ProvisionedThroughput"] = json!({
"ReadCapacityUnits": pt.read_capacity_units,
"WriteCapacityUnits": pt.write_capacity_units,
"NumberOfDecreasesToday": 0,
});
}
if let Some(ref odt) = g.on_demand_throughput {
idx["OnDemandThroughput"] = json!({
"MaxReadRequestUnits": odt.max_read_request_units,
"MaxWriteRequestUnits": odt.max_write_request_units,
});
}
idx
})
.collect();
desc["GlobalSecondaryIndexes"] = json!(gsi_json);
}
if !lsi.is_empty() {
let lsi_json: Vec<Value> = lsi
.iter()
.map(|l| {
let lks: Vec<Value> = l
.key_schema
.iter()
.map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
.collect();
let mut idx = json!({
"IndexName": l.index_name,
"KeySchema": lks,
"Projection": { "ProjectionType": l.projection.projection_type },
"IndexArn": format!("{arn}/index/{}", l.index_name),
"ItemCount": 0,
"IndexSizeBytes": 0,
});
if !l.projection.non_key_attributes.is_empty() {
idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
}
idx
})
.collect();
desc["LocalSecondaryIndexes"] = json!(lsi_json);
}
desc
}
fn build_table_description(table: &DynamoTable) -> Value {
let mut desc = build_table_description_json(&TableDescriptionInput {
arn: &table.arn,
table_id: &table.table_id,
key_schema: &table.key_schema,
attribute_definitions: &table.attribute_definitions,
provisioned_throughput: &table.provisioned_throughput,
gsi: &table.gsi,
lsi: &table.lsi,
billing_mode: &table.billing_mode,
created_at: table.created_at,
item_count: table.item_count,
size_bytes: table.size_bytes,
status: &table.status,
deletion_protection_enabled: table.deletion_protection_enabled,
on_demand_throughput: table.on_demand_throughput.as_ref(),
});
if let Some(ref stream_arn) = table.stream_arn {
desc["LatestStreamArn"] = json!(stream_arn);
desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
}
if table.stream_enabled {
if let Some(ref view_type) = table.stream_view_type {
desc["StreamSpecification"] = json!({
"StreamEnabled": true,
"StreamViewType": view_type,
});
}
}
if let Some(ref sse_type) = table.sse_type {
let mut sse_desc = json!({
"Status": "ENABLED",
"SSEType": sse_type,
});
if let Some(ref key_arn) = table.sse_kms_key_arn {
sse_desc["KMSMasterKeyArn"] = json!(key_arn);
}
desc["SSEDescription"] = sse_desc;
}
desc
}
fn execute_partiql_statement(
state: &SharedDynamoDbState,
statement: &str,
parameters: &[Value],
) -> Result<AwsResponse, AwsServiceError> {
let trimmed = statement.trim();
let upper = trimmed.to_ascii_uppercase();
if upper.starts_with("SELECT") {
execute_partiql_select(state, trimmed, parameters)
} else if upper.starts_with("INSERT") {
execute_partiql_insert(state, trimmed, parameters)
} else if upper.starts_with("UPDATE") {
execute_partiql_update(state, trimmed, parameters)
} else if upper.starts_with("DELETE") {
execute_partiql_delete(state, trimmed, parameters)
} else {
Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
format!("Unsupported PartiQL statement: {trimmed}"),
))
}
}
fn execute_partiql_select(
state: &SharedDynamoDbState,
statement: &str,
parameters: &[Value],
) -> Result<AwsResponse, AwsServiceError> {
let upper = statement.to_ascii_uppercase();
let from_pos = upper.find("FROM").ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid SELECT statement: missing FROM",
)
})?;
let after_from = statement[from_pos + 4..].trim();
let (table_name, rest) = parse_partiql_table_name(after_from);
let __mas = state.read();
let state = __mas.default_ref();
let table = get_table(&state.tables, &table_name)?;
let rest_upper = rest.trim().to_ascii_uppercase();
if rest_upper.starts_with("WHERE") {
let where_clause = rest.trim()[5..].trim();
let matched = evaluate_partiql_where(table, where_clause, parameters)?;
let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
DynamoDbService::ok_json(json!({ "Items": items }))
} else {
let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
DynamoDbService::ok_json(json!({ "Items": items }))
}
}
fn execute_partiql_insert(
state: &SharedDynamoDbState,
statement: &str,
parameters: &[Value],
) -> Result<AwsResponse, AwsServiceError> {
let upper = statement.to_ascii_uppercase();
let into_pos = upper.find("INTO").ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid INSERT statement: missing INTO",
)
})?;
let after_into = statement[into_pos + 4..].trim();
let (table_name, rest) = parse_partiql_table_name(after_into);
let rest_upper = rest.trim().to_ascii_uppercase();
let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid INSERT statement: missing VALUE",
)
})?;
let value_str = rest.trim()[value_pos + 5..].trim();
let item = parse_partiql_value_object(value_str, parameters)?;
let mut __mas = state.write();
let state = __mas.default_mut();
let table = get_table_mut(&mut state.tables, &table_name)?;
let key = extract_key(table, &item);
if table.find_item_index(&key).is_some() {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"DuplicateItemException",
"Duplicate primary key exists in table",
));
} else {
table.items.push(item);
}
table.recalculate_stats();
DynamoDbService::ok_json(json!({}))
}
fn execute_partiql_update(
state: &SharedDynamoDbState,
statement: &str,
parameters: &[Value],
) -> Result<AwsResponse, AwsServiceError> {
let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
let rest_upper = rest.trim().to_ascii_uppercase();
let set_pos = rest_upper.find("SET").ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid UPDATE statement: missing SET",
)
})?;
let after_set = rest.trim()[set_pos + 3..].trim();
let where_pos = after_set.to_ascii_uppercase().find("WHERE");
let (set_clause, where_clause) = if let Some(wp) = where_pos {
(&after_set[..wp], after_set[wp + 5..].trim())
} else {
(after_set, "")
};
let mut __mas = state.write();
let state = __mas.default_mut();
let table = get_table_mut(&mut state.tables, &table_name)?;
let matched_indices = if !where_clause.is_empty() {
find_partiql_where_indices(table, where_clause, parameters)?
} else {
(0..table.items.len()).collect()
};
let param_offset = count_params_in_str(where_clause);
let assignments: Vec<&str> = set_clause.split(',').collect();
for idx in &matched_indices {
let mut local_offset = param_offset;
for assignment in &assignments {
let assignment = assignment.trim();
if let Some((attr, val_str)) = assignment.split_once('=') {
let attr = attr.trim().trim_matches('"');
let val_str = val_str.trim();
let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
if let Some(v) = value {
table.items[*idx].insert(attr.to_string(), v);
}
}
}
}
table.recalculate_stats();
DynamoDbService::ok_json(json!({}))
}
fn execute_partiql_delete(
state: &SharedDynamoDbState,
statement: &str,
parameters: &[Value],
) -> Result<AwsResponse, AwsServiceError> {
let upper = statement.to_ascii_uppercase();
let from_pos = upper.find("FROM").ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid DELETE statement: missing FROM",
)
})?;
let after_from = statement[from_pos + 4..].trim();
let (table_name, rest) = parse_partiql_table_name(after_from);
let rest_upper = rest.trim().to_ascii_uppercase();
if !rest_upper.starts_with("WHERE") {
return Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"DELETE requires a WHERE clause",
));
}
let where_clause = rest.trim()[5..].trim();
let mut __mas = state.write();
let state = __mas.default_mut();
let table = get_table_mut(&mut state.tables, &table_name)?;
let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
indices.sort_unstable();
indices.reverse();
for idx in indices {
table.items.remove(idx);
}
table.recalculate_stats();
DynamoDbService::ok_json(json!({}))
}
fn parse_partiql_table_name(s: &str) -> (String, &str) {
let s = s.trim();
if let Some(stripped) = s.strip_prefix('"') {
if let Some(end) = stripped.find('"') {
let name = &stripped[..end];
let rest = &stripped[end + 1..];
(name.to_string(), rest)
} else {
let end = s.find(' ').unwrap_or(s.len());
(s[..end].trim_matches('"').to_string(), &s[end..])
}
} else {
let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
(s[..end].to_string(), &s[end..])
}
}
fn evaluate_partiql_where<'a>(
table: &'a DynamoTable,
where_clause: &str,
parameters: &[Value],
) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
let indices = find_partiql_where_indices(table, where_clause, parameters)?;
Ok(indices.iter().map(|i| &table.items[*i]).collect())
}
fn find_partiql_where_indices(
table: &DynamoTable,
where_clause: &str,
parameters: &[Value],
) -> Result<Vec<usize>, AwsServiceError> {
let conditions = split_partiql_and_clauses(where_clause);
let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
let mut indices = Vec::new();
for (i, item) in table.items.iter().enumerate() {
let all_match = parsed_conditions
.iter()
.all(|(attr, expected)| item.get(attr) == Some(expected));
if all_match {
indices.push(i);
}
}
Ok(indices)
}
fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
let upper = where_clause.to_uppercase();
if !upper.contains(" AND ") {
return vec![where_clause.trim()];
}
let mut parts = Vec::new();
let mut last = 0;
for (i, _) in upper.match_indices(" AND ") {
parts.push(where_clause[last..i].trim());
last = i + 5;
}
parts.push(where_clause[last..].trim());
parts
}
fn parse_partiql_equality_conditions(
conditions: &[&str],
parameters: &[Value],
) -> Vec<(String, Value)> {
let mut param_idx = 0usize;
let mut parsed = Vec::new();
for cond in conditions {
let cond = cond.trim();
if let Some((left, right)) = cond.split_once('=') {
let attr = left.trim().trim_matches('"').to_string();
let val_str = right.trim();
if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
parsed.push((attr, value));
}
}
}
parsed
}
fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
let s = s.trim();
if s == "?" {
let idx = *param_idx;
*param_idx += 1;
parameters.get(idx).cloned()
} else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
let inner = &s[1..s.len() - 1];
Some(json!({"S": inner}))
} else if let Ok(n) = s.parse::<f64>() {
let num_str = if n == n.trunc() {
format!("{}", n as i64)
} else {
format!("{n}")
};
Some(json!({"N": num_str}))
} else {
None
}
}
fn parse_partiql_value_object(
s: &str,
parameters: &[Value],
) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
let s = s.trim();
let inner = s
.strip_prefix('{')
.and_then(|s| s.strip_suffix('}'))
.ok_or_else(|| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"ValidationException",
"Invalid VALUE: expected object literal",
)
})?;
let mut item = HashMap::new();
let mut param_idx = 0usize;
for pair in split_partiql_pairs(inner) {
let pair = pair.trim();
if pair.is_empty() {
continue;
}
if let Some((key_part, val_part)) = pair.split_once(':') {
let key = key_part
.trim()
.trim_matches('\'')
.trim_matches('"')
.to_string();
if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
item.insert(key, val);
}
}
}
Ok(item)
}
fn split_partiql_pairs(s: &str) -> Vec<&str> {
let mut parts = Vec::new();
let mut start = 0;
let mut depth = 0;
let mut in_quote = false;
for (i, c) in s.char_indices() {
match c {
'\'' if !in_quote => in_quote = true,
'\'' if in_quote => in_quote = false,
'{' if !in_quote => depth += 1,
'}' if !in_quote => depth -= 1,
',' if !in_quote && depth == 0 => {
parts.push(&s[start..i]);
start = i + 1;
}
_ => {}
}
}
parts.push(&s[start..]);
parts
}
fn count_params_in_str(s: &str) -> usize {
s.chars().filter(|c| *c == '?').count()
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_parse_update_clauses_set() {
let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
assert_eq!(clauses.len(), 1);
assert_eq!(clauses[0].0, UpdateAction::Set);
assert_eq!(clauses[0].1.len(), 2);
}
#[test]
fn test_parse_update_clauses_set_and_remove() {
let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
assert_eq!(clauses.len(), 2);
assert_eq!(clauses[0].0, UpdateAction::Set);
assert_eq!(clauses[1].0, UpdateAction::Remove);
}
#[test]
fn test_parse_update_clauses_list_append_single_assignment() {
let clauses = parse_update_clauses("SET #0 = list_append(#0, :0)");
assert_eq!(clauses.len(), 1);
assert_eq!(clauses[0].0, UpdateAction::Set);
assert_eq!(
clauses[0].1.len(),
1,
"list_append(a, b) must be kept as a single assignment, not split at the inner comma"
);
}
#[test]
fn test_parse_update_clauses_list_append_mixed_with_plain_set() {
let clauses = parse_update_clauses("SET #0 = list_append(#0, :new), #1 = :other");
assert_eq!(clauses.len(), 1);
assert_eq!(clauses[0].0, UpdateAction::Set);
assert_eq!(
clauses[0].1.len(),
2,
"two SET assignments: one list_append and one plain"
);
}
#[test]
fn test_evaluate_key_condition_simple() {
let mut item = HashMap::new();
item.insert("pk".to_string(), json!({"S": "user1"}));
item.insert("sk".to_string(), json!({"S": "order1"}));
let mut expr_values = HashMap::new();
expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
assert!(evaluate_key_condition(
"pk = :pk",
&item,
&HashMap::new(),
&expr_values,
));
}
#[test]
fn test_compare_attribute_values_numbers() {
let a = json!({"N": "10"});
let b = json!({"N": "20"});
assert_eq!(
compare_attribute_values(Some(&a), Some(&b)),
std::cmp::Ordering::Less
);
}
#[test]
fn test_compare_attribute_values_strings() {
let a = json!({"S": "apple"});
let b = json!({"S": "banana"});
assert_eq!(
compare_attribute_values(Some(&a), Some(&b)),
std::cmp::Ordering::Less
);
}
#[test]
fn test_split_on_and() {
let parts = split_on_and("pk = :pk AND sk > :sk");
assert_eq!(parts.len(), 2);
assert_eq!(parts[0].trim(), "pk = :pk");
assert_eq!(parts[1].trim(), "sk > :sk");
}
#[test]
fn test_split_on_and_respects_parentheses() {
let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
assert_eq!(parts.len(), 1);
assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
}
#[test]
fn test_evaluate_filter_expression_parenthesized_and_with_or() {
let mut item = HashMap::new();
item.insert("x".to_string(), json!({"S": "no"}));
item.insert("y".to_string(), json!({"S": "no"}));
item.insert("z".to_string(), json!({"S": "yes"}));
let mut expr_values = HashMap::new();
expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
let result = evaluate_filter_expression(
"(x = :yes AND y = :yes) OR z = :yes",
&item,
&HashMap::new(),
&expr_values,
);
assert!(result, "should match because z = :yes is true");
let mut item2 = HashMap::new();
item2.insert("x".to_string(), json!({"S": "no"}));
item2.insert("y".to_string(), json!({"S": "no"}));
item2.insert("z".to_string(), json!({"S": "no"}));
let result2 = evaluate_filter_expression(
"(x = :yes AND y = :yes) OR z = :yes",
&item2,
&HashMap::new(),
&expr_values,
);
assert!(!result2, "should not match because nothing is true");
}
#[test]
fn test_project_item_nested_path() {
let mut item = HashMap::new();
item.insert("pk".to_string(), json!({"S": "key1"}));
item.insert(
"data".to_string(),
json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
);
let body = json!({
"ProjectionExpression": "data[0].name"
});
let projected = project_item(&item, &body);
let name = projected
.get("data")
.and_then(|v| v.get("L"))
.and_then(|v| v.get(0))
.and_then(|v| v.get("M"))
.and_then(|v| v.get("name"))
.and_then(|v| v.get("S"))
.and_then(|v| v.as_str());
assert_eq!(name, Some("Alice"));
let age = projected
.get("data")
.and_then(|v| v.get("L"))
.and_then(|v| v.get(0))
.and_then(|v| v.get("M"))
.and_then(|v| v.get("age"));
assert!(age.is_none(), "age should not be present in projection");
}
#[test]
fn test_resolve_nested_path_map() {
let mut item = HashMap::new();
item.insert(
"info".to_string(),
json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
);
let result = resolve_nested_path(&item, "info.address.city");
assert_eq!(result, Some(json!({"S": "NYC"})));
}
#[test]
fn test_resolve_nested_path_list_then_map() {
let mut item = HashMap::new();
item.insert(
"items".to_string(),
json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
);
let result = resolve_nested_path(&item, "items[0].sku");
assert_eq!(result, Some(json!({"S": "ABC"})));
}
#[test]
fn test_resolve_path_alias_with_dot_is_top_level_attr() {
let mut item = HashMap::new();
item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
let mut names = HashMap::new();
names.insert("#sw".to_string(), "Safety.Warning".to_string());
let result = resolve_path("#sw", &item, &names);
assert_eq!(result, Some(json!({"S": "high"})));
}
#[test]
fn test_resolve_path_dotted_expression_still_walks_nested() {
let mut item = HashMap::new();
item.insert("profile".to_string(), json!({"M": {"email": {"S": "x@y"}}}));
let names = HashMap::new();
let result = resolve_path("profile.email", &item, &names);
assert_eq!(result, Some(json!({"S": "x@y"})));
}
#[test]
fn test_project_item_alias_with_dot_is_top_level_attr() {
let mut item = HashMap::new();
item.insert("Safety.Warning".to_string(), json!({"S": "high"}));
item.insert("other".to_string(), json!({"S": "ignored"}));
let body = json!({
"ProjectionExpression": "#sw",
"ExpressionAttributeNames": {"#sw": "Safety.Warning"},
});
let projected = project_item(&item, &body);
assert_eq!(projected.get("Safety.Warning"), Some(&json!({"S": "high"})));
assert!(!projected.contains_key("other"));
}
use crate::state::SharedDynamoDbState;
use parking_lot::RwLock;
use std::sync::Arc;
fn make_service() -> DynamoDbService {
let state: SharedDynamoDbState = Arc::new(RwLock::new(
fakecloud_core::multi_account::MultiAccountState::new("123456789012", "us-east-1", ""),
));
DynamoDbService::new(state)
}
fn make_request(action: &str, body: Value) -> AwsRequest {
AwsRequest {
service: "dynamodb".to_string(),
action: action.to_string(),
region: "us-east-1".to_string(),
account_id: "123456789012".to_string(),
request_id: "test-id".to_string(),
headers: http::HeaderMap::new(),
query_params: HashMap::new(),
body: serde_json::to_vec(&body).unwrap().into(),
body_stream: parking_lot::Mutex::new(None),
path_segments: vec![],
raw_path: "/".to_string(),
raw_query: String::new(),
method: http::Method::POST,
is_query_protocol: false,
access_key_id: None,
principal: None,
}
}
fn create_test_table(svc: &DynamoDbService) {
let req = make_request(
"CreateTable",
json!({
"TableName": "test-table",
"KeySchema": [
{ "AttributeName": "pk", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "pk", "AttributeType": "S" }
],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
}
#[test]
fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "warm-throughput-table",
"KeySchema": [
{ "AttributeName": "pk", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "pk", "AttributeType": "S" }
],
"BillingMode": "PAY_PER_REQUEST"
}),
);
let create_resp = svc.create_table(&req).unwrap();
let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
let create_table = &create_body["TableDescription"];
assert_eq!(create_table["TableStatus"], "ACTIVE");
assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
let table_id = create_table["TableId"].as_str().unwrap().to_string();
assert!(!table_id.is_empty());
let describe_req = make_request(
"DescribeTable",
json!({ "TableName": "warm-throughput-table" }),
);
let describe_resp = svc.describe_table(&describe_req).unwrap();
let describe_body: Value =
serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
let described_table = &describe_body["Table"];
assert_eq!(described_table["TableStatus"], "ACTIVE");
assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
assert_eq!(described_table["TableId"], table_id);
let describe_resp_again = svc.describe_table(&describe_req).unwrap();
let describe_body_again: Value =
serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
assert_eq!(describe_body_again["Table"]["TableId"], table_id);
}
#[test]
fn delete_item_return_values_all_old() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": "key1" },
"name": { "S": "Alice" },
"age": { "N": "30" }
}
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"DeleteItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "key1" } },
"ReturnValues": "ALL_OLD"
}),
);
let resp = svc.delete_item(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let attrs = &body["Attributes"];
assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "key1" } }
}),
);
let resp = svc.get_item(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert!(body.get("Item").is_none(), "item should be deleted");
}
#[test]
fn transact_get_items_returns_existing_and_missing() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": "exists" },
"val": { "S": "hello" }
}
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"TransactGetItems",
json!({
"TransactItems": [
{ "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
{ "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
]
}),
);
let resp = svc.transact_get_items(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let responses = body["Responses"].as_array().unwrap();
assert_eq!(responses.len(), 2);
assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
assert!(responses[1].get("Item").is_none());
}
#[test]
fn transact_write_items_put_and_delete() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": "to-delete" },
"val": { "S": "bye" }
}
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"TransactWriteItems",
json!({
"TransactItems": [
{
"Put": {
"TableName": "test-table",
"Item": {
"pk": { "S": "new-item" },
"val": { "S": "hi" }
}
}
},
{
"Delete": {
"TableName": "test-table",
"Key": { "pk": { "S": "to-delete" } }
}
}
]
}),
);
let resp = svc.transact_write_items(&req).unwrap();
assert_eq!(resp.status, StatusCode::OK);
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "new-item" } }
}),
);
let resp = svc.get_item(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "to-delete" } }
}),
);
let resp = svc.get_item(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert!(body.get("Item").is_none());
}
#[test]
fn transact_write_items_condition_check_failure() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"TransactWriteItems",
json!({
"TransactItems": [
{
"ConditionCheck": {
"TableName": "test-table",
"Key": { "pk": { "S": "nonexistent" } },
"ConditionExpression": "attribute_exists(pk)"
}
}
]
}),
);
let resp = svc.transact_write_items(&req).unwrap();
assert_eq!(resp.status, StatusCode::BAD_REQUEST);
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["__type"].as_str().unwrap(),
"TransactionCanceledException"
);
assert!(body["CancellationReasons"].as_array().is_some());
}
#[test]
fn update_and_describe_time_to_live() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"UpdateTimeToLive",
json!({
"TableName": "test-table",
"TimeToLiveSpecification": {
"AttributeName": "ttl",
"Enabled": true
}
}),
);
let resp = svc.update_time_to_live(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["TimeToLiveSpecification"]["AttributeName"]
.as_str()
.unwrap(),
"ttl"
);
assert!(body["TimeToLiveSpecification"]["Enabled"]
.as_bool()
.unwrap());
let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
let resp = svc.describe_time_to_live(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["TimeToLiveDescription"]["TimeToLiveStatus"]
.as_str()
.unwrap(),
"ENABLED"
);
assert_eq!(
body["TimeToLiveDescription"]["AttributeName"]
.as_str()
.unwrap(),
"ttl"
);
let req = make_request(
"UpdateTimeToLive",
json!({
"TableName": "test-table",
"TimeToLiveSpecification": {
"AttributeName": "ttl",
"Enabled": false
}
}),
);
svc.update_time_to_live(&req).unwrap();
let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
let resp = svc.describe_time_to_live(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["TimeToLiveDescription"]["TimeToLiveStatus"]
.as_str()
.unwrap(),
"DISABLED"
);
}
#[test]
fn resource_policy_lifecycle() {
let svc = make_service();
create_test_table(&svc);
let table_arn = {
let __mas = svc.state.read();
let state = __mas.default_ref();
state.tables.get("test-table").unwrap().arn.clone()
};
let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
let req = make_request(
"PutResourcePolicy",
json!({
"ResourceArn": table_arn,
"Policy": policy_doc
}),
);
let resp = svc.put_resource_policy(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert!(body["RevisionId"].as_str().is_some());
let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
let resp = svc.get_resource_policy(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
svc.delete_resource_policy(&req).unwrap();
let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
let resp = svc.get_resource_policy(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert!(body["Policy"].is_null());
}
#[test]
fn describe_endpoints() {
let svc = make_service();
let req = make_request("DescribeEndpoints", json!({}));
let resp = svc.describe_endpoints(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
}
#[test]
fn describe_limits() {
let svc = make_service();
let req = make_request("DescribeLimits", json!({}));
let resp = svc.describe_limits(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
}
#[test]
fn backup_lifecycle() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"CreateBackup",
json!({ "TableName": "test-table", "BackupName": "my-backup" }),
);
let resp = svc.create_backup(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let backup_arn = body["BackupDetails"]["BackupArn"]
.as_str()
.unwrap()
.to_string();
assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
let resp = svc.describe_backup(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["BackupDescription"]["BackupDetails"]["BackupName"],
"my-backup"
);
let req = make_request("ListBackups", json!({}));
let resp = svc.list_backups(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
let req = make_request(
"RestoreTableFromBackup",
json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
);
svc.restore_table_from_backup(&req).unwrap();
let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
let resp = svc.describe_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
svc.delete_backup(&req).unwrap();
let req = make_request("ListBackups", json!({}));
let resp = svc.list_backups(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
}
#[test]
fn continuous_backups() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"DescribeContinuousBackups",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_continuous_backups(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
["PointInTimeRecoveryStatus"],
"DISABLED"
);
let req = make_request(
"UpdateContinuousBackups",
json!({
"TableName": "test-table",
"PointInTimeRecoverySpecification": {
"PointInTimeRecoveryEnabled": true
}
}),
);
svc.update_continuous_backups(&req).unwrap();
let req = make_request(
"DescribeContinuousBackups",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_continuous_backups(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
["PointInTimeRecoveryStatus"],
"ENABLED"
);
}
#[test]
fn restore_table_to_point_in_time() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"RestoreTableToPointInTime",
json!({
"SourceTableName": "test-table",
"TargetTableName": "pitr-restored"
}),
);
svc.restore_table_to_point_in_time(&req).unwrap();
let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
let resp = svc.describe_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
}
#[test]
fn global_table_lifecycle() {
let svc = make_service();
let req = make_request(
"CreateGlobalTable",
json!({
"GlobalTableName": "my-global",
"ReplicationGroup": [
{ "RegionName": "us-east-1" },
{ "RegionName": "eu-west-1" }
]
}),
);
let resp = svc.create_global_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["GlobalTableDescription"]["GlobalTableStatus"],
"ACTIVE"
);
let req = make_request(
"DescribeGlobalTable",
json!({ "GlobalTableName": "my-global" }),
);
let resp = svc.describe_global_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["GlobalTableDescription"]["ReplicationGroup"]
.as_array()
.unwrap()
.len(),
2
);
let req = make_request("ListGlobalTables", json!({}));
let resp = svc.list_global_tables(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
let req = make_request(
"UpdateGlobalTable",
json!({
"GlobalTableName": "my-global",
"ReplicaUpdates": [
{ "Create": { "RegionName": "ap-southeast-1" } }
]
}),
);
let resp = svc.update_global_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["GlobalTableDescription"]["ReplicationGroup"]
.as_array()
.unwrap()
.len(),
3
);
let req = make_request(
"DescribeGlobalTableSettings",
json!({ "GlobalTableName": "my-global" }),
);
let resp = svc.describe_global_table_settings(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
let req = make_request(
"UpdateGlobalTableSettings",
json!({ "GlobalTableName": "my-global" }),
);
svc.update_global_table_settings(&req).unwrap();
}
#[test]
fn table_replica_auto_scaling() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"DescribeTableReplicaAutoScaling",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["TableAutoScalingDescription"]["TableName"],
"test-table"
);
let req = make_request(
"UpdateTableReplicaAutoScaling",
json!({ "TableName": "test-table" }),
);
svc.update_table_replica_auto_scaling(&req).unwrap();
}
#[test]
fn kinesis_streaming_lifecycle() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"EnableKinesisStreamingDestination",
json!({
"TableName": "test-table",
"StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
}),
);
let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["DestinationStatus"], "ACTIVE");
let req = make_request(
"DescribeKinesisStreamingDestination",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["KinesisDataStreamDestinations"]
.as_array()
.unwrap()
.len(),
1
);
let req = make_request(
"UpdateKinesisStreamingDestination",
json!({
"TableName": "test-table",
"StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
"UpdateKinesisStreamingConfiguration": {
"ApproximateCreationDateTimePrecision": "MICROSECOND"
}
}),
);
svc.update_kinesis_streaming_destination(&req).unwrap();
let req = make_request(
"DisableKinesisStreamingDestination",
json!({
"TableName": "test-table",
"StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
}),
);
let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["DestinationStatus"], "DISABLED");
}
#[test]
fn contributor_insights_lifecycle() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"DescribeContributorInsights",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
let req = make_request(
"UpdateContributorInsights",
json!({
"TableName": "test-table",
"ContributorInsightsAction": "ENABLE"
}),
);
let resp = svc.update_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
let req = make_request("ListContributorInsights", json!({}));
let resp = svc.list_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["ContributorInsightsSummaries"]
.as_array()
.unwrap()
.len(),
1
);
}
#[test]
fn export_lifecycle() {
let svc = make_service();
create_test_table(&svc);
let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
let req = make_request(
"ExportTableToPointInTime",
json!({
"TableArn": table_arn,
"S3Bucket": "my-bucket"
}),
);
let resp = svc.export_table_to_point_in_time(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let export_arn = body["ExportDescription"]["ExportArn"]
.as_str()
.unwrap()
.to_string();
assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
let resp = svc.describe_export(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
let req = make_request("ListExports", json!({}));
let resp = svc.list_exports(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
}
#[test]
fn import_lifecycle() {
let svc = make_service();
let req = make_request(
"ImportTable",
json!({
"InputFormat": "DYNAMODB_JSON",
"S3BucketSource": { "S3Bucket": "import-bucket" },
"TableCreationParameters": {
"TableName": "imported-table",
"KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
"AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
}
}),
);
let resp = svc.import_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let import_arn = body["ImportTableDescription"]["ImportArn"]
.as_str()
.unwrap()
.to_string();
assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
let resp = svc.describe_import(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
let req = make_request("ListImports", json!({}));
let resp = svc.list_imports(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
let resp = svc.describe_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
}
#[test]
fn backup_restore_preserves_items() {
let svc = make_service();
create_test_table(&svc);
for i in 1..=3 {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": format!("key{i}") },
"data": { "S": format!("value{i}") }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"CreateBackup",
json!({
"TableName": "test-table",
"BackupName": "my-backup"
}),
);
let resp = svc.create_backup(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let backup_arn = body["BackupDetails"]["BackupArn"]
.as_str()
.unwrap()
.to_string();
for i in 1..=3 {
let req = make_request(
"DeleteItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": format!("key{i}") } }
}),
);
svc.delete_item(&req).unwrap();
}
let req = make_request("Scan", json!({ "TableName": "test-table" }));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 0);
let req = make_request(
"RestoreTableFromBackup",
json!({
"BackupArn": backup_arn,
"TargetTableName": "restored-table"
}),
);
svc.restore_table_from_backup(&req).unwrap();
let req = make_request("Scan", json!({ "TableName": "restored-table" }));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 3);
assert_eq!(body["Items"].as_array().unwrap().len(), 3);
}
#[test]
fn global_table_replicates_writes() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"CreateGlobalTable",
json!({
"GlobalTableName": "test-table",
"ReplicationGroup": [
{ "RegionName": "us-east-1" },
{ "RegionName": "eu-west-1" }
]
}),
);
let resp = svc.create_global_table(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(
body["GlobalTableDescription"]["GlobalTableStatus"],
"ACTIVE"
);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": "replicated-key" },
"data": { "S": "replicated-value" }
}
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "replicated-key" } }
}),
);
let resp = svc.get_item(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
assert_eq!(body["Item"]["data"]["S"], "replicated-value");
}
#[test]
fn contributor_insights_tracks_access() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"UpdateContributorInsights",
json!({
"TableName": "test-table",
"ContributorInsightsAction": "ENABLE"
}),
);
svc.update_contributor_insights(&req).unwrap();
for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": key },
"data": { "S": "value" }
}
}),
);
svc.put_item(&req).unwrap();
}
for _ in 0..3 {
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": { "pk": { "S": "alpha" } }
}),
);
svc.get_item(&req).unwrap();
}
let req = make_request(
"DescribeContributorInsights",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
let contributors = body["TopContributors"].as_array().unwrap();
assert!(
!contributors.is_empty(),
"TopContributors should not be empty"
);
let top = &contributors[0];
assert!(top["Count"].as_u64().unwrap() > 0);
let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
assert!(!rules.is_empty());
}
#[test]
fn contributor_insights_not_tracked_when_disabled() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": "key1" },
"data": { "S": "value" }
}
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"DescribeContributorInsights",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
let contributors = body["TopContributors"].as_array().unwrap();
assert!(contributors.is_empty());
}
#[test]
fn contributor_insights_disabled_table_no_counters_after_scan() {
let svc = make_service();
create_test_table(&svc);
for key in &["alpha", "beta"] {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": { "pk": { "S": key } }
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"UpdateContributorInsights",
json!({
"TableName": "test-table",
"ContributorInsightsAction": "ENABLE"
}),
);
svc.update_contributor_insights(&req).unwrap();
let req = make_request("Scan", json!({ "TableName": "test-table" }));
svc.scan(&req).unwrap();
let req = make_request(
"DescribeContributorInsights",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let contributors = body["TopContributors"].as_array().unwrap();
assert!(
!contributors.is_empty(),
"counters should be non-empty while enabled"
);
let req = make_request(
"UpdateContributorInsights",
json!({
"TableName": "test-table",
"ContributorInsightsAction": "DISABLE"
}),
);
svc.update_contributor_insights(&req).unwrap();
let req = make_request("Scan", json!({ "TableName": "test-table" }));
svc.scan(&req).unwrap();
let req = make_request(
"DescribeContributorInsights",
json!({ "TableName": "test-table" }),
);
let resp = svc.describe_contributor_insights(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let contributors = body["TopContributors"].as_array().unwrap();
assert!(
contributors.is_empty(),
"counters should be empty after disabling insights"
);
}
#[test]
fn scan_pagination_with_limit() {
let svc = make_service();
create_test_table(&svc);
for i in 0..5 {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": format!("item{i}") },
"data": { "S": format!("value{i}") }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 2);
assert!(
body["LastEvaluatedKey"].is_object(),
"should have LastEvaluatedKey when limit < total items"
);
assert!(body["LastEvaluatedKey"]["pk"].is_object());
let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
let mut lek = body["LastEvaluatedKey"].clone();
while lek.is_object() {
let req = make_request(
"Scan",
json!({
"TableName": "test-table",
"Limit": 2,
"ExclusiveStartKey": lek
}),
);
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
lek = body["LastEvaluatedKey"].clone();
}
assert_eq!(
all_items.len(),
5,
"should retrieve all 5 items via pagination"
);
}
#[test]
fn scan_no_pagination_when_all_fit() {
let svc = make_service();
create_test_table(&svc);
for i in 0..3 {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": { "S": format!("item{i}") }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 3);
assert!(
body["LastEvaluatedKey"].is_null(),
"should not have LastEvaluatedKey when all items fit"
);
let req = make_request("Scan", json!({ "TableName": "test-table" }));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 3);
assert!(body["LastEvaluatedKey"].is_null());
}
fn create_composite_table(svc: &DynamoDbService) {
let req = make_request(
"CreateTable",
json!({
"TableName": "composite-table",
"KeySchema": [
{ "AttributeName": "pk", "KeyType": "HASH" },
{ "AttributeName": "sk", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "pk", "AttributeType": "S" },
{ "AttributeName": "sk", "AttributeType": "S" }
],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
}
#[test]
fn query_pagination_with_composite_key() {
let svc = make_service();
create_composite_table(&svc);
for i in 0..5 {
let req = make_request(
"PutItem",
json!({
"TableName": "composite-table",
"Item": {
"pk": { "S": "user1" },
"sk": { "S": format!("item{i:03}") },
"data": { "S": format!("value{i}") }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"Query",
json!({
"TableName": "composite-table",
"KeyConditionExpression": "pk = :pk",
"ExpressionAttributeValues": { ":pk": { "S": "user1" } },
"Limit": 2
}),
);
let resp = svc.query(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 2);
assert!(body["LastEvaluatedKey"].is_object());
assert!(body["LastEvaluatedKey"]["pk"].is_object());
assert!(body["LastEvaluatedKey"]["sk"].is_object());
let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
let mut lek = body["LastEvaluatedKey"].clone();
while lek.is_object() {
let req = make_request(
"Query",
json!({
"TableName": "composite-table",
"KeyConditionExpression": "pk = :pk",
"ExpressionAttributeValues": { ":pk": { "S": "user1" } },
"Limit": 2,
"ExclusiveStartKey": lek
}),
);
let resp = svc.query(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
lek = body["LastEvaluatedKey"].clone();
}
assert_eq!(
all_items.len(),
5,
"should retrieve all 5 items via pagination"
);
let sks: Vec<String> = all_items
.iter()
.map(|item| item["sk"]["S"].as_str().unwrap().to_string())
.collect();
let mut sorted = sks.clone();
sorted.sort();
assert_eq!(sks, sorted, "items should be sorted by sort key");
}
#[test]
fn query_no_pagination_when_all_fit() {
let svc = make_service();
create_composite_table(&svc);
for i in 0..2 {
let req = make_request(
"PutItem",
json!({
"TableName": "composite-table",
"Item": {
"pk": { "S": "user1" },
"sk": { "S": format!("item{i}") }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"Query",
json!({
"TableName": "composite-table",
"KeyConditionExpression": "pk = :pk",
"ExpressionAttributeValues": { ":pk": { "S": "user1" } },
"Limit": 10
}),
);
let resp = svc.query(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 2);
assert!(
body["LastEvaluatedKey"].is_null(),
"should not have LastEvaluatedKey when all items fit"
);
}
fn create_gsi_table(svc: &DynamoDbService) {
let req = make_request(
"CreateTable",
json!({
"TableName": "gsi-table",
"KeySchema": [
{ "AttributeName": "pk", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "pk", "AttributeType": "S" },
{ "AttributeName": "gsi_pk", "AttributeType": "S" },
{ "AttributeName": "gsi_sk", "AttributeType": "S" }
],
"BillingMode": "PAY_PER_REQUEST",
"GlobalSecondaryIndexes": [
{
"IndexName": "gsi-index",
"KeySchema": [
{ "AttributeName": "gsi_pk", "KeyType": "HASH" },
{ "AttributeName": "gsi_sk", "KeyType": "RANGE" }
],
"Projection": { "ProjectionType": "ALL" }
}
]
}),
);
svc.create_table(&req).unwrap();
}
#[test]
fn gsi_query_last_evaluated_key_includes_table_pk() {
let svc = make_service();
create_gsi_table(&svc);
for i in 0..3 {
let req = make_request(
"PutItem",
json!({
"TableName": "gsi-table",
"Item": {
"pk": { "S": format!("item{i}") },
"gsi_pk": { "S": "shared" },
"gsi_sk": { "S": "sort" }
}
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"Query",
json!({
"TableName": "gsi-table",
"IndexName": "gsi-index",
"KeyConditionExpression": "gsi_pk = :v",
"ExpressionAttributeValues": { ":v": { "S": "shared" } },
"Limit": 1
}),
);
let resp = svc.query(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 1);
let lek = &body["LastEvaluatedKey"];
assert!(lek.is_object(), "should have LastEvaluatedKey");
assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
assert!(
lek["pk"].is_object(),
"LEK must contain table PK for GSI queries"
);
}
#[test]
fn gsi_query_pagination_returns_all_items() {
let svc = make_service();
create_gsi_table(&svc);
for i in 0..4 {
let req = make_request(
"PutItem",
json!({
"TableName": "gsi-table",
"Item": {
"pk": { "S": format!("item{i:03}") },
"gsi_pk": { "S": "shared" },
"gsi_sk": { "S": "sort" }
}
}),
);
svc.put_item(&req).unwrap();
}
let mut all_pks = Vec::new();
let mut lek: Option<Value> = None;
loop {
let mut query = json!({
"TableName": "gsi-table",
"IndexName": "gsi-index",
"KeyConditionExpression": "gsi_pk = :v",
"ExpressionAttributeValues": { ":v": { "S": "shared" } },
"Limit": 2
});
if let Some(ref start_key) = lek {
query["ExclusiveStartKey"] = start_key.clone();
}
let req = make_request("Query", query);
let resp = svc.query(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
for item in body["Items"].as_array().unwrap() {
let pk = item["pk"]["S"].as_str().unwrap().to_string();
all_pks.push(pk);
}
if body["LastEvaluatedKey"].is_object() {
lek = Some(body["LastEvaluatedKey"].clone());
} else {
break;
}
}
all_pks.sort();
assert_eq!(
all_pks,
vec!["item000", "item001", "item002", "item003"],
"pagination should return all items without duplicates"
);
}
fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), json!({"S": v})))
.collect()
}
fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), json!({"S": v})))
.collect()
}
#[test]
fn test_evaluate_condition_bare_not_equal() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":c", "complete")]);
assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
let item2 = cond_item(&[("state", "complete")]);
assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
}
#[test]
fn test_evaluate_condition_parenthesized_not_equal() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":c", "complete")]);
assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
}
#[test]
fn test_evaluate_condition_parenthesized_equal_mismatch() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":c", "complete")]);
assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
}
#[test]
fn test_evaluate_condition_compound_and() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
assert!(
evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
);
}
#[test]
fn test_evaluate_condition_compound_and_mismatch() {
let item = cond_item(&[("state", "inactive")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":b", "active")]);
assert!(
evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
);
}
#[test]
fn test_evaluate_condition_compound_or() {
let item = cond_item(&[("state", "running")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":b", "idle")]);
assert!(
evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
);
let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
assert!(
evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
);
}
#[test]
fn test_evaluate_condition_not_operator() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":c", "complete")]);
assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
assert!(
evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
);
assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
}
#[test]
fn test_evaluate_condition_begins_with() {
let item = cond_item(&[("name", "fakecloud-dynamodb")]);
let names = cond_names(&[("#n", "name")]);
let values = cond_values(&[(":p", "fakecloud")]);
assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
let values2 = cond_values(&[(":p", "realcloud")]);
assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
}
#[test]
fn test_evaluate_condition_contains() {
let item = cond_item(&[("tags", "alpha,beta,gamma")]);
let names = cond_names(&[("#t", "tags")]);
let values = cond_values(&[(":v", "beta")]);
assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
let values2 = cond_values(&[(":v", "delta")]);
assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
}
#[test]
fn test_evaluate_condition_no_existing_item() {
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":v", "active")]);
assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
}
#[test]
fn test_evaluate_filter_not_operator() {
let item = cond_item(&[("status", "pending")]);
let names = cond_names(&[("#s", "status")]);
let values = cond_values(&[(":v", "pending")]);
assert!(!evaluate_filter_expression(
"NOT (#s = :v)",
&item,
&names,
&values
));
assert!(evaluate_filter_expression(
"NOT (#s <> :v)",
&item,
&names,
&values
));
}
#[test]
fn test_evaluate_filter_expression_in_match() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":p", "pending")]);
assert!(
evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
"state=active should match IN (active, pending)"
);
}
#[test]
fn test_evaluate_filter_expression_in_no_match() {
let item = cond_item(&[("state", "complete")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":p", "pending")]);
assert!(
!evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
"state=complete should not match IN (active, pending)"
);
}
#[test]
fn test_evaluate_filter_expression_in_no_spaces() {
let item = cond_item(&[("status", "shipped")]);
let names = cond_names(&[("#s", "status")]);
let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
assert!(
evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
"no-space IN list should still parse"
);
}
#[test]
fn test_evaluate_filter_expression_in_missing_attribute() {
let item: HashMap<String, AttributeValue> = HashMap::new();
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active")]);
assert!(
!evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
"missing attribute should not match any IN list"
);
}
#[test]
fn test_evaluate_filter_expression_compound_in_and_eq() {
let item = cond_item(&[("state", "active"), ("priority", "high")]);
let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
assert!(
evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
"(active IN (active, pending)) AND (high = high) should match"
);
let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
assert!(
!evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
"(complete IN (active, pending)) AND (high = high) should not match"
);
}
#[test]
fn test_evaluate_condition_attribute_exists_with_space() {
let item = cond_item(&[("store_id", "s-1")]);
let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
let values = cond_values(&[(":0", "tab-A")]);
assert!(
evaluate_condition(
"(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
Some(&item),
&names,
&values,
)
.is_ok(),
"claim-lease compound on free item should succeed"
);
assert!(
evaluate_condition(
"(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
None,
&names,
&values,
)
.is_err(),
"claim-lease compound on missing item must fail attribute_exists branch"
);
let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
assert!(
evaluate_condition(
"(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
Some(&held),
&names,
&values,
)
.is_err(),
"claim-lease compound on item held by another tab must fail"
);
let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
assert!(
evaluate_condition(
"(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
Some(&self_held),
&names,
&values,
)
.is_ok(),
"same-tab re-claim must succeed"
);
}
#[test]
fn test_evaluate_condition_in_match() {
let item = cond_item(&[("state", "active")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":p", "pending")]);
assert!(
evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
"IN should succeed when actual value is in the list"
);
}
#[test]
fn test_evaluate_condition_in_no_match() {
let item = cond_item(&[("state", "complete")]);
let names = cond_names(&[("#s", "state")]);
let values = cond_values(&[(":a", "active"), (":p", "pending")]);
assert!(
evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
"IN should fail when actual value is not in the list"
);
}
#[test]
fn test_apply_update_set_list_index_replaces_existing() {
let mut item = HashMap::new();
item.insert(
"items".to_string(),
json!({"L": [
{"M": {"sku": {"S": "OLD-A"}}},
{"M": {"sku": {"S": "OLD-B"}}},
]}),
);
let names = cond_names(&[("#items", "items")]);
let mut values = HashMap::new();
values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
let items_list = item
.get("items")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.expect("items should still be a list");
assert_eq!(items_list.len(), 2, "list length should be unchanged");
let sku0 = items_list[0]
.get("M")
.and_then(|m| m.get("sku"))
.and_then(|s| s.get("S"))
.and_then(|s| s.as_str());
assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
let sku1 = items_list[1]
.get("M")
.and_then(|m| m.get("sku"))
.and_then(|s| s.get("S"))
.and_then(|s| s.as_str());
assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
assert!(!item.contains_key("items[0]"));
assert!(!item.contains_key("#items[0]"));
}
#[test]
fn test_apply_update_set_list_index_second_slot() {
let mut item = HashMap::new();
item.insert(
"items".to_string(),
json!({"L": [
{"M": {"sku": {"S": "A"}}},
{"M": {"sku": {"S": "B"}}},
{"M": {"sku": {"S": "C"}}},
]}),
);
let names = cond_names(&[("#items", "items")]);
let mut values = HashMap::new();
values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
let items_list = item
.get("items")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.unwrap();
let skus: Vec<&str> = items_list
.iter()
.map(|v| {
v.get("M")
.and_then(|m| m.get("sku"))
.and_then(|s| s.get("S"))
.and_then(|s| s.as_str())
.unwrap()
})
.collect();
assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
}
#[test]
fn test_apply_update_set_list_index_without_name_ref() {
let mut item = HashMap::new();
item.insert(
"tags".to_string(),
json!({"L": [{"S": "red"}, {"S": "blue"}]}),
);
let names: HashMap<String, String> = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "green"}));
apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
let tags = item
.get("tags")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
}
#[test]
fn test_list_append_into_empty_list() {
let mut item = HashMap::new();
item.insert("files".to_string(), json!({"L": []}));
let names = cond_names(&[("#0", "files")]);
let mut values = HashMap::new();
values.insert(
":0".to_string(),
json!({"L": [{"M": {"field": {"S": "value"}}}]}),
);
apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
.unwrap();
let list = item
.get("files")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.expect("files should be an L-typed attribute");
assert_eq!(list.len(), 1, "one element should have been appended");
}
#[test]
fn test_list_append_into_nonempty_list() {
let mut item = HashMap::new();
item.insert(
"files".to_string(),
json!({"L": [{"M": {"field": {"S": "existing"}}}]}),
);
let names = cond_names(&[("#0", "files")]);
let mut values = HashMap::new();
values.insert(
":0".to_string(),
json!({"L": [{"M": {"field": {"S": "new"}}}]}),
);
apply_update_expression(&mut item, "SET #0 = list_append(#0, :0)", &names, &values)
.unwrap();
let list = item
.get("files")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.expect("files should be an L-typed attribute");
assert_eq!(list.len(), 2, "existing element plus one new element");
}
#[test]
fn test_list_append_combined_with_plain_set() {
let mut item = HashMap::new();
item.insert("logs".to_string(), json!({"L": []}));
item.insert("count".to_string(), json!({"N": "0"}));
let names = cond_names(&[("#a", "logs"), ("#b", "count")]);
let mut values = HashMap::new();
values.insert(":v".to_string(), json!({"L": [{"S": "entry"}]}));
values.insert(":other".to_string(), json!({"N": "1"}));
apply_update_expression(
&mut item,
"SET #a = list_append(#a, :v), #b = :other",
&names,
&values,
)
.unwrap();
let list = item
.get("logs")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.expect("logs should be an L-typed attribute");
assert_eq!(list.len(), 1, "one log entry appended");
let count = item
.get("count")
.and_then(|v| v.get("N"))
.and_then(|v| v.as_str())
.expect("count should be an N-typed attribute");
assert_eq!(count, "1", "count updated to 1");
}
#[test]
fn test_unrecognized_expression_returns_false() {
let item = cond_item(&[("x", "1")]);
let names: HashMap<String, String> = HashMap::new();
let values: HashMap<String, Value> = HashMap::new();
assert!(
!evaluate_single_key_condition("GARBAGE NONSENSE", &item, &names, &values),
"unrecognized expression must return false"
);
}
#[test]
fn test_set_list_index_out_of_range_returns_error() {
let mut item = HashMap::new();
item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
let names: HashMap<String, String> = HashMap::new();
let mut values = HashMap::new();
values.insert(":v".to_string(), json!({"S": "z"}));
let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
assert!(
result.is_err(),
"out-of-range list index must return an error"
);
let list = item
.get("items")
.and_then(|v| v.get("L"))
.and_then(|v| v.as_array())
.unwrap();
assert_eq!(list.len(), 2);
}
#[test]
fn test_set_list_index_on_non_list_returns_error() {
let mut item = HashMap::new();
item.insert("name".to_string(), json!({"S": "hello"}));
let names: HashMap<String, String> = HashMap::new();
let mut values = HashMap::new();
values.insert(":v".to_string(), json!({"S": "z"}));
let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
assert!(
result.is_err(),
"list index on non-list attribute must return an error"
);
}
#[test]
fn test_unrecognized_update_action_returns_error() {
let mut item = HashMap::new();
item.insert("name".to_string(), json!({"S": "hello"}));
let names: HashMap<String, String> = HashMap::new();
let mut values = HashMap::new();
values.insert(":bar".to_string(), json!({"S": "baz"}));
let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
assert!(
result.is_err(),
"unrecognized UpdateExpression action must return an error"
);
let err_msg = format!("{}", result.unwrap_err());
assert!(
err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
"error should mention Invalid UpdateExpression, got: {err_msg}"
);
}
#[test]
fn test_size_string() {
let mut item = HashMap::new();
item.insert("name".to_string(), json!({"S": "hello"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":limit".to_string(), json!({"N": "5"}));
assert!(evaluate_single_filter_condition(
"size(name) = :limit",
&item,
&names,
&values,
));
values.insert(":limit".to_string(), json!({"N": "4"}));
assert!(evaluate_single_filter_condition(
"size(name) > :limit",
&item,
&names,
&values,
));
}
#[test]
fn test_size_list() {
let mut item = HashMap::new();
item.insert(
"items".to_string(),
json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
);
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":limit".to_string(), json!({"N": "3"}));
assert!(evaluate_single_filter_condition(
"size(items) = :limit",
&item,
&names,
&values,
));
}
#[test]
fn test_size_map() {
let mut item = HashMap::new();
item.insert(
"data".to_string(),
json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
);
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":limit".to_string(), json!({"N": "2"}));
assert!(evaluate_single_filter_condition(
"size(data) = :limit",
&item,
&names,
&values,
));
}
#[test]
fn test_size_set() {
let mut item = HashMap::new();
item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":limit".to_string(), json!({"N": "3"}));
assert!(evaluate_single_filter_condition(
"size(tags) > :limit",
&item,
&names,
&values,
));
}
#[test]
fn test_attribute_type_string() {
let mut item = HashMap::new();
item.insert("name".to_string(), json!({"S": "hello"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "S"}));
assert!(evaluate_single_filter_condition(
"attribute_type(name, :t)",
&item,
&names,
&values,
));
values.insert(":t".to_string(), json!({"S": "N"}));
assert!(!evaluate_single_filter_condition(
"attribute_type(name, :t)",
&item,
&names,
&values,
));
}
#[test]
fn test_attribute_type_number() {
let mut item = HashMap::new();
item.insert("age".to_string(), json!({"N": "42"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "N"}));
assert!(evaluate_single_filter_condition(
"attribute_type(age, :t)",
&item,
&names,
&values,
));
}
#[test]
fn test_attribute_type_list() {
let mut item = HashMap::new();
item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "L"}));
assert!(evaluate_single_filter_condition(
"attribute_type(items, :t)",
&item,
&names,
&values,
));
}
#[test]
fn test_attribute_type_map() {
let mut item = HashMap::new();
item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "M"}));
assert!(evaluate_single_filter_condition(
"attribute_type(data, :t)",
&item,
&names,
&values,
));
}
#[test]
fn test_attribute_type_bool() {
let mut item = HashMap::new();
item.insert("active".to_string(), json!({"BOOL": true}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":t".to_string(), json!({"S": "BOOL"}));
assert!(evaluate_single_filter_condition(
"attribute_type(active, :t)",
&item,
&names,
&values,
));
}
#[test]
fn test_begins_with_rejects_number_type() {
let mut item = HashMap::new();
item.insert("code".to_string(), json!({"N": "12345"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":prefix".to_string(), json!({"S": "123"}));
assert!(
!evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
"begins_with must return false for N-type attributes"
);
}
#[test]
fn test_begins_with_works_on_string_type() {
let mut item = HashMap::new();
item.insert("code".to_string(), json!({"S": "abc123"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":prefix".to_string(), json!({"S": "abc"}));
assert!(evaluate_single_filter_condition(
"begins_with(code, :prefix)",
&item,
&names,
&values,
));
}
#[test]
fn test_contains_string_set() {
let mut item = HashMap::new();
item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"S": "blue"}));
assert!(evaluate_single_filter_condition(
"contains(tags, :val)",
&item,
&names,
&values,
));
values.insert(":val".to_string(), json!({"S": "yellow"}));
assert!(!evaluate_single_filter_condition(
"contains(tags, :val)",
&item,
&names,
&values,
));
}
#[test]
fn test_contains_number_set() {
let mut item = HashMap::new();
item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"N": "2"}));
assert!(evaluate_single_filter_condition(
"contains(scores, :val)",
&item,
&names,
&values,
));
}
#[test]
fn test_set_arithmetic_rejects_string_operand() {
let mut item = HashMap::new();
item.insert("name".to_string(), json!({"S": "hello"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"N": "1"}));
let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
assert!(
result.is_err(),
"arithmetic on S-type attribute must return a ValidationException"
);
}
#[test]
fn test_set_arithmetic_rejects_string_value() {
let mut item = HashMap::new();
item.insert("count".to_string(), json!({"N": "5"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"S": "notanumber"}));
let result =
apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
assert!(
result.is_err(),
"arithmetic with S-type value must return a ValidationException"
);
}
#[test]
fn test_set_arithmetic_valid_numbers() {
let mut item = HashMap::new();
item.insert("count".to_string(), json!({"N": "10"}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"N": "3"}));
let result =
apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
assert!(result.is_ok());
assert_eq!(item["count"], json!({"N": "13"}));
}
#[test]
fn test_add_binary_set() {
let mut item = HashMap::new();
item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
assert!(result.is_ok());
let bs = item["data"]["BS"].as_array().unwrap();
assert_eq!(bs.len(), 3, "should merge sets without duplicates");
assert!(bs.contains(&json!("YQ==")));
assert!(bs.contains(&json!("Yg==")));
assert!(bs.contains(&json!("Yw==")));
}
#[test]
fn test_delete_binary_set() {
let mut item = HashMap::new();
item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
assert!(result.is_ok());
let bs = item["data"]["BS"].as_array().unwrap();
assert_eq!(bs.len(), 2);
assert!(!bs.contains(&json!("Yg==")));
}
#[test]
fn test_delete_binary_set_removes_attr_when_empty() {
let mut item = HashMap::new();
item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
let names = HashMap::new();
let mut values = HashMap::new();
values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
assert!(result.is_ok());
assert!(
!item.contains_key("data"),
"attribute should be removed when set becomes empty"
);
}
fn body_json(resp: &AwsResponse) -> Value {
serde_json::from_slice(resp.body.expect_bytes()).unwrap()
}
fn expect_err(result: Result<AwsResponse, AwsServiceError>) -> AwsServiceError {
match result {
Err(e) => e,
Ok(_) => panic!("expected error, got Ok"),
}
}
#[test]
fn create_table_basic() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "my-table",
"KeySchema": [{"AttributeName": "id", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "id", "AttributeType": "S"}],
"BillingMode": "PAY_PER_REQUEST",
}),
);
let resp = svc.create_table(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["TableDescription"]["TableName"], "my-table");
assert_eq!(b["TableDescription"]["TableStatus"], "ACTIVE");
assert!(b["TableDescription"]["TableArn"].as_str().is_some());
}
#[test]
fn create_table_with_sort_key_and_gsi() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "gsi-table",
"KeySchema": [
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
"AttributeDefinitions": [
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
{"AttributeName": "gsi_key", "AttributeType": "N"},
],
"GlobalSecondaryIndexes": [{
"IndexName": "gsi1",
"KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}],
"BillingMode": "PAY_PER_REQUEST",
}),
);
let resp = svc.create_table(&req).unwrap();
let b = body_json(&resp);
let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
.as_array()
.unwrap();
assert_eq!(gsi.len(), 1);
assert_eq!(gsi[0]["IndexName"], "gsi1");
}
#[test]
fn create_table_duplicate_fails() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"CreateTable",
json!({
"TableName": "test-table",
"KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "pk", "AttributeType": "S"}],
"BillingMode": "PAY_PER_REQUEST",
}),
);
let err = expect_err(svc.create_table(&req));
assert!(err.to_string().contains("ResourceInUseException"));
}
#[test]
fn create_table_missing_key_attr_in_definitions() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "bad",
"KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
"AttributeDefinitions": [{"AttributeName": "other", "AttributeType": "S"}],
"BillingMode": "PAY_PER_REQUEST",
}),
);
let err = expect_err(svc.create_table(&req));
assert!(err.to_string().contains("ValidationException"));
}
#[test]
fn describe_table_found() {
let svc = make_service();
create_test_table(&svc);
let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
let resp = svc.describe_table(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Table"]["TableName"], "test-table");
assert_eq!(b["Table"]["TableStatus"], "ACTIVE");
}
#[test]
fn describe_table_not_found() {
let svc = make_service();
let req = make_request("DescribeTable", json!({"TableName": "nope"}));
let err = expect_err(svc.describe_table(&req));
assert!(err.to_string().contains("ResourceNotFoundException"));
}
#[test]
fn delete_table_removes_table() {
let svc = make_service();
create_test_table(&svc);
let req = make_request("DeleteTable", json!({"TableName": "test-table"}));
let resp = svc.delete_table(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["TableDescription"]["TableName"], "test-table");
let req = make_request("DescribeTable", json!({"TableName": "test-table"}));
assert!(svc.describe_table(&req).is_err());
}
#[test]
fn list_tables_returns_names() {
let svc = make_service();
create_test_table(&svc);
let req = make_request("ListTables", json!({}));
let resp = svc.list_tables(&req).unwrap();
let b = body_json(&resp);
let names = b["TableNames"].as_array().unwrap();
assert!(names.iter().any(|n| n == "test-table"));
}
#[test]
fn put_and_get_item() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": {"S": "key1"},
"name": {"S": "Alice"},
"age": {"N": "30"},
},
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": {"pk": {"S": "key1"}},
}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Item"]["name"]["S"], "Alice");
assert_eq!(b["Item"]["age"]["N"], "30");
}
#[test]
fn get_item_not_found() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": {"pk": {"S": "nonexistent"}},
}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert!(b.get("Item").is_none() || b["Item"].is_null());
}
#[test]
fn delete_item_removes_item() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {"pk": {"S": "del-me"}},
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"DeleteItem",
json!({
"TableName": "test-table",
"Key": {"pk": {"S": "del-me"}},
}),
);
svc.delete_item(&req).unwrap();
let req = make_request(
"GetItem",
json!({
"TableName": "test-table",
"Key": {"pk": {"S": "del-me"}},
}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert!(b.get("Item").is_none() || b["Item"].is_null());
}
#[test]
fn put_item_returns_old_item() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {"pk": {"S": "overwrite"}, "v": {"N": "1"}},
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {"pk": {"S": "overwrite"}, "v": {"N": "2"}},
"ReturnValues": "ALL_OLD",
}),
);
let resp = svc.put_item(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Attributes"]["v"]["N"], "1");
}
#[test]
fn update_item_set_attribute() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {"pk": {"S": "upd"}, "count": {"N": "0"}},
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"UpdateItem",
json!({
"TableName": "test-table",
"Key": {"pk": {"S": "upd"}},
"UpdateExpression": "SET #c = :val",
"ExpressionAttributeNames": {"#c": "count"},
"ExpressionAttributeValues": {":val": {"N": "42"}},
"ReturnValues": "ALL_NEW",
}),
);
let resp = svc.update_item(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Attributes"]["count"]["N"], "42");
}
#[test]
fn query_returns_matching_items() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "query-table",
"KeySchema": [
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
"AttributeDefinitions": [
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
"BillingMode": "PAY_PER_REQUEST",
}),
);
svc.create_table(&req).unwrap();
for i in 0..3 {
let req = make_request(
"PutItem",
json!({
"TableName": "query-table",
"Item": {
"pk": {"S": "user1"},
"sk": {"S": format!("item-{i}")},
},
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"PutItem",
json!({
"TableName": "query-table",
"Item": {"pk": {"S": "user2"}, "sk": {"S": "item-0"}},
}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"Query",
json!({
"TableName": "query-table",
"KeyConditionExpression": "pk = :pk",
"ExpressionAttributeValues": {":pk": {"S": "user1"}},
}),
);
let resp = svc.query(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Count"], 3);
assert_eq!(b["Items"].as_array().unwrap().len(), 3);
}
#[test]
fn scan_returns_all_items() {
let svc = make_service();
create_test_table(&svc);
for i in 0..5 {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {"pk": {"S": format!("scan-{i}")}},
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request("Scan", json!({"TableName": "test-table"}));
let resp = svc.scan(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Count"], 5);
}
#[test]
fn batch_write_and_get_items() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"BatchWriteItem",
json!({
"RequestItems": {
"test-table": [
{"PutRequest": {"Item": {"pk": {"S": "b1"}, "val": {"S": "v1"}}}},
{"PutRequest": {"Item": {"pk": {"S": "b2"}, "val": {"S": "v2"}}}},
{"PutRequest": {"Item": {"pk": {"S": "b3"}, "val": {"S": "v3"}}}},
]
}
}),
);
let resp = svc.batch_write_item(&req).unwrap();
let b = body_json(&resp);
assert!(
b["UnprocessedItems"].as_object().unwrap().is_empty()
|| b["UnprocessedItems"]["test-table"]
.as_array()
.is_none_or(|a| a.is_empty())
);
let req = make_request(
"BatchGetItem",
json!({
"RequestItems": {
"test-table": {
"Keys": [
{"pk": {"S": "b1"}},
{"pk": {"S": "b2"}},
{"pk": {"S": "b3"}},
]
}
}
}),
);
let resp = svc.batch_get_item(&req).unwrap();
let b = body_json(&resp);
let items = b["Responses"]["test-table"].as_array().unwrap();
assert_eq!(items.len(), 3);
}
#[test]
fn transact_write_and_get() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"TransactWriteItems",
json!({
"TransactItems": [
{"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx1"}}}},
{"Put": {"TableName": "test-table", "Item": {"pk": {"S": "tx2"}}}},
]
}),
);
svc.transact_write_items(&req).unwrap();
let req = make_request(
"TransactGetItems",
json!({
"TransactItems": [
{"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}},
{"Get": {"TableName": "test-table", "Key": {"pk": {"S": "tx2"}}}},
]
}),
);
let resp = svc.transact_get_items(&req).unwrap();
let b = body_json(&resp);
let responses = b["Responses"].as_array().unwrap();
assert_eq!(responses.len(), 2);
}
#[test]
fn tag_operations() {
let svc = make_service();
create_test_table(&svc);
let arn = {
let s = svc.state.read();
s.default_ref()
.tables
.get("test-table")
.unwrap()
.arn
.clone()
};
let req = make_request(
"TagResource",
json!({
"ResourceArn": arn,
"Tags": [{"Key": "env", "Value": "test"}],
}),
);
svc.tag_resource(&req).unwrap();
let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
let resp = svc.list_tags_of_resource(&req).unwrap();
let b = body_json(&resp);
let tags = b["Tags"].as_array().unwrap();
assert_eq!(tags.len(), 1);
assert_eq!(tags[0]["Key"], "env");
let req = make_request(
"UntagResource",
json!({
"ResourceArn": arn,
"TagKeys": ["env"],
}),
);
svc.untag_resource(&req).unwrap();
let req = make_request("ListTagsOfResource", json!({"ResourceArn": arn}));
let resp = svc.list_tags_of_resource(&req).unwrap();
let b = body_json(&resp);
assert!(b["Tags"].as_array().unwrap().is_empty());
}
#[test]
fn update_table_add_gsi() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "upd-table",
"KeySchema": [{"AttributeName": "pk", "KeyType": "HASH"}],
"AttributeDefinitions": [
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "gk", "AttributeType": "S"},
],
"BillingMode": "PAY_PER_REQUEST",
}),
);
svc.create_table(&req).unwrap();
let req = make_request(
"UpdateTable",
json!({
"TableName": "upd-table",
"GlobalSecondaryIndexUpdates": [{
"Create": {
"IndexName": "new-gsi",
"KeySchema": [{"AttributeName": "gk", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
}],
}),
);
let resp = svc.update_table(&req).unwrap();
let b = body_json(&resp);
let gsi = b["TableDescription"]["GlobalSecondaryIndexes"]
.as_array()
.unwrap();
assert_eq!(gsi.len(), 1);
assert_eq!(gsi[0]["IndexName"], "new-gsi");
}
#[test]
fn scan_with_filter_expression() {
let svc = make_service();
create_test_table(&svc);
for i in 0..5 {
let req = make_request(
"PutItem",
json!({
"TableName": "test-table",
"Item": {
"pk": {"S": format!("f-{i}")},
"status": {"S": if i % 2 == 0 { "active" } else { "inactive" }},
},
}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"Scan",
json!({
"TableName": "test-table",
"FilterExpression": "#s = :val",
"ExpressionAttributeNames": {"#s": "status"},
"ExpressionAttributeValues": {":val": {"S": "active"}},
}),
);
let resp = svc.scan(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Count"], 3);
}
#[test]
fn execute_statement_select() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({"TableName": "test-table", "Item": {"pk": {"S": "qs1"}, "val": {"S": "hello"}}}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"ExecuteStatement",
json!({"Statement": "SELECT * FROM \"test-table\" WHERE pk='qs1'"}),
);
let resp = svc.execute_statement(&req).unwrap();
let b = body_json(&resp);
assert!(!b["Items"].as_array().unwrap().is_empty());
}
#[test]
fn execute_statement_insert() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"ExecuteStatement",
json!({"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'ins1', 'data': 'val'}"}),
);
svc.execute_statement(&req).unwrap();
let req = make_request(
"GetItem",
json!({"TableName": "test-table", "Key": {"pk": {"S": "ins1"}}}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Item"]["data"]["S"], "val");
}
#[test]
fn batch_execute_statement() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"PutItem",
json!({"TableName": "test-table", "Item": {"pk": {"S": "be1"}}}),
);
svc.put_item(&req).unwrap();
let req = make_request(
"BatchExecuteStatement",
json!({
"Statements": [
{"Statement": "SELECT * FROM \"test-table\" WHERE pk='be1'"},
]
}),
);
let resp = svc.batch_execute_statement(&req).unwrap();
let b = body_json(&resp);
assert!(b["Responses"].as_array().is_some());
}
#[test]
fn execute_transaction() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"ExecuteTransaction",
json!({
"TransactStatements": [
{"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx1'}"},
{"Statement": "INSERT INTO \"test-table\" VALUE {'pk': 'tx2'}"},
]
}),
);
svc.execute_transaction(&req).unwrap();
let req = make_request(
"GetItem",
json!({"TableName": "test-table", "Key": {"pk": {"S": "tx1"}}}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert!(b["Item"].is_object());
}
#[test]
fn batch_write_with_delete_requests() {
let svc = make_service();
create_test_table(&svc);
for key in &["bwd1", "bwd2", "bwd3"] {
let req = make_request(
"PutItem",
json!({"TableName": "test-table", "Item": {"pk": {"S": key}}}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"BatchWriteItem",
json!({
"RequestItems": {
"test-table": [
{"DeleteRequest": {"Key": {"pk": {"S": "bwd1"}}}},
{"DeleteRequest": {"Key": {"pk": {"S": "bwd2"}}}},
]
}
}),
);
svc.batch_write_item(&req).unwrap();
let req = make_request(
"GetItem",
json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd3"}}}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert!(b["Item"].is_object());
let req = make_request(
"GetItem",
json!({"TableName": "test-table", "Key": {"pk": {"S": "bwd1"}}}),
);
let resp = svc.get_item(&req).unwrap();
let b = body_json(&resp);
assert!(b.get("Item").is_none() || b["Item"].is_null());
}
#[test]
fn query_with_sort_key_begins_with() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "sk-table",
"KeySchema": [
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
"AttributeDefinitions": [
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
"BillingMode": "PAY_PER_REQUEST",
}),
);
svc.create_table(&req).unwrap();
for sk in &["order#001", "order#002", "profile#main"] {
let req = make_request(
"PutItem",
json!({"TableName": "sk-table", "Item": {"pk": {"S": "u1"}, "sk": {"S": sk}}}),
);
svc.put_item(&req).unwrap();
}
let req = make_request(
"Query",
json!({
"TableName": "sk-table",
"KeyConditionExpression": "pk = :pk AND begins_with(sk, :prefix)",
"ExpressionAttributeValues": {":pk": {"S": "u1"}, ":prefix": {"S": "order#"}},
}),
);
let resp = svc.query(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Count"], 2);
}
#[test]
fn scan_with_limit() {
let svc = make_service();
create_test_table(&svc);
for i in 0..10 {
let req = make_request(
"PutItem",
json!({"TableName": "test-table", "Item": {"pk": {"S": format!("lim{i}")}}}),
);
svc.put_item(&req).unwrap();
}
let req = make_request("Scan", json!({"TableName": "test-table", "Limit": 3}));
let resp = svc.scan(&req).unwrap();
let b = body_json(&resp);
assert_eq!(b["Count"], 3);
assert!(b["LastEvaluatedKey"].is_object());
}
#[test]
fn batch_get_item_table_not_found() {
let svc = make_service();
let req = make_request(
"BatchGetItem",
json!({"RequestItems": {"ghost": {"Keys": [{"pk": {"S": "k"}}]}}}),
);
assert!(svc.batch_get_item(&req).is_err());
}
#[test]
fn batch_write_item_table_not_found() {
let svc = make_service();
let req = make_request(
"BatchWriteItem",
json!({"RequestItems": {"ghost": [{"PutRequest": {"Item": {"pk": {"S": "k"}}}}]}}),
);
assert!(svc.batch_write_item(&req).is_err());
}
#[test]
fn create_and_describe_global_table() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"CreateGlobalTable",
json!({
"GlobalTableName": "test-table",
"ReplicationGroup": [{"RegionName": "us-east-1"}, {"RegionName": "eu-west-1"}],
}),
);
svc.create_global_table(&req).unwrap();
let req = make_request(
"DescribeGlobalTable",
json!({"GlobalTableName": "test-table"}),
);
let resp = svc.describe_global_table(&req).unwrap();
let b = body_json(&resp);
assert!(b["GlobalTableDescription"].is_object());
}
#[test]
fn list_global_tables() {
let svc = make_service();
let req = make_request("ListGlobalTables", json!({}));
let resp = svc.list_global_tables(&req).unwrap();
let b = body_json(&resp);
assert!(b["GlobalTables"].as_array().is_some());
}
#[test]
fn create_and_list_backups() {
let svc = make_service();
create_test_table(&svc);
let req = make_request(
"CreateBackup",
json!({"TableName": "test-table", "BackupName": "bak1"}),
);
let resp = svc.create_backup(&req).unwrap();
let b = body_json(&resp);
assert!(b["BackupDetails"]["BackupArn"].as_str().is_some());
let req = make_request("ListBackups", json!({}));
let resp = svc.list_backups(&req).unwrap();
let b = body_json(&resp);
assert!(!b["BackupSummaries"].as_array().unwrap().is_empty());
}
#[test]
fn describe_import_not_found() {
let svc = make_service();
let req = make_request(
"DescribeImport",
json!({"ImportArn": "arn:aws:dynamodb:us-east-1:123:table/t/import/ghost"}),
);
assert!(svc.describe_import(&req).is_err());
}
#[test]
fn describe_export_not_found() {
let svc = make_service();
let req = make_request(
"DescribeExport",
json!({"ExportArn": "arn:aws:dynamodb:us-east-1:123:table/t/export/ghost"}),
);
assert!(svc.describe_export(&req).is_err());
}
#[test]
fn create_table_missing_name_errors() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
);
assert!(svc.create_table(&req).is_err());
}
#[test]
fn create_table_duplicate_errors() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "dup",
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
assert!(svc.create_table(&req).is_err());
}
#[test]
fn delete_table_missing_name_errors() {
let svc = make_service();
let req = make_request("DeleteTable", json!({}));
assert!(svc.delete_table(&req).is_err());
}
#[test]
fn delete_table_not_found_errors() {
let svc = make_service();
let req = make_request("DeleteTable", json!({"TableName": "ghost"}));
assert!(svc.delete_table(&req).is_err());
}
#[test]
fn describe_table_missing_name_errors() {
let svc = make_service();
let req = make_request("DescribeTable", json!({}));
assert!(svc.describe_table(&req).is_err());
}
#[test]
fn describe_table_not_found_errors() {
let svc = make_service();
let req = make_request("DescribeTable", json!({"TableName": "ghost"}));
assert!(svc.describe_table(&req).is_err());
}
#[test]
fn update_table_missing_name_errors() {
let svc = make_service();
let req = make_request("UpdateTable", json!({}));
assert!(svc.update_table(&req).is_err());
}
#[test]
fn update_table_not_found_errors() {
let svc = make_service();
let req = make_request("UpdateTable", json!({"TableName": "ghost"}));
assert!(svc.update_table(&req).is_err());
}
#[test]
fn list_tables_pagination() {
let svc = make_service();
for i in 0..5 {
let req = make_request(
"CreateTable",
json!({
"TableName": format!("pt{i}"),
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
}
let req = make_request("ListTables", json!({"Limit": 2}));
let resp = svc.list_tables(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["TableNames"].as_array().unwrap().len(), 2);
assert!(body["LastEvaluatedTableName"].is_string());
}
#[test]
fn list_tables_start_exclusive() {
let svc = make_service();
for i in 0..3 {
let req = make_request(
"CreateTable",
json!({
"TableName": format!("pt{i}"),
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
}
let req = make_request("ListTables", json!({"ExclusiveStartTableName": "pt0"}));
let resp = svc.list_tables(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
let names = body["TableNames"].as_array().unwrap();
assert!(!names.iter().any(|n| n == "pt0"));
}
#[test]
fn update_time_to_live_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"UpdateTimeToLive",
json!({
"TableName": "ghost",
"TimeToLiveSpecification": {"Enabled": true, "AttributeName": "ttl"}
}),
);
assert!(svc.update_time_to_live(&req).is_err());
}
#[test]
fn describe_time_to_live_unknown_table_errors() {
let svc = make_service();
let req = make_request("DescribeTimeToLive", json!({"TableName": "ghost"}));
assert!(svc.describe_time_to_live(&req).is_err());
}
#[test]
fn put_resource_policy_missing_policy_errors() {
let svc = make_service();
let req = make_request(
"CreateTable",
json!({
"TableName": "rp",
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
);
svc.create_table(&req).unwrap();
let req = make_request(
"PutResourcePolicy",
json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/rp"}),
);
assert!(svc.put_resource_policy(&req).is_err());
}
#[test]
fn get_resource_policy_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"GetResourcePolicy",
json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
);
assert!(svc.get_resource_policy(&req).is_err());
}
#[test]
fn tag_resource_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"TagResource",
json!({
"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost",
"Tags": [{"Key": "k", "Value": "v"}]
}),
);
assert!(svc.tag_resource(&req).is_err());
}
#[test]
fn list_tags_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"ListTagsOfResource",
json!({"ResourceArn": "arn:aws:dynamodb:us-east-1:123456789012:table/ghost"}),
);
assert!(svc.list_tags_of_resource(&req).is_err());
}
#[test]
fn create_backup_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"CreateBackup",
json!({"TableName": "ghost", "BackupName": "b1"}),
);
assert!(svc.create_backup(&req).is_err());
}
#[test]
fn delete_backup_not_found_errors() {
let svc = make_service();
let req = make_request(
"DeleteBackup",
json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
);
assert!(svc.delete_backup(&req).is_err());
}
#[test]
fn describe_backup_not_found_errors() {
let svc = make_service();
let req = make_request(
"DescribeBackup",
json!({"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"}),
);
assert!(svc.describe_backup(&req).is_err());
}
#[test]
fn restore_table_from_backup_not_found_errors() {
let svc = make_service();
let req = make_request(
"RestoreTableFromBackup",
json!({
"TargetTableName": "restored",
"BackupArn": "arn:aws:dynamodb:us-east-1:123:table/t/backup/ghost"
}),
);
assert!(svc.restore_table_from_backup(&req).is_err());
}
#[test]
fn update_continuous_backups_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"UpdateContinuousBackups",
json!({
"TableName": "ghost",
"PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": true}
}),
);
assert!(svc.update_continuous_backups(&req).is_err());
}
#[test]
fn put_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"PutItem",
json!({
"TableName": "ghost",
"Item": {"k": {"S": "v"}}
}),
);
assert!(svc.put_item(&req).is_err());
}
#[test]
fn put_item_missing_key_attribute_errors() {
let svc = make_service();
svc.create_table(&make_request(
"CreateTable",
json!({
"TableName": "pmk",
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
))
.unwrap();
let req = make_request(
"PutItem",
json!({
"TableName": "pmk",
"Item": {"other": {"S": "v"}}
}),
);
assert!(svc.put_item(&req).is_err());
}
#[test]
fn get_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"GetItem",
json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
);
assert!(svc.get_item(&req).is_err());
}
#[test]
fn delete_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"DeleteItem",
json!({"TableName": "ghost", "Key": {"k": {"S": "1"}}}),
);
assert!(svc.delete_item(&req).is_err());
}
#[test]
fn update_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"UpdateItem",
json!({
"TableName": "ghost",
"Key": {"k": {"S": "1"}},
"UpdateExpression": "SET x = :v",
"ExpressionAttributeValues": {":v": {"S": "val"}}
}),
);
assert!(svc.update_item(&req).is_err());
}
#[test]
fn query_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"Query",
json!({
"TableName": "ghost",
"KeyConditionExpression": "k = :v",
"ExpressionAttributeValues": {":v": {"S": "x"}}
}),
);
assert!(svc.query(&req).is_err());
}
#[test]
fn scan_unknown_table_errors() {
let svc = make_service();
let req = make_request("Scan", json!({"TableName": "ghost"}));
assert!(svc.scan(&req).is_err());
}
#[test]
fn scan_with_limit_returns_ok() {
let svc = make_service();
svc.create_table(&make_request(
"CreateTable",
json!({
"TableName": "slt",
"AttributeDefinitions": [{"AttributeName": "k", "AttributeType": "S"}],
"KeySchema": [{"AttributeName": "k", "KeyType": "HASH"}],
"BillingMode": "PAY_PER_REQUEST"
}),
))
.unwrap();
for i in 0..5 {
svc.put_item(&make_request(
"PutItem",
json!({
"TableName": "slt",
"Item": {"k": {"S": format!("key-{i}")}}
}),
))
.unwrap();
}
let req = make_request("Scan", json!({"TableName": "slt", "Limit": 2}));
let resp = svc.scan(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert_eq!(body["Count"], 2);
}
#[test]
fn batch_get_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"BatchGetItem",
json!({
"RequestItems": {
"ghost": {"Keys": [{"k": {"S": "1"}}]}
}
}),
);
assert!(svc.batch_get_item(&req).is_err());
}
#[test]
fn batch_write_item_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"BatchWriteItem",
json!({
"RequestItems": {
"ghost": [{"PutRequest": {"Item": {"k": {"S": "1"}}}}]
}
}),
);
assert!(svc.batch_write_item(&req).is_err());
}
#[test]
fn transact_write_items_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"TransactWriteItems",
json!({
"TransactItems": [{
"Put": {"TableName": "ghost", "Item": {"k": {"S": "1"}}}
}]
}),
);
assert!(svc.transact_write_items(&req).is_err());
}
#[test]
fn transact_get_items_unknown_table_errors() {
let svc = make_service();
let req = make_request(
"TransactGetItems",
json!({
"TransactItems": [{
"Get": {"TableName": "ghost", "Key": {"k": {"S": "1"}}}
}]
}),
);
assert!(svc.transact_get_items(&req).is_err());
}
#[test]
fn describe_global_table_not_found_b() {
let svc = make_service();
let req = make_request("DescribeGlobalTable", json!({"GlobalTableName": "ghost"}));
assert!(svc.describe_global_table(&req).is_err());
}
#[test]
fn list_global_tables_empty_ok() {
let svc = make_service();
let req = make_request("ListGlobalTables", json!({}));
let resp = svc.list_global_tables(&req).unwrap();
let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
assert!(body["GlobalTables"].is_array());
}
#[test]
fn split_on_top_level_keyword_between_swallows_inner_and() {
let parts = split_on_top_level_keyword("x = :a AND y BETWEEN :lo AND :hi", "AND");
assert_eq!(
parts.len(),
2,
"BETWEEN's inner AND must not split; got parts = {parts:?}"
);
}
#[test]
fn split_on_top_level_keyword_between_nested_parens() {
let parts = split_on_top_level_keyword("(x = :a) AND (y BETWEEN :lo AND :hi)", "AND");
assert_eq!(parts.len(), 2);
}
#[test]
fn split_on_top_level_keyword_whitespace_variants() {
for expr in [
"x = :a AND y = :b",
"x=:a AND y=:b",
" x = :a AND y = :b ",
"x\t=\t:a\tAND\ty\t=\t:b",
"x = :a\nAND\ny = :b",
] {
let parts = split_on_top_level_keyword(expr, "AND");
assert_eq!(parts.len(), 2, "whitespace variant failed: {expr:?}");
}
}
#[test]
fn split_on_top_level_keyword_case_insensitive() {
let parts = split_on_top_level_keyword("x = :a and y = :b", "AND");
assert_eq!(parts.len(), 2);
let parts = split_on_top_level_keyword("x = :a OR y = :b", "OR");
assert_eq!(parts.len(), 2);
}
#[test]
fn split_on_top_level_keyword_does_not_match_inside_identifiers() {
let parts = split_on_top_level_keyword("land = :a", "AND");
assert_eq!(parts.len(), 1);
}
}