Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25    AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26    DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
30///
31/// A table can carry megabytes of items; cloning the whole table just to
32/// release the write lock and deliver one change record is extremely wasteful.
33/// Extracting only the fields the delivery path actually reads (destinations,
34/// arn, name) keeps the clone small.
35#[derive(Clone)]
36pub(crate) struct KinesisDeliveryTarget {
37    pub destinations: Vec<KinesisDestination>,
38    pub arn: String,
39    pub name: String,
40}
41
42/// Build a Kinesis delivery target for a table when it has at least one active
43/// streaming destination. Free-function twin of
44/// [`DynamoDbService::kinesis_target`] so the TTL processor (no `&self`) can
45/// reuse it.
46pub(crate) fn kinesis_target_for(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
47    if table
48        .kinesis_destinations
49        .iter()
50        .any(|d| d.destination_status == "ACTIVE")
51    {
52        Some(KinesisDeliveryTarget {
53            destinations: table.kinesis_destinations.clone(),
54            arn: table.arn.clone(),
55            name: table.name.clone(),
56        })
57    } else {
58        None
59    }
60}
61
62/// A cached `TransactWriteItems` outcome, keyed by (account, client request
63/// token). AWS treats a retried transaction carrying the same
64/// `ClientRequestToken` (within a ~10 minute window) as the *same* request: it
65/// is applied at most once and the original response is replayed. Without this,
66/// a client-side retry re-applies the whole transaction (a non-idempotent
67/// `ADD` advances twice).
68struct TransactIdempotencyEntry {
69    /// When the original transaction committed; entries older than the window
70    /// are purged and treated as fresh.
71    stored_at: std::time::Instant,
72    /// Hash of the original request body, used to detect a token reused with
73    /// different parameters (AWS returns `IdempotentParameterMismatchException`).
74    request_hash: u64,
75    /// The exact JSON result returned for the original transaction.
76    response: Value,
77}
78
79/// The window for which a `ClientRequestToken` short-circuits a replay. AWS
80/// documents idempotency as lasting "a few minutes"; 10 minutes matches the
81/// commonly observed behavior.
82const TRANSACT_IDEMPOTENCY_WINDOW: std::time::Duration = std::time::Duration::from_secs(600);
83
84/// Operation flavor for the per-item KMS audit-trail emitter. Reads
85/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
86/// `GenerateDataKey`, mirroring AWS's audit shape.
87pub(crate) enum TableKmsOp {
88    Read,
89    Write,
90}
91
92pub struct DynamoDbService {
93    state: SharedDynamoDbState,
94    pub(crate) s3_state: Option<SharedS3State>,
95    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
96    delivery: Option<Arc<DeliveryBus>>,
97    snapshot_store: Option<Arc<dyn SnapshotStore>>,
98    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
99    pub(crate) region: String,
100    /// Serializes concurrent snapshot writes so the newest observed
101    /// state always wins on disk. Without it, two tasks could race
102    /// between state.read().clone() and store.save() and leave older
103    /// bytes as the final on-disk state.
104    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
105    /// Recent `TransactWriteItems` outcomes keyed by (account, ClientRequestToken)
106    /// for idempotent retry handling. In-memory only (lost on restart, which
107    /// matches AWS's short idempotency window).
108    transact_idempotency:
109        Arc<parking_lot::Mutex<HashMap<(String, String), TransactIdempotencyEntry>>>,
110}
111
112impl DynamoDbService {
113    pub fn new(state: SharedDynamoDbState) -> Self {
114        Self {
115            state,
116            s3_state: None,
117            s3_store: None,
118            delivery: None,
119            snapshot_store: None,
120            kms_hook: None,
121            region: "us-east-1".to_string(),
122            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
123            transact_idempotency: Arc::new(parking_lot::Mutex::new(HashMap::new())),
124        }
125    }
126
127    /// Look up a cached `TransactWriteItems` result for an idempotent retry.
128    ///
129    /// Returns `Ok(Some(response))` to replay the original result for a matching
130    /// (token, request) within the window, `Err(..)` with
131    /// `IdempotentParameterMismatchException` when the same token is reused with
132    /// a different body, and `Ok(None)` when this is a fresh request (no token,
133    /// expired entry, or first use).
134    pub(crate) fn transact_idempotency_lookup(
135        &self,
136        account_id: &str,
137        token: &str,
138        request_hash: u64,
139    ) -> Result<Option<AwsResponse>, AwsServiceError> {
140        let mut cache = self.transact_idempotency.lock();
141        // Drop entries past the window so the map cannot grow unbounded.
142        cache.retain(|_, e| e.stored_at.elapsed() < TRANSACT_IDEMPOTENCY_WINDOW);
143        match cache.get(&(account_id.to_string(), token.to_string())) {
144            Some(entry) if entry.request_hash == request_hash => {
145                Ok(Some(AwsResponse::ok_json(entry.response.clone())))
146            }
147            Some(_) => Err(AwsServiceError::aws_error(
148                StatusCode::BAD_REQUEST,
149                "IdempotentParameterMismatchException",
150                "Request parameters do not match the parameters of a previous \
151                 request with the same client request token",
152            )),
153            None => Ok(None),
154        }
155    }
156
157    /// Record a successful `TransactWriteItems` outcome for replay.
158    pub(crate) fn transact_idempotency_store(
159        &self,
160        account_id: &str,
161        token: &str,
162        request_hash: u64,
163        response: &Value,
164    ) {
165        self.transact_idempotency.lock().insert(
166            (account_id.to_string(), token.to_string()),
167            TransactIdempotencyEntry {
168                stored_at: std::time::Instant::now(),
169                request_hash,
170                response: response.clone(),
171            },
172        );
173    }
174
175    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
176        self.s3_state = Some(s3_state);
177        self
178    }
179
180    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
181        self.s3_store = Some(store);
182        self
183    }
184
185    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
186        self.delivery = Some(delivery);
187        self
188    }
189
190    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
191        self.snapshot_store = Some(store);
192        self
193    }
194
195    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
196        self.kms_hook = Some(hook);
197        self
198    }
199
200    pub fn with_region(mut self, region: impl Into<String>) -> Self {
201        self.region = region.into();
202        self
203    }
204
205    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
206    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
207    /// item bodies are nested attribute maps — encrypting them in
208    /// fakecloud would balloon scope without adding test coverage that
209    /// users actually want, so we just emit the audit-trail records the
210    /// AWS API produces and let callers assert KMS usage via
211    /// `/_fakecloud/kms/usage`.
212    pub(crate) fn record_table_kms_usage(
213        &self,
214        account_id: &str,
215        table_arn: &str,
216        kms_key_arn: Option<&str>,
217        operation: TableKmsOp,
218    ) {
219        let Some(hook) = &self.kms_hook else { return };
220        let key = kms_key_arn
221            .filter(|k| !k.is_empty())
222            .unwrap_or("aws/dynamodb");
223        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
224        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
225        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
226        // ends with `:table/<name>`, so derive the name from it.
227        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
228        let mut ctx = std::collections::HashMap::new();
229        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
230        ctx.insert(
231            "aws:dynamodb:subscriberId".to_string(),
232            account_id.to_string(),
233        );
234        let envelope = match hook.encrypt(
235            account_id,
236            &self.region,
237            key,
238            b"ddb-item",
239            "dynamodb.amazonaws.com",
240            ctx.clone(),
241        ) {
242            Ok(env) => env,
243            Err(_) => return,
244        };
245        if matches!(operation, TableKmsOp::Read) {
246            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
247        }
248    }
249
250    /// Persist the current in-memory state as a snapshot. Called after
251    /// every state-mutating action. A noop when no snapshot store is
252    /// configured (i.e. `StorageMode::Memory`).
253    ///
254    /// The snapshot lock serializes the full clone + serialize + write
255    /// so concurrent mutators cannot leave older bytes on disk, and
256    /// serialization + the blocking file write are offloaded to the
257    /// blocking pool to keep Tokio workers responsive.
258    async fn save_snapshot(&self) {
259        save_dynamodb_snapshot(
260            &self.state,
261            self.snapshot_store.clone(),
262            &self.snapshot_lock,
263        )
264        .await;
265    }
266
267    /// Build a hook that persists the current DynamoDB state when invoked, or
268    /// `None` in memory mode (no snapshot store). The CloudFormation
269    /// provisioner mutates `state` directly and uses this to write a
270    /// CFN-provisioned table through to disk, the same way a direct mutating
271    /// API call would.
272    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
273        let store = self.snapshot_store.clone()?;
274        let state = self.state.clone();
275        let lock = self.snapshot_lock.clone();
276        Some(Arc::new(move || {
277            let state = state.clone();
278            let store = store.clone();
279            let lock = lock.clone();
280            Box::pin(async move {
281                save_dynamodb_snapshot(&state, Some(store), &lock).await;
282            })
283        }))
284    }
285
286    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
287        kinesis_target_for(table)
288    }
289
290    /// Deliver a change record to all active Kinesis streaming destinations for a table.
291    pub(super) fn deliver_to_kinesis_destinations(
292        &self,
293        target: &KinesisDeliveryTarget,
294        event_name: &str,
295        keys: &HashMap<String, AttributeValue>,
296        old_image: Option<&HashMap<String, AttributeValue>>,
297        new_image: Option<&HashMap<String, AttributeValue>>,
298    ) {
299        let delivery = match &self.delivery {
300            Some(d) => d,
301            None => return,
302        };
303        deliver_kinesis_change(
304            delivery, target, event_name, keys, old_image, new_image, None,
305        );
306    }
307
308    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
309        serde_json::from_slice(&req.body).map_err(|e| {
310            AwsServiceError::aws_error(
311                StatusCode::BAD_REQUEST,
312                "SerializationException",
313                format!("Invalid JSON: {e}"),
314            )
315        })
316    }
317
318    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
319        Ok(AwsResponse::ok_json(body))
320    }
321}
322
323/// Build and dispatch a Kinesis change record to every active streaming
324/// destination of a table. Shared by [`DynamoDbService::deliver_to_kinesis_destinations`]
325/// and the TTL processor (which is a free function with no `&self`), so both
326/// emit the identical record shape. `user_identity` is set only for
327/// system-generated changes such as TTL expirations.
328#[allow(clippy::too_many_arguments)]
329pub(crate) fn deliver_kinesis_change(
330    delivery: &DeliveryBus,
331    target: &KinesisDeliveryTarget,
332    event_name: &str,
333    keys: &HashMap<String, AttributeValue>,
334    old_image: Option<&HashMap<String, AttributeValue>>,
335    new_image: Option<&HashMap<String, AttributeValue>>,
336    user_identity: Option<&crate::state::StreamUserIdentity>,
337) {
338    let active_destinations: Vec<_> = target
339        .destinations
340        .iter()
341        .filter(|d| d.destination_status == "ACTIVE")
342        .collect();
343
344    if active_destinations.is_empty() {
345        return;
346    }
347
348    let mut record = json!({
349        "eventID": uuid::Uuid::new_v4().to_string(),
350        "eventName": event_name,
351        "eventVersion": "1.1",
352        "eventSource": "aws:dynamodb",
353        "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
354        "dynamodb": {
355            "Keys": keys,
356            // Use the shared atomic monotonic counter (not wall-clock
357            // nanoseconds): a single BatchWriteItem fires up to 25
358            // deliveries with no delay, which collide on coarse clocks
359            // and invert on NTP steps. bug-audit 2026-06-15, 4.5.
360            "SequenceNumber": crate::streams::next_stream_sequence(),
361            "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
362            "StreamViewType": "NEW_AND_OLD_IMAGES",
363        },
364        "eventSourceARN": &target.arn,
365        "tableName": &target.name,
366    });
367
368    if let Some(old) = old_image {
369        record["dynamodb"]["OldImage"] = json!(old);
370    }
371    if let Some(new) = new_image {
372        record["dynamodb"]["NewImage"] = json!(new);
373    }
374    if let Some(ui) = user_identity {
375        record["userIdentity"] = json!({
376            "principalId": ui.principal_id,
377            "type": ui.identity_type,
378        });
379    }
380
381    let record_str = serde_json::to_string(&record).unwrap_or_default();
382    let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
383    let partition_key = serde_json::to_string(keys).unwrap_or_default();
384
385    for dest in active_destinations {
386        delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
387    }
388}
389
390/// Persist the current DynamoDB state as a snapshot. Offloads the serde +
391/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
392/// (memory mode). Shared by `DynamoDbService::save_snapshot` and the
393/// CloudFormation provisioner's post-provision persist hook so both route
394/// through the same serialize-and-write path.
395pub async fn save_dynamodb_snapshot(
396    state: &SharedDynamoDbState,
397    store: Option<Arc<dyn SnapshotStore>>,
398    lock: &tokio::sync::Mutex<()>,
399) {
400    let Some(store) = store else {
401        return;
402    };
403    let _guard = lock.lock().await;
404    let snapshot = DynamoDbSnapshot {
405        schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
406        accounts: Some(state.read().clone()),
407        state: None,
408    };
409    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410        let bytes = serde_json::to_vec(&snapshot)
411            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412        store.save(&bytes)
413    })
414    .await;
415    match join {
416        Ok(Ok(())) => {}
417        Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
418        Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
419    }
420}
421
422#[async_trait]
423impl AwsService for DynamoDbService {
424    fn service_name(&self) -> &str {
425        "dynamodb"
426    }
427
428    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
429        // Avoid parsing the body for ops where the action alone tells us
430        // they mutate (or don't). Only PartiQL ops need statement
431        // inspection.
432        let mutates = if is_mutating_action(req.action.as_str()) {
433            true
434        } else if matches!(
435            req.action.as_str(),
436            "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
437        ) {
438            is_mutating_request(req.action.as_str(), &req.json_body())
439        } else {
440            false
441        };
442        let result = match req.action.as_str() {
443            "CreateTable" => self.create_table(&req),
444            "DeleteTable" => self.delete_table(&req),
445            "DescribeTable" => self.describe_table(&req),
446            "ListTables" => self.list_tables(&req),
447            "UpdateTable" => self.update_table(&req),
448            "PutItem" => self.put_item(&req),
449            "GetItem" => self.get_item(&req),
450            "DeleteItem" => self.delete_item(&req),
451            "UpdateItem" => self.update_item(&req),
452            "Query" => self.query(&req),
453            "Scan" => self.scan(&req),
454            "BatchGetItem" => self.batch_get_item(&req),
455            "BatchWriteItem" => self.batch_write_item(&req),
456            "TagResource" => self.tag_resource(&req),
457            "UntagResource" => self.untag_resource(&req),
458            "ListTagsOfResource" => self.list_tags_of_resource(&req),
459            "TransactGetItems" => self.transact_get_items(&req),
460            "TransactWriteItems" => self.transact_write_items(&req),
461            "ExecuteStatement" => self.execute_statement(&req),
462            "BatchExecuteStatement" => self.batch_execute_statement(&req),
463            "ExecuteTransaction" => self.execute_transaction(&req),
464            "UpdateTimeToLive" => self.update_time_to_live(&req),
465            "DescribeTimeToLive" => self.describe_time_to_live(&req),
466            "PutResourcePolicy" => self.put_resource_policy(&req),
467            "GetResourcePolicy" => self.get_resource_policy(&req),
468            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
469            // Synthetic defaults (no DAX endpoint discovery / no real per-account quotas tracked)
470            "DescribeEndpoints" => self.describe_endpoints(&req),
471            "DescribeLimits" => self.describe_limits(&req),
472            // Backups
473            "CreateBackup" => self.create_backup(&req),
474            "DeleteBackup" => self.delete_backup(&req),
475            "DescribeBackup" => self.describe_backup(&req),
476            "ListBackups" => self.list_backups(&req),
477            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
478            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
479            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
480            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
481            // Global tables
482            "CreateGlobalTable" => self.create_global_table(&req),
483            "DescribeGlobalTable" => self.describe_global_table(&req),
484            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
485            "ListGlobalTables" => self.list_global_tables(&req),
486            "UpdateGlobalTable" => self.update_global_table(&req),
487            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
488            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
489            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
490            // Kinesis streaming
491            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
492            "DisableKinesisStreamingDestination" => {
493                self.disable_kinesis_streaming_destination(&req)
494            }
495            "DescribeKinesisStreamingDestination" => {
496                self.describe_kinesis_streaming_destination(&req)
497            }
498            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
499            // Contributor insights
500            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
501            "UpdateContributorInsights" => self.update_contributor_insights(&req),
502            "ListContributorInsights" => self.list_contributor_insights(&req),
503            // Import/Export
504            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
505            "DescribeExport" => self.describe_export(&req),
506            "ListExports" => self.list_exports(&req),
507            "ImportTable" => self.import_table(&req),
508            "DescribeImport" => self.describe_import(&req),
509            "ListImports" => self.list_imports(&req),
510            _ => Err(AwsServiceError::action_not_implemented(
511                "dynamodb",
512                &req.action,
513            )),
514        };
515        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
516            self.save_snapshot().await;
517        }
518        result
519    }
520
521    fn supported_actions(&self) -> &[&str] {
522        &[
523            "CreateTable",
524            "DeleteTable",
525            "DescribeTable",
526            "ListTables",
527            "UpdateTable",
528            "PutItem",
529            "GetItem",
530            "DeleteItem",
531            "UpdateItem",
532            "Query",
533            "Scan",
534            "BatchGetItem",
535            "BatchWriteItem",
536            "TagResource",
537            "UntagResource",
538            "ListTagsOfResource",
539            "TransactGetItems",
540            "TransactWriteItems",
541            "ExecuteStatement",
542            "BatchExecuteStatement",
543            "ExecuteTransaction",
544            "UpdateTimeToLive",
545            "DescribeTimeToLive",
546            "PutResourcePolicy",
547            "GetResourcePolicy",
548            "DeleteResourcePolicy",
549            "DescribeEndpoints",
550            "DescribeLimits",
551            "CreateBackup",
552            "DeleteBackup",
553            "DescribeBackup",
554            "ListBackups",
555            "RestoreTableFromBackup",
556            "RestoreTableToPointInTime",
557            "UpdateContinuousBackups",
558            "DescribeContinuousBackups",
559            "CreateGlobalTable",
560            "DescribeGlobalTable",
561            "DescribeGlobalTableSettings",
562            "ListGlobalTables",
563            "UpdateGlobalTable",
564            "UpdateGlobalTableSettings",
565            "DescribeTableReplicaAutoScaling",
566            "UpdateTableReplicaAutoScaling",
567            "EnableKinesisStreamingDestination",
568            "DisableKinesisStreamingDestination",
569            "DescribeKinesisStreamingDestination",
570            "UpdateKinesisStreamingDestination",
571            "DescribeContributorInsights",
572            "UpdateContributorInsights",
573            "ListContributorInsights",
574            "ExportTableToPointInTime",
575            "DescribeExport",
576            "ListExports",
577            "ImportTable",
578            "DescribeImport",
579            "ListImports",
580        ]
581    }
582}
583
584pub(crate) mod helpers;
585pub(crate) use helpers::*;
586
587#[cfg(test)]
588mod tests;