Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25    AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26    DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
30///
31/// A table can carry megabytes of items; cloning the whole table just to
32/// release the write lock and deliver one change record is extremely wasteful.
33/// Extracting only the fields the delivery path actually reads (destinations,
34/// arn, name) keeps the clone small.
35pub(super) struct KinesisDeliveryTarget {
36    pub destinations: Vec<KinesisDestination>,
37    pub arn: String,
38    pub name: String,
39}
40
41/// Operation flavor for the per-item KMS audit-trail emitter. Reads
42/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
43/// `GenerateDataKey`, mirroring AWS's audit shape.
44pub(crate) enum TableKmsOp {
45    Read,
46    Write,
47}
48
49pub struct DynamoDbService {
50    state: SharedDynamoDbState,
51    pub(crate) s3_state: Option<SharedS3State>,
52    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
53    delivery: Option<Arc<DeliveryBus>>,
54    snapshot_store: Option<Arc<dyn SnapshotStore>>,
55    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
56    pub(crate) region: String,
57    /// Serializes concurrent snapshot writes so the newest observed
58    /// state always wins on disk. Without it, two tasks could race
59    /// between state.read().clone() and store.save() and leave older
60    /// bytes as the final on-disk state.
61    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
62}
63
64impl DynamoDbService {
65    pub fn new(state: SharedDynamoDbState) -> Self {
66        Self {
67            state,
68            s3_state: None,
69            s3_store: None,
70            delivery: None,
71            snapshot_store: None,
72            kms_hook: None,
73            region: "us-east-1".to_string(),
74            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
75        }
76    }
77
78    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
79        self.s3_state = Some(s3_state);
80        self
81    }
82
83    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
84        self.s3_store = Some(store);
85        self
86    }
87
88    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
89        self.delivery = Some(delivery);
90        self
91    }
92
93    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94        self.snapshot_store = Some(store);
95        self
96    }
97
98    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
99        self.kms_hook = Some(hook);
100        self
101    }
102
103    pub fn with_region(mut self, region: impl Into<String>) -> Self {
104        self.region = region.into();
105        self
106    }
107
108    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
109    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
110    /// item bodies are nested attribute maps — encrypting them in
111    /// fakecloud would balloon scope without adding test coverage that
112    /// users actually want, so we just emit the audit-trail records the
113    /// AWS API produces and let callers assert KMS usage via
114    /// `/_fakecloud/kms/usage`.
115    pub(crate) fn record_table_kms_usage(
116        &self,
117        account_id: &str,
118        table_arn: &str,
119        kms_key_arn: Option<&str>,
120        operation: TableKmsOp,
121    ) {
122        let Some(hook) = &self.kms_hook else { return };
123        let key = kms_key_arn
124            .filter(|k| !k.is_empty())
125            .unwrap_or("aws/dynamodb");
126        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
127        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
128        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
129        // ends with `:table/<name>`, so derive the name from it.
130        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
133        ctx.insert(
134            "aws:dynamodb:subscriberId".to_string(),
135            account_id.to_string(),
136        );
137        let envelope = match hook.encrypt(
138            account_id,
139            &self.region,
140            key,
141            b"ddb-item",
142            "dynamodb.amazonaws.com",
143            ctx.clone(),
144        ) {
145            Ok(env) => env,
146            Err(_) => return,
147        };
148        if matches!(operation, TableKmsOp::Read) {
149            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
150        }
151    }
152
153    /// Persist the current in-memory state as a snapshot. Called after
154    /// every state-mutating action. A noop when no snapshot store is
155    /// configured (i.e. `StorageMode::Memory`).
156    ///
157    /// The snapshot lock serializes the full clone + serialize + write
158    /// so concurrent mutators cannot leave older bytes on disk, and
159    /// serialization + the blocking file write are offloaded to the
160    /// blocking pool to keep Tokio workers responsive.
161    async fn save_snapshot(&self) {
162        save_dynamodb_snapshot(
163            &self.state,
164            self.snapshot_store.clone(),
165            &self.snapshot_lock,
166        )
167        .await;
168    }
169
170    /// Build a hook that persists the current DynamoDB state when invoked, or
171    /// `None` in memory mode (no snapshot store). The CloudFormation
172    /// provisioner mutates `state` directly and uses this to write a
173    /// CFN-provisioned table through to disk, the same way a direct mutating
174    /// API call would.
175    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
176        let store = self.snapshot_store.clone()?;
177        let state = self.state.clone();
178        let lock = self.snapshot_lock.clone();
179        Some(Arc::new(move || {
180            let state = state.clone();
181            let store = store.clone();
182            let lock = lock.clone();
183            Box::pin(async move {
184                save_dynamodb_snapshot(&state, Some(store), &lock).await;
185            })
186        }))
187    }
188
189    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
190        if table
191            .kinesis_destinations
192            .iter()
193            .any(|d| d.destination_status == "ACTIVE")
194        {
195            Some(KinesisDeliveryTarget {
196                destinations: table.kinesis_destinations.clone(),
197                arn: table.arn.clone(),
198                name: table.name.clone(),
199            })
200        } else {
201            None
202        }
203    }
204
205    /// Deliver a change record to all active Kinesis streaming destinations for a table.
206    pub(super) fn deliver_to_kinesis_destinations(
207        &self,
208        target: &KinesisDeliveryTarget,
209        event_name: &str,
210        keys: &HashMap<String, AttributeValue>,
211        old_image: Option<&HashMap<String, AttributeValue>>,
212        new_image: Option<&HashMap<String, AttributeValue>>,
213    ) {
214        let delivery = match &self.delivery {
215            Some(d) => d,
216            None => return,
217        };
218
219        let active_destinations: Vec<_> = target
220            .destinations
221            .iter()
222            .filter(|d| d.destination_status == "ACTIVE")
223            .collect();
224
225        if active_destinations.is_empty() {
226            return;
227        }
228
229        let mut record = json!({
230            "eventID": uuid::Uuid::new_v4().to_string(),
231            "eventName": event_name,
232            "eventVersion": "1.1",
233            "eventSource": "aws:dynamodb",
234            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
235            "dynamodb": {
236                "Keys": keys,
237                // Use the shared atomic monotonic counter (not wall-clock
238                // nanoseconds): a single BatchWriteItem fires up to 25
239                // deliveries with no delay, which collide on coarse clocks
240                // and invert on NTP steps. bug-audit 2026-06-15, 4.5.
241                "SequenceNumber": crate::streams::next_stream_sequence(),
242                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
243                "StreamViewType": "NEW_AND_OLD_IMAGES",
244            },
245            "eventSourceARN": &target.arn,
246            "tableName": &target.name,
247        });
248
249        if let Some(old) = old_image {
250            record["dynamodb"]["OldImage"] = json!(old);
251        }
252        if let Some(new) = new_image {
253            record["dynamodb"]["NewImage"] = json!(new);
254        }
255
256        let record_str = serde_json::to_string(&record).unwrap_or_default();
257        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
258        let partition_key = serde_json::to_string(keys).unwrap_or_default();
259
260        for dest in active_destinations {
261            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
262        }
263    }
264
265    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
266        serde_json::from_slice(&req.body).map_err(|e| {
267            AwsServiceError::aws_error(
268                StatusCode::BAD_REQUEST,
269                "SerializationException",
270                format!("Invalid JSON: {e}"),
271            )
272        })
273    }
274
275    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
276        Ok(AwsResponse::ok_json(body))
277    }
278}
279
280/// Persist the current DynamoDB state as a snapshot. Offloads the serde +
281/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
282/// (memory mode). Shared by `DynamoDbService::save_snapshot` and the
283/// CloudFormation provisioner's post-provision persist hook so both route
284/// through the same serialize-and-write path.
285pub async fn save_dynamodb_snapshot(
286    state: &SharedDynamoDbState,
287    store: Option<Arc<dyn SnapshotStore>>,
288    lock: &tokio::sync::Mutex<()>,
289) {
290    let Some(store) = store else {
291        return;
292    };
293    let _guard = lock.lock().await;
294    let snapshot = DynamoDbSnapshot {
295        schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
296        accounts: Some(state.read().clone()),
297        state: None,
298    };
299    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
300        let bytes = serde_json::to_vec(&snapshot)
301            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
302        store.save(&bytes)
303    })
304    .await;
305    match join {
306        Ok(Ok(())) => {}
307        Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
308        Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
309    }
310}
311
312#[async_trait]
313impl AwsService for DynamoDbService {
314    fn service_name(&self) -> &str {
315        "dynamodb"
316    }
317
318    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
319        // Avoid parsing the body for ops where the action alone tells us
320        // they mutate (or don't). Only PartiQL ops need statement
321        // inspection.
322        let mutates = if is_mutating_action(req.action.as_str()) {
323            true
324        } else if matches!(
325            req.action.as_str(),
326            "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
327        ) {
328            is_mutating_request(req.action.as_str(), &req.json_body())
329        } else {
330            false
331        };
332        let result = match req.action.as_str() {
333            "CreateTable" => self.create_table(&req),
334            "DeleteTable" => self.delete_table(&req),
335            "DescribeTable" => self.describe_table(&req),
336            "ListTables" => self.list_tables(&req),
337            "UpdateTable" => self.update_table(&req),
338            "PutItem" => self.put_item(&req),
339            "GetItem" => self.get_item(&req),
340            "DeleteItem" => self.delete_item(&req),
341            "UpdateItem" => self.update_item(&req),
342            "Query" => self.query(&req),
343            "Scan" => self.scan(&req),
344            "BatchGetItem" => self.batch_get_item(&req),
345            "BatchWriteItem" => self.batch_write_item(&req),
346            "TagResource" => self.tag_resource(&req),
347            "UntagResource" => self.untag_resource(&req),
348            "ListTagsOfResource" => self.list_tags_of_resource(&req),
349            "TransactGetItems" => self.transact_get_items(&req),
350            "TransactWriteItems" => self.transact_write_items(&req),
351            "ExecuteStatement" => self.execute_statement(&req),
352            "BatchExecuteStatement" => self.batch_execute_statement(&req),
353            "ExecuteTransaction" => self.execute_transaction(&req),
354            "UpdateTimeToLive" => self.update_time_to_live(&req),
355            "DescribeTimeToLive" => self.describe_time_to_live(&req),
356            "PutResourcePolicy" => self.put_resource_policy(&req),
357            "GetResourcePolicy" => self.get_resource_policy(&req),
358            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
359            // Synthetic defaults (no DAX endpoint discovery / no real per-account quotas tracked)
360            "DescribeEndpoints" => self.describe_endpoints(&req),
361            "DescribeLimits" => self.describe_limits(&req),
362            // Backups
363            "CreateBackup" => self.create_backup(&req),
364            "DeleteBackup" => self.delete_backup(&req),
365            "DescribeBackup" => self.describe_backup(&req),
366            "ListBackups" => self.list_backups(&req),
367            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
368            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
369            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
370            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
371            // Global tables
372            "CreateGlobalTable" => self.create_global_table(&req),
373            "DescribeGlobalTable" => self.describe_global_table(&req),
374            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
375            "ListGlobalTables" => self.list_global_tables(&req),
376            "UpdateGlobalTable" => self.update_global_table(&req),
377            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
378            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
379            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
380            // Kinesis streaming
381            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
382            "DisableKinesisStreamingDestination" => {
383                self.disable_kinesis_streaming_destination(&req)
384            }
385            "DescribeKinesisStreamingDestination" => {
386                self.describe_kinesis_streaming_destination(&req)
387            }
388            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
389            // Contributor insights
390            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
391            "UpdateContributorInsights" => self.update_contributor_insights(&req),
392            "ListContributorInsights" => self.list_contributor_insights(&req),
393            // Import/Export
394            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
395            "DescribeExport" => self.describe_export(&req),
396            "ListExports" => self.list_exports(&req),
397            "ImportTable" => self.import_table(&req),
398            "DescribeImport" => self.describe_import(&req),
399            "ListImports" => self.list_imports(&req),
400            _ => Err(AwsServiceError::action_not_implemented(
401                "dynamodb",
402                &req.action,
403            )),
404        };
405        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
406            self.save_snapshot().await;
407        }
408        result
409    }
410
411    fn supported_actions(&self) -> &[&str] {
412        &[
413            "CreateTable",
414            "DeleteTable",
415            "DescribeTable",
416            "ListTables",
417            "UpdateTable",
418            "PutItem",
419            "GetItem",
420            "DeleteItem",
421            "UpdateItem",
422            "Query",
423            "Scan",
424            "BatchGetItem",
425            "BatchWriteItem",
426            "TagResource",
427            "UntagResource",
428            "ListTagsOfResource",
429            "TransactGetItems",
430            "TransactWriteItems",
431            "ExecuteStatement",
432            "BatchExecuteStatement",
433            "ExecuteTransaction",
434            "UpdateTimeToLive",
435            "DescribeTimeToLive",
436            "PutResourcePolicy",
437            "GetResourcePolicy",
438            "DeleteResourcePolicy",
439            "DescribeEndpoints",
440            "DescribeLimits",
441            "CreateBackup",
442            "DeleteBackup",
443            "DescribeBackup",
444            "ListBackups",
445            "RestoreTableFromBackup",
446            "RestoreTableToPointInTime",
447            "UpdateContinuousBackups",
448            "DescribeContinuousBackups",
449            "CreateGlobalTable",
450            "DescribeGlobalTable",
451            "DescribeGlobalTableSettings",
452            "ListGlobalTables",
453            "UpdateGlobalTable",
454            "UpdateGlobalTableSettings",
455            "DescribeTableReplicaAutoScaling",
456            "UpdateTableReplicaAutoScaling",
457            "EnableKinesisStreamingDestination",
458            "DisableKinesisStreamingDestination",
459            "DescribeKinesisStreamingDestination",
460            "UpdateKinesisStreamingDestination",
461            "DescribeContributorInsights",
462            "UpdateContributorInsights",
463            "ListContributorInsights",
464            "ExportTableToPointInTime",
465            "DescribeExport",
466            "ListExports",
467            "ImportTable",
468            "DescribeImport",
469            "ListImports",
470        ]
471    }
472}
473
474pub(crate) mod helpers;
475pub(crate) use helpers::*;
476
477#[cfg(test)]
478mod tests;