1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::io;
12use std::sync::Arc;
13
14use async_trait::async_trait;
15use base64::Engine;
16use http::StatusCode;
17use serde_json::{json, Value};
18
19use fakecloud_core::delivery::DeliveryBus;
20use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
21
22use fakecloud_persistence::{S3Store, SnapshotStore};
23use fakecloud_s3::SharedS3State;
24
25use crate::state::{
26 AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
27 DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
28};
29
30#[derive(Clone)]
37pub(crate) struct KinesisDeliveryTarget {
38 pub destinations: Vec<KinesisDestination>,
39 pub arn: String,
40 pub name: String,
41}
42
43pub(crate) fn kinesis_target_for(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
48 if table
49 .kinesis_destinations
50 .iter()
51 .any(|d| d.destination_status == "ACTIVE")
52 {
53 Some(KinesisDeliveryTarget {
54 destinations: table.kinesis_destinations.clone(),
55 arn: table.arn.clone(),
56 name: table.name.clone(),
57 })
58 } else {
59 None
60 }
61}
62
63struct TransactIdempotencyEntry {
70 stored_at: std::time::Instant,
73 request_hash: u64,
76 response: Value,
78}
79
80const TRANSACT_IDEMPOTENCY_WINDOW: std::time::Duration = std::time::Duration::from_secs(600);
84
85pub(crate) enum TableKmsOp {
89 Read,
90 Write,
91}
92
93pub struct DynamoDbService {
94 state: SharedDynamoDbState,
95 pub(crate) s3_state: Option<SharedS3State>,
96 pub(crate) s3_store: Option<Arc<dyn S3Store>>,
97 delivery: Option<Arc<DeliveryBus>>,
98 snapshot_store: Option<Arc<dyn SnapshotStore>>,
99 pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
100 pub(crate) region: String,
101 snapshot_lock: Arc<tokio::sync::Mutex<()>>,
106 transact_idempotency:
110 Arc<parking_lot::Mutex<HashMap<(String, String), TransactIdempotencyEntry>>>,
111}
112
113impl DynamoDbService {
114 pub fn new(state: SharedDynamoDbState) -> Self {
115 Self {
116 state,
117 s3_state: None,
118 s3_store: None,
119 delivery: None,
120 snapshot_store: None,
121 kms_hook: None,
122 region: "us-east-1".to_string(),
123 snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
124 transact_idempotency: Arc::new(parking_lot::Mutex::new(HashMap::new())),
125 }
126 }
127
128 pub(crate) fn transact_idempotency_lookup(
136 &self,
137 account_id: &str,
138 token: &str,
139 request_hash: u64,
140 ) -> Result<Option<AwsResponse>, AwsServiceError> {
141 let mut cache = self.transact_idempotency.lock();
142 cache.retain(|_, e| e.stored_at.elapsed() < TRANSACT_IDEMPOTENCY_WINDOW);
144 match cache.get(&(account_id.to_string(), token.to_string())) {
145 Some(entry) if entry.request_hash == request_hash => {
146 Ok(Some(AwsResponse::ok_json(entry.response.clone())))
147 }
148 Some(_) => Err(AwsServiceError::aws_error(
149 StatusCode::BAD_REQUEST,
150 "IdempotentParameterMismatchException",
151 "Request parameters do not match the parameters of a previous \
152 request with the same client request token",
153 )),
154 None => Ok(None),
155 }
156 }
157
158 pub(crate) fn transact_idempotency_store(
160 &self,
161 account_id: &str,
162 token: &str,
163 request_hash: u64,
164 response: &Value,
165 ) {
166 self.transact_idempotency.lock().insert(
167 (account_id.to_string(), token.to_string()),
168 TransactIdempotencyEntry {
169 stored_at: std::time::Instant::now(),
170 request_hash,
171 response: response.clone(),
172 },
173 );
174 }
175
176 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
177 self.s3_state = Some(s3_state);
178 self
179 }
180
181 pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
182 self.s3_store = Some(store);
183 self
184 }
185
186 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
187 self.delivery = Some(delivery);
188 self
189 }
190
191 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
192 self.snapshot_store = Some(store);
193 self
194 }
195
196 pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
197 self.kms_hook = Some(hook);
198 self
199 }
200
201 pub fn with_region(mut self, region: impl Into<String>) -> Self {
202 self.region = region.into();
203 self
204 }
205
206 pub(crate) fn record_table_kms_usage(
214 &self,
215 account_id: &str,
216 table_arn: &str,
217 kms_key_arn: Option<&str>,
218 operation: TableKmsOp,
219 ) {
220 let Some(hook) = &self.kms_hook else { return };
221 let key = kms_key_arn
222 .filter(|k| !k.is_empty())
223 .unwrap_or("aws/dynamodb");
224 let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
229 let mut ctx = std::collections::HashMap::new();
230 ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
231 ctx.insert(
232 "aws:dynamodb:subscriberId".to_string(),
233 account_id.to_string(),
234 );
235 let envelope = match hook.encrypt(
236 account_id,
237 &self.region,
238 key,
239 b"ddb-item",
240 "dynamodb.amazonaws.com",
241 ctx.clone(),
242 ) {
243 Ok(env) => env,
244 Err(_) => return,
245 };
246 if matches!(operation, TableKmsOp::Read) {
247 let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
248 }
249 }
250
251 pub async fn save_snapshot_to_store(&self, store: Arc<dyn SnapshotStore>) -> io::Result<()> {
258 save_dynamodb_snapshot(&self.state, Some(store), &self.snapshot_lock)
259 .await
260 .map(|_| ())
261 }
262
263 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
269 let store = self.snapshot_store.clone()?;
270 let state = self.state.clone();
271 let lock = self.snapshot_lock.clone();
272 Some(Arc::new(move || {
273 let state = state.clone();
274 let store = store.clone();
275 let lock = lock.clone();
276 Box::pin(async move {
277 if let Err(err) = save_dynamodb_snapshot(&state, Some(store), &lock).await {
278 tracing::error!(%err, "dynamodb snapshot save failed");
279 }
280 })
281 }))
282 }
283
284 pub async fn save_snapshot(&self) -> io::Result<bool> {
288 save_dynamodb_snapshot(
289 &self.state,
290 self.snapshot_store.clone(),
291 &self.snapshot_lock,
292 )
293 .await
294 }
295
296 fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
297 kinesis_target_for(table)
298 }
299
300 pub(super) fn deliver_to_kinesis_destinations(
302 &self,
303 target: &KinesisDeliveryTarget,
304 event_name: &str,
305 keys: &HashMap<String, AttributeValue>,
306 old_image: Option<&HashMap<String, AttributeValue>>,
307 new_image: Option<&HashMap<String, AttributeValue>>,
308 ) {
309 let delivery = match &self.delivery {
310 Some(d) => d,
311 None => return,
312 };
313 deliver_kinesis_change(
314 delivery, target, event_name, keys, old_image, new_image, None,
315 );
316 }
317
318 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
319 serde_json::from_slice(&req.body).map_err(|e| {
320 AwsServiceError::aws_error(
321 StatusCode::BAD_REQUEST,
322 "SerializationException",
323 format!("Invalid JSON: {e}"),
324 )
325 })
326 }
327
328 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
329 Ok(AwsResponse::ok_json(body))
330 }
331}
332
333#[allow(clippy::too_many_arguments)]
339pub(crate) fn deliver_kinesis_change(
340 delivery: &DeliveryBus,
341 target: &KinesisDeliveryTarget,
342 event_name: &str,
343 keys: &HashMap<String, AttributeValue>,
344 old_image: Option<&HashMap<String, AttributeValue>>,
345 new_image: Option<&HashMap<String, AttributeValue>>,
346 user_identity: Option<&crate::state::StreamUserIdentity>,
347) {
348 let active_destinations: Vec<_> = target
349 .destinations
350 .iter()
351 .filter(|d| d.destination_status == "ACTIVE")
352 .collect();
353
354 if active_destinations.is_empty() {
355 return;
356 }
357
358 let mut record = json!({
359 "eventID": uuid::Uuid::new_v4().to_string(),
360 "eventName": event_name,
361 "eventVersion": "1.1",
362 "eventSource": "aws:dynamodb",
363 "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
364 "dynamodb": {
365 "Keys": keys,
366 "SequenceNumber": crate::streams::next_stream_sequence(),
371 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
372 "StreamViewType": "NEW_AND_OLD_IMAGES",
373 },
374 "eventSourceARN": &target.arn,
375 "tableName": &target.name,
376 });
377
378 if let Some(old) = old_image {
379 record["dynamodb"]["OldImage"] = json!(old);
380 }
381 if let Some(new) = new_image {
382 record["dynamodb"]["NewImage"] = json!(new);
383 }
384 if let Some(ui) = user_identity {
385 record["userIdentity"] = json!({
386 "principalId": ui.principal_id,
387 "type": ui.identity_type,
388 });
389 }
390
391 let record_str = serde_json::to_string(&record).unwrap_or_default();
392 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
393 let partition_key = serde_json::to_string(keys).unwrap_or_default();
394
395 for dest in active_destinations {
396 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
397 }
398}
399
400pub async fn save_dynamodb_snapshot(
406 state: &SharedDynamoDbState,
407 store: Option<Arc<dyn SnapshotStore>>,
408 lock: &tokio::sync::Mutex<()>,
409) -> io::Result<bool> {
410 let Some(store) = store else {
411 return Ok(false);
412 };
413 let _guard = lock.lock().await;
414 let snapshot = DynamoDbSnapshot {
415 schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
416 accounts: Some(state.read().clone()),
417 state: None,
418 };
419 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
420 let bytes = serde_json::to_vec(&snapshot)
421 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
422 store.save(&bytes)
423 })
424 .await;
425 match join {
426 Ok(Ok(())) => Ok(true),
427 Ok(Err(err)) => Err(io::Error::new(
428 err.kind(),
429 format!("failed to write dynamodb snapshot: {err}"),
430 )),
431 Err(err) => Err(io::Error::other(format!(
432 "dynamodb snapshot task panicked: {err}"
433 ))),
434 }
435}
436
437#[async_trait]
438impl AwsService for DynamoDbService {
439 fn service_name(&self) -> &str {
440 "dynamodb"
441 }
442
443 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
444 let mutates = if is_mutating_action(req.action.as_str()) {
448 true
449 } else if matches!(
450 req.action.as_str(),
451 "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
452 ) {
453 is_mutating_request(req.action.as_str(), &req.json_body())
454 } else {
455 false
456 };
457 let result = match req.action.as_str() {
458 "CreateTable" => self.create_table(&req),
459 "DeleteTable" => self.delete_table(&req),
460 "DescribeTable" => self.describe_table(&req),
461 "ListTables" => self.list_tables(&req),
462 "UpdateTable" => self.update_table(&req),
463 "PutItem" => self.put_item(&req),
464 "GetItem" => self.get_item(&req),
465 "DeleteItem" => self.delete_item(&req),
466 "UpdateItem" => self.update_item(&req),
467 "Query" => self.query(&req),
468 "Scan" => self.scan(&req),
469 "BatchGetItem" => self.batch_get_item(&req),
470 "BatchWriteItem" => self.batch_write_item(&req),
471 "TagResource" => self.tag_resource(&req),
472 "UntagResource" => self.untag_resource(&req),
473 "ListTagsOfResource" => self.list_tags_of_resource(&req),
474 "TransactGetItems" => self.transact_get_items(&req),
475 "TransactWriteItems" => self.transact_write_items(&req),
476 "ExecuteStatement" => self.execute_statement(&req),
477 "BatchExecuteStatement" => self.batch_execute_statement(&req),
478 "ExecuteTransaction" => self.execute_transaction(&req),
479 "UpdateTimeToLive" => self.update_time_to_live(&req),
480 "DescribeTimeToLive" => self.describe_time_to_live(&req),
481 "PutResourcePolicy" => self.put_resource_policy(&req),
482 "GetResourcePolicy" => self.get_resource_policy(&req),
483 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
484 "DescribeEndpoints" => self.describe_endpoints(&req),
486 "DescribeLimits" => self.describe_limits(&req),
487 "CreateBackup" => self.create_backup(&req),
489 "DeleteBackup" => self.delete_backup(&req),
490 "DescribeBackup" => self.describe_backup(&req),
491 "ListBackups" => self.list_backups(&req),
492 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
493 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
494 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
495 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
496 "CreateGlobalTable" => self.create_global_table(&req),
498 "DescribeGlobalTable" => self.describe_global_table(&req),
499 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
500 "ListGlobalTables" => self.list_global_tables(&req),
501 "UpdateGlobalTable" => self.update_global_table(&req),
502 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
503 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
504 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
505 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
507 "DisableKinesisStreamingDestination" => {
508 self.disable_kinesis_streaming_destination(&req)
509 }
510 "DescribeKinesisStreamingDestination" => {
511 self.describe_kinesis_streaming_destination(&req)
512 }
513 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
514 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
516 "UpdateContributorInsights" => self.update_contributor_insights(&req),
517 "ListContributorInsights" => self.list_contributor_insights(&req),
518 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
520 "DescribeExport" => self.describe_export(&req),
521 "ListExports" => self.list_exports(&req),
522 "ImportTable" => self.import_table(&req),
523 "DescribeImport" => self.describe_import(&req),
524 "ListImports" => self.list_imports(&req),
525 _ => Err(AwsServiceError::action_not_implemented(
526 "dynamodb",
527 &req.action,
528 )),
529 };
530 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
531 if let Err(err) = self.save_snapshot().await {
532 tracing::error!(%err, "dynamodb snapshot save failed");
533 }
534 }
535 result
536 }
537
538 fn supported_actions(&self) -> &[&str] {
539 &[
540 "CreateTable",
541 "DeleteTable",
542 "DescribeTable",
543 "ListTables",
544 "UpdateTable",
545 "PutItem",
546 "GetItem",
547 "DeleteItem",
548 "UpdateItem",
549 "Query",
550 "Scan",
551 "BatchGetItem",
552 "BatchWriteItem",
553 "TagResource",
554 "UntagResource",
555 "ListTagsOfResource",
556 "TransactGetItems",
557 "TransactWriteItems",
558 "ExecuteStatement",
559 "BatchExecuteStatement",
560 "ExecuteTransaction",
561 "UpdateTimeToLive",
562 "DescribeTimeToLive",
563 "PutResourcePolicy",
564 "GetResourcePolicy",
565 "DeleteResourcePolicy",
566 "DescribeEndpoints",
567 "DescribeLimits",
568 "CreateBackup",
569 "DeleteBackup",
570 "DescribeBackup",
571 "ListBackups",
572 "RestoreTableFromBackup",
573 "RestoreTableToPointInTime",
574 "UpdateContinuousBackups",
575 "DescribeContinuousBackups",
576 "CreateGlobalTable",
577 "DescribeGlobalTable",
578 "DescribeGlobalTableSettings",
579 "ListGlobalTables",
580 "UpdateGlobalTable",
581 "UpdateGlobalTableSettings",
582 "DescribeTableReplicaAutoScaling",
583 "UpdateTableReplicaAutoScaling",
584 "EnableKinesisStreamingDestination",
585 "DisableKinesisStreamingDestination",
586 "DescribeKinesisStreamingDestination",
587 "UpdateKinesisStreamingDestination",
588 "DescribeContributorInsights",
589 "UpdateContributorInsights",
590 "ListContributorInsights",
591 "ExportTableToPointInTime",
592 "DescribeExport",
593 "ListExports",
594 "ImportTable",
595 "DescribeImport",
596 "ListImports",
597 ]
598 }
599}
600
601pub(crate) mod helpers;
602pub(crate) use helpers::*;
603
604#[cfg(test)]
605mod tests;