Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::io;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use base64::Engine;
16use http::StatusCode;
17use serde_json::{json, Value};
18
19use fakecloud_core::delivery::DeliveryBus;
20use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
21
22use fakecloud_persistence::{S3Store, SnapshotStore};
23use fakecloud_s3::SharedS3State;
24
25use crate::state::{
26    AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
27    DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
31///
32/// A table can carry megabytes of items; cloning the whole table just to
33/// release the write lock and deliver one change record is extremely wasteful.
34/// Extracting only the fields the delivery path actually reads (destinations,
35/// arn, name) keeps the clone small.
36#[derive(Clone)]
37pub(crate) struct KinesisDeliveryTarget {
38    pub destinations: Vec<KinesisDestination>,
39    pub arn: String,
40    pub name: String,
41}
42
43/// Build a Kinesis delivery target for a table when it has at least one active
44/// streaming destination. Free-function twin of
45/// [`DynamoDbService::kinesis_target`] so the TTL processor (no `&self`) can
46/// reuse it.
47pub(crate) fn kinesis_target_for(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
48    if table
49        .kinesis_destinations
50        .iter()
51        .any(|d| d.destination_status == "ACTIVE")
52    {
53        Some(KinesisDeliveryTarget {
54            destinations: table.kinesis_destinations.clone(),
55            arn: table.arn.clone(),
56            name: table.name.clone(),
57        })
58    } else {
59        None
60    }
61}
62
63/// A cached `TransactWriteItems` outcome, keyed by (account, client request
64/// token). AWS treats a retried transaction carrying the same
65/// `ClientRequestToken` (within a ~10 minute window) as the *same* request: it
66/// is applied at most once and the original response is replayed. Without this,
67/// a client-side retry re-applies the whole transaction (a non-idempotent
68/// `ADD` advances twice).
69struct TransactIdempotencyEntry {
70    /// When the original transaction committed; entries older than the window
71    /// are purged and treated as fresh.
72    stored_at: std::time::Instant,
73    /// Hash of the original request body, used to detect a token reused with
74    /// different parameters (AWS returns `IdempotentParameterMismatchException`).
75    request_hash: u64,
76    /// The exact JSON result returned for the original transaction.
77    response: Value,
78}
79
80/// The window for which a `ClientRequestToken` short-circuits a replay. AWS
81/// documents idempotency as lasting "a few minutes"; 10 minutes matches the
82/// commonly observed behavior.
83const TRANSACT_IDEMPOTENCY_WINDOW: std::time::Duration = std::time::Duration::from_secs(600);
84
85/// Operation flavor for the per-item KMS audit-trail emitter. Reads
86/// emit a paired `Decrypt` after `GenerateDataKey`; writes only emit
87/// `GenerateDataKey`, mirroring AWS's audit shape.
88pub(crate) enum TableKmsOp {
89    Read,
90    Write,
91}
92
93pub struct DynamoDbService {
94    state: SharedDynamoDbState,
95    pub(crate) s3_state: Option<SharedS3State>,
96    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
97    delivery: Option<Arc<DeliveryBus>>,
98    snapshot_store: Option<Arc<dyn SnapshotStore>>,
99    pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
100    pub(crate) region: String,
101    /// Serializes concurrent snapshot writes so the newest observed
102    /// state always wins on disk. Without it, two tasks could race
103    /// between state.read().clone() and store.save() and leave older
104    /// bytes as the final on-disk state.
105    snapshot_lock: Arc<tokio::sync::Mutex<()>>,
106    /// Recent `TransactWriteItems` outcomes keyed by (account, ClientRequestToken)
107    /// for idempotent retry handling. In-memory only (lost on restart, which
108    /// matches AWS's short idempotency window).
109    transact_idempotency:
110        Arc<parking_lot::Mutex<HashMap<(String, String), TransactIdempotencyEntry>>>,
111}
112
113impl DynamoDbService {
114    pub fn new(state: SharedDynamoDbState) -> Self {
115        Self {
116            state,
117            s3_state: None,
118            s3_store: None,
119            delivery: None,
120            snapshot_store: None,
121            kms_hook: None,
122            region: "us-east-1".to_string(),
123            snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
124            transact_idempotency: Arc::new(parking_lot::Mutex::new(HashMap::new())),
125        }
126    }
127
128    /// Look up a cached `TransactWriteItems` result for an idempotent retry.
129    ///
130    /// Returns `Ok(Some(response))` to replay the original result for a matching
131    /// (token, request) within the window, `Err(..)` with
132    /// `IdempotentParameterMismatchException` when the same token is reused with
133    /// a different body, and `Ok(None)` when this is a fresh request (no token,
134    /// expired entry, or first use).
135    pub(crate) fn transact_idempotency_lookup(
136        &self,
137        account_id: &str,
138        token: &str,
139        request_hash: u64,
140    ) -> Result<Option<AwsResponse>, AwsServiceError> {
141        let mut cache = self.transact_idempotency.lock();
142        // Drop entries past the window so the map cannot grow unbounded.
143        cache.retain(|_, e| e.stored_at.elapsed() < TRANSACT_IDEMPOTENCY_WINDOW);
144        match cache.get(&(account_id.to_string(), token.to_string())) {
145            Some(entry) if entry.request_hash == request_hash => {
146                Ok(Some(AwsResponse::ok_json(entry.response.clone())))
147            }
148            Some(_) => Err(AwsServiceError::aws_error(
149                StatusCode::BAD_REQUEST,
150                "IdempotentParameterMismatchException",
151                "Request parameters do not match the parameters of a previous \
152                 request with the same client request token",
153            )),
154            None => Ok(None),
155        }
156    }
157
158    /// Record a successful `TransactWriteItems` outcome for replay.
159    pub(crate) fn transact_idempotency_store(
160        &self,
161        account_id: &str,
162        token: &str,
163        request_hash: u64,
164        response: &Value,
165    ) {
166        self.transact_idempotency.lock().insert(
167            (account_id.to_string(), token.to_string()),
168            TransactIdempotencyEntry {
169                stored_at: std::time::Instant::now(),
170                request_hash,
171                response: response.clone(),
172            },
173        );
174    }
175
176    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
177        self.s3_state = Some(s3_state);
178        self
179    }
180
181    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
182        self.s3_store = Some(store);
183        self
184    }
185
186    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
187        self.delivery = Some(delivery);
188        self
189    }
190
191    pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
192        self.snapshot_store = Some(store);
193        self
194    }
195
196    pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
197        self.kms_hook = Some(hook);
198        self
199    }
200
201    pub fn with_region(mut self, region: impl Into<String>) -> Self {
202        self.region = region.into();
203        self
204    }
205
206    /// Record `GenerateDataKey` + `Decrypt` for an SSE-KMS table on a
207    /// PutItem/UpdateItem (write) and GetItem/Query/Scan (read). DDB
208    /// item bodies are nested attribute maps — encrypting them in
209    /// fakecloud would balloon scope without adding test coverage that
210    /// users actually want, so we just emit the audit-trail records the
211    /// AWS API produces and let callers assert KMS usage via
212    /// `/_fakecloud/kms/usage`.
213    pub(crate) fn record_table_kms_usage(
214        &self,
215        account_id: &str,
216        table_arn: &str,
217        kms_key_arn: Option<&str>,
218        operation: TableKmsOp,
219    ) {
220        let Some(hook) = &self.kms_hook else { return };
221        let key = kms_key_arn
222            .filter(|k| !k.is_empty())
223            .unwrap_or("aws/dynamodb");
224        // DynamoDB SSE-KMS uses the AWS-documented encryption context:
225        // {aws:dynamodb:tableName: <name>, aws:dynamodb:subscriberId: <account>}
226        // — see the AWS DynamoDB encryption-at-rest docs. The table arn
227        // ends with `:table/<name>`, so derive the name from it.
228        let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
229        let mut ctx = std::collections::HashMap::new();
230        ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
231        ctx.insert(
232            "aws:dynamodb:subscriberId".to_string(),
233            account_id.to_string(),
234        );
235        let envelope = match hook.encrypt(
236            account_id,
237            &self.region,
238            key,
239            b"ddb-item",
240            "dynamodb.amazonaws.com",
241            ctx.clone(),
242        ) {
243            Ok(env) => env,
244            Err(_) => return,
245        };
246        if matches!(operation, TableKmsOp::Read) {
247            let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
248        }
249    }
250
251    /// Persist the current in-memory state to the provided snapshot store.
252    ///
253    /// The snapshot lock serializes the full clone + serialize + write
254    /// so concurrent mutators cannot leave older bytes on disk, and
255    /// serialization + the blocking file write are offloaded to the
256    /// blocking pool to keep Tokio workers responsive.
257    pub async fn save_snapshot_to_store(&self, store: Arc<dyn SnapshotStore>) -> io::Result<()> {
258        save_dynamodb_snapshot(&self.state, Some(store), &self.snapshot_lock)
259            .await
260            .map(|_| ())
261    }
262
263    /// Build a hook that persists the current DynamoDB state when invoked, or
264    /// `None` in memory mode (no snapshot store). The CloudFormation
265    /// provisioner mutates `state` directly and uses this to write a
266    /// CFN-provisioned table through to disk, the same way a direct mutating
267    /// API call would.
268    pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
269        let store = self.snapshot_store.clone()?;
270        let state = self.state.clone();
271        let lock = self.snapshot_lock.clone();
272        Some(Arc::new(move || {
273            let state = state.clone();
274            let store = store.clone();
275            let lock = lock.clone();
276            Box::pin(async move {
277                if let Err(err) = save_dynamodb_snapshot(&state, Some(store), &lock).await {
278                    tracing::error!(%err, "dynamodb snapshot save failed");
279                }
280            })
281        }))
282    }
283
284    /// Persist the current in-memory state to the configured snapshot store.
285    /// Returns `Ok(false)` when no snapshot store is configured (i.e.
286    /// `StorageMode::Memory`).
287    pub async fn save_snapshot(&self) -> io::Result<bool> {
288        save_dynamodb_snapshot(
289            &self.state,
290            self.snapshot_store.clone(),
291            &self.snapshot_lock,
292        )
293        .await
294    }
295
296    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
297        kinesis_target_for(table)
298    }
299
300    /// Deliver a change record to all active Kinesis streaming destinations for a table.
301    pub(super) fn deliver_to_kinesis_destinations(
302        &self,
303        target: &KinesisDeliveryTarget,
304        event_name: &str,
305        keys: &HashMap<String, AttributeValue>,
306        old_image: Option<&HashMap<String, AttributeValue>>,
307        new_image: Option<&HashMap<String, AttributeValue>>,
308    ) {
309        let delivery = match &self.delivery {
310            Some(d) => d,
311            None => return,
312        };
313        deliver_kinesis_change(
314            delivery, target, event_name, keys, old_image, new_image, None,
315        );
316    }
317
318    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
319        serde_json::from_slice(&req.body).map_err(|e| {
320            AwsServiceError::aws_error(
321                StatusCode::BAD_REQUEST,
322                "SerializationException",
323                format!("Invalid JSON: {e}"),
324            )
325        })
326    }
327
328    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
329        Ok(AwsResponse::ok_json(body))
330    }
331}
332
333/// Build and dispatch a Kinesis change record to every active streaming
334/// destination of a table. Shared by [`DynamoDbService::deliver_to_kinesis_destinations`]
335/// and the TTL processor (which is a free function with no `&self`), so both
336/// emit the identical record shape. `user_identity` is set only for
337/// system-generated changes such as TTL expirations.
338#[allow(clippy::too_many_arguments)]
339pub(crate) fn deliver_kinesis_change(
340    delivery: &DeliveryBus,
341    target: &KinesisDeliveryTarget,
342    event_name: &str,
343    keys: &HashMap<String, AttributeValue>,
344    old_image: Option<&HashMap<String, AttributeValue>>,
345    new_image: Option<&HashMap<String, AttributeValue>>,
346    user_identity: Option<&crate::state::StreamUserIdentity>,
347) {
348    let active_destinations: Vec<_> = target
349        .destinations
350        .iter()
351        .filter(|d| d.destination_status == "ACTIVE")
352        .collect();
353
354    if active_destinations.is_empty() {
355        return;
356    }
357
358    let mut record = json!({
359        "eventID": uuid::Uuid::new_v4().to_string(),
360        "eventName": event_name,
361        "eventVersion": "1.1",
362        "eventSource": "aws:dynamodb",
363        "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
364        "dynamodb": {
365            "Keys": keys,
366            // Use the shared atomic monotonic counter (not wall-clock
367            // nanoseconds): a single BatchWriteItem fires up to 25
368            // deliveries with no delay, which collide on coarse clocks
369            // and invert on NTP steps. bug-audit 2026-06-15, 4.5.
370            "SequenceNumber": crate::streams::next_stream_sequence(),
371            "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
372            "StreamViewType": "NEW_AND_OLD_IMAGES",
373        },
374        "eventSourceARN": &target.arn,
375        "tableName": &target.name,
376    });
377
378    if let Some(old) = old_image {
379        record["dynamodb"]["OldImage"] = json!(old);
380    }
381    if let Some(new) = new_image {
382        record["dynamodb"]["NewImage"] = json!(new);
383    }
384    if let Some(ui) = user_identity {
385        record["userIdentity"] = json!({
386            "principalId": ui.principal_id,
387            "type": ui.identity_type,
388        });
389    }
390
391    let record_str = serde_json::to_string(&record).unwrap_or_default();
392    let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
393    let partition_key = serde_json::to_string(keys).unwrap_or_default();
394
395    for dest in active_destinations {
396        delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
397    }
398}
399
400/// Persist the current DynamoDB state as a snapshot. Offloads the serde +
401/// blocking file write to the Tokio blocking pool. Noop when `store` is `None`
402/// (memory mode). Shared by `DynamoDbService::save_snapshot` and the
403/// CloudFormation provisioner's post-provision persist hook so both route
404/// through the same serialize-and-write path.
405pub async fn save_dynamodb_snapshot(
406    state: &SharedDynamoDbState,
407    store: Option<Arc<dyn SnapshotStore>>,
408    lock: &tokio::sync::Mutex<()>,
409) -> io::Result<bool> {
410    let Some(store) = store else {
411        return Ok(false);
412    };
413    let _guard = lock.lock().await;
414    let snapshot = DynamoDbSnapshot {
415        schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
416        accounts: Some(state.read().clone()),
417        state: None,
418    };
419    let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
420        let bytes = serde_json::to_vec(&snapshot)
421            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
422        store.save(&bytes)
423    })
424    .await;
425    match join {
426        Ok(Ok(())) => Ok(true),
427        Ok(Err(err)) => Err(io::Error::new(
428            err.kind(),
429            format!("failed to write dynamodb snapshot: {err}"),
430        )),
431        Err(err) => Err(io::Error::other(format!(
432            "dynamodb snapshot task panicked: {err}"
433        ))),
434    }
435}
436
437#[async_trait]
438impl AwsService for DynamoDbService {
439    fn service_name(&self) -> &str {
440        "dynamodb"
441    }
442
443    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
444        // Avoid parsing the body for ops where the action alone tells us
445        // they mutate (or don't). Only PartiQL ops need statement
446        // inspection.
447        let mutates = if is_mutating_action(req.action.as_str()) {
448            true
449        } else if matches!(
450            req.action.as_str(),
451            "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
452        ) {
453            is_mutating_request(req.action.as_str(), &req.json_body())
454        } else {
455            false
456        };
457        let result = match req.action.as_str() {
458            "CreateTable" => self.create_table(&req),
459            "DeleteTable" => self.delete_table(&req),
460            "DescribeTable" => self.describe_table(&req),
461            "ListTables" => self.list_tables(&req),
462            "UpdateTable" => self.update_table(&req),
463            "PutItem" => self.put_item(&req),
464            "GetItem" => self.get_item(&req),
465            "DeleteItem" => self.delete_item(&req),
466            "UpdateItem" => self.update_item(&req),
467            "Query" => self.query(&req),
468            "Scan" => self.scan(&req),
469            "BatchGetItem" => self.batch_get_item(&req),
470            "BatchWriteItem" => self.batch_write_item(&req),
471            "TagResource" => self.tag_resource(&req),
472            "UntagResource" => self.untag_resource(&req),
473            "ListTagsOfResource" => self.list_tags_of_resource(&req),
474            "TransactGetItems" => self.transact_get_items(&req),
475            "TransactWriteItems" => self.transact_write_items(&req),
476            "ExecuteStatement" => self.execute_statement(&req),
477            "BatchExecuteStatement" => self.batch_execute_statement(&req),
478            "ExecuteTransaction" => self.execute_transaction(&req),
479            "UpdateTimeToLive" => self.update_time_to_live(&req),
480            "DescribeTimeToLive" => self.describe_time_to_live(&req),
481            "PutResourcePolicy" => self.put_resource_policy(&req),
482            "GetResourcePolicy" => self.get_resource_policy(&req),
483            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
484            // Synthetic defaults (no DAX endpoint discovery / no real per-account quotas tracked)
485            "DescribeEndpoints" => self.describe_endpoints(&req),
486            "DescribeLimits" => self.describe_limits(&req),
487            // Backups
488            "CreateBackup" => self.create_backup(&req),
489            "DeleteBackup" => self.delete_backup(&req),
490            "DescribeBackup" => self.describe_backup(&req),
491            "ListBackups" => self.list_backups(&req),
492            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
493            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
494            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
495            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
496            // Global tables
497            "CreateGlobalTable" => self.create_global_table(&req),
498            "DescribeGlobalTable" => self.describe_global_table(&req),
499            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
500            "ListGlobalTables" => self.list_global_tables(&req),
501            "UpdateGlobalTable" => self.update_global_table(&req),
502            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
503            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
504            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
505            // Kinesis streaming
506            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
507            "DisableKinesisStreamingDestination" => {
508                self.disable_kinesis_streaming_destination(&req)
509            }
510            "DescribeKinesisStreamingDestination" => {
511                self.describe_kinesis_streaming_destination(&req)
512            }
513            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
514            // Contributor insights
515            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
516            "UpdateContributorInsights" => self.update_contributor_insights(&req),
517            "ListContributorInsights" => self.list_contributor_insights(&req),
518            // Import/Export
519            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
520            "DescribeExport" => self.describe_export(&req),
521            "ListExports" => self.list_exports(&req),
522            "ImportTable" => self.import_table(&req),
523            "DescribeImport" => self.describe_import(&req),
524            "ListImports" => self.list_imports(&req),
525            _ => Err(AwsServiceError::action_not_implemented(
526                "dynamodb",
527                &req.action,
528            )),
529        };
530        if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
531            if let Err(err) = self.save_snapshot().await {
532                tracing::error!(%err, "dynamodb snapshot save failed");
533            }
534        }
535        result
536    }
537
538    fn supported_actions(&self) -> &[&str] {
539        &[
540            "CreateTable",
541            "DeleteTable",
542            "DescribeTable",
543            "ListTables",
544            "UpdateTable",
545            "PutItem",
546            "GetItem",
547            "DeleteItem",
548            "UpdateItem",
549            "Query",
550            "Scan",
551            "BatchGetItem",
552            "BatchWriteItem",
553            "TagResource",
554            "UntagResource",
555            "ListTagsOfResource",
556            "TransactGetItems",
557            "TransactWriteItems",
558            "ExecuteStatement",
559            "BatchExecuteStatement",
560            "ExecuteTransaction",
561            "UpdateTimeToLive",
562            "DescribeTimeToLive",
563            "PutResourcePolicy",
564            "GetResourcePolicy",
565            "DeleteResourcePolicy",
566            "DescribeEndpoints",
567            "DescribeLimits",
568            "CreateBackup",
569            "DeleteBackup",
570            "DescribeBackup",
571            "ListBackups",
572            "RestoreTableFromBackup",
573            "RestoreTableToPointInTime",
574            "UpdateContinuousBackups",
575            "DescribeContinuousBackups",
576            "CreateGlobalTable",
577            "DescribeGlobalTable",
578            "DescribeGlobalTableSettings",
579            "ListGlobalTables",
580            "UpdateGlobalTable",
581            "UpdateGlobalTableSettings",
582            "DescribeTableReplicaAutoScaling",
583            "UpdateTableReplicaAutoScaling",
584            "EnableKinesisStreamingDestination",
585            "DisableKinesisStreamingDestination",
586            "DescribeKinesisStreamingDestination",
587            "UpdateKinesisStreamingDestination",
588            "DescribeContributorInsights",
589            "UpdateContributorInsights",
590            "ListContributorInsights",
591            "ExportTableToPointInTime",
592            "DescribeExport",
593            "ListExports",
594            "ImportTable",
595            "DescribeImport",
596            "ListImports",
597        ]
598    }
599}
600
601pub(crate) mod helpers;
602pub(crate) use helpers::*;
603
604#[cfg(test)]
605mod tests;