mod batch;
#[cfg(test)]
mod expression_corpus_tests;
mod global_tables;
mod items;
mod queries;
mod streams;
mod tables;
use std::collections::HashMap;
use std::io;
use std::sync::Arc;
use async_trait::async_trait;
use base64::Engine;
use http::StatusCode;
use serde_json::{json, Value};
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
use fakecloud_persistence::{S3Store, SnapshotStore};
use fakecloud_s3::SharedS3State;
use crate::state::{
AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
};
#[derive(Clone)]
pub(crate) struct KinesisDeliveryTarget {
pub destinations: Vec<KinesisDestination>,
pub arn: String,
pub name: String,
}
pub(crate) fn kinesis_target_for(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
if table
.kinesis_destinations
.iter()
.any(|d| d.destination_status == "ACTIVE")
{
Some(KinesisDeliveryTarget {
destinations: table.kinesis_destinations.clone(),
arn: table.arn.clone(),
name: table.name.clone(),
})
} else {
None
}
}
struct TransactIdempotencyEntry {
stored_at: std::time::Instant,
request_hash: u64,
response: Value,
}
const TRANSACT_IDEMPOTENCY_WINDOW: std::time::Duration = std::time::Duration::from_secs(600);
pub(crate) enum TableKmsOp {
Read,
Write,
}
pub struct DynamoDbService {
state: SharedDynamoDbState,
pub(crate) s3_state: Option<SharedS3State>,
pub(crate) s3_store: Option<Arc<dyn S3Store>>,
delivery: Option<Arc<DeliveryBus>>,
snapshot_store: Option<Arc<dyn SnapshotStore>>,
pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
pub(crate) region: String,
snapshot_lock: Arc<tokio::sync::Mutex<()>>,
transact_idempotency:
Arc<parking_lot::Mutex<HashMap<(String, String), TransactIdempotencyEntry>>>,
}
impl DynamoDbService {
pub fn new(state: SharedDynamoDbState) -> Self {
Self {
state,
s3_state: None,
s3_store: None,
delivery: None,
snapshot_store: None,
kms_hook: None,
region: "us-east-1".to_string(),
snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
transact_idempotency: Arc::new(parking_lot::Mutex::new(HashMap::new())),
}
}
pub(crate) fn transact_idempotency_lookup(
&self,
account_id: &str,
token: &str,
request_hash: u64,
) -> Result<Option<AwsResponse>, AwsServiceError> {
let mut cache = self.transact_idempotency.lock();
cache.retain(|_, e| e.stored_at.elapsed() < TRANSACT_IDEMPOTENCY_WINDOW);
match cache.get(&(account_id.to_string(), token.to_string())) {
Some(entry) if entry.request_hash == request_hash => {
Ok(Some(AwsResponse::ok_json(entry.response.clone())))
}
Some(_) => Err(AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"IdempotentParameterMismatchException",
"Request parameters do not match the parameters of a previous \
request with the same client request token",
)),
None => Ok(None),
}
}
pub(crate) fn transact_idempotency_store(
&self,
account_id: &str,
token: &str,
request_hash: u64,
response: &Value,
) {
self.transact_idempotency.lock().insert(
(account_id.to_string(), token.to_string()),
TransactIdempotencyEntry {
stored_at: std::time::Instant::now(),
request_hash,
response: response.clone(),
},
);
}
pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
self.s3_state = Some(s3_state);
self
}
pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
self.s3_store = Some(store);
self
}
pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
self.delivery = Some(delivery);
self
}
pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
self.snapshot_store = Some(store);
self
}
pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
self.kms_hook = Some(hook);
self
}
pub fn with_region(mut self, region: impl Into<String>) -> Self {
self.region = region.into();
self
}
pub(crate) fn record_table_kms_usage(
&self,
account_id: &str,
table_arn: &str,
kms_key_arn: Option<&str>,
operation: TableKmsOp,
) {
let Some(hook) = &self.kms_hook else { return };
let key = kms_key_arn
.filter(|k| !k.is_empty())
.unwrap_or("aws/dynamodb");
let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
let mut ctx = std::collections::HashMap::new();
ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
ctx.insert(
"aws:dynamodb:subscriberId".to_string(),
account_id.to_string(),
);
let envelope = match hook.encrypt(
account_id,
&self.region,
key,
b"ddb-item",
"dynamodb.amazonaws.com",
ctx.clone(),
) {
Ok(env) => env,
Err(_) => return,
};
if matches!(operation, TableKmsOp::Read) {
let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
}
}
pub async fn save_snapshot_to_store(&self, store: Arc<dyn SnapshotStore>) -> io::Result<()> {
save_dynamodb_snapshot(&self.state, Some(store), &self.snapshot_lock)
.await
.map(|_| ())
}
pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
let store = self.snapshot_store.clone()?;
let state = self.state.clone();
let lock = self.snapshot_lock.clone();
Some(Arc::new(move || {
let state = state.clone();
let store = store.clone();
let lock = lock.clone();
Box::pin(async move {
if let Err(err) = save_dynamodb_snapshot(&state, Some(store), &lock).await {
tracing::error!(%err, "dynamodb snapshot save failed");
}
})
}))
}
pub async fn save_snapshot(&self) -> io::Result<bool> {
save_dynamodb_snapshot(
&self.state,
self.snapshot_store.clone(),
&self.snapshot_lock,
)
.await
}
fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
kinesis_target_for(table)
}
pub(super) fn deliver_to_kinesis_destinations(
&self,
target: &KinesisDeliveryTarget,
event_name: &str,
keys: &HashMap<String, AttributeValue>,
old_image: Option<&HashMap<String, AttributeValue>>,
new_image: Option<&HashMap<String, AttributeValue>>,
) {
let delivery = match &self.delivery {
Some(d) => d,
None => return,
};
deliver_kinesis_change(
delivery, target, event_name, keys, old_image, new_image, None,
);
}
fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
serde_json::from_slice(&req.body).map_err(|e| {
AwsServiceError::aws_error(
StatusCode::BAD_REQUEST,
"SerializationException",
format!("Invalid JSON: {e}"),
)
})
}
fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
Ok(AwsResponse::ok_json(body))
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn deliver_kinesis_change(
delivery: &DeliveryBus,
target: &KinesisDeliveryTarget,
event_name: &str,
keys: &HashMap<String, AttributeValue>,
old_image: Option<&HashMap<String, AttributeValue>>,
new_image: Option<&HashMap<String, AttributeValue>>,
user_identity: Option<&crate::state::StreamUserIdentity>,
) {
let active_destinations: Vec<_> = target
.destinations
.iter()
.filter(|d| d.destination_status == "ACTIVE")
.collect();
if active_destinations.is_empty() {
return;
}
let mut record = json!({
"eventID": uuid::Uuid::new_v4().to_string(),
"eventName": event_name,
"eventVersion": "1.1",
"eventSource": "aws:dynamodb",
"awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
"dynamodb": {
"Keys": keys,
"SequenceNumber": crate::streams::next_stream_sequence(),
"SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
"eventSourceARN": &target.arn,
"tableName": &target.name,
});
if let Some(old) = old_image {
record["dynamodb"]["OldImage"] = json!(old);
}
if let Some(new) = new_image {
record["dynamodb"]["NewImage"] = json!(new);
}
if let Some(ui) = user_identity {
record["userIdentity"] = json!({
"principalId": ui.principal_id,
"type": ui.identity_type,
});
}
let record_str = serde_json::to_string(&record).unwrap_or_default();
let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
let partition_key = serde_json::to_string(keys).unwrap_or_default();
for dest in active_destinations {
delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
}
}
pub async fn save_dynamodb_snapshot(
state: &SharedDynamoDbState,
store: Option<Arc<dyn SnapshotStore>>,
lock: &tokio::sync::Mutex<()>,
) -> io::Result<bool> {
let Some(store) = store else {
return Ok(false);
};
let _guard = lock.lock().await;
let snapshot = DynamoDbSnapshot {
schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
accounts: Some(state.read().clone()),
state: None,
};
let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
let bytes = serde_json::to_vec(&snapshot)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
store.save(&bytes)
})
.await;
match join {
Ok(Ok(())) => Ok(true),
Ok(Err(err)) => Err(io::Error::new(
err.kind(),
format!("failed to write dynamodb snapshot: {err}"),
)),
Err(err) => Err(io::Error::other(format!(
"dynamodb snapshot task panicked: {err}"
))),
}
}
#[async_trait]
impl AwsService for DynamoDbService {
fn service_name(&self) -> &str {
"dynamodb"
}
async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
let mutates = if is_mutating_action(req.action.as_str()) {
true
} else if matches!(
req.action.as_str(),
"ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
) {
is_mutating_request(req.action.as_str(), &req.json_body())
} else {
false
};
let result = match req.action.as_str() {
"CreateTable" => self.create_table(&req),
"DeleteTable" => self.delete_table(&req),
"DescribeTable" => self.describe_table(&req),
"ListTables" => self.list_tables(&req),
"UpdateTable" => self.update_table(&req),
"PutItem" => self.put_item(&req),
"GetItem" => self.get_item(&req),
"DeleteItem" => self.delete_item(&req),
"UpdateItem" => self.update_item(&req),
"Query" => self.query(&req),
"Scan" => self.scan(&req),
"BatchGetItem" => self.batch_get_item(&req),
"BatchWriteItem" => self.batch_write_item(&req),
"TagResource" => self.tag_resource(&req),
"UntagResource" => self.untag_resource(&req),
"ListTagsOfResource" => self.list_tags_of_resource(&req),
"TransactGetItems" => self.transact_get_items(&req),
"TransactWriteItems" => self.transact_write_items(&req),
"ExecuteStatement" => self.execute_statement(&req),
"BatchExecuteStatement" => self.batch_execute_statement(&req),
"ExecuteTransaction" => self.execute_transaction(&req),
"UpdateTimeToLive" => self.update_time_to_live(&req),
"DescribeTimeToLive" => self.describe_time_to_live(&req),
"PutResourcePolicy" => self.put_resource_policy(&req),
"GetResourcePolicy" => self.get_resource_policy(&req),
"DeleteResourcePolicy" => self.delete_resource_policy(&req),
"DescribeEndpoints" => self.describe_endpoints(&req),
"DescribeLimits" => self.describe_limits(&req),
"CreateBackup" => self.create_backup(&req),
"DeleteBackup" => self.delete_backup(&req),
"DescribeBackup" => self.describe_backup(&req),
"ListBackups" => self.list_backups(&req),
"RestoreTableFromBackup" => self.restore_table_from_backup(&req),
"RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
"UpdateContinuousBackups" => self.update_continuous_backups(&req),
"DescribeContinuousBackups" => self.describe_continuous_backups(&req),
"CreateGlobalTable" => self.create_global_table(&req),
"DescribeGlobalTable" => self.describe_global_table(&req),
"DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
"ListGlobalTables" => self.list_global_tables(&req),
"UpdateGlobalTable" => self.update_global_table(&req),
"UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
"DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
"UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
"EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
"DisableKinesisStreamingDestination" => {
self.disable_kinesis_streaming_destination(&req)
}
"DescribeKinesisStreamingDestination" => {
self.describe_kinesis_streaming_destination(&req)
}
"UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
"DescribeContributorInsights" => self.describe_contributor_insights(&req),
"UpdateContributorInsights" => self.update_contributor_insights(&req),
"ListContributorInsights" => self.list_contributor_insights(&req),
"ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
"DescribeExport" => self.describe_export(&req),
"ListExports" => self.list_exports(&req),
"ImportTable" => self.import_table(&req),
"DescribeImport" => self.describe_import(&req),
"ListImports" => self.list_imports(&req),
_ => Err(AwsServiceError::action_not_implemented(
"dynamodb",
&req.action,
)),
};
if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
if let Err(err) = self.save_snapshot().await {
tracing::error!(%err, "dynamodb snapshot save failed");
}
}
result
}
fn supported_actions(&self) -> &[&str] {
&[
"CreateTable",
"DeleteTable",
"DescribeTable",
"ListTables",
"UpdateTable",
"PutItem",
"GetItem",
"DeleteItem",
"UpdateItem",
"Query",
"Scan",
"BatchGetItem",
"BatchWriteItem",
"TagResource",
"UntagResource",
"ListTagsOfResource",
"TransactGetItems",
"TransactWriteItems",
"ExecuteStatement",
"BatchExecuteStatement",
"ExecuteTransaction",
"UpdateTimeToLive",
"DescribeTimeToLive",
"PutResourcePolicy",
"GetResourcePolicy",
"DeleteResourcePolicy",
"DescribeEndpoints",
"DescribeLimits",
"CreateBackup",
"DeleteBackup",
"DescribeBackup",
"ListBackups",
"RestoreTableFromBackup",
"RestoreTableToPointInTime",
"UpdateContinuousBackups",
"DescribeContinuousBackups",
"CreateGlobalTable",
"DescribeGlobalTable",
"DescribeGlobalTableSettings",
"ListGlobalTables",
"UpdateGlobalTable",
"UpdateGlobalTableSettings",
"DescribeTableReplicaAutoScaling",
"UpdateTableReplicaAutoScaling",
"EnableKinesisStreamingDestination",
"DisableKinesisStreamingDestination",
"DescribeKinesisStreamingDestination",
"UpdateKinesisStreamingDestination",
"DescribeContributorInsights",
"UpdateContributorInsights",
"ListContributorInsights",
"ExportTableToPointInTime",
"DescribeExport",
"ListExports",
"ImportTable",
"DescribeImport",
"ListImports",
]
}
}
pub(crate) mod helpers;
pub(crate) use helpers::*;
#[cfg(test)]
mod tests;