1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25 AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26 DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29pub(super) struct KinesisDeliveryTarget {
36 pub destinations: Vec<KinesisDestination>,
37 pub arn: String,
38 pub name: String,
39}
40
41pub(crate) enum TableKmsOp {
45 Read,
46 Write,
47}
48
49pub struct DynamoDbService {
50 state: SharedDynamoDbState,
51 pub(crate) s3_state: Option<SharedS3State>,
52 pub(crate) s3_store: Option<Arc<dyn S3Store>>,
53 delivery: Option<Arc<DeliveryBus>>,
54 snapshot_store: Option<Arc<dyn SnapshotStore>>,
55 pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
56 pub(crate) region: String,
57 snapshot_lock: Arc<tokio::sync::Mutex<()>>,
62}
63
64impl DynamoDbService {
65 pub fn new(state: SharedDynamoDbState) -> Self {
66 Self {
67 state,
68 s3_state: None,
69 s3_store: None,
70 delivery: None,
71 snapshot_store: None,
72 kms_hook: None,
73 region: "us-east-1".to_string(),
74 snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
75 }
76 }
77
78 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
79 self.s3_state = Some(s3_state);
80 self
81 }
82
83 pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
84 self.s3_store = Some(store);
85 self
86 }
87
88 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
89 self.delivery = Some(delivery);
90 self
91 }
92
93 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94 self.snapshot_store = Some(store);
95 self
96 }
97
98 pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
99 self.kms_hook = Some(hook);
100 self
101 }
102
103 pub fn with_region(mut self, region: impl Into<String>) -> Self {
104 self.region = region.into();
105 self
106 }
107
108 pub(crate) fn record_table_kms_usage(
116 &self,
117 account_id: &str,
118 table_arn: &str,
119 kms_key_arn: Option<&str>,
120 operation: TableKmsOp,
121 ) {
122 let Some(hook) = &self.kms_hook else { return };
123 let key = kms_key_arn
124 .filter(|k| !k.is_empty())
125 .unwrap_or("aws/dynamodb");
126 let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
131 let mut ctx = std::collections::HashMap::new();
132 ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
133 ctx.insert(
134 "aws:dynamodb:subscriberId".to_string(),
135 account_id.to_string(),
136 );
137 let envelope = match hook.encrypt(
138 account_id,
139 &self.region,
140 key,
141 b"ddb-item",
142 "dynamodb.amazonaws.com",
143 ctx.clone(),
144 ) {
145 Ok(env) => env,
146 Err(_) => return,
147 };
148 if matches!(operation, TableKmsOp::Read) {
149 let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
150 }
151 }
152
153 async fn save_snapshot(&self) {
162 save_dynamodb_snapshot(
163 &self.state,
164 self.snapshot_store.clone(),
165 &self.snapshot_lock,
166 )
167 .await;
168 }
169
170 pub fn snapshot_hook(&self) -> Option<fakecloud_persistence::SnapshotHook> {
176 let store = self.snapshot_store.clone()?;
177 let state = self.state.clone();
178 let lock = self.snapshot_lock.clone();
179 Some(Arc::new(move || {
180 let state = state.clone();
181 let store = store.clone();
182 let lock = lock.clone();
183 Box::pin(async move {
184 save_dynamodb_snapshot(&state, Some(store), &lock).await;
185 })
186 }))
187 }
188
189 fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
190 if table
191 .kinesis_destinations
192 .iter()
193 .any(|d| d.destination_status == "ACTIVE")
194 {
195 Some(KinesisDeliveryTarget {
196 destinations: table.kinesis_destinations.clone(),
197 arn: table.arn.clone(),
198 name: table.name.clone(),
199 })
200 } else {
201 None
202 }
203 }
204
205 pub(super) fn deliver_to_kinesis_destinations(
207 &self,
208 target: &KinesisDeliveryTarget,
209 event_name: &str,
210 keys: &HashMap<String, AttributeValue>,
211 old_image: Option<&HashMap<String, AttributeValue>>,
212 new_image: Option<&HashMap<String, AttributeValue>>,
213 ) {
214 let delivery = match &self.delivery {
215 Some(d) => d,
216 None => return,
217 };
218
219 let active_destinations: Vec<_> = target
220 .destinations
221 .iter()
222 .filter(|d| d.destination_status == "ACTIVE")
223 .collect();
224
225 if active_destinations.is_empty() {
226 return;
227 }
228
229 let mut record = json!({
230 "eventID": uuid::Uuid::new_v4().to_string(),
231 "eventName": event_name,
232 "eventVersion": "1.1",
233 "eventSource": "aws:dynamodb",
234 "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
235 "dynamodb": {
236 "Keys": keys,
237 "SequenceNumber": crate::streams::next_stream_sequence(),
242 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
243 "StreamViewType": "NEW_AND_OLD_IMAGES",
244 },
245 "eventSourceARN": &target.arn,
246 "tableName": &target.name,
247 });
248
249 if let Some(old) = old_image {
250 record["dynamodb"]["OldImage"] = json!(old);
251 }
252 if let Some(new) = new_image {
253 record["dynamodb"]["NewImage"] = json!(new);
254 }
255
256 let record_str = serde_json::to_string(&record).unwrap_or_default();
257 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
258 let partition_key = serde_json::to_string(keys).unwrap_or_default();
259
260 for dest in active_destinations {
261 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
262 }
263 }
264
265 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
266 serde_json::from_slice(&req.body).map_err(|e| {
267 AwsServiceError::aws_error(
268 StatusCode::BAD_REQUEST,
269 "SerializationException",
270 format!("Invalid JSON: {e}"),
271 )
272 })
273 }
274
275 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
276 Ok(AwsResponse::ok_json(body))
277 }
278}
279
280pub async fn save_dynamodb_snapshot(
286 state: &SharedDynamoDbState,
287 store: Option<Arc<dyn SnapshotStore>>,
288 lock: &tokio::sync::Mutex<()>,
289) {
290 let Some(store) = store else {
291 return;
292 };
293 let _guard = lock.lock().await;
294 let snapshot = DynamoDbSnapshot {
295 schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
296 accounts: Some(state.read().clone()),
297 state: None,
298 };
299 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
300 let bytes = serde_json::to_vec(&snapshot)
301 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
302 store.save(&bytes)
303 })
304 .await;
305 match join {
306 Ok(Ok(())) => {}
307 Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
308 Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
309 }
310}
311
312#[async_trait]
313impl AwsService for DynamoDbService {
314 fn service_name(&self) -> &str {
315 "dynamodb"
316 }
317
318 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
319 let mutates = if is_mutating_action(req.action.as_str()) {
323 true
324 } else if matches!(
325 req.action.as_str(),
326 "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
327 ) {
328 is_mutating_request(req.action.as_str(), &req.json_body())
329 } else {
330 false
331 };
332 let result = match req.action.as_str() {
333 "CreateTable" => self.create_table(&req),
334 "DeleteTable" => self.delete_table(&req),
335 "DescribeTable" => self.describe_table(&req),
336 "ListTables" => self.list_tables(&req),
337 "UpdateTable" => self.update_table(&req),
338 "PutItem" => self.put_item(&req),
339 "GetItem" => self.get_item(&req),
340 "DeleteItem" => self.delete_item(&req),
341 "UpdateItem" => self.update_item(&req),
342 "Query" => self.query(&req),
343 "Scan" => self.scan(&req),
344 "BatchGetItem" => self.batch_get_item(&req),
345 "BatchWriteItem" => self.batch_write_item(&req),
346 "TagResource" => self.tag_resource(&req),
347 "UntagResource" => self.untag_resource(&req),
348 "ListTagsOfResource" => self.list_tags_of_resource(&req),
349 "TransactGetItems" => self.transact_get_items(&req),
350 "TransactWriteItems" => self.transact_write_items(&req),
351 "ExecuteStatement" => self.execute_statement(&req),
352 "BatchExecuteStatement" => self.batch_execute_statement(&req),
353 "ExecuteTransaction" => self.execute_transaction(&req),
354 "UpdateTimeToLive" => self.update_time_to_live(&req),
355 "DescribeTimeToLive" => self.describe_time_to_live(&req),
356 "PutResourcePolicy" => self.put_resource_policy(&req),
357 "GetResourcePolicy" => self.get_resource_policy(&req),
358 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
359 "DescribeEndpoints" => self.describe_endpoints(&req),
361 "DescribeLimits" => self.describe_limits(&req),
362 "CreateBackup" => self.create_backup(&req),
364 "DeleteBackup" => self.delete_backup(&req),
365 "DescribeBackup" => self.describe_backup(&req),
366 "ListBackups" => self.list_backups(&req),
367 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
368 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
369 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
370 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
371 "CreateGlobalTable" => self.create_global_table(&req),
373 "DescribeGlobalTable" => self.describe_global_table(&req),
374 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
375 "ListGlobalTables" => self.list_global_tables(&req),
376 "UpdateGlobalTable" => self.update_global_table(&req),
377 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
378 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
379 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
380 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
382 "DisableKinesisStreamingDestination" => {
383 self.disable_kinesis_streaming_destination(&req)
384 }
385 "DescribeKinesisStreamingDestination" => {
386 self.describe_kinesis_streaming_destination(&req)
387 }
388 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
389 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
391 "UpdateContributorInsights" => self.update_contributor_insights(&req),
392 "ListContributorInsights" => self.list_contributor_insights(&req),
393 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
395 "DescribeExport" => self.describe_export(&req),
396 "ListExports" => self.list_exports(&req),
397 "ImportTable" => self.import_table(&req),
398 "DescribeImport" => self.describe_import(&req),
399 "ListImports" => self.list_imports(&req),
400 _ => Err(AwsServiceError::action_not_implemented(
401 "dynamodb",
402 &req.action,
403 )),
404 };
405 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
406 self.save_snapshot().await;
407 }
408 result
409 }
410
411 fn supported_actions(&self) -> &[&str] {
412 &[
413 "CreateTable",
414 "DeleteTable",
415 "DescribeTable",
416 "ListTables",
417 "UpdateTable",
418 "PutItem",
419 "GetItem",
420 "DeleteItem",
421 "UpdateItem",
422 "Query",
423 "Scan",
424 "BatchGetItem",
425 "BatchWriteItem",
426 "TagResource",
427 "UntagResource",
428 "ListTagsOfResource",
429 "TransactGetItems",
430 "TransactWriteItems",
431 "ExecuteStatement",
432 "BatchExecuteStatement",
433 "ExecuteTransaction",
434 "UpdateTimeToLive",
435 "DescribeTimeToLive",
436 "PutResourcePolicy",
437 "GetResourcePolicy",
438 "DeleteResourcePolicy",
439 "DescribeEndpoints",
440 "DescribeLimits",
441 "CreateBackup",
442 "DeleteBackup",
443 "DescribeBackup",
444 "ListBackups",
445 "RestoreTableFromBackup",
446 "RestoreTableToPointInTime",
447 "UpdateContinuousBackups",
448 "DescribeContinuousBackups",
449 "CreateGlobalTable",
450 "DescribeGlobalTable",
451 "DescribeGlobalTableSettings",
452 "ListGlobalTables",
453 "UpdateGlobalTable",
454 "UpdateGlobalTableSettings",
455 "DescribeTableReplicaAutoScaling",
456 "UpdateTableReplicaAutoScaling",
457 "EnableKinesisStreamingDestination",
458 "DisableKinesisStreamingDestination",
459 "DescribeKinesisStreamingDestination",
460 "UpdateKinesisStreamingDestination",
461 "DescribeContributorInsights",
462 "UpdateContributorInsights",
463 "ListContributorInsights",
464 "ExportTableToPointInTime",
465 "DescribeExport",
466 "ListExports",
467 "ImportTable",
468 "DescribeImport",
469 "ListImports",
470 ]
471 }
472}
473
474mod helpers;
475pub(crate) use helpers::*;
476
477#[cfg(test)]
478mod tests;