Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25    AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26    DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
30///
31/// A table can carry megabytes of items; cloning the whole table just to
32/// release the write lock and deliver one change record is extremely wasteful.
33/// Extracting only the fields the delivery path actually reads (destinations,
34/// arn, name) keeps the clone small.
35pub(super) struct KinesisDeliveryTarget {
36    pub destinations: Vec<KinesisDestination>,
37    pub arn: String,
38    pub name: String,
39}
40
41/// Operation flavor for the per-item KMS audit-trail emitter. Reads
42/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
43/// `GenerateDataKey`, mirroring AWS's audit shape.
44pub(crate) enum TableKmsOp {
45    Read,
46    Write,
47}
48
49pub struct DynamoDbService {
50    state: SharedDynamoDbState,
51    pub(crate) s3_state: Option<SharedS3State>,
52    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
53    delivery: Option<Arc<DeliveryBus>>,
54    snapshot_store: Option<Arc<dyn SnapshotStore>>,
55    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
56    pub(crate) region: String,
57    /// Serializes concurrent snapshot writes so the newest observed
58    /// state always wins on disk. Without it, two tasks could race
59    /// between state.read().clone() and store.save() and leave older
60    /// bytes as the final on-disk state.
61    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
62}
63
64impl DynamoDbService {
65    pub fn new(state: SharedDynamoDbState) -> Self {
66        Self {
67            state,
68            s3_state: None,
69            s3_store: None,
70            delivery: None,
71            snapshot_store: None,
72            kms_hook: None,
73            region: "us-east-1".to_string(),
74            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
75        }
76    }
77
78    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
79        self.s3_state = Some(s3_state);
80        self
81    }
82
83    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
84        self.s3_store = Some(store);
85        self
86    }
87
88    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
89        self.delivery = Some(delivery);
90        self
91    }
92
93    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94        self.snapshot_store = Some(store);
95        self
96    }
97
98    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
99        self.kms_hook = Some(hook);
100        self
101    }
102
103    pub fn with_region(mut self, region: impl Into<String>) -> Self {
104        self.region = region.into();
105        self
106    }
107
108    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
109    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
110    /// item bodies are nested attribute maps — encrypting them in
111    /// fakecloud would balloon scope without adding test coverage that
112    /// users actually want, so we just emit the audit-trail records the
113    /// AWS API produces and let callers assert KMS usage via
114    /// `/_fakecloud/kms/usage`.
115    pub(crate) fn record_table_kms_usage(
116        &self,
117        account_id: &str,
118        table_arn: &str,
119        kms_key_arn: Option<&str>,
120        operation: TableKmsOp,
121    ) {
122        let Some(hook) = &self.kms_hook else { return };
123        let key = kms_key_arn
124            .filter(|k| !k.is_empty())
125            .unwrap_or("aws/dynamodb");
126        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
127        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
128        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
129        // ends with `:table/<name>`, so derive the name from it.
130        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
131        let mut ctx = std::collections::HashMap::new();
132        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
133        ctx.insert(
134            "aws:dynamodb:subscriberId".to_string(),
135            account_id.to_string(),
136        );
137        let envelope = match hook.encrypt(
138            account_id,
139            &self.region,
140            key,
141            b"ddb-item",
142            "dynamodb.amazonaws.com",
143            ctx.clone(),
144        ) {
145            Ok(env) => env,
146            Err(_) => return,
147        };
148        if matches!(operation, TableKmsOp::Read) {
149            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
150        }
151    }
152
153    /// Persist the current in-memory state as a snapshot. Called after
154    /// every state-mutating action. A noop when no snapshot store is
155    /// configured (i.e. `StorageMode::Memory`).
156    ///
157    /// The snapshot lock serializes the full clone + serialize + write
158    /// so concurrent mutators cannot leave older bytes on disk, and
159    /// serialization + the blocking file write are offloaded to the
160    /// blocking pool to keep Tokio workers responsive.
161    async fn save_snapshot(&self) {
162        let Some(store) = self.snapshot_store.clone() else {
163            return;
164        };
165        let _guard = self.snapshot_lock.lock().await;
166        let snapshot = DynamoDbSnapshot {
167            schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
168            accounts: Some(self.state.read().clone()),
169            state: None,
170        };
171        let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
172            let bytes = serde_json::to_vec(&snapshot)
173                .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
174            store.save(&bytes)
175        })
176        .await;
177        match join {
178            Ok(Ok(())) => {}
179            Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
180            Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
181        }
182    }
183
184    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
185        if table
186            .kinesis_destinations
187            .iter()
188            .any(|d| d.destination_status == "ACTIVE")
189        {
190            Some(KinesisDeliveryTarget {
191                destinations: table.kinesis_destinations.clone(),
192                arn: table.arn.clone(),
193                name: table.name.clone(),
194            })
195        } else {
196            None
197        }
198    }
199
200    /// Deliver a change record to all active Kinesis streaming destinations for a table.
201    pub(super) fn deliver_to_kinesis_destinations(
202        &self,
203        target: &KinesisDeliveryTarget,
204        event_name: &str,
205        keys: &HashMap<String, AttributeValue>,
206        old_image: Option<&HashMap<String, AttributeValue>>,
207        new_image: Option<&HashMap<String, AttributeValue>>,
208    ) {
209        let delivery = match &self.delivery {
210            Some(d) => d,
211            None => return,
212        };
213
214        let active_destinations: Vec<_> = target
215            .destinations
216            .iter()
217            .filter(|d| d.destination_status == "ACTIVE")
218            .collect();
219
220        if active_destinations.is_empty() {
221            return;
222        }
223
224        let mut record = json!({
225            "eventID": uuid::Uuid::new_v4().to_string(),
226            "eventName": event_name,
227            "eventVersion": "1.1",
228            "eventSource": "aws:dynamodb",
229            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
230            "dynamodb": {
231                "Keys": keys,
232                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
233                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
234                "StreamViewType": "NEW_AND_OLD_IMAGES",
235            },
236            "eventSourceARN": &target.arn,
237            "tableName": &target.name,
238        });
239
240        if let Some(old) = old_image {
241            record["dynamodb"]["OldImage"] = json!(old);
242        }
243        if let Some(new) = new_image {
244            record["dynamodb"]["NewImage"] = json!(new);
245        }
246
247        let record_str = serde_json::to_string(&record).unwrap_or_default();
248        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
249        let partition_key = serde_json::to_string(keys).unwrap_or_default();
250
251        for dest in active_destinations {
252            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
253        }
254    }
255
256    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
257        serde_json::from_slice(&req.body).map_err(|e| {
258            AwsServiceError::aws_error(
259                StatusCode::BAD_REQUEST,
260                "SerializationException",
261                format!("Invalid JSON: {e}"),
262            )
263        })
264    }
265
266    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
267        Ok(AwsResponse::ok_json(body))
268    }
269}
270
271#[async_trait]
272impl AwsService for DynamoDbService {
273    fn service_name(&self) -> &str {
274        "dynamodb"
275    }
276
277    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
278        // Avoid parsing the body for ops where the action alone tells us
279        // they mutate (or don't). Only PartiQL ops need statement
280        // inspection.
281        let mutates = if is_mutating_action(req.action.as_str()) {
282            true
283        } else if matches!(
284            req.action.as_str(),
285            "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
286        ) {
287            is_mutating_request(req.action.as_str(), &req.json_body())
288        } else {
289            false
290        };
291        let result = match req.action.as_str() {
292            "CreateTable" => self.create_table(&req),
293            "DeleteTable" => self.delete_table(&req),
294            "DescribeTable" => self.describe_table(&req),
295            "ListTables" => self.list_tables(&req),
296            "UpdateTable" => self.update_table(&req),
297            "PutItem" => self.put_item(&req),
298            "GetItem" => self.get_item(&req),
299            "DeleteItem" => self.delete_item(&req),
300            "UpdateItem" => self.update_item(&req),
301            "Query" => self.query(&req),
302            "Scan" => self.scan(&req),
303            "BatchGetItem" => self.batch_get_item(&req),
304            "BatchWriteItem" => self.batch_write_item(&req),
305            "TagResource" => self.tag_resource(&req),
306            "UntagResource" => self.untag_resource(&req),
307            "ListTagsOfResource" => self.list_tags_of_resource(&req),
308            "TransactGetItems" => self.transact_get_items(&req),
309            "TransactWriteItems" => self.transact_write_items(&req),
310            "ExecuteStatement" => self.execute_statement(&req),
311            "BatchExecuteStatement" => self.batch_execute_statement(&req),
312            "ExecuteTransaction" => self.execute_transaction(&req),
313            "UpdateTimeToLive" => self.update_time_to_live(&req),
314            "DescribeTimeToLive" => self.describe_time_to_live(&req),
315            "PutResourcePolicy" => self.put_resource_policy(&req),
316            "GetResourcePolicy" => self.get_resource_policy(&req),
317            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
318            // Synthetic defaults (no DAX endpoint discovery / no real per-account quotas tracked)
319            "DescribeEndpoints" => self.describe_endpoints(&req),
320            "DescribeLimits" => self.describe_limits(&req),
321            // Backups
322            "CreateBackup" => self.create_backup(&req),
323            "DeleteBackup" => self.delete_backup(&req),
324            "DescribeBackup" => self.describe_backup(&req),
325            "ListBackups" => self.list_backups(&req),
326            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
327            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
328            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
329            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
330            // Global tables
331            "CreateGlobalTable" => self.create_global_table(&req),
332            "DescribeGlobalTable" => self.describe_global_table(&req),
333            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
334            "ListGlobalTables" => self.list_global_tables(&req),
335            "UpdateGlobalTable" => self.update_global_table(&req),
336            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
337            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
338            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
339            // Kinesis streaming
340            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
341            "DisableKinesisStreamingDestination" => {
342                self.disable_kinesis_streaming_destination(&req)
343            }
344            "DescribeKinesisStreamingDestination" => {
345                self.describe_kinesis_streaming_destination(&req)
346            }
347            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
348            // Contributor insights
349            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
350            "UpdateContributorInsights" => self.update_contributor_insights(&req),
351            "ListContributorInsights" => self.list_contributor_insights(&req),
352            // Import/Export
353            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
354            "DescribeExport" => self.describe_export(&req),
355            "ListExports" => self.list_exports(&req),
356            "ImportTable" => self.import_table(&req),
357            "DescribeImport" => self.describe_import(&req),
358            "ListImports" => self.list_imports(&req),
359            _ => Err(AwsServiceError::action_not_implemented(
360                "dynamodb",
361                &req.action,
362            )),
363        };
364        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
365            self.save_snapshot().await;
366        }
367        result
368    }
369
370    fn supported_actions(&self) -> &[&str] {
371        &[
372            "CreateTable",
373            "DeleteTable",
374            "DescribeTable",
375            "ListTables",
376            "UpdateTable",
377            "PutItem",
378            "GetItem",
379            "DeleteItem",
380            "UpdateItem",
381            "Query",
382            "Scan",
383            "BatchGetItem",
384            "BatchWriteItem",
385            "TagResource",
386            "UntagResource",
387            "ListTagsOfResource",
388            "TransactGetItems",
389            "TransactWriteItems",
390            "ExecuteStatement",
391            "BatchExecuteStatement",
392            "ExecuteTransaction",
393            "UpdateTimeToLive",
394            "DescribeTimeToLive",
395            "PutResourcePolicy",
396            "GetResourcePolicy",
397            "DeleteResourcePolicy",
398            "DescribeEndpoints",
399            "DescribeLimits",
400            "CreateBackup",
401            "DeleteBackup",
402            "DescribeBackup",
403            "ListBackups",
404            "RestoreTableFromBackup",
405            "RestoreTableToPointInTime",
406            "UpdateContinuousBackups",
407            "DescribeContinuousBackups",
408            "CreateGlobalTable",
409            "DescribeGlobalTable",
410            "DescribeGlobalTableSettings",
411            "ListGlobalTables",
412            "UpdateGlobalTable",
413            "UpdateGlobalTableSettings",
414            "DescribeTableReplicaAutoScaling",
415            "UpdateTableReplicaAutoScaling",
416            "EnableKinesisStreamingDestination",
417            "DisableKinesisStreamingDestination",
418            "DescribeKinesisStreamingDestination",
419            "UpdateKinesisStreamingDestination",
420            "DescribeContributorInsights",
421            "UpdateContributorInsights",
422            "ListContributorInsights",
423            "ExportTableToPointInTime",
424            "DescribeExport",
425            "ListExports",
426            "ImportTable",
427            "DescribeImport",
428            "ListImports",
429        ]
430    }
431}
432
433mod helpers;
434pub(crate) use helpers::*;
435
436#[cfg(test)]
437mod tests;