1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25 AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26 DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29#[derive(Clone)]
36pub(crate) struct KinesisDeliveryTarget {
37 pub destinations: Vec<KinesisDestination>,
38 pub arn: String,
39 pub name: String,
40}
41
42pub(crate) fn kinesis_target_for(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
47 if table
48 .kinesis_destinations
49 .iter()
50 .any(|d| d.destination_status == "ACTIVE")
51 {
52 Some(KinesisDeliveryTarget {
53 destinations: table.kinesis_destinations.clone(),
54 arn: table.arn.clone(),
55 name: table.name.clone(),
56 })
57 } else {
58 None
59 }
60}
61
62struct TransactIdempotencyEntry {
69 stored_at: std::time::Instant,
72 request_hash: u64,
75 response: Value,
77}
78
79const TRANSACT_IDEMPOTENCY_WINDOW: std::time::Duration = std::time::Duration::from_secs(600);
83
84pub(crate) enum TableKmsOp {
88 Read,
89 Write,
90}
91
92pub struct DynamoDbService {
93 state: SharedDynamoDbState,
94 pub(crate) s3_state: Option<SharedS3State>,
95 pub(crate) s3_store: Option<Arc<dyn S3Store>>,
96 delivery: Option<Arc<DeliveryBus>>,
97 snapshot_store: Option<Arc<dyn SnapshotStore>>,
98 pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
99 pub(crate) region: String,
100 snapshot_lock: Arc<tokio::sync::Mutex<()>>,
105 transact_idempotency:
109 Arc<parking_lot::Mutex<HashMap<(String, String), TransactIdempotencyEntry>>>,
110}
111
112impl DynamoDbService {
113 pub fn new(state: SharedDynamoDbState) -> Self {
114 Self {
115 state,
116 s3_state: None,
117 s3_store: None,
118 delivery: None,
119 snapshot_store: None,
120 kms_hook: None,
121 region: "us-east-1".to_string(),
122 snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
123 transact_idempotency: Arc::new(parking_lot::Mutex::new(HashMap::new())),
124 }
125 }
126
127 pub(crate) fn transact_idempotency_lookup(
135 &self,
136 account_id: &str,
137 token: &str,
138 request_hash: u64,
139 ) -> Result<Option<AwsResponse>, AwsServiceError> {
140 let mut cache = self.transact_idempotency.lock();
141 cache.retain(|_, e| e.stored_at.elapsed() < TRANSACT_IDEMPOTENCY_WINDOW);
143 match cache.get(&(account_id.to_string(), token.to_string())) {
144 Some(entry) if entry.request_hash == request_hash => {
145 Ok(Some(AwsResponse::ok_json(entry.response.clone())))
146 }
147 Some(_) => Err(AwsServiceError::aws_error(
148 StatusCode::BAD_REQUEST,
149 "IdempotentParameterMismatchException",
150 "Request parameters do not match the parameters of a previous \
151 request with the same client request token",
152 )),
153 None => Ok(None),
154 }
155 }
156
157 pub(crate) fn transact_idempotency_store(
159 &self,
160 account_id: &str,
161 token: &str,
162 request_hash: u64,
163 response: &Value,
164 ) {
165 self.transact_idempotency.lock().insert(
166 (account_id.to_string(), token.to_string()),
167 TransactIdempotencyEntry {
168 stored_at: std::time::Instant::now(),
169 request_hash,
170 response: response.clone(),
171 },
172 );
173 }
174
175 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
176 self.s3_state = Some(s3_state);
177 self
178 }
179
180 pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
181 self.s3_store = Some(store);
182 self
183 }
184
185 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
186 self.delivery = Some(delivery);
187 self
188 }
189
190 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
191 self.snapshot_store = Some(store);
192 self
193 }
194
195 pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
196 self.kms_hook = Some(hook);
197 self
198 }
199
200 pub fn with_region(mut self, region: impl Into<String>) -> Self {
201 self.region = region.into();
202 self
203 }
204
205 pub(crate) fn record_table_kms_usage(
213 &self,
214 account_id: &str,
215 table_arn: &str,
216 kms_key_arn: Option<&str>,
217 operation: TableKmsOp,
218 ) {
219 let Some(hook) = &self.kms_hook else { return };
220 let key = kms_key_arn
221 .filter(|k| !k.is_empty())
222 .unwrap_or("aws/dynamodb");
223 let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
228 let mut ctx = std::collections::HashMap::new();
229 ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
230 ctx.insert(
231 "aws:dynamodb:subscriberId".to_string(),
232 account_id.to_string(),
233 );
234 let envelope = match hook.encrypt(
235 account_id,
236 &self.region,
237 key,
238 b"ddb-item",
239 "dynamodb.amazonaws.com",
240 ctx.clone(),
241 ) {
242 Ok(env) => env,
243 Err(_) => return,
244 };
245 if matches!(operation, TableKmsOp::Read) {
246 let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
247 }
248 }
249
250 async fn save_snapshot(&self) {
259 save_dynamodb_snapshot(
260 &self.state,
261 self.snapshot_store.clone(),
262 &self.snapshot_lock,
263 )
264 .await;
265 }
266
267 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
273 let store = self.snapshot_store.clone()?;
274 let state = self.state.clone();
275 let lock = self.snapshot_lock.clone();
276 Some(Arc::new(move || {
277 let state = state.clone();
278 let store = store.clone();
279 let lock = lock.clone();
280 Box::pin(async move {
281 save_dynamodb_snapshot(&state, Some(store), &lock).await;
282 })
283 }))
284 }
285
286 fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
287 kinesis_target_for(table)
288 }
289
290 pub(super) fn deliver_to_kinesis_destinations(
292 &self,
293 target: &KinesisDeliveryTarget,
294 event_name: &str,
295 keys: &HashMap<String, AttributeValue>,
296 old_image: Option<&HashMap<String, AttributeValue>>,
297 new_image: Option<&HashMap<String, AttributeValue>>,
298 ) {
299 let delivery = match &self.delivery {
300 Some(d) => d,
301 None => return,
302 };
303 deliver_kinesis_change(
304 delivery, target, event_name, keys, old_image, new_image, None,
305 );
306 }
307
308 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
309 serde_json::from_slice(&req.body).map_err(|e| {
310 AwsServiceError::aws_error(
311 StatusCode::BAD_REQUEST,
312 "SerializationException",
313 format!("Invalid JSON: {e}"),
314 )
315 })
316 }
317
318 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
319 Ok(AwsResponse::ok_json(body))
320 }
321}
322
323#[allow(clippy::too_many_arguments)]
329pub(crate) fn deliver_kinesis_change(
330 delivery: &DeliveryBus,
331 target: &KinesisDeliveryTarget,
332 event_name: &str,
333 keys: &HashMap<String, AttributeValue>,
334 old_image: Option<&HashMap<String, AttributeValue>>,
335 new_image: Option<&HashMap<String, AttributeValue>>,
336 user_identity: Option<&crate::state::StreamUserIdentity>,
337) {
338 let active_destinations: Vec<_> = target
339 .destinations
340 .iter()
341 .filter(|d| d.destination_status == "ACTIVE")
342 .collect();
343
344 if active_destinations.is_empty() {
345 return;
346 }
347
348 let mut record = json!({
349 "eventID": uuid::Uuid::new_v4().to_string(),
350 "eventName": event_name,
351 "eventVersion": "1.1",
352 "eventSource": "aws:dynamodb",
353 "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
354 "dynamodb": {
355 "Keys": keys,
356 "SequenceNumber": crate::streams::next_stream_sequence(),
361 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
362 "StreamViewType": "NEW_AND_OLD_IMAGES",
363 },
364 "eventSourceARN": &target.arn,
365 "tableName": &target.name,
366 });
367
368 if let Some(old) = old_image {
369 record["dynamodb"]["OldImage"] = json!(old);
370 }
371 if let Some(new) = new_image {
372 record["dynamodb"]["NewImage"] = json!(new);
373 }
374 if let Some(ui) = user_identity {
375 record["userIdentity"] = json!({
376 "principalId": ui.principal_id,
377 "type": ui.identity_type,
378 });
379 }
380
381 let record_str = serde_json::to_string(&record).unwrap_or_default();
382 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
383 let partition_key = serde_json::to_string(keys).unwrap_or_default();
384
385 for dest in active_destinations {
386 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
387 }
388}
389
390pub async fn save_dynamodb_snapshot(
396 state: &SharedDynamoDbState,
397 store: Option<Arc<dyn SnapshotStore>>,
398 lock: &tokio::sync::Mutex<()>,
399) {
400 let Some(store) = store else {
401 return;
402 };
403 let _guard = lock.lock().await;
404 let snapshot = DynamoDbSnapshot {
405 schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
406 accounts: Some(state.read().clone()),
407 state: None,
408 };
409 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
410 let bytes = serde_json::to_vec(&snapshot)
411 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
412 store.save(&bytes)
413 })
414 .await;
415 match join {
416 Ok(Ok(())) => {}
417 Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
418 Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
419 }
420}
421
422#[async_trait]
423impl AwsService for DynamoDbService {
424 fn service_name(&self) -> &str {
425 "dynamodb"
426 }
427
428 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
429 let mutates = if is_mutating_action(req.action.as_str()) {
433 true
434 } else if matches!(
435 req.action.as_str(),
436 "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
437 ) {
438 is_mutating_request(req.action.as_str(), &req.json_body())
439 } else {
440 false
441 };
442 let result = match req.action.as_str() {
443 "CreateTable" => self.create_table(&req),
444 "DeleteTable" => self.delete_table(&req),
445 "DescribeTable" => self.describe_table(&req),
446 "ListTables" => self.list_tables(&req),
447 "UpdateTable" => self.update_table(&req),
448 "PutItem" => self.put_item(&req),
449 "GetItem" => self.get_item(&req),
450 "DeleteItem" => self.delete_item(&req),
451 "UpdateItem" => self.update_item(&req),
452 "Query" => self.query(&req),
453 "Scan" => self.scan(&req),
454 "BatchGetItem" => self.batch_get_item(&req),
455 "BatchWriteItem" => self.batch_write_item(&req),
456 "TagResource" => self.tag_resource(&req),
457 "UntagResource" => self.untag_resource(&req),
458 "ListTagsOfResource" => self.list_tags_of_resource(&req),
459 "TransactGetItems" => self.transact_get_items(&req),
460 "TransactWriteItems" => self.transact_write_items(&req),
461 "ExecuteStatement" => self.execute_statement(&req),
462 "BatchExecuteStatement" => self.batch_execute_statement(&req),
463 "ExecuteTransaction" => self.execute_transaction(&req),
464 "UpdateTimeToLive" => self.update_time_to_live(&req),
465 "DescribeTimeToLive" => self.describe_time_to_live(&req),
466 "PutResourcePolicy" => self.put_resource_policy(&req),
467 "GetResourcePolicy" => self.get_resource_policy(&req),
468 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
469 "DescribeEndpoints" => self.describe_endpoints(&req),
471 "DescribeLimits" => self.describe_limits(&req),
472 "CreateBackup" => self.create_backup(&req),
474 "DeleteBackup" => self.delete_backup(&req),
475 "DescribeBackup" => self.describe_backup(&req),
476 "ListBackups" => self.list_backups(&req),
477 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
478 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
479 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
480 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
481 "CreateGlobalTable" => self.create_global_table(&req),
483 "DescribeGlobalTable" => self.describe_global_table(&req),
484 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
485 "ListGlobalTables" => self.list_global_tables(&req),
486 "UpdateGlobalTable" => self.update_global_table(&req),
487 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
488 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
489 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
490 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
492 "DisableKinesisStreamingDestination" => {
493 self.disable_kinesis_streaming_destination(&req)
494 }
495 "DescribeKinesisStreamingDestination" => {
496 self.describe_kinesis_streaming_destination(&req)
497 }
498 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
499 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
501 "UpdateContributorInsights" => self.update_contributor_insights(&req),
502 "ListContributorInsights" => self.list_contributor_insights(&req),
503 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
505 "DescribeExport" => self.describe_export(&req),
506 "ListExports" => self.list_exports(&req),
507 "ImportTable" => self.import_table(&req),
508 "DescribeImport" => self.describe_import(&req),
509 "ListImports" => self.list_imports(&req),
510 _ => Err(AwsServiceError::action_not_implemented(
511 "dynamodb",
512 &req.action,
513 )),
514 };
515 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
516 self.save_snapshot().await;
517 }
518 result
519 }
520
521 fn supported_actions(&self) -> &[&str] {
522 &[
523 "CreateTable",
524 "DeleteTable",
525 "DescribeTable",
526 "ListTables",
527 "UpdateTable",
528 "PutItem",
529 "GetItem",
530 "DeleteItem",
531 "UpdateItem",
532 "Query",
533 "Scan",
534 "BatchGetItem",
535 "BatchWriteItem",
536 "TagResource",
537 "UntagResource",
538 "ListTagsOfResource",
539 "TransactGetItems",
540 "TransactWriteItems",
541 "ExecuteStatement",
542 "BatchExecuteStatement",
543 "ExecuteTransaction",
544 "UpdateTimeToLive",
545 "DescribeTimeToLive",
546 "PutResourcePolicy",
547 "GetResourcePolicy",
548 "DeleteResourcePolicy",
549 "DescribeEndpoints",
550 "DescribeLimits",
551 "CreateBackup",
552 "DeleteBackup",
553 "DescribeBackup",
554 "ListBackups",
555 "RestoreTableFromBackup",
556 "RestoreTableToPointInTime",
557 "UpdateContinuousBackups",
558 "DescribeContinuousBackups",
559 "CreateGlobalTable",
560 "DescribeGlobalTable",
561 "DescribeGlobalTableSettings",
562 "ListGlobalTables",
563 "UpdateGlobalTable",
564 "UpdateGlobalTableSettings",
565 "DescribeTableReplicaAutoScaling",
566 "UpdateTableReplicaAutoScaling",
567 "EnableKinesisStreamingDestination",
568 "DisableKinesisStreamingDestination",
569 "DescribeKinesisStreamingDestination",
570 "UpdateKinesisStreamingDestination",
571 "DescribeContributorInsights",
572 "UpdateContributorInsights",
573 "ListContributorInsights",
574 "ExportTableToPointInTime",
575 "DescribeExport",
576 "ListExports",
577 "ImportTable",
578 "DescribeImport",
579 "ListImports",
580 ]
581 }
582}
583
584pub(crate) mod helpers;
585pub(crate) use helpers::*;
586
587#[cfg(test)]
588mod tests;