1mod batch;
2#[cfg(test)]
3mod expression_corpus_tests;
4mod global_tables;
5mod items;
6mod queries;
7mod streams;
8mod tables;
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use base64::Engine;
15use http::StatusCode;
16use serde_json::{json, Value};
17
18use fakecloud_core::delivery::DeliveryBus;
19use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
20
21use fakecloud_persistence::{S3Store, SnapshotStore};
22use fakecloud_s3::SharedS3State;
23
24use crate::state::{
25 AttributeValue, DynamoDbSnapshot, DynamoTable, KinesisDestination, SharedDynamoDbState,
26 DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
27};
28
29pub(super) struct KinesisDeliveryTarget {
36 pub destinations: Vec<KinesisDestination>,
37 pub arn: String,
38 pub name: String,
39}
40
41pub(crate) enum TableKmsOp {
45 Read,
46 Write,
47}
48
49pub struct DynamoDbService {
50 state: SharedDynamoDbState,
51 pub(crate) s3_state: Option<SharedS3State>,
52 pub(crate) s3_store: Option<Arc<dyn S3Store>>,
53 delivery: Option<Arc<DeliveryBus>>,
54 snapshot_store: Option<Arc<dyn SnapshotStore>>,
55 pub(crate) kms_hook: Option<Arc<dyn fakecloud_core::delivery::KmsHook>>,
56 pub(crate) region: String,
57 snapshot_lock: Arc<tokio::sync::Mutex<()>>,
62}
63
64impl DynamoDbService {
65 pub fn new(state: SharedDynamoDbState) -> Self {
66 Self {
67 state,
68 s3_state: None,
69 s3_store: None,
70 delivery: None,
71 snapshot_store: None,
72 kms_hook: None,
73 region: "us-east-1".to_string(),
74 snapshot_lock: Arc::new(tokio::sync::Mutex::new(())),
75 }
76 }
77
78 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
79 self.s3_state = Some(s3_state);
80 self
81 }
82
83 pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
84 self.s3_store = Some(store);
85 self
86 }
87
88 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
89 self.delivery = Some(delivery);
90 self
91 }
92
93 pub fn with_snapshot_store(mut self, store: Arc<dyn SnapshotStore>) -> Self {
94 self.snapshot_store = Some(store);
95 self
96 }
97
98 pub fn with_kms_hook(mut self, hook: Arc<dyn fakecloud_core::delivery::KmsHook>) -> Self {
99 self.kms_hook = Some(hook);
100 self
101 }
102
103 pub fn with_region(mut self, region: impl Into<String>) -> Self {
104 self.region = region.into();
105 self
106 }
107
108 pub(crate) fn record_table_kms_usage(
116 &self,
117 account_id: &str,
118 table_arn: &str,
119 kms_key_arn: Option<&str>,
120 operation: TableKmsOp,
121 ) {
122 let Some(hook) = &self.kms_hook else { return };
123 let key = kms_key_arn
124 .filter(|k| !k.is_empty())
125 .unwrap_or("aws/dynamodb");
126 let table_name = table_arn.rsplit('/').next().unwrap_or(table_arn);
131 let mut ctx = std::collections::HashMap::new();
132 ctx.insert("aws:dynamodb:tableName".to_string(), table_name.to_string());
133 ctx.insert(
134 "aws:dynamodb:subscriberId".to_string(),
135 account_id.to_string(),
136 );
137 let envelope = match hook.encrypt(
138 account_id,
139 &self.region,
140 key,
141 b"ddb-item",
142 "dynamodb.amazonaws.com",
143 ctx.clone(),
144 ) {
145 Ok(env) => env,
146 Err(_) => return,
147 };
148 if matches!(operation, TableKmsOp::Read) {
149 let _ = hook.decrypt(account_id, &envelope, "dynamodb.amazonaws.com", ctx);
150 }
151 }
152
153 async fn save_snapshot(&self) {
162 let Some(store) = self.snapshot_store.clone() else {
163 return;
164 };
165 let _guard = self.snapshot_lock.lock().await;
166 let snapshot = DynamoDbSnapshot {
167 schema_version: DYNAMODB_SNAPSHOT_SCHEMA_VERSION,
168 accounts: Some(self.state.read().clone()),
169 state: None,
170 };
171 let join = tokio::task::spawn_blocking(move || -> std::io::Result<()> {
172 let bytes = serde_json::to_vec(&snapshot)
173 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
174 store.save(&bytes)
175 })
176 .await;
177 match join {
178 Ok(Ok(())) => {}
179 Ok(Err(err)) => tracing::error!(%err, "failed to write dynamodb snapshot"),
180 Err(err) => tracing::error!(%err, "dynamodb snapshot task panicked"),
181 }
182 }
183
184 fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
185 if table
186 .kinesis_destinations
187 .iter()
188 .any(|d| d.destination_status == "ACTIVE")
189 {
190 Some(KinesisDeliveryTarget {
191 destinations: table.kinesis_destinations.clone(),
192 arn: table.arn.clone(),
193 name: table.name.clone(),
194 })
195 } else {
196 None
197 }
198 }
199
200 pub(super) fn deliver_to_kinesis_destinations(
202 &self,
203 target: &KinesisDeliveryTarget,
204 event_name: &str,
205 keys: &HashMap<String, AttributeValue>,
206 old_image: Option<&HashMap<String, AttributeValue>>,
207 new_image: Option<&HashMap<String, AttributeValue>>,
208 ) {
209 let delivery = match &self.delivery {
210 Some(d) => d,
211 None => return,
212 };
213
214 let active_destinations: Vec<_> = target
215 .destinations
216 .iter()
217 .filter(|d| d.destination_status == "ACTIVE")
218 .collect();
219
220 if active_destinations.is_empty() {
221 return;
222 }
223
224 let mut record = json!({
225 "eventID": uuid::Uuid::new_v4().to_string(),
226 "eventName": event_name,
227 "eventVersion": "1.1",
228 "eventSource": "aws:dynamodb",
229 "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
230 "dynamodb": {
231 "Keys": keys,
232 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
233 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
234 "StreamViewType": "NEW_AND_OLD_IMAGES",
235 },
236 "eventSourceARN": &target.arn,
237 "tableName": &target.name,
238 });
239
240 if let Some(old) = old_image {
241 record["dynamodb"]["OldImage"] = json!(old);
242 }
243 if let Some(new) = new_image {
244 record["dynamodb"]["NewImage"] = json!(new);
245 }
246
247 let record_str = serde_json::to_string(&record).unwrap_or_default();
248 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
249 let partition_key = serde_json::to_string(keys).unwrap_or_default();
250
251 for dest in active_destinations {
252 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
253 }
254 }
255
256 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
257 serde_json::from_slice(&req.body).map_err(|e| {
258 AwsServiceError::aws_error(
259 StatusCode::BAD_REQUEST,
260 "SerializationException",
261 format!("Invalid JSON: {e}"),
262 )
263 })
264 }
265
266 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
267 Ok(AwsResponse::ok_json(body))
268 }
269}
270
271#[async_trait]
272impl AwsService for DynamoDbService {
273 fn service_name(&self) -> &str {
274 "dynamodb"
275 }
276
277 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
278 let mutates = if is_mutating_action(req.action.as_str()) {
282 true
283 } else if matches!(
284 req.action.as_str(),
285 "ExecuteStatement" | "BatchExecuteStatement" | "ExecuteTransaction"
286 ) {
287 is_mutating_request(req.action.as_str(), &req.json_body())
288 } else {
289 false
290 };
291 let result = match req.action.as_str() {
292 "CreateTable" => self.create_table(&req),
293 "DeleteTable" => self.delete_table(&req),
294 "DescribeTable" => self.describe_table(&req),
295 "ListTables" => self.list_tables(&req),
296 "UpdateTable" => self.update_table(&req),
297 "PutItem" => self.put_item(&req),
298 "GetItem" => self.get_item(&req),
299 "DeleteItem" => self.delete_item(&req),
300 "UpdateItem" => self.update_item(&req),
301 "Query" => self.query(&req),
302 "Scan" => self.scan(&req),
303 "BatchGetItem" => self.batch_get_item(&req),
304 "BatchWriteItem" => self.batch_write_item(&req),
305 "TagResource" => self.tag_resource(&req),
306 "UntagResource" => self.untag_resource(&req),
307 "ListTagsOfResource" => self.list_tags_of_resource(&req),
308 "TransactGetItems" => self.transact_get_items(&req),
309 "TransactWriteItems" => self.transact_write_items(&req),
310 "ExecuteStatement" => self.execute_statement(&req),
311 "BatchExecuteStatement" => self.batch_execute_statement(&req),
312 "ExecuteTransaction" => self.execute_transaction(&req),
313 "UpdateTimeToLive" => self.update_time_to_live(&req),
314 "DescribeTimeToLive" => self.describe_time_to_live(&req),
315 "PutResourcePolicy" => self.put_resource_policy(&req),
316 "GetResourcePolicy" => self.get_resource_policy(&req),
317 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
318 "DescribeEndpoints" => self.describe_endpoints(&req),
320 "DescribeLimits" => self.describe_limits(&req),
321 "CreateBackup" => self.create_backup(&req),
323 "DeleteBackup" => self.delete_backup(&req),
324 "DescribeBackup" => self.describe_backup(&req),
325 "ListBackups" => self.list_backups(&req),
326 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
327 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
328 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
329 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
330 "CreateGlobalTable" => self.create_global_table(&req),
332 "DescribeGlobalTable" => self.describe_global_table(&req),
333 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
334 "ListGlobalTables" => self.list_global_tables(&req),
335 "UpdateGlobalTable" => self.update_global_table(&req),
336 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
337 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
338 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
339 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
341 "DisableKinesisStreamingDestination" => {
342 self.disable_kinesis_streaming_destination(&req)
343 }
344 "DescribeKinesisStreamingDestination" => {
345 self.describe_kinesis_streaming_destination(&req)
346 }
347 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
348 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
350 "UpdateContributorInsights" => self.update_contributor_insights(&req),
351 "ListContributorInsights" => self.list_contributor_insights(&req),
352 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
354 "DescribeExport" => self.describe_export(&req),
355 "ListExports" => self.list_exports(&req),
356 "ImportTable" => self.import_table(&req),
357 "DescribeImport" => self.describe_import(&req),
358 "ListImports" => self.list_imports(&req),
359 _ => Err(AwsServiceError::action_not_implemented(
360 "dynamodb",
361 &req.action,
362 )),
363 };
364 if mutates && matches!(result.as_ref(), Ok(resp) if resp.status.is_success()) {
365 self.save_snapshot().await;
366 }
367 result
368 }
369
370 fn supported_actions(&self) -> &[&str] {
371 &[
372 "CreateTable",
373 "DeleteTable",
374 "DescribeTable",
375 "ListTables",
376 "UpdateTable",
377 "PutItem",
378 "GetItem",
379 "DeleteItem",
380 "UpdateItem",
381 "Query",
382 "Scan",
383 "BatchGetItem",
384 "BatchWriteItem",
385 "TagResource",
386 "UntagResource",
387 "ListTagsOfResource",
388 "TransactGetItems",
389 "TransactWriteItems",
390 "ExecuteStatement",
391 "BatchExecuteStatement",
392 "ExecuteTransaction",
393 "UpdateTimeToLive",
394 "DescribeTimeToLive",
395 "PutResourcePolicy",
396 "GetResourcePolicy",
397 "DeleteResourcePolicy",
398 "DescribeEndpoints",
399 "DescribeLimits",
400 "CreateBackup",
401 "DeleteBackup",
402 "DescribeBackup",
403 "ListBackups",
404 "RestoreTableFromBackup",
405 "RestoreTableToPointInTime",
406 "UpdateContinuousBackups",
407 "DescribeContinuousBackups",
408 "CreateGlobalTable",
409 "DescribeGlobalTable",
410 "DescribeGlobalTableSettings",
411 "ListGlobalTables",
412 "UpdateGlobalTable",
413 "UpdateGlobalTableSettings",
414 "DescribeTableReplicaAutoScaling",
415 "UpdateTableReplicaAutoScaling",
416 "EnableKinesisStreamingDestination",
417 "DisableKinesisStreamingDestination",
418 "DescribeKinesisStreamingDestination",
419 "UpdateKinesisStreamingDestination",
420 "DescribeContributorInsights",
421 "UpdateContributorInsights",
422 "ListContributorInsights",
423 "ExportTableToPointInTime",
424 "DescribeExport",
425 "ListExports",
426 "ImportTable",
427 "DescribeImport",
428 "ListImports",
429 ]
430 }
431}
432
433mod helpers;
434pub(crate) use helpers::*;
435
436#[cfg(test)]
437mod tests;