1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use fakecloud_s3::state::SharedS3State;
12
13use crate::state::{
14 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
15 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
16 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
17 ReplicaDescription, SharedDynamoDbState,
18};
19
20pub struct DynamoDbService {
21 state: SharedDynamoDbState,
22 s3_state: Option<SharedS3State>,
23}
24
25impl DynamoDbService {
26 pub fn new(state: SharedDynamoDbState) -> Self {
27 Self {
28 state,
29 s3_state: None,
30 }
31 }
32
33 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
34 self.s3_state = Some(s3_state);
35 self
36 }
37
38 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
39 serde_json::from_slice(&req.body).map_err(|e| {
40 AwsServiceError::aws_error(
41 StatusCode::BAD_REQUEST,
42 "SerializationException",
43 format!("Invalid JSON: {e}"),
44 )
45 })
46 }
47
48 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
49 Ok(AwsResponse::ok_json(body))
50 }
51
52 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
55 let body = Self::parse_body(req)?;
56
57 let table_name = body["TableName"]
58 .as_str()
59 .ok_or_else(|| {
60 AwsServiceError::aws_error(
61 StatusCode::BAD_REQUEST,
62 "ValidationException",
63 "TableName is required",
64 )
65 })?
66 .to_string();
67
68 let key_schema = parse_key_schema(&body["KeySchema"])?;
69 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
70
71 for ks in &key_schema {
73 if !attribute_definitions
74 .iter()
75 .any(|ad| ad.attribute_name == ks.attribute_name)
76 {
77 return Err(AwsServiceError::aws_error(
78 StatusCode::BAD_REQUEST,
79 "ValidationException",
80 format!(
81 "One or more parameter values were invalid: \
82 Some index key attributes are not defined in AttributeDefinitions. \
83 Keys: [{}], AttributeDefinitions: [{}]",
84 ks.attribute_name,
85 attribute_definitions
86 .iter()
87 .map(|ad| ad.attribute_name.as_str())
88 .collect::<Vec<_>>()
89 .join(", ")
90 ),
91 ));
92 }
93 }
94
95 let billing_mode = body["BillingMode"]
96 .as_str()
97 .unwrap_or("PROVISIONED")
98 .to_string();
99
100 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
101 ProvisionedThroughput {
102 read_capacity_units: 0,
103 write_capacity_units: 0,
104 }
105 } else {
106 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
107 };
108
109 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
110 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
111 let tags = parse_tags(&body["Tags"]);
112
113 let mut state = self.state.write();
114
115 if state.tables.contains_key(&table_name) {
116 return Err(AwsServiceError::aws_error(
117 StatusCode::BAD_REQUEST,
118 "ResourceInUseException",
119 format!("Table already exists: {table_name}"),
120 ));
121 }
122
123 let arn = format!(
124 "arn:aws:dynamodb:{}:{}:table/{}",
125 state.region, state.account_id, table_name
126 );
127 let now = Utc::now();
128
129 let table = DynamoTable {
130 name: table_name.clone(),
131 arn: arn.clone(),
132 key_schema: key_schema.clone(),
133 attribute_definitions: attribute_definitions.clone(),
134 provisioned_throughput: provisioned_throughput.clone(),
135 items: Vec::new(),
136 gsi: gsi.clone(),
137 lsi: lsi.clone(),
138 tags,
139 created_at: now,
140 status: "ACTIVE".to_string(),
141 item_count: 0,
142 size_bytes: 0,
143 billing_mode: billing_mode.clone(),
144 ttl_attribute: None,
145 ttl_enabled: false,
146 resource_policy: None,
147 pitr_enabled: false,
148 kinesis_destinations: Vec::new(),
149 contributor_insights_status: "DISABLED".to_string(),
150 contributor_insights_counters: HashMap::new(),
151 };
152
153 state.tables.insert(table_name, table);
154
155 let table_desc = build_table_description_json(
156 &arn,
157 &key_schema,
158 &attribute_definitions,
159 &provisioned_throughput,
160 &gsi,
161 &lsi,
162 &billing_mode,
163 now,
164 0,
165 0,
166 "ACTIVE",
167 );
168
169 Self::ok_json(json!({ "TableDescription": table_desc }))
170 }
171
172 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
173 let body = Self::parse_body(req)?;
174 let table_name = require_str(&body, "TableName")?;
175
176 let mut state = self.state.write();
177 let table = state.tables.remove(table_name).ok_or_else(|| {
178 AwsServiceError::aws_error(
179 StatusCode::BAD_REQUEST,
180 "ResourceNotFoundException",
181 format!("Requested resource not found: Table: {table_name} not found"),
182 )
183 })?;
184
185 let table_desc = build_table_description_json(
186 &table.arn,
187 &table.key_schema,
188 &table.attribute_definitions,
189 &table.provisioned_throughput,
190 &table.gsi,
191 &table.lsi,
192 &table.billing_mode,
193 table.created_at,
194 table.item_count,
195 table.size_bytes,
196 "DELETING",
197 );
198
199 Self::ok_json(json!({ "TableDescription": table_desc }))
200 }
201
202 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
203 let body = Self::parse_body(req)?;
204 let table_name = require_str(&body, "TableName")?;
205
206 let state = self.state.read();
207 let table = get_table(&state.tables, table_name)?;
208
209 let table_desc = build_table_description_json(
210 &table.arn,
211 &table.key_schema,
212 &table.attribute_definitions,
213 &table.provisioned_throughput,
214 &table.gsi,
215 &table.lsi,
216 &table.billing_mode,
217 table.created_at,
218 table.item_count,
219 table.size_bytes,
220 &table.status,
221 );
222
223 Self::ok_json(json!({ "Table": table_desc }))
224 }
225
226 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
227 let body = Self::parse_body(req)?;
228
229 validate_optional_string_length(
230 "exclusiveStartTableName",
231 body["ExclusiveStartTableName"].as_str(),
232 3,
233 255,
234 )?;
235 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
236
237 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
238 let exclusive_start = body["ExclusiveStartTableName"]
239 .as_str()
240 .map(|s| s.to_string());
241
242 let state = self.state.read();
243 let mut names: Vec<&String> = state.tables.keys().collect();
244 names.sort();
245
246 let start_idx = match &exclusive_start {
247 Some(start) => names
248 .iter()
249 .position(|n| n.as_str() > start.as_str())
250 .unwrap_or(names.len()),
251 None => 0,
252 };
253
254 let page: Vec<&str> = names
255 .iter()
256 .skip(start_idx)
257 .take(limit)
258 .map(|n| n.as_str())
259 .collect();
260
261 let mut result = json!({ "TableNames": page });
262
263 if start_idx + limit < names.len() {
264 if let Some(last) = page.last() {
265 result["LastEvaluatedTableName"] = json!(last);
266 }
267 }
268
269 Self::ok_json(result)
270 }
271
272 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
273 let body = Self::parse_body(req)?;
274 let table_name = require_str(&body, "TableName")?;
275
276 let mut state = self.state.write();
277 let table = state.tables.get_mut(table_name).ok_or_else(|| {
278 AwsServiceError::aws_error(
279 StatusCode::BAD_REQUEST,
280 "ResourceNotFoundException",
281 format!("Requested resource not found: Table: {table_name} not found"),
282 )
283 })?;
284
285 if let Some(pt) = body.get("ProvisionedThroughput") {
286 if let Ok(throughput) = parse_provisioned_throughput(pt) {
287 table.provisioned_throughput = throughput;
288 }
289 }
290
291 if let Some(bm) = body["BillingMode"].as_str() {
292 table.billing_mode = bm.to_string();
293 }
294
295 let table_desc = build_table_description_json(
296 &table.arn,
297 &table.key_schema,
298 &table.attribute_definitions,
299 &table.provisioned_throughput,
300 &table.gsi,
301 &table.lsi,
302 &table.billing_mode,
303 table.created_at,
304 table.item_count,
305 table.size_bytes,
306 &table.status,
307 );
308
309 Self::ok_json(json!({ "TableDescription": table_desc }))
310 }
311
312 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
315 let body = Self::parse_body(req)?;
317 let table_name = require_str(&body, "TableName")?;
318 let item = require_object(&body, "Item")?;
319 let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
320 let expr_attr_names = parse_expression_attribute_names(&body);
321 let expr_attr_values = parse_expression_attribute_values(&body);
322 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
323
324 let old_item = {
326 let mut state = self.state.write();
327 let table = get_table_mut(&mut state.tables, table_name)?;
328
329 validate_key_in_item(table, &item)?;
330
331 let key = extract_key(table, &item);
332 let existing_idx = table.find_item_index(&key);
333
334 if let Some(ref cond) = condition {
335 let existing = existing_idx.map(|i| &table.items[i]);
336 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
337 }
338
339 let old_item = if return_values == "ALL_OLD" {
340 existing_idx.map(|i| table.items[i].clone())
341 } else {
342 None
343 };
344
345 if let Some(idx) = existing_idx {
346 table.items[idx] = item.clone();
347 } else {
348 table.items.push(item.clone());
349 }
350
351 table.record_item_access(&item);
352 table.recalculate_stats();
353
354 old_item
355 };
356 let mut result = json!({});
359 if let Some(old) = old_item {
360 result["Attributes"] = json!(old);
361 }
362
363 Self::ok_json(result)
364 }
365
366 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
367 let body = Self::parse_body(req)?;
369 let table_name = require_str(&body, "TableName")?;
370 let key = require_object(&body, "Key")?;
371
372 let (result, needs_insights) = {
374 let state = self.state.read();
375 let table = get_table(&state.tables, table_name)?;
376 let needs_insights = table.contributor_insights_status == "ENABLED";
377
378 let result = match table.find_item_index(&key) {
379 Some(idx) => {
380 let item = &table.items[idx];
381 let projected = project_item(item, &body);
382 json!({ "Item": projected })
383 }
384 None => json!({}),
385 };
386 (result, needs_insights)
387 };
388 if needs_insights {
392 let mut state = self.state.write();
393 if let Some(table) = state.tables.get_mut(table_name) {
394 table.record_key_access(&key);
395 }
396 }
397
398 Self::ok_json(result)
399 }
400
401 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
402 let body = Self::parse_body(req)?;
403
404 validate_optional_enum_value(
405 "conditionalOperator",
406 &body["ConditionalOperator"],
407 &["AND", "OR"],
408 )?;
409 validate_optional_enum_value(
410 "returnConsumedCapacity",
411 &body["ReturnConsumedCapacity"],
412 &["INDEXES", "TOTAL", "NONE"],
413 )?;
414 validate_optional_enum_value(
415 "returnValues",
416 &body["ReturnValues"],
417 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
418 )?;
419 validate_optional_enum_value(
420 "returnItemCollectionMetrics",
421 &body["ReturnItemCollectionMetrics"],
422 &["SIZE", "NONE"],
423 )?;
424 validate_optional_enum_value(
425 "returnValuesOnConditionCheckFailure",
426 &body["ReturnValuesOnConditionCheckFailure"],
427 &["ALL_OLD", "NONE"],
428 )?;
429
430 let table_name = require_str(&body, "TableName")?;
431 let key = require_object(&body, "Key")?;
432
433 let mut state = self.state.write();
434 let table = get_table_mut(&mut state.tables, table_name)?;
435
436 let condition = body["ConditionExpression"].as_str();
437 let expr_attr_names = parse_expression_attribute_names(&body);
438 let expr_attr_values = parse_expression_attribute_values(&body);
439
440 let existing_idx = table.find_item_index(&key);
441
442 if let Some(cond) = condition {
443 let existing = existing_idx.map(|i| &table.items[i]);
444 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
445 }
446
447 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
448
449 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
450 let return_icm = body["ReturnItemCollectionMetrics"]
451 .as_str()
452 .unwrap_or("NONE");
453
454 let mut result = json!({});
455
456 if let Some(idx) = existing_idx {
457 if return_values == "ALL_OLD" {
458 result["Attributes"] = json!(table.items[idx]);
459 }
460 table.items.remove(idx);
461 table.recalculate_stats();
462 }
463
464 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
465 result["ConsumedCapacity"] = json!({
466 "TableName": table_name,
467 "CapacityUnits": 1.0,
468 });
469 }
470
471 if return_icm == "SIZE" {
472 result["ItemCollectionMetrics"] = json!({});
473 }
474
475 Self::ok_json(result)
476 }
477
478 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
479 let body = Self::parse_body(req)?;
480 let table_name = require_str(&body, "TableName")?;
481 let key = require_object(&body, "Key")?;
482
483 let mut state = self.state.write();
484 let table = get_table_mut(&mut state.tables, table_name)?;
485
486 validate_key_attributes_in_key(table, &key)?;
487
488 let condition = body["ConditionExpression"].as_str();
489 let expr_attr_names = parse_expression_attribute_names(&body);
490 let expr_attr_values = parse_expression_attribute_values(&body);
491 let update_expression = body["UpdateExpression"].as_str();
492
493 let existing_idx = table.find_item_index(&key);
494
495 if let Some(cond) = condition {
496 let existing = existing_idx.map(|i| &table.items[i]);
497 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
498 }
499
500 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
501
502 let idx = match existing_idx {
503 Some(i) => i,
504 None => {
505 let mut new_item = HashMap::new();
506 for (k, v) in &key {
507 new_item.insert(k.clone(), v.clone());
508 }
509 table.items.push(new_item);
510 table.items.len() - 1
511 }
512 };
513
514 let old_item = if return_values == "ALL_OLD" {
515 Some(table.items[idx].clone())
516 } else {
517 None
518 };
519
520 if let Some(expr) = update_expression {
521 apply_update_expression(
522 &mut table.items[idx],
523 expr,
524 &expr_attr_names,
525 &expr_attr_values,
526 )?;
527 }
528
529 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
530 Some(table.items[idx].clone())
531 } else {
532 None
533 };
534
535 table.recalculate_stats();
536
537 let mut result = json!({});
538 if let Some(old) = old_item {
539 result["Attributes"] = json!(old);
540 } else if let Some(new) = new_item {
541 result["Attributes"] = json!(new);
542 }
543
544 Self::ok_json(result)
545 }
546
547 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
550 let body = Self::parse_body(req)?;
551 let table_name = require_str(&body, "TableName")?;
552
553 let state = self.state.read();
554 let table = get_table(&state.tables, table_name)?;
555
556 let expr_attr_names = parse_expression_attribute_names(&body);
557 let expr_attr_values = parse_expression_attribute_values(&body);
558
559 let key_condition = body["KeyConditionExpression"].as_str();
560 let filter_expression = body["FilterExpression"].as_str();
561 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
562 let limit = body["Limit"].as_i64().map(|l| l as usize);
563 let index_name = body["IndexName"].as_str();
564 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
565 parse_key_map(&body["ExclusiveStartKey"]);
566
567 let (items_to_scan, hash_key_name, range_key_name): (
568 &[HashMap<String, AttributeValue>],
569 String,
570 Option<String>,
571 ) = if let Some(idx_name) = index_name {
572 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
573 let hk = gsi
574 .key_schema
575 .iter()
576 .find(|k| k.key_type == "HASH")
577 .map(|k| k.attribute_name.clone())
578 .unwrap_or_default();
579 let rk = gsi
580 .key_schema
581 .iter()
582 .find(|k| k.key_type == "RANGE")
583 .map(|k| k.attribute_name.clone());
584 (&table.items, hk, rk)
585 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
586 let hk = lsi
587 .key_schema
588 .iter()
589 .find(|k| k.key_type == "HASH")
590 .map(|k| k.attribute_name.clone())
591 .unwrap_or_default();
592 let rk = lsi
593 .key_schema
594 .iter()
595 .find(|k| k.key_type == "RANGE")
596 .map(|k| k.attribute_name.clone());
597 (&table.items, hk, rk)
598 } else {
599 return Err(AwsServiceError::aws_error(
600 StatusCode::BAD_REQUEST,
601 "ValidationException",
602 format!("The table does not have the specified index: {idx_name}"),
603 ));
604 }
605 } else {
606 (
607 &table.items[..],
608 table.hash_key_name().to_string(),
609 table.range_key_name().map(|s| s.to_string()),
610 )
611 };
612
613 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
614 .iter()
615 .filter(|item| {
616 if let Some(kc) = key_condition {
617 evaluate_key_condition(
618 kc,
619 item,
620 &hash_key_name,
621 range_key_name.as_deref(),
622 &expr_attr_names,
623 &expr_attr_values,
624 )
625 } else {
626 true
627 }
628 })
629 .collect();
630
631 if let Some(ref rk) = range_key_name {
632 matched.sort_by(|a, b| {
633 let av = a.get(rk.as_str());
634 let bv = b.get(rk.as_str());
635 compare_attribute_values(av, bv)
636 });
637 if !scan_forward {
638 matched.reverse();
639 }
640 }
641
642 let table_pk_hash = table.hash_key_name().to_string();
645 let table_pk_range = table.range_key_name().map(|s| s.to_string());
646 let is_gsi_query = index_name.is_some()
647 && (hash_key_name != table_pk_hash
648 || range_key_name.as_deref() != table_pk_range.as_deref());
649
650 if let Some(ref start_key) = exclusive_start_key {
654 if let Some(pos) = matched.iter().position(|item| {
655 let index_match =
656 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
657 if is_gsi_query {
658 index_match
659 && item_matches_key(
660 item,
661 start_key,
662 &table_pk_hash,
663 table_pk_range.as_deref(),
664 )
665 } else {
666 index_match
667 }
668 }) {
669 matched = matched.split_off(pos + 1);
670 }
671 }
672
673 if let Some(filter) = filter_expression {
674 matched.retain(|item| {
675 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
676 });
677 }
678
679 let scanned_count = matched.len();
680
681 let has_more = if let Some(lim) = limit {
682 let more = matched.len() > lim;
683 matched.truncate(lim);
684 more
685 } else {
686 false
687 };
688
689 let last_evaluated_key = if has_more {
693 matched.last().map(|item| {
694 let mut key =
695 extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
696 if is_gsi_query {
697 let table_key =
698 extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
699 key.extend(table_key);
700 }
701 key
702 })
703 } else {
704 None
705 };
706
707 let insights_enabled = table.contributor_insights_status == "ENABLED";
709 let pk_name = table.hash_key_name().to_string();
710 let accessed_keys: Vec<String> = if insights_enabled {
711 matched
712 .iter()
713 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
714 .collect()
715 } else {
716 Vec::new()
717 };
718
719 let items: Vec<Value> = matched
720 .iter()
721 .map(|item| {
722 let projected = project_item(item, &body);
723 json!(projected)
724 })
725 .collect();
726
727 let mut result = json!({
728 "Items": items,
729 "Count": items.len(),
730 "ScannedCount": scanned_count,
731 });
732
733 if let Some(lek) = last_evaluated_key {
734 result["LastEvaluatedKey"] = json!(lek);
735 }
736
737 drop(state);
738
739 if !accessed_keys.is_empty() {
740 let mut state = self.state.write();
741 if let Some(table) = state.tables.get_mut(table_name) {
742 if table.contributor_insights_status == "ENABLED" {
745 for key_str in accessed_keys {
746 *table
747 .contributor_insights_counters
748 .entry(key_str)
749 .or_insert(0) += 1;
750 }
751 }
752 }
753 }
754
755 Self::ok_json(result)
756 }
757
758 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
759 let body = Self::parse_body(req)?;
760 let table_name = require_str(&body, "TableName")?;
761
762 let state = self.state.read();
763 let table = get_table(&state.tables, table_name)?;
764
765 let expr_attr_names = parse_expression_attribute_names(&body);
766 let expr_attr_values = parse_expression_attribute_values(&body);
767 let filter_expression = body["FilterExpression"].as_str();
768 let limit = body["Limit"].as_i64().map(|l| l as usize);
769 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
770 parse_key_map(&body["ExclusiveStartKey"]);
771
772 let hash_key_name = table.hash_key_name().to_string();
773 let range_key_name = table.range_key_name().map(|s| s.to_string());
774
775 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
776
777 if let Some(ref start_key) = exclusive_start_key {
779 if let Some(pos) = matched.iter().position(|item| {
780 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
781 }) {
782 matched = matched.split_off(pos + 1);
783 }
784 }
785
786 let scanned_count = matched.len();
787
788 if let Some(filter) = filter_expression {
789 matched.retain(|item| {
790 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
791 });
792 }
793
794 let has_more = if let Some(lim) = limit {
795 let more = matched.len() > lim;
796 matched.truncate(lim);
797 more
798 } else {
799 false
800 };
801
802 let last_evaluated_key = if has_more {
804 matched
805 .last()
806 .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
807 } else {
808 None
809 };
810
811 let insights_enabled = table.contributor_insights_status == "ENABLED";
813 let pk_name = table.hash_key_name().to_string();
814 let accessed_keys: Vec<String> = if insights_enabled {
815 matched
816 .iter()
817 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
818 .collect()
819 } else {
820 Vec::new()
821 };
822
823 let items: Vec<Value> = matched
824 .iter()
825 .map(|item| {
826 let projected = project_item(item, &body);
827 json!(projected)
828 })
829 .collect();
830
831 let mut result = json!({
832 "Items": items,
833 "Count": items.len(),
834 "ScannedCount": scanned_count,
835 });
836
837 if let Some(lek) = last_evaluated_key {
838 result["LastEvaluatedKey"] = json!(lek);
839 }
840
841 drop(state);
842
843 if !accessed_keys.is_empty() {
844 let mut state = self.state.write();
845 if let Some(table) = state.tables.get_mut(table_name) {
846 if table.contributor_insights_status == "ENABLED" {
849 for key_str in accessed_keys {
850 *table
851 .contributor_insights_counters
852 .entry(key_str)
853 .or_insert(0) += 1;
854 }
855 }
856 }
857 }
858
859 Self::ok_json(result)
860 }
861
862 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
865 let body = Self::parse_body(req)?;
866
867 validate_optional_enum_value(
868 "returnConsumedCapacity",
869 &body["ReturnConsumedCapacity"],
870 &["INDEXES", "TOTAL", "NONE"],
871 )?;
872
873 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
874
875 let request_items = body["RequestItems"]
876 .as_object()
877 .ok_or_else(|| {
878 AwsServiceError::aws_error(
879 StatusCode::BAD_REQUEST,
880 "ValidationException",
881 "RequestItems is required",
882 )
883 })?
884 .clone();
885
886 let state = self.state.read();
887 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
888 let mut consumed_capacity: Vec<Value> = Vec::new();
889
890 for (table_name, params) in &request_items {
891 let table = get_table(&state.tables, table_name)?;
892 let keys = params["Keys"].as_array().ok_or_else(|| {
893 AwsServiceError::aws_error(
894 StatusCode::BAD_REQUEST,
895 "ValidationException",
896 "Keys is required",
897 )
898 })?;
899
900 let mut items = Vec::new();
901 for key_val in keys {
902 let key: HashMap<String, AttributeValue> =
903 serde_json::from_value(key_val.clone()).unwrap_or_default();
904 if let Some(idx) = table.find_item_index(&key) {
905 items.push(json!(table.items[idx]));
906 }
907 }
908 responses.insert(table_name.clone(), items);
909
910 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
911 consumed_capacity.push(json!({
912 "TableName": table_name,
913 "CapacityUnits": 1.0,
914 }));
915 }
916 }
917
918 let mut result = json!({
919 "Responses": responses,
920 "UnprocessedKeys": {},
921 });
922
923 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
924 result["ConsumedCapacity"] = json!(consumed_capacity);
925 }
926
927 Self::ok_json(result)
928 }
929
930 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
931 let body = Self::parse_body(req)?;
932
933 validate_optional_enum_value(
934 "returnConsumedCapacity",
935 &body["ReturnConsumedCapacity"],
936 &["INDEXES", "TOTAL", "NONE"],
937 )?;
938 validate_optional_enum_value(
939 "returnItemCollectionMetrics",
940 &body["ReturnItemCollectionMetrics"],
941 &["SIZE", "NONE"],
942 )?;
943
944 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
945 let return_icm = body["ReturnItemCollectionMetrics"]
946 .as_str()
947 .unwrap_or("NONE");
948
949 let request_items = body["RequestItems"]
950 .as_object()
951 .ok_or_else(|| {
952 AwsServiceError::aws_error(
953 StatusCode::BAD_REQUEST,
954 "ValidationException",
955 "RequestItems is required",
956 )
957 })?
958 .clone();
959
960 let mut state = self.state.write();
961 let mut consumed_capacity: Vec<Value> = Vec::new();
962 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
963
964 for (table_name, requests) in &request_items {
965 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
966 AwsServiceError::aws_error(
967 StatusCode::BAD_REQUEST,
968 "ResourceNotFoundException",
969 format!("Requested resource not found: Table: {table_name} not found"),
970 )
971 })?;
972
973 let reqs = requests.as_array().ok_or_else(|| {
974 AwsServiceError::aws_error(
975 StatusCode::BAD_REQUEST,
976 "ValidationException",
977 "Request list must be an array",
978 )
979 })?;
980
981 for request in reqs {
982 if let Some(put_req) = request.get("PutRequest") {
983 let item: HashMap<String, AttributeValue> =
984 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
985 let key = extract_key(table, &item);
986 if let Some(idx) = table.find_item_index(&key) {
987 table.items[idx] = item;
988 } else {
989 table.items.push(item);
990 }
991 } else if let Some(del_req) = request.get("DeleteRequest") {
992 let key: HashMap<String, AttributeValue> =
993 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
994 if let Some(idx) = table.find_item_index(&key) {
995 table.items.remove(idx);
996 }
997 }
998 }
999
1000 table.recalculate_stats();
1001
1002 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1003 consumed_capacity.push(json!({
1004 "TableName": table_name,
1005 "CapacityUnits": 1.0,
1006 }));
1007 }
1008
1009 if return_icm == "SIZE" {
1010 item_collection_metrics.insert(table_name.clone(), vec![]);
1011 }
1012 }
1013
1014 let mut result = json!({
1015 "UnprocessedItems": {},
1016 });
1017
1018 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1019 result["ConsumedCapacity"] = json!(consumed_capacity);
1020 }
1021
1022 if return_icm == "SIZE" {
1023 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1024 }
1025
1026 Self::ok_json(result)
1027 }
1028
1029 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1032 let body = Self::parse_body(req)?;
1033 let resource_arn = require_str(&body, "ResourceArn")?;
1034 validate_required("Tags", &body["Tags"])?;
1035
1036 let mut state = self.state.write();
1037 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1038
1039 fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1040 |f| {
1041 AwsServiceError::aws_error(
1042 StatusCode::BAD_REQUEST,
1043 "ValidationException",
1044 format!("{f} must be a list"),
1045 )
1046 },
1047 )?;
1048
1049 Self::ok_json(json!({}))
1050 }
1051
1052 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1053 let body = Self::parse_body(req)?;
1054 let resource_arn = require_str(&body, "ResourceArn")?;
1055 validate_required("TagKeys", &body["TagKeys"])?;
1056
1057 let mut state = self.state.write();
1058 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1059
1060 fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1061 AwsServiceError::aws_error(
1062 StatusCode::BAD_REQUEST,
1063 "ValidationException",
1064 format!("{f} must be a list"),
1065 )
1066 })?;
1067
1068 Self::ok_json(json!({}))
1069 }
1070
1071 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1072 let body = Self::parse_body(req)?;
1073 let resource_arn = require_str(&body, "ResourceArn")?;
1074
1075 let state = self.state.read();
1076 let table = find_table_by_arn(&state.tables, resource_arn)?;
1077
1078 let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1079
1080 Self::ok_json(json!({ "Tags": tags }))
1081 }
1082
1083 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1086 let body = Self::parse_body(req)?;
1087 validate_optional_enum_value(
1088 "returnConsumedCapacity",
1089 &body["ReturnConsumedCapacity"],
1090 &["INDEXES", "TOTAL", "NONE"],
1091 )?;
1092 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1093 AwsServiceError::aws_error(
1094 StatusCode::BAD_REQUEST,
1095 "ValidationException",
1096 "TransactItems is required",
1097 )
1098 })?;
1099
1100 let state = self.state.read();
1101 let mut responses: Vec<Value> = Vec::new();
1102
1103 for ti in transact_items {
1104 let get = &ti["Get"];
1105 let table_name = get["TableName"].as_str().ok_or_else(|| {
1106 AwsServiceError::aws_error(
1107 StatusCode::BAD_REQUEST,
1108 "ValidationException",
1109 "TableName is required in Get",
1110 )
1111 })?;
1112 let key: HashMap<String, AttributeValue> =
1113 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1114
1115 let table = get_table(&state.tables, table_name)?;
1116 match table.find_item_index(&key) {
1117 Some(idx) => {
1118 responses.push(json!({ "Item": table.items[idx] }));
1119 }
1120 None => {
1121 responses.push(json!({}));
1122 }
1123 }
1124 }
1125
1126 Self::ok_json(json!({ "Responses": responses }))
1127 }
1128
1129 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1130 let body = Self::parse_body(req)?;
1131 validate_optional_string_length(
1132 "clientRequestToken",
1133 body["ClientRequestToken"].as_str(),
1134 1,
1135 36,
1136 )?;
1137 validate_optional_enum_value(
1138 "returnConsumedCapacity",
1139 &body["ReturnConsumedCapacity"],
1140 &["INDEXES", "TOTAL", "NONE"],
1141 )?;
1142 validate_optional_enum_value(
1143 "returnItemCollectionMetrics",
1144 &body["ReturnItemCollectionMetrics"],
1145 &["SIZE", "NONE"],
1146 )?;
1147 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1148 AwsServiceError::aws_error(
1149 StatusCode::BAD_REQUEST,
1150 "ValidationException",
1151 "TransactItems is required",
1152 )
1153 })?;
1154
1155 let mut state = self.state.write();
1156
1157 let mut cancellation_reasons: Vec<Value> = Vec::new();
1159 let mut any_failed = false;
1160
1161 for ti in transact_items {
1162 if let Some(put) = ti.get("Put") {
1163 let table_name = put["TableName"].as_str().unwrap_or_default();
1164 let item: HashMap<String, AttributeValue> =
1165 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1166 let condition = put["ConditionExpression"].as_str();
1167
1168 if let Some(cond) = condition {
1169 let table = get_table(&state.tables, table_name)?;
1170 let expr_attr_names = parse_expression_attribute_names(put);
1171 let expr_attr_values = parse_expression_attribute_values(put);
1172 let key = extract_key(table, &item);
1173 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1174 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1175 .is_err()
1176 {
1177 cancellation_reasons.push(json!({
1178 "Code": "ConditionalCheckFailed",
1179 "Message": "The conditional request failed"
1180 }));
1181 any_failed = true;
1182 continue;
1183 }
1184 }
1185 cancellation_reasons.push(json!({ "Code": "None" }));
1186 } else if let Some(delete) = ti.get("Delete") {
1187 let table_name = delete["TableName"].as_str().unwrap_or_default();
1188 let key: HashMap<String, AttributeValue> =
1189 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1190 let condition = delete["ConditionExpression"].as_str();
1191
1192 if let Some(cond) = condition {
1193 let table = get_table(&state.tables, table_name)?;
1194 let expr_attr_names = parse_expression_attribute_names(delete);
1195 let expr_attr_values = parse_expression_attribute_values(delete);
1196 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1197 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1198 .is_err()
1199 {
1200 cancellation_reasons.push(json!({
1201 "Code": "ConditionalCheckFailed",
1202 "Message": "The conditional request failed"
1203 }));
1204 any_failed = true;
1205 continue;
1206 }
1207 }
1208 cancellation_reasons.push(json!({ "Code": "None" }));
1209 } else if let Some(update) = ti.get("Update") {
1210 let table_name = update["TableName"].as_str().unwrap_or_default();
1211 let key: HashMap<String, AttributeValue> =
1212 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1213 let condition = update["ConditionExpression"].as_str();
1214
1215 if let Some(cond) = condition {
1216 let table = get_table(&state.tables, table_name)?;
1217 let expr_attr_names = parse_expression_attribute_names(update);
1218 let expr_attr_values = parse_expression_attribute_values(update);
1219 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1220 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1221 .is_err()
1222 {
1223 cancellation_reasons.push(json!({
1224 "Code": "ConditionalCheckFailed",
1225 "Message": "The conditional request failed"
1226 }));
1227 any_failed = true;
1228 continue;
1229 }
1230 }
1231 cancellation_reasons.push(json!({ "Code": "None" }));
1232 } else if let Some(check) = ti.get("ConditionCheck") {
1233 let table_name = check["TableName"].as_str().unwrap_or_default();
1234 let key: HashMap<String, AttributeValue> =
1235 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1236 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1237
1238 let table = get_table(&state.tables, table_name)?;
1239 let expr_attr_names = parse_expression_attribute_names(check);
1240 let expr_attr_values = parse_expression_attribute_values(check);
1241 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1242 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1243 {
1244 cancellation_reasons.push(json!({
1245 "Code": "ConditionalCheckFailed",
1246 "Message": "The conditional request failed"
1247 }));
1248 any_failed = true;
1249 continue;
1250 }
1251 cancellation_reasons.push(json!({ "Code": "None" }));
1252 } else {
1253 cancellation_reasons.push(json!({ "Code": "None" }));
1254 }
1255 }
1256
1257 if any_failed {
1258 let error_body = json!({
1259 "__type": "TransactionCanceledException",
1260 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1261 "CancellationReasons": cancellation_reasons
1262 });
1263 return Ok(AwsResponse::json(
1264 StatusCode::BAD_REQUEST,
1265 serde_json::to_vec(&error_body).unwrap(),
1266 ));
1267 }
1268
1269 for ti in transact_items {
1271 if let Some(put) = ti.get("Put") {
1272 let table_name = put["TableName"].as_str().unwrap_or_default();
1273 let item: HashMap<String, AttributeValue> =
1274 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1275 let table = get_table_mut(&mut state.tables, table_name)?;
1276 let key = extract_key(table, &item);
1277 if let Some(idx) = table.find_item_index(&key) {
1278 table.items[idx] = item;
1279 } else {
1280 table.items.push(item);
1281 }
1282 table.recalculate_stats();
1283 } else if let Some(delete) = ti.get("Delete") {
1284 let table_name = delete["TableName"].as_str().unwrap_or_default();
1285 let key: HashMap<String, AttributeValue> =
1286 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1287 let table = get_table_mut(&mut state.tables, table_name)?;
1288 if let Some(idx) = table.find_item_index(&key) {
1289 table.items.remove(idx);
1290 }
1291 table.recalculate_stats();
1292 } else if let Some(update) = ti.get("Update") {
1293 let table_name = update["TableName"].as_str().unwrap_or_default();
1294 let key: HashMap<String, AttributeValue> =
1295 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1296 let update_expression = update["UpdateExpression"].as_str();
1297 let expr_attr_names = parse_expression_attribute_names(update);
1298 let expr_attr_values = parse_expression_attribute_values(update);
1299
1300 let table = get_table_mut(&mut state.tables, table_name)?;
1301 let idx = match table.find_item_index(&key) {
1302 Some(i) => i,
1303 None => {
1304 let mut new_item = HashMap::new();
1305 for (k, v) in &key {
1306 new_item.insert(k.clone(), v.clone());
1307 }
1308 table.items.push(new_item);
1309 table.items.len() - 1
1310 }
1311 };
1312
1313 if let Some(expr) = update_expression {
1314 apply_update_expression(
1315 &mut table.items[idx],
1316 expr,
1317 &expr_attr_names,
1318 &expr_attr_values,
1319 )?;
1320 }
1321 table.recalculate_stats();
1322 }
1323 }
1325
1326 Self::ok_json(json!({}))
1327 }
1328
1329 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1330 let body = Self::parse_body(req)?;
1331 let statement = require_str(&body, "Statement")?;
1332 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1333
1334 execute_partiql_statement(&self.state, statement, ¶meters)
1335 }
1336
1337 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1338 let body = Self::parse_body(req)?;
1339 validate_optional_enum_value(
1340 "returnConsumedCapacity",
1341 &body["ReturnConsumedCapacity"],
1342 &["INDEXES", "TOTAL", "NONE"],
1343 )?;
1344 let statements = body["Statements"].as_array().ok_or_else(|| {
1345 AwsServiceError::aws_error(
1346 StatusCode::BAD_REQUEST,
1347 "ValidationException",
1348 "Statements is required",
1349 )
1350 })?;
1351
1352 let mut responses: Vec<Value> = Vec::new();
1353 for stmt_obj in statements {
1354 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1355 let parameters = stmt_obj["Parameters"]
1356 .as_array()
1357 .cloned()
1358 .unwrap_or_default();
1359
1360 match execute_partiql_statement(&self.state, statement, ¶meters) {
1361 Ok(resp) => {
1362 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1363 responses.push(resp_body);
1364 }
1365 Err(e) => {
1366 responses.push(json!({
1367 "Error": {
1368 "Code": "ValidationException",
1369 "Message": e.to_string()
1370 }
1371 }));
1372 }
1373 }
1374 }
1375
1376 Self::ok_json(json!({ "Responses": responses }))
1377 }
1378
1379 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1380 let body = Self::parse_body(req)?;
1381 validate_optional_string_length(
1382 "clientRequestToken",
1383 body["ClientRequestToken"].as_str(),
1384 1,
1385 36,
1386 )?;
1387 validate_optional_enum_value(
1388 "returnConsumedCapacity",
1389 &body["ReturnConsumedCapacity"],
1390 &["INDEXES", "TOTAL", "NONE"],
1391 )?;
1392 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1393 AwsServiceError::aws_error(
1394 StatusCode::BAD_REQUEST,
1395 "ValidationException",
1396 "TransactStatements is required",
1397 )
1398 })?;
1399
1400 let mut results: Vec<Result<Value, String>> = Vec::new();
1402 for stmt_obj in transact_statements {
1403 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1404 let parameters = stmt_obj["Parameters"]
1405 .as_array()
1406 .cloned()
1407 .unwrap_or_default();
1408
1409 match execute_partiql_statement(&self.state, statement, ¶meters) {
1410 Ok(resp) => {
1411 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1412 results.push(Ok(resp_body));
1413 }
1414 Err(e) => {
1415 results.push(Err(e.to_string()));
1416 }
1417 }
1418 }
1419
1420 let any_failed = results.iter().any(|r| r.is_err());
1421 if any_failed {
1422 let reasons: Vec<Value> = results
1423 .iter()
1424 .map(|r| match r {
1425 Ok(_) => json!({ "Code": "None" }),
1426 Err(msg) => json!({
1427 "Code": "ValidationException",
1428 "Message": msg
1429 }),
1430 })
1431 .collect();
1432 let error_body = json!({
1433 "__type": "TransactionCanceledException",
1434 "message": "Transaction cancelled due to validation errors",
1435 "CancellationReasons": reasons
1436 });
1437 return Ok(AwsResponse::json(
1438 StatusCode::BAD_REQUEST,
1439 serde_json::to_vec(&error_body).unwrap(),
1440 ));
1441 }
1442
1443 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1444 Self::ok_json(json!({ "Responses": responses }))
1445 }
1446
1447 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1450 let body = Self::parse_body(req)?;
1451 let table_name = require_str(&body, "TableName")?;
1452 let spec = &body["TimeToLiveSpecification"];
1453 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1454 AwsServiceError::aws_error(
1455 StatusCode::BAD_REQUEST,
1456 "ValidationException",
1457 "TimeToLiveSpecification.AttributeName is required",
1458 )
1459 })?;
1460 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1461 AwsServiceError::aws_error(
1462 StatusCode::BAD_REQUEST,
1463 "ValidationException",
1464 "TimeToLiveSpecification.Enabled is required",
1465 )
1466 })?;
1467
1468 let mut state = self.state.write();
1469 let table = get_table_mut(&mut state.tables, table_name)?;
1470
1471 if enabled {
1472 table.ttl_attribute = Some(attr_name.to_string());
1473 table.ttl_enabled = true;
1474 } else {
1475 table.ttl_enabled = false;
1476 }
1477
1478 Self::ok_json(json!({
1479 "TimeToLiveSpecification": {
1480 "AttributeName": attr_name,
1481 "Enabled": enabled
1482 }
1483 }))
1484 }
1485
1486 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1487 let body = Self::parse_body(req)?;
1488 let table_name = require_str(&body, "TableName")?;
1489
1490 let state = self.state.read();
1491 let table = get_table(&state.tables, table_name)?;
1492
1493 let status = if table.ttl_enabled {
1494 "ENABLED"
1495 } else {
1496 "DISABLED"
1497 };
1498
1499 let mut desc = json!({
1500 "TimeToLiveDescription": {
1501 "TimeToLiveStatus": status
1502 }
1503 });
1504
1505 if let Some(ref attr) = table.ttl_attribute {
1506 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1507 }
1508
1509 Self::ok_json(desc)
1510 }
1511
1512 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1515 let body = Self::parse_body(req)?;
1516 let resource_arn = require_str(&body, "ResourceArn")?;
1517 let policy = require_str(&body, "Policy")?;
1518
1519 let mut state = self.state.write();
1520 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1521 table.resource_policy = Some(policy.to_string());
1522
1523 let revision_id = uuid::Uuid::new_v4().to_string();
1524 Self::ok_json(json!({ "RevisionId": revision_id }))
1525 }
1526
1527 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1528 let body = Self::parse_body(req)?;
1529 let resource_arn = require_str(&body, "ResourceArn")?;
1530
1531 let state = self.state.read();
1532 let table = find_table_by_arn(&state.tables, resource_arn)?;
1533
1534 match &table.resource_policy {
1535 Some(policy) => {
1536 let revision_id = uuid::Uuid::new_v4().to_string();
1537 Self::ok_json(json!({
1538 "Policy": policy,
1539 "RevisionId": revision_id
1540 }))
1541 }
1542 None => Self::ok_json(json!({ "Policy": null })),
1543 }
1544 }
1545
1546 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1547 let body = Self::parse_body(req)?;
1548 let resource_arn = require_str(&body, "ResourceArn")?;
1549
1550 let mut state = self.state.write();
1551 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1552 table.resource_policy = None;
1553
1554 Self::ok_json(json!({}))
1555 }
1556
1557 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1560 Self::ok_json(json!({
1561 "Endpoints": [{
1562 "Address": "dynamodb.us-east-1.amazonaws.com",
1563 "CachePeriodInMinutes": 1440
1564 }]
1565 }))
1566 }
1567
1568 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1569 Self::ok_json(json!({
1570 "AccountMaxReadCapacityUnits": 80000,
1571 "AccountMaxWriteCapacityUnits": 80000,
1572 "TableMaxReadCapacityUnits": 40000,
1573 "TableMaxWriteCapacityUnits": 40000
1574 }))
1575 }
1576
1577 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1580 let body = Self::parse_body(req)?;
1581 let table_name = require_str(&body, "TableName")?;
1582 let backup_name = require_str(&body, "BackupName")?;
1583
1584 let mut state = self.state.write();
1585 let table = get_table(&state.tables, table_name)?;
1586
1587 let backup_arn = format!(
1588 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1589 state.region,
1590 state.account_id,
1591 table_name,
1592 Utc::now().format("%Y%m%d%H%M%S")
1593 );
1594 let now = Utc::now();
1595
1596 let backup = BackupDescription {
1597 backup_arn: backup_arn.clone(),
1598 backup_name: backup_name.to_string(),
1599 table_name: table_name.to_string(),
1600 table_arn: table.arn.clone(),
1601 backup_status: "AVAILABLE".to_string(),
1602 backup_type: "USER".to_string(),
1603 backup_creation_date: now,
1604 key_schema: table.key_schema.clone(),
1605 attribute_definitions: table.attribute_definitions.clone(),
1606 provisioned_throughput: table.provisioned_throughput.clone(),
1607 billing_mode: table.billing_mode.clone(),
1608 item_count: table.item_count,
1609 size_bytes: table.size_bytes,
1610 items: table.items.clone(),
1611 };
1612
1613 state.backups.insert(backup_arn.clone(), backup);
1614
1615 Self::ok_json(json!({
1616 "BackupDetails": {
1617 "BackupArn": backup_arn,
1618 "BackupName": backup_name,
1619 "BackupStatus": "AVAILABLE",
1620 "BackupType": "USER",
1621 "BackupCreationDateTime": now.timestamp() as f64,
1622 "BackupSizeBytes": 0
1623 }
1624 }))
1625 }
1626
1627 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1628 let body = Self::parse_body(req)?;
1629 let backup_arn = require_str(&body, "BackupArn")?;
1630
1631 let mut state = self.state.write();
1632 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1633 AwsServiceError::aws_error(
1634 StatusCode::BAD_REQUEST,
1635 "BackupNotFoundException",
1636 format!("Backup not found: {backup_arn}"),
1637 )
1638 })?;
1639
1640 Self::ok_json(json!({
1641 "BackupDescription": {
1642 "BackupDetails": {
1643 "BackupArn": backup.backup_arn,
1644 "BackupName": backup.backup_name,
1645 "BackupStatus": "DELETED",
1646 "BackupType": backup.backup_type,
1647 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1648 "BackupSizeBytes": backup.size_bytes
1649 },
1650 "SourceTableDetails": {
1651 "TableName": backup.table_name,
1652 "TableArn": backup.table_arn,
1653 "TableId": uuid::Uuid::new_v4().to_string(),
1654 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1655 "AttributeName": ks.attribute_name,
1656 "KeyType": ks.key_type
1657 })).collect::<Vec<_>>(),
1658 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1659 "ProvisionedThroughput": {
1660 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1661 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1662 },
1663 "ItemCount": backup.item_count,
1664 "BillingMode": backup.billing_mode,
1665 "TableSizeBytes": backup.size_bytes
1666 },
1667 "SourceTableFeatureDetails": {}
1668 }
1669 }))
1670 }
1671
1672 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673 let body = Self::parse_body(req)?;
1674 let backup_arn = require_str(&body, "BackupArn")?;
1675
1676 let state = self.state.read();
1677 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1678 AwsServiceError::aws_error(
1679 StatusCode::BAD_REQUEST,
1680 "BackupNotFoundException",
1681 format!("Backup not found: {backup_arn}"),
1682 )
1683 })?;
1684
1685 Self::ok_json(json!({
1686 "BackupDescription": {
1687 "BackupDetails": {
1688 "BackupArn": backup.backup_arn,
1689 "BackupName": backup.backup_name,
1690 "BackupStatus": backup.backup_status,
1691 "BackupType": backup.backup_type,
1692 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1693 "BackupSizeBytes": backup.size_bytes
1694 },
1695 "SourceTableDetails": {
1696 "TableName": backup.table_name,
1697 "TableArn": backup.table_arn,
1698 "TableId": uuid::Uuid::new_v4().to_string(),
1699 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1700 "AttributeName": ks.attribute_name,
1701 "KeyType": ks.key_type
1702 })).collect::<Vec<_>>(),
1703 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1704 "ProvisionedThroughput": {
1705 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1706 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1707 },
1708 "ItemCount": backup.item_count,
1709 "BillingMode": backup.billing_mode,
1710 "TableSizeBytes": backup.size_bytes
1711 },
1712 "SourceTableFeatureDetails": {}
1713 }
1714 }))
1715 }
1716
1717 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1718 let body = Self::parse_body(req)?;
1719 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
1720 validate_optional_string_length(
1721 "exclusiveStartBackupArn",
1722 body["ExclusiveStartBackupArn"].as_str(),
1723 37,
1724 1024,
1725 )?;
1726 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1727 validate_optional_enum_value(
1728 "backupType",
1729 &body["BackupType"],
1730 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
1731 )?;
1732 let table_name = body["TableName"].as_str();
1733
1734 let state = self.state.read();
1735 let summaries: Vec<Value> = state
1736 .backups
1737 .values()
1738 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1739 .map(|b| {
1740 json!({
1741 "TableName": b.table_name,
1742 "TableArn": b.table_arn,
1743 "BackupArn": b.backup_arn,
1744 "BackupName": b.backup_name,
1745 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1746 "BackupStatus": b.backup_status,
1747 "BackupType": b.backup_type,
1748 "BackupSizeBytes": b.size_bytes
1749 })
1750 })
1751 .collect();
1752
1753 Self::ok_json(json!({
1754 "BackupSummaries": summaries
1755 }))
1756 }
1757
1758 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1759 let body = Self::parse_body(req)?;
1760 let backup_arn = require_str(&body, "BackupArn")?;
1761 let target_table_name = require_str(&body, "TargetTableName")?;
1762
1763 let mut state = self.state.write();
1764 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1765 AwsServiceError::aws_error(
1766 StatusCode::BAD_REQUEST,
1767 "BackupNotFoundException",
1768 format!("Backup not found: {backup_arn}"),
1769 )
1770 })?;
1771
1772 if state.tables.contains_key(target_table_name) {
1773 return Err(AwsServiceError::aws_error(
1774 StatusCode::BAD_REQUEST,
1775 "TableAlreadyExistsException",
1776 format!("Table already exists: {target_table_name}"),
1777 ));
1778 }
1779
1780 let now = Utc::now();
1781 let arn = format!(
1782 "arn:aws:dynamodb:{}:{}:table/{}",
1783 state.region, state.account_id, target_table_name
1784 );
1785
1786 let restored_items = backup.items.clone();
1787 let mut table = DynamoTable {
1788 name: target_table_name.to_string(),
1789 arn: arn.clone(),
1790 key_schema: backup.key_schema.clone(),
1791 attribute_definitions: backup.attribute_definitions.clone(),
1792 provisioned_throughput: backup.provisioned_throughput.clone(),
1793 items: restored_items,
1794 gsi: Vec::new(),
1795 lsi: Vec::new(),
1796 tags: HashMap::new(),
1797 created_at: now,
1798 status: "ACTIVE".to_string(),
1799 item_count: 0,
1800 size_bytes: 0,
1801 billing_mode: backup.billing_mode.clone(),
1802 ttl_attribute: None,
1803 ttl_enabled: false,
1804 resource_policy: None,
1805 pitr_enabled: false,
1806 kinesis_destinations: Vec::new(),
1807 contributor_insights_status: "DISABLED".to_string(),
1808 contributor_insights_counters: HashMap::new(),
1809 };
1810 table.recalculate_stats();
1811
1812 let desc = build_table_description(&table);
1813 state.tables.insert(target_table_name.to_string(), table);
1814
1815 Self::ok_json(json!({
1816 "TableDescription": desc
1817 }))
1818 }
1819
1820 fn restore_table_to_point_in_time(
1821 &self,
1822 req: &AwsRequest,
1823 ) -> Result<AwsResponse, AwsServiceError> {
1824 let body = Self::parse_body(req)?;
1825 let target_table_name = require_str(&body, "TargetTableName")?;
1826 let source_table_name = body["SourceTableName"].as_str();
1827 let source_table_arn = body["SourceTableArn"].as_str();
1828
1829 let mut state = self.state.write();
1830
1831 let source = if let Some(name) = source_table_name {
1833 get_table(&state.tables, name)?.clone()
1834 } else if let Some(arn) = source_table_arn {
1835 find_table_by_arn(&state.tables, arn)?.clone()
1836 } else {
1837 return Err(AwsServiceError::aws_error(
1838 StatusCode::BAD_REQUEST,
1839 "ValidationException",
1840 "SourceTableName or SourceTableArn is required",
1841 ));
1842 };
1843
1844 if state.tables.contains_key(target_table_name) {
1845 return Err(AwsServiceError::aws_error(
1846 StatusCode::BAD_REQUEST,
1847 "TableAlreadyExistsException",
1848 format!("Table already exists: {target_table_name}"),
1849 ));
1850 }
1851
1852 let now = Utc::now();
1853 let arn = format!(
1854 "arn:aws:dynamodb:{}:{}:table/{}",
1855 state.region, state.account_id, target_table_name
1856 );
1857
1858 let mut table = DynamoTable {
1859 name: target_table_name.to_string(),
1860 arn: arn.clone(),
1861 key_schema: source.key_schema.clone(),
1862 attribute_definitions: source.attribute_definitions.clone(),
1863 provisioned_throughput: source.provisioned_throughput.clone(),
1864 items: source.items.clone(),
1865 gsi: Vec::new(),
1866 lsi: Vec::new(),
1867 tags: HashMap::new(),
1868 created_at: now,
1869 status: "ACTIVE".to_string(),
1870 item_count: 0,
1871 size_bytes: 0,
1872 billing_mode: source.billing_mode.clone(),
1873 ttl_attribute: None,
1874 ttl_enabled: false,
1875 resource_policy: None,
1876 pitr_enabled: false,
1877 kinesis_destinations: Vec::new(),
1878 contributor_insights_status: "DISABLED".to_string(),
1879 contributor_insights_counters: HashMap::new(),
1880 };
1881 table.recalculate_stats();
1882
1883 let desc = build_table_description(&table);
1884 state.tables.insert(target_table_name.to_string(), table);
1885
1886 Self::ok_json(json!({
1887 "TableDescription": desc
1888 }))
1889 }
1890
1891 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1892 let body = Self::parse_body(req)?;
1893 let table_name = require_str(&body, "TableName")?;
1894
1895 let pitr_spec = body["PointInTimeRecoverySpecification"]
1896 .as_object()
1897 .ok_or_else(|| {
1898 AwsServiceError::aws_error(
1899 StatusCode::BAD_REQUEST,
1900 "ValidationException",
1901 "PointInTimeRecoverySpecification is required",
1902 )
1903 })?;
1904 let enabled = pitr_spec
1905 .get("PointInTimeRecoveryEnabled")
1906 .and_then(|v| v.as_bool())
1907 .unwrap_or(false);
1908
1909 let mut state = self.state.write();
1910 let table = get_table_mut(&mut state.tables, table_name)?;
1911 table.pitr_enabled = enabled;
1912
1913 let status = if enabled { "ENABLED" } else { "DISABLED" };
1914 Self::ok_json(json!({
1915 "ContinuousBackupsDescription": {
1916 "ContinuousBackupsStatus": status,
1917 "PointInTimeRecoveryDescription": {
1918 "PointInTimeRecoveryStatus": status,
1919 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1920 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1921 }
1922 }
1923 }))
1924 }
1925
1926 fn describe_continuous_backups(
1927 &self,
1928 req: &AwsRequest,
1929 ) -> Result<AwsResponse, AwsServiceError> {
1930 let body = Self::parse_body(req)?;
1931 let table_name = require_str(&body, "TableName")?;
1932
1933 let state = self.state.read();
1934 let table = get_table(&state.tables, table_name)?;
1935
1936 let status = if table.pitr_enabled {
1937 "ENABLED"
1938 } else {
1939 "DISABLED"
1940 };
1941 Self::ok_json(json!({
1942 "ContinuousBackupsDescription": {
1943 "ContinuousBackupsStatus": status,
1944 "PointInTimeRecoveryDescription": {
1945 "PointInTimeRecoveryStatus": status,
1946 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1947 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1948 }
1949 }
1950 }))
1951 }
1952
1953 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1956 let body = Self::parse_body(req)?;
1957 let global_table_name = require_str(&body, "GlobalTableName")?;
1958 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1959
1960 let replication_group = body["ReplicationGroup"]
1961 .as_array()
1962 .ok_or_else(|| {
1963 AwsServiceError::aws_error(
1964 StatusCode::BAD_REQUEST,
1965 "ValidationException",
1966 "ReplicationGroup is required",
1967 )
1968 })?
1969 .iter()
1970 .filter_map(|r| {
1971 r["RegionName"].as_str().map(|rn| ReplicaDescription {
1972 region_name: rn.to_string(),
1973 replica_status: "ACTIVE".to_string(),
1974 })
1975 })
1976 .collect::<Vec<_>>();
1977
1978 let mut state = self.state.write();
1979
1980 if state.global_tables.contains_key(global_table_name) {
1981 return Err(AwsServiceError::aws_error(
1982 StatusCode::BAD_REQUEST,
1983 "GlobalTableAlreadyExistsException",
1984 format!("Global table already exists: {global_table_name}"),
1985 ));
1986 }
1987
1988 let arn = format!(
1989 "arn:aws:dynamodb::{}:global-table/{}",
1990 state.account_id, global_table_name
1991 );
1992 let now = Utc::now();
1993
1994 let gt = GlobalTableDescription {
1995 global_table_name: global_table_name.to_string(),
1996 global_table_arn: arn.clone(),
1997 global_table_status: "ACTIVE".to_string(),
1998 creation_date: now,
1999 replication_group: replication_group.clone(),
2000 };
2001
2002 state
2003 .global_tables
2004 .insert(global_table_name.to_string(), gt);
2005
2006 Self::ok_json(json!({
2007 "GlobalTableDescription": {
2008 "GlobalTableName": global_table_name,
2009 "GlobalTableArn": arn,
2010 "GlobalTableStatus": "ACTIVE",
2011 "CreationDateTime": now.timestamp() as f64,
2012 "ReplicationGroup": replication_group.iter().map(|r| json!({
2013 "RegionName": r.region_name,
2014 "ReplicaStatus": r.replica_status
2015 })).collect::<Vec<_>>()
2016 }
2017 }))
2018 }
2019
2020 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2021 let body = Self::parse_body(req)?;
2022 let global_table_name = require_str(&body, "GlobalTableName")?;
2023 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2024
2025 let state = self.state.read();
2026 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2027 AwsServiceError::aws_error(
2028 StatusCode::BAD_REQUEST,
2029 "GlobalTableNotFoundException",
2030 format!("Global table not found: {global_table_name}"),
2031 )
2032 })?;
2033
2034 Self::ok_json(json!({
2035 "GlobalTableDescription": {
2036 "GlobalTableName": gt.global_table_name,
2037 "GlobalTableArn": gt.global_table_arn,
2038 "GlobalTableStatus": gt.global_table_status,
2039 "CreationDateTime": gt.creation_date.timestamp() as f64,
2040 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2041 "RegionName": r.region_name,
2042 "ReplicaStatus": r.replica_status
2043 })).collect::<Vec<_>>()
2044 }
2045 }))
2046 }
2047
2048 fn describe_global_table_settings(
2049 &self,
2050 req: &AwsRequest,
2051 ) -> Result<AwsResponse, AwsServiceError> {
2052 let body = Self::parse_body(req)?;
2053 let global_table_name = require_str(&body, "GlobalTableName")?;
2054 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2055
2056 let state = self.state.read();
2057 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2058 AwsServiceError::aws_error(
2059 StatusCode::BAD_REQUEST,
2060 "GlobalTableNotFoundException",
2061 format!("Global table not found: {global_table_name}"),
2062 )
2063 })?;
2064
2065 let replica_settings: Vec<Value> = gt
2066 .replication_group
2067 .iter()
2068 .map(|r| {
2069 json!({
2070 "RegionName": r.region_name,
2071 "ReplicaStatus": r.replica_status,
2072 "ReplicaProvisionedReadCapacityUnits": 0,
2073 "ReplicaProvisionedWriteCapacityUnits": 0
2074 })
2075 })
2076 .collect();
2077
2078 Self::ok_json(json!({
2079 "GlobalTableName": gt.global_table_name,
2080 "ReplicaSettings": replica_settings
2081 }))
2082 }
2083
2084 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2085 let body = Self::parse_body(req)?;
2086 validate_optional_string_length(
2087 "exclusiveStartGlobalTableName",
2088 body["ExclusiveStartGlobalTableName"].as_str(),
2089 3,
2090 255,
2091 )?;
2092 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2093 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2094
2095 let state = self.state.read();
2096 let tables: Vec<Value> = state
2097 .global_tables
2098 .values()
2099 .take(limit)
2100 .map(|gt| {
2101 json!({
2102 "GlobalTableName": gt.global_table_name,
2103 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2104 "RegionName": r.region_name
2105 })).collect::<Vec<_>>()
2106 })
2107 })
2108 .collect();
2109
2110 Self::ok_json(json!({
2111 "GlobalTables": tables
2112 }))
2113 }
2114
2115 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2116 let body = Self::parse_body(req)?;
2117 let global_table_name = require_str(&body, "GlobalTableName")?;
2118 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2119 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2120
2121 let mut state = self.state.write();
2122 let gt = state
2123 .global_tables
2124 .get_mut(global_table_name)
2125 .ok_or_else(|| {
2126 AwsServiceError::aws_error(
2127 StatusCode::BAD_REQUEST,
2128 "GlobalTableNotFoundException",
2129 format!("Global table not found: {global_table_name}"),
2130 )
2131 })?;
2132
2133 if let Some(updates) = body["ReplicaUpdates"].as_array() {
2134 for update in updates {
2135 if let Some(create) = update["Create"].as_object() {
2136 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2137 gt.replication_group.push(ReplicaDescription {
2138 region_name: region.to_string(),
2139 replica_status: "ACTIVE".to_string(),
2140 });
2141 }
2142 }
2143 if let Some(delete) = update["Delete"].as_object() {
2144 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2145 gt.replication_group.retain(|r| r.region_name != region);
2146 }
2147 }
2148 }
2149 }
2150
2151 Self::ok_json(json!({
2152 "GlobalTableDescription": {
2153 "GlobalTableName": gt.global_table_name,
2154 "GlobalTableArn": gt.global_table_arn,
2155 "GlobalTableStatus": gt.global_table_status,
2156 "CreationDateTime": gt.creation_date.timestamp() as f64,
2157 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2158 "RegionName": r.region_name,
2159 "ReplicaStatus": r.replica_status
2160 })).collect::<Vec<_>>()
2161 }
2162 }))
2163 }
2164
2165 fn update_global_table_settings(
2166 &self,
2167 req: &AwsRequest,
2168 ) -> Result<AwsResponse, AwsServiceError> {
2169 let body = Self::parse_body(req)?;
2170 let global_table_name = require_str(&body, "GlobalTableName")?;
2171 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2172 validate_optional_enum_value(
2173 "globalTableBillingMode",
2174 &body["GlobalTableBillingMode"],
2175 &["PROVISIONED", "PAY_PER_REQUEST"],
2176 )?;
2177 validate_optional_range_i64(
2178 "globalTableProvisionedWriteCapacityUnits",
2179 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2180 1,
2181 i64::MAX,
2182 )?;
2183
2184 let state = self.state.read();
2185 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2186 AwsServiceError::aws_error(
2187 StatusCode::BAD_REQUEST,
2188 "GlobalTableNotFoundException",
2189 format!("Global table not found: {global_table_name}"),
2190 )
2191 })?;
2192
2193 let replica_settings: Vec<Value> = gt
2194 .replication_group
2195 .iter()
2196 .map(|r| {
2197 json!({
2198 "RegionName": r.region_name,
2199 "ReplicaStatus": r.replica_status,
2200 "ReplicaProvisionedReadCapacityUnits": 0,
2201 "ReplicaProvisionedWriteCapacityUnits": 0
2202 })
2203 })
2204 .collect();
2205
2206 Self::ok_json(json!({
2207 "GlobalTableName": gt.global_table_name,
2208 "ReplicaSettings": replica_settings
2209 }))
2210 }
2211
2212 fn describe_table_replica_auto_scaling(
2213 &self,
2214 req: &AwsRequest,
2215 ) -> Result<AwsResponse, AwsServiceError> {
2216 let body = Self::parse_body(req)?;
2217 let table_name = require_str(&body, "TableName")?;
2218
2219 let state = self.state.read();
2220 let table = get_table(&state.tables, table_name)?;
2221
2222 Self::ok_json(json!({
2223 "TableAutoScalingDescription": {
2224 "TableName": table.name,
2225 "TableStatus": table.status,
2226 "Replicas": []
2227 }
2228 }))
2229 }
2230
2231 fn update_table_replica_auto_scaling(
2232 &self,
2233 req: &AwsRequest,
2234 ) -> Result<AwsResponse, AwsServiceError> {
2235 let body = Self::parse_body(req)?;
2236 let table_name = require_str(&body, "TableName")?;
2237
2238 let state = self.state.read();
2239 let table = get_table(&state.tables, table_name)?;
2240
2241 Self::ok_json(json!({
2242 "TableAutoScalingDescription": {
2243 "TableName": table.name,
2244 "TableStatus": table.status,
2245 "Replicas": []
2246 }
2247 }))
2248 }
2249
2250 fn enable_kinesis_streaming_destination(
2253 &self,
2254 req: &AwsRequest,
2255 ) -> Result<AwsResponse, AwsServiceError> {
2256 let body = Self::parse_body(req)?;
2257 let table_name = require_str(&body, "TableName")?;
2258 let stream_arn = require_str(&body, "StreamArn")?;
2259 let precision = body["EnableKinesisStreamingConfiguration"]
2260 ["ApproximateCreationDateTimePrecision"]
2261 .as_str()
2262 .unwrap_or("MILLISECOND");
2263
2264 let mut state = self.state.write();
2265 let table = get_table_mut(&mut state.tables, table_name)?;
2266
2267 table.kinesis_destinations.push(KinesisDestination {
2268 stream_arn: stream_arn.to_string(),
2269 destination_status: "ACTIVE".to_string(),
2270 approximate_creation_date_time_precision: precision.to_string(),
2271 });
2272
2273 Self::ok_json(json!({
2274 "TableName": table_name,
2275 "StreamArn": stream_arn,
2276 "DestinationStatus": "ACTIVE",
2277 "EnableKinesisStreamingConfiguration": {
2278 "ApproximateCreationDateTimePrecision": precision
2279 }
2280 }))
2281 }
2282
2283 fn disable_kinesis_streaming_destination(
2284 &self,
2285 req: &AwsRequest,
2286 ) -> Result<AwsResponse, AwsServiceError> {
2287 let body = Self::parse_body(req)?;
2288 let table_name = require_str(&body, "TableName")?;
2289 let stream_arn = require_str(&body, "StreamArn")?;
2290
2291 let mut state = self.state.write();
2292 let table = get_table_mut(&mut state.tables, table_name)?;
2293
2294 if let Some(dest) = table
2295 .kinesis_destinations
2296 .iter_mut()
2297 .find(|d| d.stream_arn == stream_arn)
2298 {
2299 dest.destination_status = "DISABLED".to_string();
2300 }
2301
2302 Self::ok_json(json!({
2303 "TableName": table_name,
2304 "StreamArn": stream_arn,
2305 "DestinationStatus": "DISABLED"
2306 }))
2307 }
2308
2309 fn describe_kinesis_streaming_destination(
2310 &self,
2311 req: &AwsRequest,
2312 ) -> Result<AwsResponse, AwsServiceError> {
2313 let body = Self::parse_body(req)?;
2314 let table_name = require_str(&body, "TableName")?;
2315
2316 let state = self.state.read();
2317 let table = get_table(&state.tables, table_name)?;
2318
2319 let destinations: Vec<Value> = table
2320 .kinesis_destinations
2321 .iter()
2322 .map(|d| {
2323 json!({
2324 "StreamArn": d.stream_arn,
2325 "DestinationStatus": d.destination_status,
2326 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2327 })
2328 })
2329 .collect();
2330
2331 Self::ok_json(json!({
2332 "TableName": table_name,
2333 "KinesisDataStreamDestinations": destinations
2334 }))
2335 }
2336
2337 fn update_kinesis_streaming_destination(
2338 &self,
2339 req: &AwsRequest,
2340 ) -> Result<AwsResponse, AwsServiceError> {
2341 let body = Self::parse_body(req)?;
2342 let table_name = require_str(&body, "TableName")?;
2343 let stream_arn = require_str(&body, "StreamArn")?;
2344 let precision = body["UpdateKinesisStreamingConfiguration"]
2345 ["ApproximateCreationDateTimePrecision"]
2346 .as_str()
2347 .unwrap_or("MILLISECOND");
2348
2349 let mut state = self.state.write();
2350 let table = get_table_mut(&mut state.tables, table_name)?;
2351
2352 if let Some(dest) = table
2353 .kinesis_destinations
2354 .iter_mut()
2355 .find(|d| d.stream_arn == stream_arn)
2356 {
2357 dest.approximate_creation_date_time_precision = precision.to_string();
2358 }
2359
2360 Self::ok_json(json!({
2361 "TableName": table_name,
2362 "StreamArn": stream_arn,
2363 "DestinationStatus": "ACTIVE",
2364 "UpdateKinesisStreamingConfiguration": {
2365 "ApproximateCreationDateTimePrecision": precision
2366 }
2367 }))
2368 }
2369
2370 fn describe_contributor_insights(
2373 &self,
2374 req: &AwsRequest,
2375 ) -> Result<AwsResponse, AwsServiceError> {
2376 let body = Self::parse_body(req)?;
2377 let table_name = require_str(&body, "TableName")?;
2378 let index_name = body["IndexName"].as_str();
2379
2380 let state = self.state.read();
2381 let table = get_table(&state.tables, table_name)?;
2382
2383 let top = table.top_contributors(10);
2384 let contributors: Vec<Value> = top
2385 .iter()
2386 .map(|(key, count)| {
2387 json!({
2388 "Key": key,
2389 "Count": count
2390 })
2391 })
2392 .collect();
2393
2394 let mut result = json!({
2395 "TableName": table_name,
2396 "ContributorInsightsStatus": table.contributor_insights_status,
2397 "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2398 "TopContributors": contributors
2399 });
2400 if let Some(idx) = index_name {
2401 result["IndexName"] = json!(idx);
2402 }
2403
2404 Self::ok_json(result)
2405 }
2406
2407 fn update_contributor_insights(
2408 &self,
2409 req: &AwsRequest,
2410 ) -> Result<AwsResponse, AwsServiceError> {
2411 let body = Self::parse_body(req)?;
2412 let table_name = require_str(&body, "TableName")?;
2413 let action = require_str(&body, "ContributorInsightsAction")?;
2414 let index_name = body["IndexName"].as_str();
2415
2416 let mut state = self.state.write();
2417 let table = get_table_mut(&mut state.tables, table_name)?;
2418
2419 let status = match action {
2420 "ENABLE" => "ENABLED",
2421 "DISABLE" => "DISABLED",
2422 _ => {
2423 return Err(AwsServiceError::aws_error(
2424 StatusCode::BAD_REQUEST,
2425 "ValidationException",
2426 format!("Invalid ContributorInsightsAction: {action}"),
2427 ))
2428 }
2429 };
2430 table.contributor_insights_status = status.to_string();
2431 if status == "DISABLED" {
2432 table.contributor_insights_counters.clear();
2433 }
2434
2435 let mut result = json!({
2436 "TableName": table_name,
2437 "ContributorInsightsStatus": status
2438 });
2439 if let Some(idx) = index_name {
2440 result["IndexName"] = json!(idx);
2441 }
2442
2443 Self::ok_json(result)
2444 }
2445
2446 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2447 let body = Self::parse_body(req)?;
2448 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2449 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2450 let table_name = body["TableName"].as_str();
2451
2452 let state = self.state.read();
2453 let summaries: Vec<Value> = state
2454 .tables
2455 .values()
2456 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2457 .map(|t| {
2458 json!({
2459 "TableName": t.name,
2460 "ContributorInsightsStatus": t.contributor_insights_status
2461 })
2462 })
2463 .collect();
2464
2465 Self::ok_json(json!({
2466 "ContributorInsightsSummaries": summaries
2467 }))
2468 }
2469
2470 fn export_table_to_point_in_time(
2473 &self,
2474 req: &AwsRequest,
2475 ) -> Result<AwsResponse, AwsServiceError> {
2476 let body = Self::parse_body(req)?;
2477 let table_arn = require_str(&body, "TableArn")?;
2478 let s3_bucket = require_str(&body, "S3Bucket")?;
2479 let s3_prefix = body["S3Prefix"].as_str();
2480 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2481
2482 let state = self.state.read();
2483 let table = find_table_by_arn(&state.tables, table_arn)?;
2485 let items = table.items.clone();
2486 let item_count = items.len() as i64;
2487
2488 let now = Utc::now();
2489 let export_arn = format!(
2490 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2491 state.region,
2492 state.account_id,
2493 table_arn.rsplit('/').next().unwrap_or("unknown"),
2494 uuid::Uuid::new_v4()
2495 );
2496
2497 drop(state);
2498
2499 let mut json_lines = String::new();
2501 for item in &items {
2502 let item_json = if export_format == "DYNAMODB_JSON" {
2503 json!({ "Item": item })
2504 } else {
2505 json!(item)
2506 };
2507 json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2508 json_lines.push('\n');
2509 }
2510 let data_size = json_lines.len() as i64;
2511
2512 let s3_key = if let Some(prefix) = s3_prefix {
2514 format!("{prefix}/data/manifest-files.json")
2515 } else {
2516 "data/manifest-files.json".to_string()
2517 };
2518
2519 let mut export_failed = false;
2521 let mut failure_reason = String::new();
2522 if let Some(ref s3_state) = self.s3_state {
2523 let mut s3 = s3_state.write();
2524 if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2525 let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2526 let obj = fakecloud_s3::state::S3Object {
2527 key: s3_key.clone(),
2528 data: bytes::Bytes::from(json_lines),
2529 content_type: "application/json".to_string(),
2530 etag,
2531 size: data_size as u64,
2532 last_modified: now,
2533 metadata: HashMap::new(),
2534 storage_class: "STANDARD".to_string(),
2535 tags: HashMap::new(),
2536 acl_grants: Vec::new(),
2537 acl_owner_id: None,
2538 parts_count: None,
2539 part_sizes: None,
2540 sse_algorithm: None,
2541 sse_kms_key_id: None,
2542 bucket_key_enabled: None,
2543 version_id: None,
2544 is_delete_marker: false,
2545 content_encoding: None,
2546 website_redirect_location: None,
2547 restore_ongoing: None,
2548 restore_expiry: None,
2549 checksum_algorithm: None,
2550 checksum_value: None,
2551 lock_mode: None,
2552 lock_retain_until: None,
2553 lock_legal_hold: None,
2554 };
2555 bucket.objects.insert(s3_key, obj);
2556 } else {
2557 export_failed = true;
2558 failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2559 }
2560 }
2561
2562 let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2563
2564 let export = ExportDescription {
2565 export_arn: export_arn.clone(),
2566 export_status: export_status.to_string(),
2567 table_arn: table_arn.to_string(),
2568 s3_bucket: s3_bucket.to_string(),
2569 s3_prefix: s3_prefix.map(|s| s.to_string()),
2570 export_format: export_format.to_string(),
2571 start_time: now,
2572 end_time: now,
2573 export_time: now,
2574 item_count,
2575 billed_size_bytes: data_size,
2576 };
2577
2578 let mut state = self.state.write();
2579 state.exports.insert(export_arn.clone(), export);
2580
2581 let mut response = json!({
2582 "ExportDescription": {
2583 "ExportArn": export_arn,
2584 "ExportStatus": export_status,
2585 "TableArn": table_arn,
2586 "S3Bucket": s3_bucket,
2587 "S3Prefix": s3_prefix,
2588 "ExportFormat": export_format,
2589 "StartTime": now.timestamp() as f64,
2590 "EndTime": now.timestamp() as f64,
2591 "ExportTime": now.timestamp() as f64,
2592 "ItemCount": item_count,
2593 "BilledSizeBytes": data_size
2594 }
2595 });
2596 if export_failed {
2597 response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2598 response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2599 }
2600 Self::ok_json(response)
2601 }
2602
2603 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2604 let body = Self::parse_body(req)?;
2605 let export_arn = require_str(&body, "ExportArn")?;
2606
2607 let state = self.state.read();
2608 let export = state.exports.get(export_arn).ok_or_else(|| {
2609 AwsServiceError::aws_error(
2610 StatusCode::BAD_REQUEST,
2611 "ExportNotFoundException",
2612 format!("Export not found: {export_arn}"),
2613 )
2614 })?;
2615
2616 Self::ok_json(json!({
2617 "ExportDescription": {
2618 "ExportArn": export.export_arn,
2619 "ExportStatus": export.export_status,
2620 "TableArn": export.table_arn,
2621 "S3Bucket": export.s3_bucket,
2622 "S3Prefix": export.s3_prefix,
2623 "ExportFormat": export.export_format,
2624 "StartTime": export.start_time.timestamp() as f64,
2625 "EndTime": export.end_time.timestamp() as f64,
2626 "ExportTime": export.export_time.timestamp() as f64,
2627 "ItemCount": export.item_count,
2628 "BilledSizeBytes": export.billed_size_bytes
2629 }
2630 }))
2631 }
2632
2633 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2634 let body = Self::parse_body(req)?;
2635 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2636 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2637 let table_arn = body["TableArn"].as_str();
2638
2639 let state = self.state.read();
2640 let summaries: Vec<Value> = state
2641 .exports
2642 .values()
2643 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2644 .map(|e| {
2645 json!({
2646 "ExportArn": e.export_arn,
2647 "ExportStatus": e.export_status,
2648 "TableArn": e.table_arn
2649 })
2650 })
2651 .collect();
2652
2653 Self::ok_json(json!({
2654 "ExportSummaries": summaries
2655 }))
2656 }
2657
2658 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2659 let body = Self::parse_body(req)?;
2660 let input_format = require_str(&body, "InputFormat")?;
2661 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2662 AwsServiceError::aws_error(
2663 StatusCode::BAD_REQUEST,
2664 "ValidationException",
2665 "S3BucketSource is required",
2666 )
2667 })?;
2668 let s3_bucket = s3_source
2669 .get("S3Bucket")
2670 .and_then(|v| v.as_str())
2671 .unwrap_or("");
2672 let s3_key_prefix = s3_source
2673 .get("S3KeyPrefix")
2674 .and_then(|v| v.as_str())
2675 .unwrap_or("");
2676
2677 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2678 AwsServiceError::aws_error(
2679 StatusCode::BAD_REQUEST,
2680 "ValidationException",
2681 "TableCreationParameters is required",
2682 )
2683 })?;
2684 let table_name = table_params
2685 .get("TableName")
2686 .and_then(|v| v.as_str())
2687 .ok_or_else(|| {
2688 AwsServiceError::aws_error(
2689 StatusCode::BAD_REQUEST,
2690 "ValidationException",
2691 "TableCreationParameters.TableName is required",
2692 )
2693 })?;
2694
2695 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2696 let attribute_definitions = parse_attribute_definitions(
2697 table_params
2698 .get("AttributeDefinitions")
2699 .unwrap_or(&Value::Null),
2700 )?;
2701
2702 let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
2704 let mut processed_size_bytes: i64 = 0;
2705 if let Some(ref s3_state) = self.s3_state {
2706 let s3 = s3_state.read();
2707 let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
2708 AwsServiceError::aws_error(
2709 StatusCode::BAD_REQUEST,
2710 "ImportConflictException",
2711 format!("S3 bucket does not exist: {s3_bucket}"),
2712 )
2713 })?;
2714 let prefix = if s3_key_prefix.is_empty() {
2716 String::new()
2717 } else {
2718 s3_key_prefix.to_string()
2719 };
2720 for (key, obj) in &bucket.objects {
2721 if !prefix.is_empty() && !key.starts_with(&prefix) {
2722 continue;
2723 }
2724 let data = std::str::from_utf8(&obj.data).unwrap_or("");
2725 processed_size_bytes += obj.size as i64;
2726 for line in data.lines() {
2727 let line = line.trim();
2728 if line.is_empty() {
2729 continue;
2730 }
2731 if let Ok(parsed) = serde_json::from_str::<Value>(line) {
2732 let item = if input_format == "DYNAMODB_JSON" {
2734 if let Some(item_obj) = parsed.get("Item") {
2735 item_obj.as_object().cloned().unwrap_or_default()
2736 } else {
2737 parsed.as_object().cloned().unwrap_or_default()
2738 }
2739 } else {
2740 parsed.as_object().cloned().unwrap_or_default()
2741 };
2742 if !item.is_empty() {
2743 imported_items
2744 .push(item.into_iter().collect::<HashMap<String, Value>>());
2745 }
2746 }
2747 }
2748 }
2749 }
2750
2751 let mut state = self.state.write();
2752
2753 if state.tables.contains_key(table_name) {
2754 return Err(AwsServiceError::aws_error(
2755 StatusCode::BAD_REQUEST,
2756 "ResourceInUseException",
2757 format!("Table already exists: {table_name}"),
2758 ));
2759 }
2760
2761 let now = Utc::now();
2762 let table_arn = format!(
2763 "arn:aws:dynamodb:{}:{}:table/{}",
2764 state.region, state.account_id, table_name
2765 );
2766 let import_arn = format!(
2767 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2768 state.region,
2769 state.account_id,
2770 table_name,
2771 uuid::Uuid::new_v4()
2772 );
2773
2774 let processed_item_count = imported_items.len() as i64;
2775
2776 let mut table = DynamoTable {
2777 name: table_name.to_string(),
2778 arn: table_arn.clone(),
2779 key_schema,
2780 attribute_definitions,
2781 provisioned_throughput: ProvisionedThroughput {
2782 read_capacity_units: 0,
2783 write_capacity_units: 0,
2784 },
2785 items: imported_items,
2786 gsi: Vec::new(),
2787 lsi: Vec::new(),
2788 tags: HashMap::new(),
2789 created_at: now,
2790 status: "ACTIVE".to_string(),
2791 item_count: 0,
2792 size_bytes: 0,
2793 billing_mode: "PAY_PER_REQUEST".to_string(),
2794 ttl_attribute: None,
2795 ttl_enabled: false,
2796 resource_policy: None,
2797 pitr_enabled: false,
2798 kinesis_destinations: Vec::new(),
2799 contributor_insights_status: "DISABLED".to_string(),
2800 contributor_insights_counters: HashMap::new(),
2801 };
2802 table.recalculate_stats();
2803 state.tables.insert(table_name.to_string(), table);
2804
2805 let import_desc = ImportDescription {
2806 import_arn: import_arn.clone(),
2807 import_status: "COMPLETED".to_string(),
2808 table_arn: table_arn.clone(),
2809 table_name: table_name.to_string(),
2810 s3_bucket_source: s3_bucket.to_string(),
2811 input_format: input_format.to_string(),
2812 start_time: now,
2813 end_time: now,
2814 processed_item_count,
2815 processed_size_bytes,
2816 };
2817 state.imports.insert(import_arn.clone(), import_desc);
2818
2819 let table_ref = state.tables.get(table_name).unwrap();
2820 let ks: Vec<Value> = table_ref
2821 .key_schema
2822 .iter()
2823 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2824 .collect();
2825 let ad: Vec<Value> = table_ref
2826 .attribute_definitions
2827 .iter()
2828 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2829 .collect();
2830
2831 Self::ok_json(json!({
2832 "ImportTableDescription": {
2833 "ImportArn": import_arn,
2834 "ImportStatus": "COMPLETED",
2835 "TableArn": table_arn,
2836 "TableId": uuid::Uuid::new_v4().to_string(),
2837 "S3BucketSource": {
2838 "S3Bucket": s3_bucket
2839 },
2840 "InputFormat": input_format,
2841 "TableCreationParameters": {
2842 "TableName": table_name,
2843 "KeySchema": ks,
2844 "AttributeDefinitions": ad
2845 },
2846 "StartTime": now.timestamp() as f64,
2847 "EndTime": now.timestamp() as f64,
2848 "ProcessedItemCount": processed_item_count,
2849 "ProcessedSizeBytes": processed_size_bytes
2850 }
2851 }))
2852 }
2853
2854 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2855 let body = Self::parse_body(req)?;
2856 let import_arn = require_str(&body, "ImportArn")?;
2857
2858 let state = self.state.read();
2859 let import = state.imports.get(import_arn).ok_or_else(|| {
2860 AwsServiceError::aws_error(
2861 StatusCode::BAD_REQUEST,
2862 "ImportNotFoundException",
2863 format!("Import not found: {import_arn}"),
2864 )
2865 })?;
2866
2867 Self::ok_json(json!({
2868 "ImportTableDescription": {
2869 "ImportArn": import.import_arn,
2870 "ImportStatus": import.import_status,
2871 "TableArn": import.table_arn,
2872 "S3BucketSource": {
2873 "S3Bucket": import.s3_bucket_source
2874 },
2875 "InputFormat": import.input_format,
2876 "StartTime": import.start_time.timestamp() as f64,
2877 "EndTime": import.end_time.timestamp() as f64,
2878 "ProcessedItemCount": import.processed_item_count,
2879 "ProcessedSizeBytes": import.processed_size_bytes
2880 }
2881 }))
2882 }
2883
2884 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2885 let body = Self::parse_body(req)?;
2886 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2887 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
2888 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
2889 let table_arn = body["TableArn"].as_str();
2890
2891 let state = self.state.read();
2892 let summaries: Vec<Value> = state
2893 .imports
2894 .values()
2895 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2896 .map(|i| {
2897 json!({
2898 "ImportArn": i.import_arn,
2899 "ImportStatus": i.import_status,
2900 "TableArn": i.table_arn
2901 })
2902 })
2903 .collect();
2904
2905 Self::ok_json(json!({
2906 "ImportSummaryList": summaries
2907 }))
2908 }
2909}
2910
2911#[async_trait]
2912impl AwsService for DynamoDbService {
2913 fn service_name(&self) -> &str {
2914 "dynamodb"
2915 }
2916
2917 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2918 match req.action.as_str() {
2919 "CreateTable" => self.create_table(&req),
2920 "DeleteTable" => self.delete_table(&req),
2921 "DescribeTable" => self.describe_table(&req),
2922 "ListTables" => self.list_tables(&req),
2923 "UpdateTable" => self.update_table(&req),
2924 "PutItem" => self.put_item(&req),
2925 "GetItem" => self.get_item(&req),
2926 "DeleteItem" => self.delete_item(&req),
2927 "UpdateItem" => self.update_item(&req),
2928 "Query" => self.query(&req),
2929 "Scan" => self.scan(&req),
2930 "BatchGetItem" => self.batch_get_item(&req),
2931 "BatchWriteItem" => self.batch_write_item(&req),
2932 "TagResource" => self.tag_resource(&req),
2933 "UntagResource" => self.untag_resource(&req),
2934 "ListTagsOfResource" => self.list_tags_of_resource(&req),
2935 "TransactGetItems" => self.transact_get_items(&req),
2936 "TransactWriteItems" => self.transact_write_items(&req),
2937 "ExecuteStatement" => self.execute_statement(&req),
2938 "BatchExecuteStatement" => self.batch_execute_statement(&req),
2939 "ExecuteTransaction" => self.execute_transaction(&req),
2940 "UpdateTimeToLive" => self.update_time_to_live(&req),
2941 "DescribeTimeToLive" => self.describe_time_to_live(&req),
2942 "PutResourcePolicy" => self.put_resource_policy(&req),
2943 "GetResourcePolicy" => self.get_resource_policy(&req),
2944 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2945 "DescribeEndpoints" => self.describe_endpoints(&req),
2947 "DescribeLimits" => self.describe_limits(&req),
2948 "CreateBackup" => self.create_backup(&req),
2950 "DeleteBackup" => self.delete_backup(&req),
2951 "DescribeBackup" => self.describe_backup(&req),
2952 "ListBackups" => self.list_backups(&req),
2953 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2954 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2955 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2956 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2957 "CreateGlobalTable" => self.create_global_table(&req),
2959 "DescribeGlobalTable" => self.describe_global_table(&req),
2960 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2961 "ListGlobalTables" => self.list_global_tables(&req),
2962 "UpdateGlobalTable" => self.update_global_table(&req),
2963 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2964 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2965 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2966 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2968 "DisableKinesisStreamingDestination" => {
2969 self.disable_kinesis_streaming_destination(&req)
2970 }
2971 "DescribeKinesisStreamingDestination" => {
2972 self.describe_kinesis_streaming_destination(&req)
2973 }
2974 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2975 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2977 "UpdateContributorInsights" => self.update_contributor_insights(&req),
2978 "ListContributorInsights" => self.list_contributor_insights(&req),
2979 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2981 "DescribeExport" => self.describe_export(&req),
2982 "ListExports" => self.list_exports(&req),
2983 "ImportTable" => self.import_table(&req),
2984 "DescribeImport" => self.describe_import(&req),
2985 "ListImports" => self.list_imports(&req),
2986 _ => Err(AwsServiceError::action_not_implemented(
2987 "dynamodb",
2988 &req.action,
2989 )),
2990 }
2991 }
2992
2993 fn supported_actions(&self) -> &[&str] {
2994 &[
2995 "CreateTable",
2996 "DeleteTable",
2997 "DescribeTable",
2998 "ListTables",
2999 "UpdateTable",
3000 "PutItem",
3001 "GetItem",
3002 "DeleteItem",
3003 "UpdateItem",
3004 "Query",
3005 "Scan",
3006 "BatchGetItem",
3007 "BatchWriteItem",
3008 "TagResource",
3009 "UntagResource",
3010 "ListTagsOfResource",
3011 "TransactGetItems",
3012 "TransactWriteItems",
3013 "ExecuteStatement",
3014 "BatchExecuteStatement",
3015 "ExecuteTransaction",
3016 "UpdateTimeToLive",
3017 "DescribeTimeToLive",
3018 "PutResourcePolicy",
3019 "GetResourcePolicy",
3020 "DeleteResourcePolicy",
3021 "DescribeEndpoints",
3022 "DescribeLimits",
3023 "CreateBackup",
3024 "DeleteBackup",
3025 "DescribeBackup",
3026 "ListBackups",
3027 "RestoreTableFromBackup",
3028 "RestoreTableToPointInTime",
3029 "UpdateContinuousBackups",
3030 "DescribeContinuousBackups",
3031 "CreateGlobalTable",
3032 "DescribeGlobalTable",
3033 "DescribeGlobalTableSettings",
3034 "ListGlobalTables",
3035 "UpdateGlobalTable",
3036 "UpdateGlobalTableSettings",
3037 "DescribeTableReplicaAutoScaling",
3038 "UpdateTableReplicaAutoScaling",
3039 "EnableKinesisStreamingDestination",
3040 "DisableKinesisStreamingDestination",
3041 "DescribeKinesisStreamingDestination",
3042 "UpdateKinesisStreamingDestination",
3043 "DescribeContributorInsights",
3044 "UpdateContributorInsights",
3045 "ListContributorInsights",
3046 "ExportTableToPointInTime",
3047 "DescribeExport",
3048 "ListExports",
3049 "ImportTable",
3050 "DescribeImport",
3051 "ListImports",
3052 ]
3053 }
3054}
3055
3056fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3059 body[field].as_str().ok_or_else(|| {
3060 AwsServiceError::aws_error(
3061 StatusCode::BAD_REQUEST,
3062 "ValidationException",
3063 format!("{field} is required"),
3064 )
3065 })
3066}
3067
3068fn require_object(
3069 body: &Value,
3070 field: &str,
3071) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3072 let obj = body[field].as_object().ok_or_else(|| {
3073 AwsServiceError::aws_error(
3074 StatusCode::BAD_REQUEST,
3075 "ValidationException",
3076 format!("{field} is required"),
3077 )
3078 })?;
3079 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3080}
3081
3082fn get_table<'a>(
3083 tables: &'a HashMap<String, DynamoTable>,
3084 name: &str,
3085) -> Result<&'a DynamoTable, AwsServiceError> {
3086 tables.get(name).ok_or_else(|| {
3087 AwsServiceError::aws_error(
3088 StatusCode::BAD_REQUEST,
3089 "ResourceNotFoundException",
3090 format!("Requested resource not found: Table: {name} not found"),
3091 )
3092 })
3093}
3094
3095fn get_table_mut<'a>(
3096 tables: &'a mut HashMap<String, DynamoTable>,
3097 name: &str,
3098) -> Result<&'a mut DynamoTable, AwsServiceError> {
3099 tables.get_mut(name).ok_or_else(|| {
3100 AwsServiceError::aws_error(
3101 StatusCode::BAD_REQUEST,
3102 "ResourceNotFoundException",
3103 format!("Requested resource not found: Table: {name} not found"),
3104 )
3105 })
3106}
3107
3108fn find_table_by_arn<'a>(
3109 tables: &'a HashMap<String, DynamoTable>,
3110 arn: &str,
3111) -> Result<&'a DynamoTable, AwsServiceError> {
3112 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3113 AwsServiceError::aws_error(
3114 StatusCode::BAD_REQUEST,
3115 "ResourceNotFoundException",
3116 format!("Requested resource not found: {arn}"),
3117 )
3118 })
3119}
3120
3121fn find_table_by_arn_mut<'a>(
3122 tables: &'a mut HashMap<String, DynamoTable>,
3123 arn: &str,
3124) -> Result<&'a mut DynamoTable, AwsServiceError> {
3125 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3126 AwsServiceError::aws_error(
3127 StatusCode::BAD_REQUEST,
3128 "ResourceNotFoundException",
3129 format!("Requested resource not found: {arn}"),
3130 )
3131 })
3132}
3133
3134fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3135 let arr = val.as_array().ok_or_else(|| {
3136 AwsServiceError::aws_error(
3137 StatusCode::BAD_REQUEST,
3138 "ValidationException",
3139 "KeySchema is required",
3140 )
3141 })?;
3142 Ok(arr
3143 .iter()
3144 .map(|elem| KeySchemaElement {
3145 attribute_name: elem["AttributeName"]
3146 .as_str()
3147 .unwrap_or_default()
3148 .to_string(),
3149 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3150 })
3151 .collect())
3152}
3153
3154fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3155 let arr = val.as_array().ok_or_else(|| {
3156 AwsServiceError::aws_error(
3157 StatusCode::BAD_REQUEST,
3158 "ValidationException",
3159 "AttributeDefinitions is required",
3160 )
3161 })?;
3162 Ok(arr
3163 .iter()
3164 .map(|elem| AttributeDefinition {
3165 attribute_name: elem["AttributeName"]
3166 .as_str()
3167 .unwrap_or_default()
3168 .to_string(),
3169 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3170 })
3171 .collect())
3172}
3173
3174fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3175 Ok(ProvisionedThroughput {
3176 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3177 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3178 })
3179}
3180
3181fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3182 let Some(arr) = val.as_array() else {
3183 return Vec::new();
3184 };
3185 arr.iter()
3186 .filter_map(|g| {
3187 Some(GlobalSecondaryIndex {
3188 index_name: g["IndexName"].as_str()?.to_string(),
3189 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3190 projection: parse_projection(&g["Projection"]),
3191 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3192 .ok(),
3193 })
3194 })
3195 .collect()
3196}
3197
3198fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3199 let Some(arr) = val.as_array() else {
3200 return Vec::new();
3201 };
3202 arr.iter()
3203 .filter_map(|l| {
3204 Some(LocalSecondaryIndex {
3205 index_name: l["IndexName"].as_str()?.to_string(),
3206 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3207 projection: parse_projection(&l["Projection"]),
3208 })
3209 })
3210 .collect()
3211}
3212
3213fn parse_projection(val: &Value) -> Projection {
3214 Projection {
3215 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3216 non_key_attributes: val["NonKeyAttributes"]
3217 .as_array()
3218 .map(|arr| {
3219 arr.iter()
3220 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3221 .collect()
3222 })
3223 .unwrap_or_default(),
3224 }
3225}
3226
3227fn parse_tags(val: &Value) -> HashMap<String, String> {
3228 let mut tags = HashMap::new();
3229 if let Some(arr) = val.as_array() {
3230 for tag in arr {
3231 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3232 tags.insert(k.to_string(), v.to_string());
3233 }
3234 }
3235 }
3236 tags
3237}
3238
3239fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3240 let mut names = HashMap::new();
3241 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3242 for (k, v) in obj {
3243 if let Some(s) = v.as_str() {
3244 names.insert(k.clone(), s.to_string());
3245 }
3246 }
3247 }
3248 names
3249}
3250
3251fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3252 let mut values = HashMap::new();
3253 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3254 for (k, v) in obj {
3255 values.insert(k.clone(), v.clone());
3256 }
3257 }
3258 values
3259}
3260
3261fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3262 if name.starts_with('#') {
3263 expr_attr_names
3264 .get(name)
3265 .cloned()
3266 .unwrap_or_else(|| name.to_string())
3267 } else {
3268 name.to_string()
3269 }
3270}
3271
3272fn extract_key(
3273 table: &DynamoTable,
3274 item: &HashMap<String, AttributeValue>,
3275) -> HashMap<String, AttributeValue> {
3276 let mut key = HashMap::new();
3277 let hash_key = table.hash_key_name();
3278 if let Some(v) = item.get(hash_key) {
3279 key.insert(hash_key.to_string(), v.clone());
3280 }
3281 if let Some(range_key) = table.range_key_name() {
3282 if let Some(v) = item.get(range_key) {
3283 key.insert(range_key.to_string(), v.clone());
3284 }
3285 }
3286 key
3287}
3288
3289fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3291 let obj = value.as_object()?;
3292 if obj.is_empty() {
3293 return None;
3294 }
3295 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3296}
3297
3298fn item_matches_key(
3300 item: &HashMap<String, AttributeValue>,
3301 key: &HashMap<String, AttributeValue>,
3302 hash_key_name: &str,
3303 range_key_name: Option<&str>,
3304) -> bool {
3305 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3306 (Some(a), Some(b)) => a == b,
3307 _ => false,
3308 };
3309 if !hash_match {
3310 return false;
3311 }
3312 match range_key_name {
3313 Some(rk) => match (item.get(rk), key.get(rk)) {
3314 (Some(a), Some(b)) => a == b,
3315 (None, None) => true,
3316 _ => false,
3317 },
3318 None => true,
3319 }
3320}
3321
3322fn extract_key_for_schema(
3324 item: &HashMap<String, AttributeValue>,
3325 hash_key_name: &str,
3326 range_key_name: Option<&str>,
3327) -> HashMap<String, AttributeValue> {
3328 let mut key = HashMap::new();
3329 if let Some(v) = item.get(hash_key_name) {
3330 key.insert(hash_key_name.to_string(), v.clone());
3331 }
3332 if let Some(rk) = range_key_name {
3333 if let Some(v) = item.get(rk) {
3334 key.insert(rk.to_string(), v.clone());
3335 }
3336 }
3337 key
3338}
3339
3340fn validate_key_in_item(
3341 table: &DynamoTable,
3342 item: &HashMap<String, AttributeValue>,
3343) -> Result<(), AwsServiceError> {
3344 let hash_key = table.hash_key_name();
3345 if !item.contains_key(hash_key) {
3346 return Err(AwsServiceError::aws_error(
3347 StatusCode::BAD_REQUEST,
3348 "ValidationException",
3349 format!("Missing the key {hash_key} in the item"),
3350 ));
3351 }
3352 if let Some(range_key) = table.range_key_name() {
3353 if !item.contains_key(range_key) {
3354 return Err(AwsServiceError::aws_error(
3355 StatusCode::BAD_REQUEST,
3356 "ValidationException",
3357 format!("Missing the key {range_key} in the item"),
3358 ));
3359 }
3360 }
3361 Ok(())
3362}
3363
3364fn validate_key_attributes_in_key(
3365 table: &DynamoTable,
3366 key: &HashMap<String, AttributeValue>,
3367) -> Result<(), AwsServiceError> {
3368 let hash_key = table.hash_key_name();
3369 if !key.contains_key(hash_key) {
3370 return Err(AwsServiceError::aws_error(
3371 StatusCode::BAD_REQUEST,
3372 "ValidationException",
3373 format!("Missing the key {hash_key} in the item"),
3374 ));
3375 }
3376 Ok(())
3377}
3378
3379fn project_item(
3380 item: &HashMap<String, AttributeValue>,
3381 body: &Value,
3382) -> HashMap<String, AttributeValue> {
3383 let projection = body["ProjectionExpression"].as_str();
3384 match projection {
3385 Some(proj) if !proj.is_empty() => {
3386 let expr_attr_names = parse_expression_attribute_names(body);
3387 let attrs: Vec<String> = proj
3388 .split(',')
3389 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3390 .collect();
3391 let mut result = HashMap::new();
3392 for attr in &attrs {
3393 if let Some(v) = resolve_nested_path(item, attr) {
3394 insert_nested_value(&mut result, attr, v);
3395 }
3396 }
3397 result
3398 }
3399 _ => item.clone(),
3400 }
3401}
3402
3403fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3406 let mut result = String::new();
3408 for (i, segment) in path.split('.').enumerate() {
3409 if i > 0 {
3410 result.push('.');
3411 }
3412 if let Some(bracket_pos) = segment.find('[') {
3414 let key_part = &segment[..bracket_pos];
3415 let index_part = &segment[bracket_pos..];
3416 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3417 result.push_str(index_part);
3418 } else {
3419 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3420 }
3421 }
3422 result
3423}
3424
3425fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3427 let segments = parse_path_segments(path);
3428 if segments.is_empty() {
3429 return None;
3430 }
3431
3432 let first = &segments[0];
3433 let top_key = match first {
3434 PathSegment::Key(k) => k.as_str(),
3435 _ => return None,
3436 };
3437
3438 let mut current = item.get(top_key)?.clone();
3439
3440 for segment in &segments[1..] {
3441 match segment {
3442 PathSegment::Key(k) => {
3443 current = current.get("M")?.get(k)?.clone();
3445 }
3446 PathSegment::Index(idx) => {
3447 current = current.get("L")?.get(*idx)?.clone();
3449 }
3450 }
3451 }
3452
3453 Some(current)
3454}
3455
3456#[derive(Debug)]
3457enum PathSegment {
3458 Key(String),
3459 Index(usize),
3460}
3461
3462fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3464 let mut segments = Vec::new();
3465 let mut current = String::new();
3466
3467 let chars: Vec<char> = path.chars().collect();
3468 let mut i = 0;
3469 while i < chars.len() {
3470 match chars[i] {
3471 '.' => {
3472 if !current.is_empty() {
3473 segments.push(PathSegment::Key(current.clone()));
3474 current.clear();
3475 }
3476 }
3477 '[' => {
3478 if !current.is_empty() {
3479 segments.push(PathSegment::Key(current.clone()));
3480 current.clear();
3481 }
3482 i += 1;
3483 let mut num = String::new();
3484 while i < chars.len() && chars[i] != ']' {
3485 num.push(chars[i]);
3486 i += 1;
3487 }
3488 if let Ok(idx) = num.parse::<usize>() {
3489 segments.push(PathSegment::Index(idx));
3490 }
3491 }
3493 c => {
3494 current.push(c);
3495 }
3496 }
3497 i += 1;
3498 }
3499 if !current.is_empty() {
3500 segments.push(PathSegment::Key(current));
3501 }
3502 segments
3503}
3504
3505fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3508 if !path.contains('.') && !path.contains('[') {
3510 result.insert(path.to_string(), value);
3511 return;
3512 }
3513
3514 let segments = parse_path_segments(path);
3515 if segments.is_empty() {
3516 return;
3517 }
3518
3519 let top_key = match &segments[0] {
3520 PathSegment::Key(k) => k.clone(),
3521 _ => return,
3522 };
3523
3524 if segments.len() == 1 {
3525 result.insert(top_key, value);
3526 return;
3527 }
3528
3529 let wrapped = wrap_value_in_path(&segments[1..], value);
3531 let existing = result.remove(&top_key);
3533 let merged = match existing {
3534 Some(existing) => merge_attribute_values(existing, wrapped),
3535 None => wrapped,
3536 };
3537 result.insert(top_key, merged);
3538}
3539
3540fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3542 if segments.is_empty() {
3543 return value;
3544 }
3545 let inner = wrap_value_in_path(&segments[1..], value);
3546 match &segments[0] {
3547 PathSegment::Key(k) => {
3548 json!({"M": {k.clone(): inner}})
3549 }
3550 PathSegment::Index(idx) => {
3551 let mut arr = vec![Value::Null; idx + 1];
3552 arr[*idx] = inner;
3553 json!({"L": arr})
3554 }
3555 }
3556}
3557
3558fn merge_attribute_values(a: Value, b: Value) -> Value {
3560 if let (Some(a_map), Some(b_map)) = (
3561 a.get("M").and_then(|v| v.as_object()),
3562 b.get("M").and_then(|v| v.as_object()),
3563 ) {
3564 let mut merged = a_map.clone();
3565 for (k, v) in b_map {
3566 if let Some(existing) = merged.get(k) {
3567 merged.insert(
3568 k.clone(),
3569 merge_attribute_values(existing.clone(), v.clone()),
3570 );
3571 } else {
3572 merged.insert(k.clone(), v.clone());
3573 }
3574 }
3575 json!({"M": merged})
3576 } else {
3577 b
3578 }
3579}
3580
3581fn evaluate_condition(
3582 condition: &str,
3583 existing: Option<&HashMap<String, AttributeValue>>,
3584 expr_attr_names: &HashMap<String, String>,
3585 expr_attr_values: &HashMap<String, Value>,
3586) -> Result<(), AwsServiceError> {
3587 let cond = condition.trim();
3588
3589 if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3590 let attr = resolve_attr_name(inner, expr_attr_names);
3591 match existing {
3592 Some(item) if item.contains_key(&attr) => {
3593 return Err(AwsServiceError::aws_error(
3594 StatusCode::BAD_REQUEST,
3595 "ConditionalCheckFailedException",
3596 "The conditional request failed",
3597 ));
3598 }
3599 _ => return Ok(()),
3600 }
3601 }
3602
3603 if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3604 let attr = resolve_attr_name(inner, expr_attr_names);
3605 match existing {
3606 Some(item) if item.contains_key(&attr) => return Ok(()),
3607 _ => {
3608 return Err(AwsServiceError::aws_error(
3609 StatusCode::BAD_REQUEST,
3610 "ConditionalCheckFailedException",
3611 "The conditional request failed",
3612 ));
3613 }
3614 }
3615 }
3616
3617 if let Some((left, op, right)) = parse_simple_comparison(cond) {
3618 let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3619 let expected = expr_attr_values.get(right.trim());
3620 let actual = existing.and_then(|item| item.get(&attr_name));
3621
3622 let result = match op {
3623 "=" => actual == expected,
3624 "<>" => actual != expected,
3625 _ => true,
3626 };
3627
3628 if !result {
3629 return Err(AwsServiceError::aws_error(
3630 StatusCode::BAD_REQUEST,
3631 "ConditionalCheckFailedException",
3632 "The conditional request failed",
3633 ));
3634 }
3635 }
3636
3637 Ok(())
3638}
3639
3640fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3641 let prefix = format!("{func_name}(");
3642 if let Some(rest) = expr.strip_prefix(&prefix) {
3643 if let Some(inner) = rest.strip_suffix(')') {
3644 return Some(inner.trim());
3645 }
3646 }
3647 None
3648}
3649
3650fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3651 for op in &["<>", "=", "<", ">", "<=", ">="] {
3652 if let Some(pos) = expr.find(op) {
3653 let left = &expr[..pos];
3654 let right = &expr[pos + op.len()..];
3655 return Some((left, op, right));
3656 }
3657 }
3658 None
3659}
3660
3661fn evaluate_key_condition(
3662 expr: &str,
3663 item: &HashMap<String, AttributeValue>,
3664 hash_key_name: &str,
3665 _range_key_name: Option<&str>,
3666 expr_attr_names: &HashMap<String, String>,
3667 expr_attr_values: &HashMap<String, Value>,
3668) -> bool {
3669 let parts: Vec<&str> = split_on_and(expr);
3670 for part in &parts {
3671 let part = part.trim();
3672 if !evaluate_single_key_condition(
3673 part,
3674 item,
3675 hash_key_name,
3676 expr_attr_names,
3677 expr_attr_values,
3678 ) {
3679 return false;
3680 }
3681 }
3682 true
3683}
3684
3685fn split_on_and(expr: &str) -> Vec<&str> {
3686 let mut parts = Vec::new();
3687 let mut start = 0;
3688 let len = expr.len();
3689 let mut i = 0;
3690 let mut depth = 0;
3691 while i < len {
3692 let ch = expr.as_bytes()[i];
3693 if ch == b'(' {
3694 depth += 1;
3695 } else if ch == b')' {
3696 if depth > 0 {
3697 depth -= 1;
3698 }
3699 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3700 parts.push(&expr[start..i]);
3701 start = i + 5;
3702 i = start;
3703 continue;
3704 }
3705 i += 1;
3706 }
3707 parts.push(&expr[start..]);
3708 parts
3709}
3710
3711fn split_on_or(expr: &str) -> Vec<&str> {
3712 let mut parts = Vec::new();
3713 let mut start = 0;
3714 let len = expr.len();
3715 let mut i = 0;
3716 let mut depth = 0;
3717 while i < len {
3718 let ch = expr.as_bytes()[i];
3719 if ch == b'(' {
3720 depth += 1;
3721 } else if ch == b')' {
3722 if depth > 0 {
3723 depth -= 1;
3724 }
3725 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3726 parts.push(&expr[start..i]);
3727 start = i + 4;
3728 i = start;
3729 continue;
3730 }
3731 i += 1;
3732 }
3733 parts.push(&expr[start..]);
3734 parts
3735}
3736
3737fn evaluate_single_key_condition(
3738 part: &str,
3739 item: &HashMap<String, AttributeValue>,
3740 _hash_key_name: &str,
3741 expr_attr_names: &HashMap<String, String>,
3742 expr_attr_values: &HashMap<String, Value>,
3743) -> bool {
3744 let part = part.trim();
3745
3746 if let Some(rest) = part
3748 .strip_prefix("begins_with(")
3749 .or_else(|| part.strip_prefix("begins_with ("))
3750 {
3751 if let Some(inner) = rest.strip_suffix(')') {
3752 let mut split = inner.splitn(2, ',');
3753 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3754 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3755 let val_ref = val_ref.trim();
3756 let expected = expr_attr_values.get(val_ref);
3757 let actual = item.get(&attr_name);
3758 return match (actual, expected) {
3759 (Some(a), Some(e)) => {
3760 let a_str = extract_string_value(a);
3761 let e_str = extract_string_value(e);
3762 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3763 }
3764 _ => false,
3765 };
3766 }
3767 }
3768 return false;
3769 }
3770
3771 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3773 let attr_part = part[..between_pos].trim();
3774 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3775 let range_part = &part[between_pos + 7..];
3776 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3777 let lo_ref = range_part[..and_pos].trim();
3778 let hi_ref = range_part[and_pos + 5..].trim();
3779 let lo = expr_attr_values.get(lo_ref);
3780 let hi = expr_attr_values.get(hi_ref);
3781 let actual = item.get(&attr_name);
3782 return match (actual, lo, hi) {
3783 (Some(a), Some(l), Some(h)) => {
3784 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3785 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3786 }
3787 _ => false,
3788 };
3789 }
3790 }
3791
3792 for op in &["<=", ">=", "<>", "=", "<", ">"] {
3794 if let Some(pos) = part.find(op) {
3795 let left = part[..pos].trim();
3796 let right = part[pos + op.len()..].trim();
3797 let attr_name = resolve_attr_name(left, expr_attr_names);
3798 let expected = expr_attr_values.get(right);
3799 let actual = item.get(&attr_name);
3800
3801 return match *op {
3802 "=" => actual == expected,
3803 "<>" => actual != expected,
3804 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3805 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3806 "<=" => {
3807 let cmp = compare_attribute_values(actual, expected);
3808 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3809 }
3810 ">=" => {
3811 let cmp = compare_attribute_values(actual, expected);
3812 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3813 }
3814 _ => true,
3815 };
3816 }
3817 }
3818
3819 true
3820}
3821
3822fn extract_string_value(val: &Value) -> Option<String> {
3823 val.get("S")
3824 .and_then(|v| v.as_str())
3825 .map(|s| s.to_string())
3826 .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3827}
3828
3829fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3830 match (a, b) {
3831 (None, None) => std::cmp::Ordering::Equal,
3832 (None, Some(_)) => std::cmp::Ordering::Less,
3833 (Some(_), None) => std::cmp::Ordering::Greater,
3834 (Some(a), Some(b)) => {
3835 let a_type = attribute_type_and_value(a);
3836 let b_type = attribute_type_and_value(b);
3837 match (a_type, b_type) {
3838 (Some(("S", a_val)), Some(("S", b_val))) => {
3839 let a_str = a_val.as_str().unwrap_or("");
3840 let b_str = b_val.as_str().unwrap_or("");
3841 a_str.cmp(b_str)
3842 }
3843 (Some(("N", a_val)), Some(("N", b_val))) => {
3844 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3845 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3846 a_num
3847 .partial_cmp(&b_num)
3848 .unwrap_or(std::cmp::Ordering::Equal)
3849 }
3850 (Some(("B", a_val)), Some(("B", b_val))) => {
3851 let a_str = a_val.as_str().unwrap_or("");
3852 let b_str = b_val.as_str().unwrap_or("");
3853 a_str.cmp(b_str)
3854 }
3855 _ => std::cmp::Ordering::Equal,
3856 }
3857 }
3858 }
3859}
3860
3861fn evaluate_filter_expression(
3862 expr: &str,
3863 item: &HashMap<String, AttributeValue>,
3864 expr_attr_names: &HashMap<String, String>,
3865 expr_attr_values: &HashMap<String, Value>,
3866) -> bool {
3867 let trimmed = expr.trim();
3868
3869 let or_parts = split_on_or(trimmed);
3871 if or_parts.len() > 1 {
3872 return or_parts.iter().any(|part| {
3873 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3874 });
3875 }
3876
3877 let and_parts = split_on_and(trimmed);
3879 if and_parts.len() > 1 {
3880 return and_parts.iter().all(|part| {
3881 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3882 });
3883 }
3884
3885 let stripped = strip_outer_parens(trimmed);
3887 if stripped != trimmed {
3888 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3889 }
3890
3891 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3892}
3893
3894fn strip_outer_parens(expr: &str) -> &str {
3896 let trimmed = expr.trim();
3897 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3898 return trimmed;
3899 }
3900 let inner = &trimmed[1..trimmed.len() - 1];
3902 let mut depth = 0;
3903 for ch in inner.bytes() {
3904 match ch {
3905 b'(' => depth += 1,
3906 b')' => {
3907 if depth == 0 {
3908 return trimmed; }
3910 depth -= 1;
3911 }
3912 _ => {}
3913 }
3914 }
3915 if depth == 0 {
3916 inner
3917 } else {
3918 trimmed
3919 }
3920}
3921
3922fn evaluate_single_filter_condition(
3923 part: &str,
3924 item: &HashMap<String, AttributeValue>,
3925 expr_attr_names: &HashMap<String, String>,
3926 expr_attr_values: &HashMap<String, Value>,
3927) -> bool {
3928 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3929 let attr = resolve_attr_name(inner, expr_attr_names);
3930 return item.contains_key(&attr);
3931 }
3932
3933 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3934 let attr = resolve_attr_name(inner, expr_attr_names);
3935 return !item.contains_key(&attr);
3936 }
3937
3938 if let Some(rest) = part
3939 .strip_prefix("begins_with(")
3940 .or_else(|| part.strip_prefix("begins_with ("))
3941 {
3942 if let Some(inner) = rest.strip_suffix(')') {
3943 let mut split = inner.splitn(2, ',');
3944 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3945 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3946 let expected = expr_attr_values.get(val_ref.trim());
3947 let actual = item.get(&attr_name);
3948 return match (actual, expected) {
3949 (Some(a), Some(e)) => {
3950 let a_str = extract_string_value(a);
3951 let e_str = extract_string_value(e);
3952 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3953 }
3954 _ => false,
3955 };
3956 }
3957 }
3958 }
3959
3960 if let Some(rest) = part
3961 .strip_prefix("contains(")
3962 .or_else(|| part.strip_prefix("contains ("))
3963 {
3964 if let Some(inner) = rest.strip_suffix(')') {
3965 let mut split = inner.splitn(2, ',');
3966 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3967 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3968 let expected = expr_attr_values.get(val_ref.trim());
3969 let actual = item.get(&attr_name);
3970 return match (actual, expected) {
3971 (Some(a), Some(e)) => {
3972 let a_str = extract_string_value(a);
3973 let e_str = extract_string_value(e);
3974 matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3975 }
3976 _ => false,
3977 };
3978 }
3979 }
3980 }
3981
3982 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3983}
3984
3985fn apply_update_expression(
3986 item: &mut HashMap<String, AttributeValue>,
3987 expr: &str,
3988 expr_attr_names: &HashMap<String, String>,
3989 expr_attr_values: &HashMap<String, Value>,
3990) -> Result<(), AwsServiceError> {
3991 let clauses = parse_update_clauses(expr);
3992 for (action, assignments) in &clauses {
3993 match action.to_ascii_uppercase().as_str() {
3994 "SET" => {
3995 for assignment in assignments {
3996 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3997 }
3998 }
3999 "REMOVE" => {
4000 for attr_ref in assignments {
4001 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4002 item.remove(&attr);
4003 }
4004 }
4005 "ADD" => {
4006 for assignment in assignments {
4007 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4008 }
4009 }
4010 "DELETE" => {
4011 for assignment in assignments {
4012 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4013 }
4014 }
4015 _ => {}
4016 }
4017 }
4018 Ok(())
4019}
4020
4021fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4022 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4023 let upper = expr.to_ascii_uppercase();
4024 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4025 let mut positions: Vec<(usize, &str)> = Vec::new();
4026
4027 for kw in &keywords {
4028 let mut search_from = 0;
4029 while let Some(pos) = upper[search_from..].find(kw) {
4030 let abs_pos = search_from + pos;
4031 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4032 let after_pos = abs_pos + kw.len();
4033 let after_ok =
4034 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4035 if before_ok && after_ok {
4036 positions.push((abs_pos, kw));
4037 }
4038 search_from = abs_pos + kw.len();
4039 }
4040 }
4041
4042 positions.sort_by_key(|(pos, _)| *pos);
4043
4044 for (i, &(pos, kw)) in positions.iter().enumerate() {
4045 let start = pos + kw.len();
4046 let end = if i + 1 < positions.len() {
4047 positions[i + 1].0
4048 } else {
4049 expr.len()
4050 };
4051 let content = expr[start..end].trim();
4052 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4053 clauses.push((kw.to_string(), assignments));
4054 }
4055
4056 clauses
4057}
4058
4059fn apply_set_assignment(
4060 item: &mut HashMap<String, AttributeValue>,
4061 assignment: &str,
4062 expr_attr_names: &HashMap<String, String>,
4063 expr_attr_values: &HashMap<String, Value>,
4064) -> Result<(), AwsServiceError> {
4065 let Some((left, right)) = assignment.split_once('=') else {
4066 return Ok(());
4067 };
4068
4069 let attr = resolve_attr_name(left.trim(), expr_attr_names);
4070 let right = right.trim();
4071
4072 if let Some(rest) = right
4074 .strip_prefix("if_not_exists(")
4075 .or_else(|| right.strip_prefix("if_not_exists ("))
4076 {
4077 if let Some(inner) = rest.strip_suffix(')') {
4078 let mut split = inner.splitn(2, ',');
4079 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4080 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4081 if !item.contains_key(&check_name) {
4082 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4083 item.insert(attr, val.clone());
4084 }
4085 }
4086 return Ok(());
4087 }
4088 }
4089 }
4090
4091 if let Some(rest) = right
4093 .strip_prefix("list_append(")
4094 .or_else(|| right.strip_prefix("list_append ("))
4095 {
4096 if let Some(inner) = rest.strip_suffix(')') {
4097 let mut split = inner.splitn(2, ',');
4098 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4099 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4100 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4101
4102 let mut merged = Vec::new();
4103 if let Some(Value::Object(obj)) = &a_val {
4104 if let Some(Value::Array(arr)) = obj.get("L") {
4105 merged.extend(arr.clone());
4106 }
4107 }
4108 if let Some(Value::Object(obj)) = &b_val {
4109 if let Some(Value::Array(arr)) = obj.get("L") {
4110 merged.extend(arr.clone());
4111 }
4112 }
4113
4114 item.insert(attr, json!({"L": merged}));
4115 return Ok(());
4116 }
4117 }
4118 }
4119
4120 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4122 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4123 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4124
4125 let left_num = extract_number(&left_val).unwrap_or(0.0);
4126 let right_num = extract_number(&right_val).unwrap_or(0.0);
4127
4128 let result = if is_add {
4129 left_num + right_num
4130 } else {
4131 left_num - right_num
4132 };
4133
4134 let num_str = if result == result.trunc() {
4135 format!("{}", result as i64)
4136 } else {
4137 format!("{result}")
4138 };
4139
4140 item.insert(attr, json!({"N": num_str}));
4141 return Ok(());
4142 }
4143
4144 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4146 if let Some(v) = val {
4147 item.insert(attr, v);
4148 }
4149
4150 Ok(())
4151}
4152
4153fn resolve_value(
4154 reference: &str,
4155 item: &HashMap<String, AttributeValue>,
4156 expr_attr_names: &HashMap<String, String>,
4157 expr_attr_values: &HashMap<String, Value>,
4158) -> Option<Value> {
4159 let reference = reference.trim();
4160 if reference.starts_with(':') {
4161 expr_attr_values.get(reference).cloned()
4162 } else {
4163 let attr_name = resolve_attr_name(reference, expr_attr_names);
4164 item.get(&attr_name).cloned()
4165 }
4166}
4167
4168fn extract_number(val: &Option<Value>) -> Option<f64> {
4169 val.as_ref()
4170 .and_then(|v| v.get("N"))
4171 .and_then(|n| n.as_str())
4172 .and_then(|s| s.parse().ok())
4173}
4174
4175fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4176 let mut depth = 0;
4177 for (i, c) in expr.char_indices() {
4178 match c {
4179 '(' => depth += 1,
4180 ')' => depth -= 1,
4181 '+' if depth == 0 && i > 0 => {
4182 return Some((&expr[..i], &expr[i + 1..], true));
4183 }
4184 '-' if depth == 0 && i > 0 => {
4185 return Some((&expr[..i], &expr[i + 1..], false));
4186 }
4187 _ => {}
4188 }
4189 }
4190 None
4191}
4192
4193fn apply_add_assignment(
4194 item: &mut HashMap<String, AttributeValue>,
4195 assignment: &str,
4196 expr_attr_names: &HashMap<String, String>,
4197 expr_attr_values: &HashMap<String, Value>,
4198) -> Result<(), AwsServiceError> {
4199 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4200 if parts.len() != 2 {
4201 return Ok(());
4202 }
4203
4204 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4205 let val_ref = parts[1].trim();
4206 let add_val = expr_attr_values.get(val_ref);
4207
4208 if let Some(add_val) = add_val {
4209 if let Some(existing) = item.get(&attr) {
4210 if let (Some(existing_num), Some(add_num)) = (
4211 extract_number(&Some(existing.clone())),
4212 extract_number(&Some(add_val.clone())),
4213 ) {
4214 let result = existing_num + add_num;
4215 let num_str = if result == result.trunc() {
4216 format!("{}", result as i64)
4217 } else {
4218 format!("{result}")
4219 };
4220 item.insert(attr, json!({"N": num_str}));
4221 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4222 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4223 let mut merged: Vec<Value> = existing_set.clone();
4224 for v in add_set {
4225 if !merged.contains(v) {
4226 merged.push(v.clone());
4227 }
4228 }
4229 item.insert(attr, json!({"SS": merged}));
4230 }
4231 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4232 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4233 let mut merged: Vec<Value> = existing_set.clone();
4234 for v in add_set {
4235 if !merged.contains(v) {
4236 merged.push(v.clone());
4237 }
4238 }
4239 item.insert(attr, json!({"NS": merged}));
4240 }
4241 }
4242 } else {
4243 item.insert(attr, add_val.clone());
4244 }
4245 }
4246
4247 Ok(())
4248}
4249
4250fn apply_delete_assignment(
4251 item: &mut HashMap<String, AttributeValue>,
4252 assignment: &str,
4253 expr_attr_names: &HashMap<String, String>,
4254 expr_attr_values: &HashMap<String, Value>,
4255) -> Result<(), AwsServiceError> {
4256 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4257 if parts.len() != 2 {
4258 return Ok(());
4259 }
4260
4261 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4262 let val_ref = parts[1].trim();
4263 let del_val = expr_attr_values.get(val_ref);
4264
4265 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4266 if let (Some(existing_set), Some(del_set)) = (
4267 existing.get("SS").and_then(|v| v.as_array()),
4268 del_val.get("SS").and_then(|v| v.as_array()),
4269 ) {
4270 let filtered: Vec<Value> = existing_set
4271 .iter()
4272 .filter(|v| !del_set.contains(v))
4273 .cloned()
4274 .collect();
4275 if filtered.is_empty() {
4276 item.remove(&attr);
4277 } else {
4278 item.insert(attr, json!({"SS": filtered}));
4279 }
4280 } else if let (Some(existing_set), Some(del_set)) = (
4281 existing.get("NS").and_then(|v| v.as_array()),
4282 del_val.get("NS").and_then(|v| v.as_array()),
4283 ) {
4284 let filtered: Vec<Value> = existing_set
4285 .iter()
4286 .filter(|v| !del_set.contains(v))
4287 .cloned()
4288 .collect();
4289 if filtered.is_empty() {
4290 item.remove(&attr);
4291 } else {
4292 item.insert(attr, json!({"NS": filtered}));
4293 }
4294 }
4295 }
4296
4297 Ok(())
4298}
4299
4300#[allow(clippy::too_many_arguments)]
4301fn build_table_description_json(
4302 arn: &str,
4303 key_schema: &[KeySchemaElement],
4304 attribute_definitions: &[AttributeDefinition],
4305 provisioned_throughput: &ProvisionedThroughput,
4306 gsi: &[GlobalSecondaryIndex],
4307 lsi: &[LocalSecondaryIndex],
4308 billing_mode: &str,
4309 created_at: chrono::DateTime<chrono::Utc>,
4310 item_count: i64,
4311 size_bytes: i64,
4312 status: &str,
4313) -> Value {
4314 let table_name = arn.rsplit('/').next().unwrap_or("");
4315 let creation_timestamp =
4316 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4317
4318 let ks: Vec<Value> = key_schema
4319 .iter()
4320 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4321 .collect();
4322
4323 let ad: Vec<Value> = attribute_definitions
4324 .iter()
4325 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4326 .collect();
4327
4328 let mut desc = json!({
4329 "TableName": table_name,
4330 "TableArn": arn,
4331 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4332 "TableStatus": status,
4333 "KeySchema": ks,
4334 "AttributeDefinitions": ad,
4335 "CreationDateTime": creation_timestamp,
4336 "ItemCount": item_count,
4337 "TableSizeBytes": size_bytes,
4338 "BillingModeSummary": { "BillingMode": billing_mode },
4339 });
4340
4341 if billing_mode != "PAY_PER_REQUEST" {
4342 desc["ProvisionedThroughput"] = json!({
4343 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4344 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4345 "NumberOfDecreasesToday": 0,
4346 });
4347 } else {
4348 desc["ProvisionedThroughput"] = json!({
4349 "ReadCapacityUnits": 0,
4350 "WriteCapacityUnits": 0,
4351 "NumberOfDecreasesToday": 0,
4352 });
4353 }
4354
4355 if !gsi.is_empty() {
4356 let gsi_json: Vec<Value> = gsi
4357 .iter()
4358 .map(|g| {
4359 let gks: Vec<Value> = g
4360 .key_schema
4361 .iter()
4362 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4363 .collect();
4364 let mut idx = json!({
4365 "IndexName": g.index_name,
4366 "KeySchema": gks,
4367 "Projection": { "ProjectionType": g.projection.projection_type },
4368 "IndexStatus": "ACTIVE",
4369 "IndexArn": format!("{arn}/index/{}", g.index_name),
4370 "ItemCount": 0,
4371 "IndexSizeBytes": 0,
4372 });
4373 if !g.projection.non_key_attributes.is_empty() {
4374 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4375 }
4376 if let Some(ref pt) = g.provisioned_throughput {
4377 idx["ProvisionedThroughput"] = json!({
4378 "ReadCapacityUnits": pt.read_capacity_units,
4379 "WriteCapacityUnits": pt.write_capacity_units,
4380 "NumberOfDecreasesToday": 0,
4381 });
4382 }
4383 idx
4384 })
4385 .collect();
4386 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4387 }
4388
4389 if !lsi.is_empty() {
4390 let lsi_json: Vec<Value> = lsi
4391 .iter()
4392 .map(|l| {
4393 let lks: Vec<Value> = l
4394 .key_schema
4395 .iter()
4396 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4397 .collect();
4398 let mut idx = json!({
4399 "IndexName": l.index_name,
4400 "KeySchema": lks,
4401 "Projection": { "ProjectionType": l.projection.projection_type },
4402 "IndexArn": format!("{arn}/index/{}", l.index_name),
4403 "ItemCount": 0,
4404 "IndexSizeBytes": 0,
4405 });
4406 if !l.projection.non_key_attributes.is_empty() {
4407 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4408 }
4409 idx
4410 })
4411 .collect();
4412 desc["LocalSecondaryIndexes"] = json!(lsi_json);
4413 }
4414
4415 desc
4416}
4417
4418fn build_table_description(table: &DynamoTable) -> Value {
4419 build_table_description_json(
4420 &table.arn,
4421 &table.key_schema,
4422 &table.attribute_definitions,
4423 &table.provisioned_throughput,
4424 &table.gsi,
4425 &table.lsi,
4426 &table.billing_mode,
4427 table.created_at,
4428 table.item_count,
4429 table.size_bytes,
4430 &table.status,
4431 )
4432}
4433
4434fn execute_partiql_statement(
4435 state: &SharedDynamoDbState,
4436 statement: &str,
4437 parameters: &[Value],
4438) -> Result<AwsResponse, AwsServiceError> {
4439 let trimmed = statement.trim();
4440 let upper = trimmed.to_ascii_uppercase();
4441
4442 if upper.starts_with("SELECT") {
4443 execute_partiql_select(state, trimmed, parameters)
4444 } else if upper.starts_with("INSERT") {
4445 execute_partiql_insert(state, trimmed, parameters)
4446 } else if upper.starts_with("UPDATE") {
4447 execute_partiql_update(state, trimmed, parameters)
4448 } else if upper.starts_with("DELETE") {
4449 execute_partiql_delete(state, trimmed, parameters)
4450 } else {
4451 Err(AwsServiceError::aws_error(
4452 StatusCode::BAD_REQUEST,
4453 "ValidationException",
4454 format!("Unsupported PartiQL statement: {trimmed}"),
4455 ))
4456 }
4457}
4458
4459fn execute_partiql_select(
4461 state: &SharedDynamoDbState,
4462 statement: &str,
4463 parameters: &[Value],
4464) -> Result<AwsResponse, AwsServiceError> {
4465 let upper = statement.to_ascii_uppercase();
4467 let from_pos = upper.find("FROM").ok_or_else(|| {
4468 AwsServiceError::aws_error(
4469 StatusCode::BAD_REQUEST,
4470 "ValidationException",
4471 "Invalid SELECT statement: missing FROM",
4472 )
4473 })?;
4474
4475 let after_from = statement[from_pos + 4..].trim();
4476 let (table_name, rest) = parse_partiql_table_name(after_from);
4477
4478 let state = state.read();
4479 let table = get_table(&state.tables, &table_name)?;
4480
4481 let rest_upper = rest.trim().to_ascii_uppercase();
4482 if rest_upper.starts_with("WHERE") {
4483 let where_clause = rest.trim()[5..].trim();
4484 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4485 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4486 DynamoDbService::ok_json(json!({ "Items": items }))
4487 } else {
4488 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4490 DynamoDbService::ok_json(json!({ "Items": items }))
4491 }
4492}
4493
4494fn execute_partiql_insert(
4495 state: &SharedDynamoDbState,
4496 statement: &str,
4497 parameters: &[Value],
4498) -> Result<AwsResponse, AwsServiceError> {
4499 let upper = statement.to_ascii_uppercase();
4502 let into_pos = upper.find("INTO").ok_or_else(|| {
4503 AwsServiceError::aws_error(
4504 StatusCode::BAD_REQUEST,
4505 "ValidationException",
4506 "Invalid INSERT statement: missing INTO",
4507 )
4508 })?;
4509
4510 let after_into = statement[into_pos + 4..].trim();
4511 let (table_name, rest) = parse_partiql_table_name(after_into);
4512
4513 let rest_upper = rest.trim().to_ascii_uppercase();
4514 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4515 AwsServiceError::aws_error(
4516 StatusCode::BAD_REQUEST,
4517 "ValidationException",
4518 "Invalid INSERT statement: missing VALUE",
4519 )
4520 })?;
4521
4522 let value_str = rest.trim()[value_pos + 5..].trim();
4523 let item = parse_partiql_value_object(value_str, parameters)?;
4524
4525 let mut state = state.write();
4526 let table = get_table_mut(&mut state.tables, &table_name)?;
4527 let key = extract_key(table, &item);
4528 if table.find_item_index(&key).is_some() {
4529 return Err(AwsServiceError::aws_error(
4531 StatusCode::BAD_REQUEST,
4532 "DuplicateItemException",
4533 "Duplicate primary key exists in table",
4534 ));
4535 } else {
4536 table.items.push(item);
4537 }
4538 table.recalculate_stats();
4539
4540 DynamoDbService::ok_json(json!({}))
4541}
4542
4543fn execute_partiql_update(
4544 state: &SharedDynamoDbState,
4545 statement: &str,
4546 parameters: &[Value],
4547) -> Result<AwsResponse, AwsServiceError> {
4548 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
4552
4553 let rest_upper = rest.trim().to_ascii_uppercase();
4554 let set_pos = rest_upper.find("SET").ok_or_else(|| {
4555 AwsServiceError::aws_error(
4556 StatusCode::BAD_REQUEST,
4557 "ValidationException",
4558 "Invalid UPDATE statement: missing SET",
4559 )
4560 })?;
4561
4562 let after_set = rest.trim()[set_pos + 3..].trim();
4563
4564 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4566 let (set_clause, where_clause) = if let Some(wp) = where_pos {
4567 (&after_set[..wp], after_set[wp + 5..].trim())
4568 } else {
4569 (after_set, "")
4570 };
4571
4572 let mut state = state.write();
4573 let table = get_table_mut(&mut state.tables, &table_name)?;
4574
4575 let matched_indices = if !where_clause.is_empty() {
4576 find_partiql_where_indices(table, where_clause, parameters)?
4577 } else {
4578 (0..table.items.len()).collect()
4579 };
4580
4581 let param_offset = count_params_in_str(where_clause);
4583 let assignments: Vec<&str> = set_clause.split(',').collect();
4584 for idx in &matched_indices {
4585 let mut local_offset = param_offset;
4586 for assignment in &assignments {
4587 let assignment = assignment.trim();
4588 if let Some((attr, val_str)) = assignment.split_once('=') {
4589 let attr = attr.trim().trim_matches('"');
4590 let val_str = val_str.trim();
4591 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4592 if let Some(v) = value {
4593 table.items[*idx].insert(attr.to_string(), v);
4594 }
4595 }
4596 }
4597 }
4598 table.recalculate_stats();
4599
4600 DynamoDbService::ok_json(json!({}))
4601}
4602
4603fn execute_partiql_delete(
4604 state: &SharedDynamoDbState,
4605 statement: &str,
4606 parameters: &[Value],
4607) -> Result<AwsResponse, AwsServiceError> {
4608 let upper = statement.to_ascii_uppercase();
4610 let from_pos = upper.find("FROM").ok_or_else(|| {
4611 AwsServiceError::aws_error(
4612 StatusCode::BAD_REQUEST,
4613 "ValidationException",
4614 "Invalid DELETE statement: missing FROM",
4615 )
4616 })?;
4617
4618 let after_from = statement[from_pos + 4..].trim();
4619 let (table_name, rest) = parse_partiql_table_name(after_from);
4620
4621 let rest_upper = rest.trim().to_ascii_uppercase();
4622 if !rest_upper.starts_with("WHERE") {
4623 return Err(AwsServiceError::aws_error(
4624 StatusCode::BAD_REQUEST,
4625 "ValidationException",
4626 "DELETE requires a WHERE clause",
4627 ));
4628 }
4629 let where_clause = rest.trim()[5..].trim();
4630
4631 let mut state = state.write();
4632 let table = get_table_mut(&mut state.tables, &table_name)?;
4633
4634 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4635 indices.sort_unstable();
4637 indices.reverse();
4638 for idx in indices {
4639 table.items.remove(idx);
4640 }
4641 table.recalculate_stats();
4642
4643 DynamoDbService::ok_json(json!({}))
4644}
4645
4646fn parse_partiql_table_name(s: &str) -> (String, &str) {
4649 let s = s.trim();
4650 if let Some(stripped) = s.strip_prefix('"') {
4651 if let Some(end) = stripped.find('"') {
4653 let name = &stripped[..end];
4654 let rest = &stripped[end + 1..];
4655 (name.to_string(), rest)
4656 } else {
4657 let end = s.find(' ').unwrap_or(s.len());
4658 (s[..end].trim_matches('"').to_string(), &s[end..])
4659 }
4660 } else {
4661 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4662 (s[..end].to_string(), &s[end..])
4663 }
4664}
4665
4666fn evaluate_partiql_where<'a>(
4669 table: &'a DynamoTable,
4670 where_clause: &str,
4671 parameters: &[Value],
4672) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4673 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4674 Ok(indices.iter().map(|i| &table.items[*i]).collect())
4675}
4676
4677fn find_partiql_where_indices(
4678 table: &DynamoTable,
4679 where_clause: &str,
4680 parameters: &[Value],
4681) -> Result<Vec<usize>, AwsServiceError> {
4682 let upper = where_clause.to_uppercase();
4685 let conditions = if upper.contains(" AND ") {
4686 let mut parts = Vec::new();
4688 let mut last = 0;
4689 for (i, _) in upper.match_indices(" AND ") {
4690 parts.push(where_clause[last..i].trim());
4691 last = i + 5;
4692 }
4693 parts.push(where_clause[last..].trim());
4694 parts
4695 } else {
4696 vec![where_clause.trim()]
4697 };
4698
4699 let mut param_idx = 0usize;
4700 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4701
4702 for cond in &conditions {
4703 let cond = cond.trim();
4704 if let Some((left, right)) = cond.split_once('=') {
4705 let attr = left.trim().trim_matches('"').to_string();
4706 let val_str = right.trim();
4707 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4708 if let Some(v) = value {
4709 parsed_conditions.push((attr, v));
4710 }
4711 }
4712 }
4713
4714 let mut indices = Vec::new();
4715 for (i, item) in table.items.iter().enumerate() {
4716 let all_match = parsed_conditions
4717 .iter()
4718 .all(|(attr, expected)| item.get(attr) == Some(expected));
4719 if all_match {
4720 indices.push(i);
4721 }
4722 }
4723
4724 Ok(indices)
4725}
4726
4727fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4732 let s = s.trim();
4733 if s == "?" {
4734 let idx = *param_idx;
4735 *param_idx += 1;
4736 parameters.get(idx).cloned()
4737 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4738 let inner = &s[1..s.len() - 1];
4739 Some(json!({"S": inner}))
4740 } else if let Ok(n) = s.parse::<f64>() {
4741 let num_str = if n == n.trunc() {
4742 format!("{}", n as i64)
4743 } else {
4744 format!("{n}")
4745 };
4746 Some(json!({"N": num_str}))
4747 } else {
4748 None
4749 }
4750}
4751
4752fn parse_partiql_value_object(
4754 s: &str,
4755 parameters: &[Value],
4756) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4757 let s = s.trim();
4758 let inner = s
4759 .strip_prefix('{')
4760 .and_then(|s| s.strip_suffix('}'))
4761 .ok_or_else(|| {
4762 AwsServiceError::aws_error(
4763 StatusCode::BAD_REQUEST,
4764 "ValidationException",
4765 "Invalid VALUE: expected object literal",
4766 )
4767 })?;
4768
4769 let mut item = HashMap::new();
4770 let mut param_idx = 0usize;
4771
4772 for pair in split_partiql_pairs(inner) {
4774 let pair = pair.trim();
4775 if pair.is_empty() {
4776 continue;
4777 }
4778 if let Some((key_part, val_part)) = pair.split_once(':') {
4779 let key = key_part
4780 .trim()
4781 .trim_matches('\'')
4782 .trim_matches('"')
4783 .to_string();
4784 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4785 item.insert(key, val);
4786 }
4787 }
4788 }
4789
4790 Ok(item)
4791}
4792
4793fn split_partiql_pairs(s: &str) -> Vec<&str> {
4795 let mut parts = Vec::new();
4796 let mut start = 0;
4797 let mut depth = 0;
4798 let mut in_quote = false;
4799
4800 for (i, c) in s.char_indices() {
4801 match c {
4802 '\'' if !in_quote => in_quote = true,
4803 '\'' if in_quote => in_quote = false,
4804 '{' if !in_quote => depth += 1,
4805 '}' if !in_quote => depth -= 1,
4806 ',' if !in_quote && depth == 0 => {
4807 parts.push(&s[start..i]);
4808 start = i + 1;
4809 }
4810 _ => {}
4811 }
4812 }
4813 parts.push(&s[start..]);
4814 parts
4815}
4816
4817fn count_params_in_str(s: &str) -> usize {
4819 s.chars().filter(|c| *c == '?').count()
4820}
4821
4822#[cfg(test)]
4823mod tests {
4824 use super::*;
4825 use serde_json::json;
4826
4827 #[test]
4828 fn test_parse_update_clauses_set() {
4829 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4830 assert_eq!(clauses.len(), 1);
4831 assert_eq!(clauses[0].0, "SET");
4832 assert_eq!(clauses[0].1.len(), 2);
4833 }
4834
4835 #[test]
4836 fn test_parse_update_clauses_set_and_remove() {
4837 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4838 assert_eq!(clauses.len(), 2);
4839 assert_eq!(clauses[0].0, "SET");
4840 assert_eq!(clauses[1].0, "REMOVE");
4841 }
4842
4843 #[test]
4844 fn test_evaluate_key_condition_simple() {
4845 let mut item = HashMap::new();
4846 item.insert("pk".to_string(), json!({"S": "user1"}));
4847 item.insert("sk".to_string(), json!({"S": "order1"}));
4848
4849 let mut expr_values = HashMap::new();
4850 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4851
4852 assert!(evaluate_key_condition(
4853 "pk = :pk",
4854 &item,
4855 "pk",
4856 Some("sk"),
4857 &HashMap::new(),
4858 &expr_values,
4859 ));
4860 }
4861
4862 #[test]
4863 fn test_compare_attribute_values_numbers() {
4864 let a = json!({"N": "10"});
4865 let b = json!({"N": "20"});
4866 assert_eq!(
4867 compare_attribute_values(Some(&a), Some(&b)),
4868 std::cmp::Ordering::Less
4869 );
4870 }
4871
4872 #[test]
4873 fn test_compare_attribute_values_strings() {
4874 let a = json!({"S": "apple"});
4875 let b = json!({"S": "banana"});
4876 assert_eq!(
4877 compare_attribute_values(Some(&a), Some(&b)),
4878 std::cmp::Ordering::Less
4879 );
4880 }
4881
4882 #[test]
4883 fn test_split_on_and() {
4884 let parts = split_on_and("pk = :pk AND sk > :sk");
4885 assert_eq!(parts.len(), 2);
4886 assert_eq!(parts[0].trim(), "pk = :pk");
4887 assert_eq!(parts[1].trim(), "sk > :sk");
4888 }
4889
4890 #[test]
4891 fn test_split_on_and_respects_parentheses() {
4892 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4894 assert_eq!(parts.len(), 1);
4896 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4897 }
4898
4899 #[test]
4900 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4901 let mut item = HashMap::new();
4903 item.insert("x".to_string(), json!({"S": "no"}));
4904 item.insert("y".to_string(), json!({"S": "no"}));
4905 item.insert("z".to_string(), json!({"S": "yes"}));
4906
4907 let mut expr_values = HashMap::new();
4908 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4909
4910 let result = evaluate_filter_expression(
4912 "(x = :yes AND y = :yes) OR z = :yes",
4913 &item,
4914 &HashMap::new(),
4915 &expr_values,
4916 );
4917 assert!(result, "should match because z = :yes is true");
4918
4919 let mut item2 = HashMap::new();
4921 item2.insert("x".to_string(), json!({"S": "no"}));
4922 item2.insert("y".to_string(), json!({"S": "no"}));
4923 item2.insert("z".to_string(), json!({"S": "no"}));
4924
4925 let result2 = evaluate_filter_expression(
4926 "(x = :yes AND y = :yes) OR z = :yes",
4927 &item2,
4928 &HashMap::new(),
4929 &expr_values,
4930 );
4931 assert!(!result2, "should not match because nothing is true");
4932 }
4933
4934 #[test]
4935 fn test_project_item_nested_path() {
4936 let mut item = HashMap::new();
4938 item.insert("pk".to_string(), json!({"S": "key1"}));
4939 item.insert(
4940 "data".to_string(),
4941 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4942 );
4943
4944 let body = json!({
4945 "ProjectionExpression": "data[0].name"
4946 });
4947
4948 let projected = project_item(&item, &body);
4949 let name = projected
4951 .get("data")
4952 .and_then(|v| v.get("L"))
4953 .and_then(|v| v.get(0))
4954 .and_then(|v| v.get("M"))
4955 .and_then(|v| v.get("name"))
4956 .and_then(|v| v.get("S"))
4957 .and_then(|v| v.as_str());
4958 assert_eq!(name, Some("Alice"));
4959
4960 let age = projected
4962 .get("data")
4963 .and_then(|v| v.get("L"))
4964 .and_then(|v| v.get(0))
4965 .and_then(|v| v.get("M"))
4966 .and_then(|v| v.get("age"));
4967 assert!(age.is_none(), "age should not be present in projection");
4968 }
4969
4970 #[test]
4971 fn test_resolve_nested_path_map() {
4972 let mut item = HashMap::new();
4973 item.insert(
4974 "info".to_string(),
4975 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4976 );
4977
4978 let result = resolve_nested_path(&item, "info.address.city");
4979 assert_eq!(result, Some(json!({"S": "NYC"})));
4980 }
4981
4982 #[test]
4983 fn test_resolve_nested_path_list_then_map() {
4984 let mut item = HashMap::new();
4985 item.insert(
4986 "items".to_string(),
4987 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4988 );
4989
4990 let result = resolve_nested_path(&item, "items[0].sku");
4991 assert_eq!(result, Some(json!({"S": "ABC"})));
4992 }
4993
4994 use crate::state::SharedDynamoDbState;
4997 use parking_lot::RwLock;
4998 use std::sync::Arc;
4999
5000 fn make_service() -> DynamoDbService {
5001 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5002 "123456789012",
5003 "us-east-1",
5004 )));
5005 DynamoDbService::new(state)
5006 }
5007
5008 fn make_request(action: &str, body: Value) -> AwsRequest {
5009 AwsRequest {
5010 service: "dynamodb".to_string(),
5011 action: action.to_string(),
5012 region: "us-east-1".to_string(),
5013 account_id: "123456789012".to_string(),
5014 request_id: "test-id".to_string(),
5015 headers: http::HeaderMap::new(),
5016 query_params: HashMap::new(),
5017 body: serde_json::to_vec(&body).unwrap().into(),
5018 path_segments: vec![],
5019 raw_path: "/".to_string(),
5020 raw_query: String::new(),
5021 method: http::Method::POST,
5022 is_query_protocol: false,
5023 access_key_id: None,
5024 }
5025 }
5026
5027 fn create_test_table(svc: &DynamoDbService) {
5028 let req = make_request(
5029 "CreateTable",
5030 json!({
5031 "TableName": "test-table",
5032 "KeySchema": [
5033 { "AttributeName": "pk", "KeyType": "HASH" }
5034 ],
5035 "AttributeDefinitions": [
5036 { "AttributeName": "pk", "AttributeType": "S" }
5037 ],
5038 "BillingMode": "PAY_PER_REQUEST"
5039 }),
5040 );
5041 svc.create_table(&req).unwrap();
5042 }
5043
5044 #[test]
5045 fn delete_item_return_values_all_old() {
5046 let svc = make_service();
5047 create_test_table(&svc);
5048
5049 let req = make_request(
5051 "PutItem",
5052 json!({
5053 "TableName": "test-table",
5054 "Item": {
5055 "pk": { "S": "key1" },
5056 "name": { "S": "Alice" },
5057 "age": { "N": "30" }
5058 }
5059 }),
5060 );
5061 svc.put_item(&req).unwrap();
5062
5063 let req = make_request(
5065 "DeleteItem",
5066 json!({
5067 "TableName": "test-table",
5068 "Key": { "pk": { "S": "key1" } },
5069 "ReturnValues": "ALL_OLD"
5070 }),
5071 );
5072 let resp = svc.delete_item(&req).unwrap();
5073 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5074
5075 let attrs = &body["Attributes"];
5077 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5078 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5079 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5080
5081 let req = make_request(
5083 "GetItem",
5084 json!({
5085 "TableName": "test-table",
5086 "Key": { "pk": { "S": "key1" } }
5087 }),
5088 );
5089 let resp = svc.get_item(&req).unwrap();
5090 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5091 assert!(body.get("Item").is_none(), "item should be deleted");
5092 }
5093
5094 #[test]
5095 fn transact_get_items_returns_existing_and_missing() {
5096 let svc = make_service();
5097 create_test_table(&svc);
5098
5099 let req = make_request(
5101 "PutItem",
5102 json!({
5103 "TableName": "test-table",
5104 "Item": {
5105 "pk": { "S": "exists" },
5106 "val": { "S": "hello" }
5107 }
5108 }),
5109 );
5110 svc.put_item(&req).unwrap();
5111
5112 let req = make_request(
5113 "TransactGetItems",
5114 json!({
5115 "TransactItems": [
5116 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5117 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5118 ]
5119 }),
5120 );
5121 let resp = svc.transact_get_items(&req).unwrap();
5122 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5123 let responses = body["Responses"].as_array().unwrap();
5124 assert_eq!(responses.len(), 2);
5125 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5126 assert!(responses[1].get("Item").is_none());
5127 }
5128
5129 #[test]
5130 fn transact_write_items_put_and_delete() {
5131 let svc = make_service();
5132 create_test_table(&svc);
5133
5134 let req = make_request(
5136 "PutItem",
5137 json!({
5138 "TableName": "test-table",
5139 "Item": {
5140 "pk": { "S": "to-delete" },
5141 "val": { "S": "bye" }
5142 }
5143 }),
5144 );
5145 svc.put_item(&req).unwrap();
5146
5147 let req = make_request(
5149 "TransactWriteItems",
5150 json!({
5151 "TransactItems": [
5152 {
5153 "Put": {
5154 "TableName": "test-table",
5155 "Item": {
5156 "pk": { "S": "new-item" },
5157 "val": { "S": "hi" }
5158 }
5159 }
5160 },
5161 {
5162 "Delete": {
5163 "TableName": "test-table",
5164 "Key": { "pk": { "S": "to-delete" } }
5165 }
5166 }
5167 ]
5168 }),
5169 );
5170 let resp = svc.transact_write_items(&req).unwrap();
5171 assert_eq!(resp.status, StatusCode::OK);
5172
5173 let req = make_request(
5175 "GetItem",
5176 json!({
5177 "TableName": "test-table",
5178 "Key": { "pk": { "S": "new-item" } }
5179 }),
5180 );
5181 let resp = svc.get_item(&req).unwrap();
5182 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5183 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5184
5185 let req = make_request(
5187 "GetItem",
5188 json!({
5189 "TableName": "test-table",
5190 "Key": { "pk": { "S": "to-delete" } }
5191 }),
5192 );
5193 let resp = svc.get_item(&req).unwrap();
5194 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5195 assert!(body.get("Item").is_none());
5196 }
5197
5198 #[test]
5199 fn transact_write_items_condition_check_failure() {
5200 let svc = make_service();
5201 create_test_table(&svc);
5202
5203 let req = make_request(
5205 "TransactWriteItems",
5206 json!({
5207 "TransactItems": [
5208 {
5209 "ConditionCheck": {
5210 "TableName": "test-table",
5211 "Key": { "pk": { "S": "nonexistent" } },
5212 "ConditionExpression": "attribute_exists(pk)"
5213 }
5214 }
5215 ]
5216 }),
5217 );
5218 let resp = svc.transact_write_items(&req).unwrap();
5219 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5221 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5222 assert_eq!(
5223 body["__type"].as_str().unwrap(),
5224 "TransactionCanceledException"
5225 );
5226 assert!(body["CancellationReasons"].as_array().is_some());
5227 }
5228
5229 #[test]
5230 fn update_and_describe_time_to_live() {
5231 let svc = make_service();
5232 create_test_table(&svc);
5233
5234 let req = make_request(
5236 "UpdateTimeToLive",
5237 json!({
5238 "TableName": "test-table",
5239 "TimeToLiveSpecification": {
5240 "AttributeName": "ttl",
5241 "Enabled": true
5242 }
5243 }),
5244 );
5245 let resp = svc.update_time_to_live(&req).unwrap();
5246 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5247 assert_eq!(
5248 body["TimeToLiveSpecification"]["AttributeName"]
5249 .as_str()
5250 .unwrap(),
5251 "ttl"
5252 );
5253 assert!(body["TimeToLiveSpecification"]["Enabled"]
5254 .as_bool()
5255 .unwrap());
5256
5257 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5259 let resp = svc.describe_time_to_live(&req).unwrap();
5260 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5261 assert_eq!(
5262 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5263 .as_str()
5264 .unwrap(),
5265 "ENABLED"
5266 );
5267 assert_eq!(
5268 body["TimeToLiveDescription"]["AttributeName"]
5269 .as_str()
5270 .unwrap(),
5271 "ttl"
5272 );
5273
5274 let req = make_request(
5276 "UpdateTimeToLive",
5277 json!({
5278 "TableName": "test-table",
5279 "TimeToLiveSpecification": {
5280 "AttributeName": "ttl",
5281 "Enabled": false
5282 }
5283 }),
5284 );
5285 svc.update_time_to_live(&req).unwrap();
5286
5287 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5288 let resp = svc.describe_time_to_live(&req).unwrap();
5289 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5290 assert_eq!(
5291 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5292 .as_str()
5293 .unwrap(),
5294 "DISABLED"
5295 );
5296 }
5297
5298 #[test]
5299 fn resource_policy_lifecycle() {
5300 let svc = make_service();
5301 create_test_table(&svc);
5302
5303 let table_arn = {
5304 let state = svc.state.read();
5305 state.tables.get("test-table").unwrap().arn.clone()
5306 };
5307
5308 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5310 let req = make_request(
5311 "PutResourcePolicy",
5312 json!({
5313 "ResourceArn": table_arn,
5314 "Policy": policy_doc
5315 }),
5316 );
5317 let resp = svc.put_resource_policy(&req).unwrap();
5318 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5319 assert!(body["RevisionId"].as_str().is_some());
5320
5321 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5323 let resp = svc.get_resource_policy(&req).unwrap();
5324 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5325 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5326
5327 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5329 svc.delete_resource_policy(&req).unwrap();
5330
5331 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5333 let resp = svc.get_resource_policy(&req).unwrap();
5334 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5335 assert!(body["Policy"].is_null());
5336 }
5337
5338 #[test]
5339 fn describe_endpoints() {
5340 let svc = make_service();
5341 let req = make_request("DescribeEndpoints", json!({}));
5342 let resp = svc.describe_endpoints(&req).unwrap();
5343 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5344 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5345 }
5346
5347 #[test]
5348 fn describe_limits() {
5349 let svc = make_service();
5350 let req = make_request("DescribeLimits", json!({}));
5351 let resp = svc.describe_limits(&req).unwrap();
5352 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5353 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5354 }
5355
5356 #[test]
5357 fn backup_lifecycle() {
5358 let svc = make_service();
5359 create_test_table(&svc);
5360
5361 let req = make_request(
5363 "CreateBackup",
5364 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5365 );
5366 let resp = svc.create_backup(&req).unwrap();
5367 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5368 let backup_arn = body["BackupDetails"]["BackupArn"]
5369 .as_str()
5370 .unwrap()
5371 .to_string();
5372 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5373
5374 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5376 let resp = svc.describe_backup(&req).unwrap();
5377 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5378 assert_eq!(
5379 body["BackupDescription"]["BackupDetails"]["BackupName"],
5380 "my-backup"
5381 );
5382
5383 let req = make_request("ListBackups", json!({}));
5385 let resp = svc.list_backups(&req).unwrap();
5386 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5387 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5388
5389 let req = make_request(
5391 "RestoreTableFromBackup",
5392 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5393 );
5394 svc.restore_table_from_backup(&req).unwrap();
5395
5396 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5398 let resp = svc.describe_table(&req).unwrap();
5399 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5400 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5401
5402 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5404 svc.delete_backup(&req).unwrap();
5405
5406 let req = make_request("ListBackups", json!({}));
5408 let resp = svc.list_backups(&req).unwrap();
5409 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5410 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5411 }
5412
5413 #[test]
5414 fn continuous_backups() {
5415 let svc = make_service();
5416 create_test_table(&svc);
5417
5418 let req = make_request(
5420 "DescribeContinuousBackups",
5421 json!({ "TableName": "test-table" }),
5422 );
5423 let resp = svc.describe_continuous_backups(&req).unwrap();
5424 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5425 assert_eq!(
5426 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5427 ["PointInTimeRecoveryStatus"],
5428 "DISABLED"
5429 );
5430
5431 let req = make_request(
5433 "UpdateContinuousBackups",
5434 json!({
5435 "TableName": "test-table",
5436 "PointInTimeRecoverySpecification": {
5437 "PointInTimeRecoveryEnabled": true
5438 }
5439 }),
5440 );
5441 svc.update_continuous_backups(&req).unwrap();
5442
5443 let req = make_request(
5445 "DescribeContinuousBackups",
5446 json!({ "TableName": "test-table" }),
5447 );
5448 let resp = svc.describe_continuous_backups(&req).unwrap();
5449 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5450 assert_eq!(
5451 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5452 ["PointInTimeRecoveryStatus"],
5453 "ENABLED"
5454 );
5455 }
5456
5457 #[test]
5458 fn restore_table_to_point_in_time() {
5459 let svc = make_service();
5460 create_test_table(&svc);
5461
5462 let req = make_request(
5463 "RestoreTableToPointInTime",
5464 json!({
5465 "SourceTableName": "test-table",
5466 "TargetTableName": "pitr-restored"
5467 }),
5468 );
5469 svc.restore_table_to_point_in_time(&req).unwrap();
5470
5471 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5472 let resp = svc.describe_table(&req).unwrap();
5473 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5474 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5475 }
5476
5477 #[test]
5478 fn global_table_lifecycle() {
5479 let svc = make_service();
5480
5481 let req = make_request(
5483 "CreateGlobalTable",
5484 json!({
5485 "GlobalTableName": "my-global",
5486 "ReplicationGroup": [
5487 { "RegionName": "us-east-1" },
5488 { "RegionName": "eu-west-1" }
5489 ]
5490 }),
5491 );
5492 let resp = svc.create_global_table(&req).unwrap();
5493 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5494 assert_eq!(
5495 body["GlobalTableDescription"]["GlobalTableStatus"],
5496 "ACTIVE"
5497 );
5498
5499 let req = make_request(
5501 "DescribeGlobalTable",
5502 json!({ "GlobalTableName": "my-global" }),
5503 );
5504 let resp = svc.describe_global_table(&req).unwrap();
5505 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5506 assert_eq!(
5507 body["GlobalTableDescription"]["ReplicationGroup"]
5508 .as_array()
5509 .unwrap()
5510 .len(),
5511 2
5512 );
5513
5514 let req = make_request("ListGlobalTables", json!({}));
5516 let resp = svc.list_global_tables(&req).unwrap();
5517 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5518 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5519
5520 let req = make_request(
5522 "UpdateGlobalTable",
5523 json!({
5524 "GlobalTableName": "my-global",
5525 "ReplicaUpdates": [
5526 { "Create": { "RegionName": "ap-southeast-1" } }
5527 ]
5528 }),
5529 );
5530 let resp = svc.update_global_table(&req).unwrap();
5531 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5532 assert_eq!(
5533 body["GlobalTableDescription"]["ReplicationGroup"]
5534 .as_array()
5535 .unwrap()
5536 .len(),
5537 3
5538 );
5539
5540 let req = make_request(
5542 "DescribeGlobalTableSettings",
5543 json!({ "GlobalTableName": "my-global" }),
5544 );
5545 let resp = svc.describe_global_table_settings(&req).unwrap();
5546 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5547 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5548
5549 let req = make_request(
5551 "UpdateGlobalTableSettings",
5552 json!({ "GlobalTableName": "my-global" }),
5553 );
5554 svc.update_global_table_settings(&req).unwrap();
5555 }
5556
5557 #[test]
5558 fn table_replica_auto_scaling() {
5559 let svc = make_service();
5560 create_test_table(&svc);
5561
5562 let req = make_request(
5563 "DescribeTableReplicaAutoScaling",
5564 json!({ "TableName": "test-table" }),
5565 );
5566 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5567 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5568 assert_eq!(
5569 body["TableAutoScalingDescription"]["TableName"],
5570 "test-table"
5571 );
5572
5573 let req = make_request(
5574 "UpdateTableReplicaAutoScaling",
5575 json!({ "TableName": "test-table" }),
5576 );
5577 svc.update_table_replica_auto_scaling(&req).unwrap();
5578 }
5579
5580 #[test]
5581 fn kinesis_streaming_lifecycle() {
5582 let svc = make_service();
5583 create_test_table(&svc);
5584
5585 let req = make_request(
5587 "EnableKinesisStreamingDestination",
5588 json!({
5589 "TableName": "test-table",
5590 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5591 }),
5592 );
5593 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5594 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5595 assert_eq!(body["DestinationStatus"], "ACTIVE");
5596
5597 let req = make_request(
5599 "DescribeKinesisStreamingDestination",
5600 json!({ "TableName": "test-table" }),
5601 );
5602 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5603 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5604 assert_eq!(
5605 body["KinesisDataStreamDestinations"]
5606 .as_array()
5607 .unwrap()
5608 .len(),
5609 1
5610 );
5611
5612 let req = make_request(
5614 "UpdateKinesisStreamingDestination",
5615 json!({
5616 "TableName": "test-table",
5617 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5618 "UpdateKinesisStreamingConfiguration": {
5619 "ApproximateCreationDateTimePrecision": "MICROSECOND"
5620 }
5621 }),
5622 );
5623 svc.update_kinesis_streaming_destination(&req).unwrap();
5624
5625 let req = make_request(
5627 "DisableKinesisStreamingDestination",
5628 json!({
5629 "TableName": "test-table",
5630 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5631 }),
5632 );
5633 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5634 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5635 assert_eq!(body["DestinationStatus"], "DISABLED");
5636 }
5637
5638 #[test]
5639 fn contributor_insights_lifecycle() {
5640 let svc = make_service();
5641 create_test_table(&svc);
5642
5643 let req = make_request(
5645 "DescribeContributorInsights",
5646 json!({ "TableName": "test-table" }),
5647 );
5648 let resp = svc.describe_contributor_insights(&req).unwrap();
5649 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5650 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5651
5652 let req = make_request(
5654 "UpdateContributorInsights",
5655 json!({
5656 "TableName": "test-table",
5657 "ContributorInsightsAction": "ENABLE"
5658 }),
5659 );
5660 let resp = svc.update_contributor_insights(&req).unwrap();
5661 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5662 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5663
5664 let req = make_request("ListContributorInsights", json!({}));
5666 let resp = svc.list_contributor_insights(&req).unwrap();
5667 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5668 assert_eq!(
5669 body["ContributorInsightsSummaries"]
5670 .as_array()
5671 .unwrap()
5672 .len(),
5673 1
5674 );
5675 }
5676
5677 #[test]
5678 fn export_lifecycle() {
5679 let svc = make_service();
5680 create_test_table(&svc);
5681
5682 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5683
5684 let req = make_request(
5686 "ExportTableToPointInTime",
5687 json!({
5688 "TableArn": table_arn,
5689 "S3Bucket": "my-bucket"
5690 }),
5691 );
5692 let resp = svc.export_table_to_point_in_time(&req).unwrap();
5693 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5694 let export_arn = body["ExportDescription"]["ExportArn"]
5695 .as_str()
5696 .unwrap()
5697 .to_string();
5698 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5699
5700 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5702 let resp = svc.describe_export(&req).unwrap();
5703 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5704 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5705
5706 let req = make_request("ListExports", json!({}));
5708 let resp = svc.list_exports(&req).unwrap();
5709 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5710 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5711 }
5712
5713 #[test]
5714 fn import_lifecycle() {
5715 let svc = make_service();
5716
5717 let req = make_request(
5718 "ImportTable",
5719 json!({
5720 "InputFormat": "DYNAMODB_JSON",
5721 "S3BucketSource": { "S3Bucket": "import-bucket" },
5722 "TableCreationParameters": {
5723 "TableName": "imported-table",
5724 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5725 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5726 }
5727 }),
5728 );
5729 let resp = svc.import_table(&req).unwrap();
5730 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5731 let import_arn = body["ImportTableDescription"]["ImportArn"]
5732 .as_str()
5733 .unwrap()
5734 .to_string();
5735 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5736
5737 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5739 let resp = svc.describe_import(&req).unwrap();
5740 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5741 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5742
5743 let req = make_request("ListImports", json!({}));
5745 let resp = svc.list_imports(&req).unwrap();
5746 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5747 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5748
5749 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5751 let resp = svc.describe_table(&req).unwrap();
5752 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5753 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5754 }
5755
5756 #[test]
5757 fn backup_restore_preserves_items() {
5758 let svc = make_service();
5759 create_test_table(&svc);
5760
5761 for i in 1..=3 {
5763 let req = make_request(
5764 "PutItem",
5765 json!({
5766 "TableName": "test-table",
5767 "Item": {
5768 "pk": { "S": format!("key{i}") },
5769 "data": { "S": format!("value{i}") }
5770 }
5771 }),
5772 );
5773 svc.put_item(&req).unwrap();
5774 }
5775
5776 let req = make_request(
5778 "CreateBackup",
5779 json!({
5780 "TableName": "test-table",
5781 "BackupName": "my-backup"
5782 }),
5783 );
5784 let resp = svc.create_backup(&req).unwrap();
5785 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5786 let backup_arn = body["BackupDetails"]["BackupArn"]
5787 .as_str()
5788 .unwrap()
5789 .to_string();
5790
5791 for i in 1..=3 {
5793 let req = make_request(
5794 "DeleteItem",
5795 json!({
5796 "TableName": "test-table",
5797 "Key": { "pk": { "S": format!("key{i}") } }
5798 }),
5799 );
5800 svc.delete_item(&req).unwrap();
5801 }
5802
5803 let req = make_request("Scan", json!({ "TableName": "test-table" }));
5805 let resp = svc.scan(&req).unwrap();
5806 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5807 assert_eq!(body["Count"], 0);
5808
5809 let req = make_request(
5811 "RestoreTableFromBackup",
5812 json!({
5813 "BackupArn": backup_arn,
5814 "TargetTableName": "restored-table"
5815 }),
5816 );
5817 svc.restore_table_from_backup(&req).unwrap();
5818
5819 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
5821 let resp = svc.scan(&req).unwrap();
5822 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5823 assert_eq!(body["Count"], 3);
5824 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
5825 }
5826
5827 #[test]
5828 fn global_table_replicates_writes() {
5829 let svc = make_service();
5830 create_test_table(&svc);
5831
5832 let req = make_request(
5834 "CreateGlobalTable",
5835 json!({
5836 "GlobalTableName": "test-table",
5837 "ReplicationGroup": [
5838 { "RegionName": "us-east-1" },
5839 { "RegionName": "eu-west-1" }
5840 ]
5841 }),
5842 );
5843 let resp = svc.create_global_table(&req).unwrap();
5844 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5845 assert_eq!(
5846 body["GlobalTableDescription"]["GlobalTableStatus"],
5847 "ACTIVE"
5848 );
5849
5850 let req = make_request(
5852 "PutItem",
5853 json!({
5854 "TableName": "test-table",
5855 "Item": {
5856 "pk": { "S": "replicated-key" },
5857 "data": { "S": "replicated-value" }
5858 }
5859 }),
5860 );
5861 svc.put_item(&req).unwrap();
5862
5863 let req = make_request(
5865 "GetItem",
5866 json!({
5867 "TableName": "test-table",
5868 "Key": { "pk": { "S": "replicated-key" } }
5869 }),
5870 );
5871 let resp = svc.get_item(&req).unwrap();
5872 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5873 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
5874 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
5875 }
5876
5877 #[test]
5878 fn contributor_insights_tracks_access() {
5879 let svc = make_service();
5880 create_test_table(&svc);
5881
5882 let req = make_request(
5884 "UpdateContributorInsights",
5885 json!({
5886 "TableName": "test-table",
5887 "ContributorInsightsAction": "ENABLE"
5888 }),
5889 );
5890 svc.update_contributor_insights(&req).unwrap();
5891
5892 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
5894 let req = make_request(
5895 "PutItem",
5896 json!({
5897 "TableName": "test-table",
5898 "Item": {
5899 "pk": { "S": key },
5900 "data": { "S": "value" }
5901 }
5902 }),
5903 );
5904 svc.put_item(&req).unwrap();
5905 }
5906
5907 for _ in 0..3 {
5909 let req = make_request(
5910 "GetItem",
5911 json!({
5912 "TableName": "test-table",
5913 "Key": { "pk": { "S": "alpha" } }
5914 }),
5915 );
5916 svc.get_item(&req).unwrap();
5917 }
5918
5919 let req = make_request(
5921 "DescribeContributorInsights",
5922 json!({ "TableName": "test-table" }),
5923 );
5924 let resp = svc.describe_contributor_insights(&req).unwrap();
5925 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5926 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5927
5928 let contributors = body["TopContributors"].as_array().unwrap();
5929 assert!(
5930 !contributors.is_empty(),
5931 "TopContributors should not be empty"
5932 );
5933
5934 let top = &contributors[0];
5937 assert!(top["Count"].as_u64().unwrap() > 0);
5938
5939 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
5941 assert!(!rules.is_empty());
5942 }
5943
5944 #[test]
5945 fn contributor_insights_not_tracked_when_disabled() {
5946 let svc = make_service();
5947 create_test_table(&svc);
5948
5949 let req = make_request(
5951 "PutItem",
5952 json!({
5953 "TableName": "test-table",
5954 "Item": {
5955 "pk": { "S": "key1" },
5956 "data": { "S": "value" }
5957 }
5958 }),
5959 );
5960 svc.put_item(&req).unwrap();
5961
5962 let req = make_request(
5964 "DescribeContributorInsights",
5965 json!({ "TableName": "test-table" }),
5966 );
5967 let resp = svc.describe_contributor_insights(&req).unwrap();
5968 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5970
5971 let contributors = body["TopContributors"].as_array().unwrap();
5972 assert!(contributors.is_empty());
5973 }
5974
5975 #[test]
5976 fn contributor_insights_disabled_table_no_counters_after_scan() {
5977 let svc = make_service();
5978 create_test_table(&svc);
5979
5980 for key in &["alpha", "beta"] {
5982 let req = make_request(
5983 "PutItem",
5984 json!({
5985 "TableName": "test-table",
5986 "Item": { "pk": { "S": key } }
5987 }),
5988 );
5989 svc.put_item(&req).unwrap();
5990 }
5991
5992 let req = make_request(
5994 "UpdateContributorInsights",
5995 json!({
5996 "TableName": "test-table",
5997 "ContributorInsightsAction": "ENABLE"
5998 }),
5999 );
6000 svc.update_contributor_insights(&req).unwrap();
6001
6002 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6004 svc.scan(&req).unwrap();
6005
6006 let req = make_request(
6008 "DescribeContributorInsights",
6009 json!({ "TableName": "test-table" }),
6010 );
6011 let resp = svc.describe_contributor_insights(&req).unwrap();
6012 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013 let contributors = body["TopContributors"].as_array().unwrap();
6014 assert!(
6015 !contributors.is_empty(),
6016 "counters should be non-empty while enabled"
6017 );
6018
6019 let req = make_request(
6021 "UpdateContributorInsights",
6022 json!({
6023 "TableName": "test-table",
6024 "ContributorInsightsAction": "DISABLE"
6025 }),
6026 );
6027 svc.update_contributor_insights(&req).unwrap();
6028
6029 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6031 svc.scan(&req).unwrap();
6032
6033 let req = make_request(
6035 "DescribeContributorInsights",
6036 json!({ "TableName": "test-table" }),
6037 );
6038 let resp = svc.describe_contributor_insights(&req).unwrap();
6039 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6040 let contributors = body["TopContributors"].as_array().unwrap();
6041 assert!(
6042 contributors.is_empty(),
6043 "counters should be empty after disabling insights"
6044 );
6045 }
6046
6047 #[test]
6048 fn scan_pagination_with_limit() {
6049 let svc = make_service();
6050 create_test_table(&svc);
6051
6052 for i in 0..5 {
6054 let req = make_request(
6055 "PutItem",
6056 json!({
6057 "TableName": "test-table",
6058 "Item": {
6059 "pk": { "S": format!("item{i}") },
6060 "data": { "S": format!("value{i}") }
6061 }
6062 }),
6063 );
6064 svc.put_item(&req).unwrap();
6065 }
6066
6067 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6069 let resp = svc.scan(&req).unwrap();
6070 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6071 assert_eq!(body["Count"], 2);
6072 assert!(
6073 body["LastEvaluatedKey"].is_object(),
6074 "should have LastEvaluatedKey when limit < total items"
6075 );
6076 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6077
6078 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6080 let mut lek = body["LastEvaluatedKey"].clone();
6081
6082 while lek.is_object() {
6083 let req = make_request(
6084 "Scan",
6085 json!({
6086 "TableName": "test-table",
6087 "Limit": 2,
6088 "ExclusiveStartKey": lek
6089 }),
6090 );
6091 let resp = svc.scan(&req).unwrap();
6092 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6093 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6094 lek = body["LastEvaluatedKey"].clone();
6095 }
6096
6097 assert_eq!(
6098 all_items.len(),
6099 5,
6100 "should retrieve all 5 items via pagination"
6101 );
6102 }
6103
6104 #[test]
6105 fn scan_no_pagination_when_all_fit() {
6106 let svc = make_service();
6107 create_test_table(&svc);
6108
6109 for i in 0..3 {
6110 let req = make_request(
6111 "PutItem",
6112 json!({
6113 "TableName": "test-table",
6114 "Item": {
6115 "pk": { "S": format!("item{i}") }
6116 }
6117 }),
6118 );
6119 svc.put_item(&req).unwrap();
6120 }
6121
6122 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6124 let resp = svc.scan(&req).unwrap();
6125 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6126 assert_eq!(body["Count"], 3);
6127 assert!(
6128 body["LastEvaluatedKey"].is_null(),
6129 "should not have LastEvaluatedKey when all items fit"
6130 );
6131
6132 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6134 let resp = svc.scan(&req).unwrap();
6135 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6136 assert_eq!(body["Count"], 3);
6137 assert!(body["LastEvaluatedKey"].is_null());
6138 }
6139
6140 fn create_composite_table(svc: &DynamoDbService) {
6141 let req = make_request(
6142 "CreateTable",
6143 json!({
6144 "TableName": "composite-table",
6145 "KeySchema": [
6146 { "AttributeName": "pk", "KeyType": "HASH" },
6147 { "AttributeName": "sk", "KeyType": "RANGE" }
6148 ],
6149 "AttributeDefinitions": [
6150 { "AttributeName": "pk", "AttributeType": "S" },
6151 { "AttributeName": "sk", "AttributeType": "S" }
6152 ],
6153 "BillingMode": "PAY_PER_REQUEST"
6154 }),
6155 );
6156 svc.create_table(&req).unwrap();
6157 }
6158
6159 #[test]
6160 fn query_pagination_with_composite_key() {
6161 let svc = make_service();
6162 create_composite_table(&svc);
6163
6164 for i in 0..5 {
6166 let req = make_request(
6167 "PutItem",
6168 json!({
6169 "TableName": "composite-table",
6170 "Item": {
6171 "pk": { "S": "user1" },
6172 "sk": { "S": format!("item{i:03}") },
6173 "data": { "S": format!("value{i}") }
6174 }
6175 }),
6176 );
6177 svc.put_item(&req).unwrap();
6178 }
6179
6180 let req = make_request(
6182 "Query",
6183 json!({
6184 "TableName": "composite-table",
6185 "KeyConditionExpression": "pk = :pk",
6186 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6187 "Limit": 2
6188 }),
6189 );
6190 let resp = svc.query(&req).unwrap();
6191 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6192 assert_eq!(body["Count"], 2);
6193 assert!(body["LastEvaluatedKey"].is_object());
6194 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6195 assert!(body["LastEvaluatedKey"]["sk"].is_object());
6196
6197 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6199 let mut lek = body["LastEvaluatedKey"].clone();
6200
6201 while lek.is_object() {
6202 let req = make_request(
6203 "Query",
6204 json!({
6205 "TableName": "composite-table",
6206 "KeyConditionExpression": "pk = :pk",
6207 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6208 "Limit": 2,
6209 "ExclusiveStartKey": lek
6210 }),
6211 );
6212 let resp = svc.query(&req).unwrap();
6213 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6214 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6215 lek = body["LastEvaluatedKey"].clone();
6216 }
6217
6218 assert_eq!(
6219 all_items.len(),
6220 5,
6221 "should retrieve all 5 items via pagination"
6222 );
6223
6224 let sks: Vec<String> = all_items
6226 .iter()
6227 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6228 .collect();
6229 let mut sorted = sks.clone();
6230 sorted.sort();
6231 assert_eq!(sks, sorted, "items should be sorted by sort key");
6232 }
6233
6234 #[test]
6235 fn query_no_pagination_when_all_fit() {
6236 let svc = make_service();
6237 create_composite_table(&svc);
6238
6239 for i in 0..2 {
6240 let req = make_request(
6241 "PutItem",
6242 json!({
6243 "TableName": "composite-table",
6244 "Item": {
6245 "pk": { "S": "user1" },
6246 "sk": { "S": format!("item{i}") }
6247 }
6248 }),
6249 );
6250 svc.put_item(&req).unwrap();
6251 }
6252
6253 let req = make_request(
6254 "Query",
6255 json!({
6256 "TableName": "composite-table",
6257 "KeyConditionExpression": "pk = :pk",
6258 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6259 "Limit": 10
6260 }),
6261 );
6262 let resp = svc.query(&req).unwrap();
6263 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6264 assert_eq!(body["Count"], 2);
6265 assert!(
6266 body["LastEvaluatedKey"].is_null(),
6267 "should not have LastEvaluatedKey when all items fit"
6268 );
6269 }
6270
6271 fn create_gsi_table(svc: &DynamoDbService) {
6272 let req = make_request(
6273 "CreateTable",
6274 json!({
6275 "TableName": "gsi-table",
6276 "KeySchema": [
6277 { "AttributeName": "pk", "KeyType": "HASH" }
6278 ],
6279 "AttributeDefinitions": [
6280 { "AttributeName": "pk", "AttributeType": "S" },
6281 { "AttributeName": "gsi_pk", "AttributeType": "S" },
6282 { "AttributeName": "gsi_sk", "AttributeType": "S" }
6283 ],
6284 "BillingMode": "PAY_PER_REQUEST",
6285 "GlobalSecondaryIndexes": [
6286 {
6287 "IndexName": "gsi-index",
6288 "KeySchema": [
6289 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6290 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6291 ],
6292 "Projection": { "ProjectionType": "ALL" }
6293 }
6294 ]
6295 }),
6296 );
6297 svc.create_table(&req).unwrap();
6298 }
6299
6300 #[test]
6301 fn gsi_query_last_evaluated_key_includes_table_pk() {
6302 let svc = make_service();
6303 create_gsi_table(&svc);
6304
6305 for i in 0..3 {
6307 let req = make_request(
6308 "PutItem",
6309 json!({
6310 "TableName": "gsi-table",
6311 "Item": {
6312 "pk": { "S": format!("item{i}") },
6313 "gsi_pk": { "S": "shared" },
6314 "gsi_sk": { "S": "sort" }
6315 }
6316 }),
6317 );
6318 svc.put_item(&req).unwrap();
6319 }
6320
6321 let req = make_request(
6323 "Query",
6324 json!({
6325 "TableName": "gsi-table",
6326 "IndexName": "gsi-index",
6327 "KeyConditionExpression": "gsi_pk = :v",
6328 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6329 "Limit": 1
6330 }),
6331 );
6332 let resp = svc.query(&req).unwrap();
6333 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6334 assert_eq!(body["Count"], 1);
6335 let lek = &body["LastEvaluatedKey"];
6336 assert!(lek.is_object(), "should have LastEvaluatedKey");
6337 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6339 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6340 assert!(
6342 lek["pk"].is_object(),
6343 "LEK must contain table PK for GSI queries"
6344 );
6345 }
6346
6347 #[test]
6348 fn gsi_query_pagination_returns_all_items() {
6349 let svc = make_service();
6350 create_gsi_table(&svc);
6351
6352 for i in 0..4 {
6354 let req = make_request(
6355 "PutItem",
6356 json!({
6357 "TableName": "gsi-table",
6358 "Item": {
6359 "pk": { "S": format!("item{i:03}") },
6360 "gsi_pk": { "S": "shared" },
6361 "gsi_sk": { "S": "sort" }
6362 }
6363 }),
6364 );
6365 svc.put_item(&req).unwrap();
6366 }
6367
6368 let mut all_pks = Vec::new();
6370 let mut lek: Option<Value> = None;
6371
6372 loop {
6373 let mut query = json!({
6374 "TableName": "gsi-table",
6375 "IndexName": "gsi-index",
6376 "KeyConditionExpression": "gsi_pk = :v",
6377 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6378 "Limit": 2
6379 });
6380 if let Some(ref start_key) = lek {
6381 query["ExclusiveStartKey"] = start_key.clone();
6382 }
6383
6384 let req = make_request("Query", query);
6385 let resp = svc.query(&req).unwrap();
6386 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6387
6388 for item in body["Items"].as_array().unwrap() {
6389 let pk = item["pk"]["S"].as_str().unwrap().to_string();
6390 all_pks.push(pk);
6391 }
6392
6393 if body["LastEvaluatedKey"].is_object() {
6394 lek = Some(body["LastEvaluatedKey"].clone());
6395 } else {
6396 break;
6397 }
6398 }
6399
6400 all_pks.sort();
6401 assert_eq!(
6402 all_pks,
6403 vec!["item000", "item001", "item002", "item003"],
6404 "pagination should return all items without duplicates"
6405 );
6406 }
6407}