1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use crate::state::{
12 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
13 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
14 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
15 ReplicaDescription, SharedDynamoDbState,
16};
17
18pub struct DynamoDbService {
19 state: SharedDynamoDbState,
20}
21
22impl DynamoDbService {
23 pub fn new(state: SharedDynamoDbState) -> Self {
24 Self { state }
25 }
26
27 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
28 serde_json::from_slice(&req.body).map_err(|e| {
29 AwsServiceError::aws_error(
30 StatusCode::BAD_REQUEST,
31 "SerializationException",
32 format!("Invalid JSON: {e}"),
33 )
34 })
35 }
36
37 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
38 Ok(AwsResponse::json(
39 StatusCode::OK,
40 serde_json::to_vec(&body).unwrap(),
41 ))
42 }
43
44 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
47 let body = Self::parse_body(req)?;
48
49 let table_name = body["TableName"]
50 .as_str()
51 .ok_or_else(|| {
52 AwsServiceError::aws_error(
53 StatusCode::BAD_REQUEST,
54 "ValidationException",
55 "TableName is required",
56 )
57 })?
58 .to_string();
59
60 let key_schema = parse_key_schema(&body["KeySchema"])?;
61 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
62
63 for ks in &key_schema {
65 if !attribute_definitions
66 .iter()
67 .any(|ad| ad.attribute_name == ks.attribute_name)
68 {
69 return Err(AwsServiceError::aws_error(
70 StatusCode::BAD_REQUEST,
71 "ValidationException",
72 format!(
73 "One or more parameter values were invalid: \
74 Some index key attributes are not defined in AttributeDefinitions. \
75 Keys: [{}], AttributeDefinitions: [{}]",
76 ks.attribute_name,
77 attribute_definitions
78 .iter()
79 .map(|ad| ad.attribute_name.as_str())
80 .collect::<Vec<_>>()
81 .join(", ")
82 ),
83 ));
84 }
85 }
86
87 let billing_mode = body["BillingMode"]
88 .as_str()
89 .unwrap_or("PROVISIONED")
90 .to_string();
91
92 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
93 ProvisionedThroughput {
94 read_capacity_units: 0,
95 write_capacity_units: 0,
96 }
97 } else {
98 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
99 };
100
101 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
102 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
103 let tags = parse_tags(&body["Tags"]);
104
105 let mut state = self.state.write();
106
107 if state.tables.contains_key(&table_name) {
108 return Err(AwsServiceError::aws_error(
109 StatusCode::BAD_REQUEST,
110 "ResourceInUseException",
111 format!("Table already exists: {table_name}"),
112 ));
113 }
114
115 let arn = format!(
116 "arn:aws:dynamodb:{}:{}:table/{}",
117 state.region, state.account_id, table_name
118 );
119 let now = Utc::now();
120
121 let table = DynamoTable {
122 name: table_name.clone(),
123 arn: arn.clone(),
124 key_schema: key_schema.clone(),
125 attribute_definitions: attribute_definitions.clone(),
126 provisioned_throughput: provisioned_throughput.clone(),
127 items: Vec::new(),
128 gsi: gsi.clone(),
129 lsi: lsi.clone(),
130 tags,
131 created_at: now,
132 status: "ACTIVE".to_string(),
133 item_count: 0,
134 size_bytes: 0,
135 billing_mode: billing_mode.clone(),
136 ttl_attribute: None,
137 ttl_enabled: false,
138 resource_policy: None,
139 pitr_enabled: false,
140 kinesis_destinations: Vec::new(),
141 contributor_insights_status: "DISABLED".to_string(),
142 };
143
144 state.tables.insert(table_name, table);
145
146 let table_desc = build_table_description_json(
147 &arn,
148 &key_schema,
149 &attribute_definitions,
150 &provisioned_throughput,
151 &gsi,
152 &lsi,
153 &billing_mode,
154 now,
155 0,
156 0,
157 "ACTIVE",
158 );
159
160 Self::ok_json(json!({ "TableDescription": table_desc }))
161 }
162
163 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
164 let body = Self::parse_body(req)?;
165 let table_name = require_str(&body, "TableName")?;
166
167 let mut state = self.state.write();
168 let table = state.tables.remove(table_name).ok_or_else(|| {
169 AwsServiceError::aws_error(
170 StatusCode::BAD_REQUEST,
171 "ResourceNotFoundException",
172 format!("Requested resource not found: Table: {table_name} not found"),
173 )
174 })?;
175
176 let table_desc = build_table_description_json(
177 &table.arn,
178 &table.key_schema,
179 &table.attribute_definitions,
180 &table.provisioned_throughput,
181 &table.gsi,
182 &table.lsi,
183 &table.billing_mode,
184 table.created_at,
185 table.item_count,
186 table.size_bytes,
187 "DELETING",
188 );
189
190 Self::ok_json(json!({ "TableDescription": table_desc }))
191 }
192
193 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194 let body = Self::parse_body(req)?;
195 let table_name = require_str(&body, "TableName")?;
196
197 let state = self.state.read();
198 let table = get_table(&state.tables, table_name)?;
199
200 let table_desc = build_table_description_json(
201 &table.arn,
202 &table.key_schema,
203 &table.attribute_definitions,
204 &table.provisioned_throughput,
205 &table.gsi,
206 &table.lsi,
207 &table.billing_mode,
208 table.created_at,
209 table.item_count,
210 table.size_bytes,
211 &table.status,
212 );
213
214 Self::ok_json(json!({ "Table": table_desc }))
215 }
216
217 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
218 let body = Self::parse_body(req)?;
219
220 validate_optional_string_length(
221 "exclusiveStartTableName",
222 body["ExclusiveStartTableName"].as_str(),
223 3,
224 255,
225 )?;
226 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
227
228 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
229 let exclusive_start = body["ExclusiveStartTableName"]
230 .as_str()
231 .map(|s| s.to_string());
232
233 let state = self.state.read();
234 let mut names: Vec<&String> = state.tables.keys().collect();
235 names.sort();
236
237 let start_idx = match &exclusive_start {
238 Some(start) => names
239 .iter()
240 .position(|n| n.as_str() > start.as_str())
241 .unwrap_or(names.len()),
242 None => 0,
243 };
244
245 let page: Vec<&str> = names
246 .iter()
247 .skip(start_idx)
248 .take(limit)
249 .map(|n| n.as_str())
250 .collect();
251
252 let mut result = json!({ "TableNames": page });
253
254 if start_idx + limit < names.len() {
255 if let Some(last) = page.last() {
256 result["LastEvaluatedTableName"] = json!(last);
257 }
258 }
259
260 Self::ok_json(result)
261 }
262
263 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
264 let body = Self::parse_body(req)?;
265 let table_name = require_str(&body, "TableName")?;
266
267 let mut state = self.state.write();
268 let table = state.tables.get_mut(table_name).ok_or_else(|| {
269 AwsServiceError::aws_error(
270 StatusCode::BAD_REQUEST,
271 "ResourceNotFoundException",
272 format!("Requested resource not found: Table: {table_name} not found"),
273 )
274 })?;
275
276 if let Some(pt) = body.get("ProvisionedThroughput") {
277 if let Ok(throughput) = parse_provisioned_throughput(pt) {
278 table.provisioned_throughput = throughput;
279 }
280 }
281
282 if let Some(bm) = body["BillingMode"].as_str() {
283 table.billing_mode = bm.to_string();
284 }
285
286 let table_desc = build_table_description_json(
287 &table.arn,
288 &table.key_schema,
289 &table.attribute_definitions,
290 &table.provisioned_throughput,
291 &table.gsi,
292 &table.lsi,
293 &table.billing_mode,
294 table.created_at,
295 table.item_count,
296 table.size_bytes,
297 &table.status,
298 );
299
300 Self::ok_json(json!({ "TableDescription": table_desc }))
301 }
302
303 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
306 let body = Self::parse_body(req)?;
307 let table_name = require_str(&body, "TableName")?;
308 let item = require_object(&body, "Item")?;
309
310 let mut state = self.state.write();
311 let table = get_table_mut(&mut state.tables, table_name)?;
312
313 validate_key_in_item(table, &item)?;
314
315 let condition = body["ConditionExpression"].as_str();
316 let expr_attr_names = parse_expression_attribute_names(&body);
317 let expr_attr_values = parse_expression_attribute_values(&body);
318
319 let key = extract_key(table, &item);
320 let existing_idx = table.find_item_index(&key);
321
322 if let Some(cond) = condition {
323 let existing = existing_idx.map(|i| &table.items[i]);
324 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
325 }
326
327 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
328 let old_item = if return_values == "ALL_OLD" {
329 existing_idx.map(|i| table.items[i].clone())
330 } else {
331 None
332 };
333
334 if let Some(idx) = existing_idx {
335 table.items[idx] = item;
336 } else {
337 table.items.push(item);
338 }
339
340 table.recalculate_stats();
341
342 let mut result = json!({});
343 if let Some(old) = old_item {
344 result["Attributes"] = json!(old);
345 }
346
347 Self::ok_json(result)
348 }
349
350 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
351 let body = Self::parse_body(req)?;
352 let table_name = require_str(&body, "TableName")?;
353 let key = require_object(&body, "Key")?;
354
355 let state = self.state.read();
356 let table = get_table(&state.tables, table_name)?;
357
358 let result = match table.find_item_index(&key) {
359 Some(idx) => {
360 let item = &table.items[idx];
361 let projected = project_item(item, &body);
362 json!({ "Item": projected })
363 }
364 None => json!({}),
365 };
366
367 Self::ok_json(result)
368 }
369
370 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
371 let body = Self::parse_body(req)?;
372
373 validate_optional_enum_value(
374 "conditionalOperator",
375 &body["ConditionalOperator"],
376 &["AND", "OR"],
377 )?;
378 validate_optional_enum_value(
379 "returnConsumedCapacity",
380 &body["ReturnConsumedCapacity"],
381 &["INDEXES", "TOTAL", "NONE"],
382 )?;
383 validate_optional_enum_value(
384 "returnValues",
385 &body["ReturnValues"],
386 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
387 )?;
388 validate_optional_enum_value(
389 "returnItemCollectionMetrics",
390 &body["ReturnItemCollectionMetrics"],
391 &["SIZE", "NONE"],
392 )?;
393 validate_optional_enum_value(
394 "returnValuesOnConditionCheckFailure",
395 &body["ReturnValuesOnConditionCheckFailure"],
396 &["ALL_OLD", "NONE"],
397 )?;
398
399 let table_name = require_str(&body, "TableName")?;
400 let key = require_object(&body, "Key")?;
401
402 let mut state = self.state.write();
403 let table = get_table_mut(&mut state.tables, table_name)?;
404
405 let condition = body["ConditionExpression"].as_str();
406 let expr_attr_names = parse_expression_attribute_names(&body);
407 let expr_attr_values = parse_expression_attribute_values(&body);
408
409 let existing_idx = table.find_item_index(&key);
410
411 if let Some(cond) = condition {
412 let existing = existing_idx.map(|i| &table.items[i]);
413 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
414 }
415
416 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
417
418 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
419 let return_icm = body["ReturnItemCollectionMetrics"]
420 .as_str()
421 .unwrap_or("NONE");
422
423 let mut result = json!({});
424
425 if let Some(idx) = existing_idx {
426 if return_values == "ALL_OLD" {
427 result["Attributes"] = json!(table.items[idx]);
428 }
429 table.items.remove(idx);
430 table.recalculate_stats();
431 }
432
433 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
434 result["ConsumedCapacity"] = json!({
435 "TableName": table_name,
436 "CapacityUnits": 1.0,
437 });
438 }
439
440 if return_icm == "SIZE" {
441 result["ItemCollectionMetrics"] = json!({});
442 }
443
444 Self::ok_json(result)
445 }
446
447 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448 let body = Self::parse_body(req)?;
449 let table_name = require_str(&body, "TableName")?;
450 let key = require_object(&body, "Key")?;
451
452 let mut state = self.state.write();
453 let table = get_table_mut(&mut state.tables, table_name)?;
454
455 validate_key_attributes_in_key(table, &key)?;
456
457 let condition = body["ConditionExpression"].as_str();
458 let expr_attr_names = parse_expression_attribute_names(&body);
459 let expr_attr_values = parse_expression_attribute_values(&body);
460 let update_expression = body["UpdateExpression"].as_str();
461
462 let existing_idx = table.find_item_index(&key);
463
464 if let Some(cond) = condition {
465 let existing = existing_idx.map(|i| &table.items[i]);
466 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
467 }
468
469 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
470
471 let idx = match existing_idx {
472 Some(i) => i,
473 None => {
474 let mut new_item = HashMap::new();
475 for (k, v) in &key {
476 new_item.insert(k.clone(), v.clone());
477 }
478 table.items.push(new_item);
479 table.items.len() - 1
480 }
481 };
482
483 let old_item = if return_values == "ALL_OLD" {
484 Some(table.items[idx].clone())
485 } else {
486 None
487 };
488
489 if let Some(expr) = update_expression {
490 apply_update_expression(
491 &mut table.items[idx],
492 expr,
493 &expr_attr_names,
494 &expr_attr_values,
495 )?;
496 }
497
498 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
499 Some(table.items[idx].clone())
500 } else {
501 None
502 };
503
504 table.recalculate_stats();
505
506 let mut result = json!({});
507 if let Some(old) = old_item {
508 result["Attributes"] = json!(old);
509 } else if let Some(new) = new_item {
510 result["Attributes"] = json!(new);
511 }
512
513 Self::ok_json(result)
514 }
515
516 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
519 let body = Self::parse_body(req)?;
520 let table_name = require_str(&body, "TableName")?;
521
522 let state = self.state.read();
523 let table = get_table(&state.tables, table_name)?;
524
525 let expr_attr_names = parse_expression_attribute_names(&body);
526 let expr_attr_values = parse_expression_attribute_values(&body);
527
528 let key_condition = body["KeyConditionExpression"].as_str();
529 let filter_expression = body["FilterExpression"].as_str();
530 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
531 let limit = body["Limit"].as_i64().map(|l| l as usize);
532 let index_name = body["IndexName"].as_str();
533
534 let (items_to_scan, hash_key_name, range_key_name): (
535 &[HashMap<String, AttributeValue>],
536 String,
537 Option<String>,
538 ) = if let Some(idx_name) = index_name {
539 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
540 let hk = gsi
541 .key_schema
542 .iter()
543 .find(|k| k.key_type == "HASH")
544 .map(|k| k.attribute_name.clone())
545 .unwrap_or_default();
546 let rk = gsi
547 .key_schema
548 .iter()
549 .find(|k| k.key_type == "RANGE")
550 .map(|k| k.attribute_name.clone());
551 (&table.items, hk, rk)
552 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
553 let hk = lsi
554 .key_schema
555 .iter()
556 .find(|k| k.key_type == "HASH")
557 .map(|k| k.attribute_name.clone())
558 .unwrap_or_default();
559 let rk = lsi
560 .key_schema
561 .iter()
562 .find(|k| k.key_type == "RANGE")
563 .map(|k| k.attribute_name.clone());
564 (&table.items, hk, rk)
565 } else {
566 return Err(AwsServiceError::aws_error(
567 StatusCode::BAD_REQUEST,
568 "ValidationException",
569 format!("The table does not have the specified index: {idx_name}"),
570 ));
571 }
572 } else {
573 (
574 &table.items[..],
575 table.hash_key_name().to_string(),
576 table.range_key_name().map(|s| s.to_string()),
577 )
578 };
579
580 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
581 .iter()
582 .filter(|item| {
583 if let Some(kc) = key_condition {
584 evaluate_key_condition(
585 kc,
586 item,
587 &hash_key_name,
588 range_key_name.as_deref(),
589 &expr_attr_names,
590 &expr_attr_values,
591 )
592 } else {
593 true
594 }
595 })
596 .collect();
597
598 if let Some(ref rk) = range_key_name {
599 matched.sort_by(|a, b| {
600 let av = a.get(rk.as_str());
601 let bv = b.get(rk.as_str());
602 compare_attribute_values(av, bv)
603 });
604 if !scan_forward {
605 matched.reverse();
606 }
607 }
608
609 if let Some(filter) = filter_expression {
610 matched.retain(|item| {
611 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
612 });
613 }
614
615 let scanned_count = matched.len();
616
617 if let Some(lim) = limit {
618 matched.truncate(lim);
619 }
620
621 let items: Vec<Value> = matched
622 .iter()
623 .map(|item| {
624 let projected = project_item(item, &body);
625 json!(projected)
626 })
627 .collect();
628
629 Self::ok_json(json!({
630 "Items": items,
631 "Count": items.len(),
632 "ScannedCount": scanned_count,
633 }))
634 }
635
636 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637 let body = Self::parse_body(req)?;
638 let table_name = require_str(&body, "TableName")?;
639
640 let state = self.state.read();
641 let table = get_table(&state.tables, table_name)?;
642
643 let expr_attr_names = parse_expression_attribute_names(&body);
644 let expr_attr_values = parse_expression_attribute_values(&body);
645 let filter_expression = body["FilterExpression"].as_str();
646 let limit = body["Limit"].as_i64().map(|l| l as usize);
647
648 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
649 let scanned_count = matched.len();
650
651 if let Some(filter) = filter_expression {
652 matched.retain(|item| {
653 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
654 });
655 }
656
657 if let Some(lim) = limit {
658 matched.truncate(lim);
659 }
660
661 let items: Vec<Value> = matched
662 .iter()
663 .map(|item| {
664 let projected = project_item(item, &body);
665 json!(projected)
666 })
667 .collect();
668
669 Self::ok_json(json!({
670 "Items": items,
671 "Count": items.len(),
672 "ScannedCount": scanned_count,
673 }))
674 }
675
676 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679 let body = Self::parse_body(req)?;
680
681 validate_optional_enum_value(
682 "returnConsumedCapacity",
683 &body["ReturnConsumedCapacity"],
684 &["INDEXES", "TOTAL", "NONE"],
685 )?;
686
687 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
688
689 let request_items = body["RequestItems"]
690 .as_object()
691 .ok_or_else(|| {
692 AwsServiceError::aws_error(
693 StatusCode::BAD_REQUEST,
694 "ValidationException",
695 "RequestItems is required",
696 )
697 })?
698 .clone();
699
700 let state = self.state.read();
701 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
702 let mut consumed_capacity: Vec<Value> = Vec::new();
703
704 for (table_name, params) in &request_items {
705 let table = get_table(&state.tables, table_name)?;
706 let keys = params["Keys"].as_array().ok_or_else(|| {
707 AwsServiceError::aws_error(
708 StatusCode::BAD_REQUEST,
709 "ValidationException",
710 "Keys is required",
711 )
712 })?;
713
714 let mut items = Vec::new();
715 for key_val in keys {
716 let key: HashMap<String, AttributeValue> =
717 serde_json::from_value(key_val.clone()).unwrap_or_default();
718 if let Some(idx) = table.find_item_index(&key) {
719 items.push(json!(table.items[idx]));
720 }
721 }
722 responses.insert(table_name.clone(), items);
723
724 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
725 consumed_capacity.push(json!({
726 "TableName": table_name,
727 "CapacityUnits": 1.0,
728 }));
729 }
730 }
731
732 let mut result = json!({
733 "Responses": responses,
734 "UnprocessedKeys": {},
735 });
736
737 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
738 result["ConsumedCapacity"] = json!(consumed_capacity);
739 }
740
741 Self::ok_json(result)
742 }
743
744 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
745 let body = Self::parse_body(req)?;
746
747 validate_optional_enum_value(
748 "returnConsumedCapacity",
749 &body["ReturnConsumedCapacity"],
750 &["INDEXES", "TOTAL", "NONE"],
751 )?;
752 validate_optional_enum_value(
753 "returnItemCollectionMetrics",
754 &body["ReturnItemCollectionMetrics"],
755 &["SIZE", "NONE"],
756 )?;
757
758 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
759 let return_icm = body["ReturnItemCollectionMetrics"]
760 .as_str()
761 .unwrap_or("NONE");
762
763 let request_items = body["RequestItems"]
764 .as_object()
765 .ok_or_else(|| {
766 AwsServiceError::aws_error(
767 StatusCode::BAD_REQUEST,
768 "ValidationException",
769 "RequestItems is required",
770 )
771 })?
772 .clone();
773
774 let mut state = self.state.write();
775 let mut consumed_capacity: Vec<Value> = Vec::new();
776 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
777
778 for (table_name, requests) in &request_items {
779 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
780 AwsServiceError::aws_error(
781 StatusCode::BAD_REQUEST,
782 "ResourceNotFoundException",
783 format!("Requested resource not found: Table: {table_name} not found"),
784 )
785 })?;
786
787 let reqs = requests.as_array().ok_or_else(|| {
788 AwsServiceError::aws_error(
789 StatusCode::BAD_REQUEST,
790 "ValidationException",
791 "Request list must be an array",
792 )
793 })?;
794
795 for request in reqs {
796 if let Some(put_req) = request.get("PutRequest") {
797 let item: HashMap<String, AttributeValue> =
798 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
799 let key = extract_key(table, &item);
800 if let Some(idx) = table.find_item_index(&key) {
801 table.items[idx] = item;
802 } else {
803 table.items.push(item);
804 }
805 } else if let Some(del_req) = request.get("DeleteRequest") {
806 let key: HashMap<String, AttributeValue> =
807 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
808 if let Some(idx) = table.find_item_index(&key) {
809 table.items.remove(idx);
810 }
811 }
812 }
813
814 table.recalculate_stats();
815
816 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
817 consumed_capacity.push(json!({
818 "TableName": table_name,
819 "CapacityUnits": 1.0,
820 }));
821 }
822
823 if return_icm == "SIZE" {
824 item_collection_metrics.insert(table_name.clone(), vec![]);
825 }
826 }
827
828 let mut result = json!({
829 "UnprocessedItems": {},
830 });
831
832 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
833 result["ConsumedCapacity"] = json!(consumed_capacity);
834 }
835
836 if return_icm == "SIZE" {
837 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
838 }
839
840 Self::ok_json(result)
841 }
842
843 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
846 let body = Self::parse_body(req)?;
847 let resource_arn = require_str(&body, "ResourceArn")?;
848 let tags_arr = body["Tags"].as_array().ok_or_else(|| {
849 AwsServiceError::aws_error(
850 StatusCode::BAD_REQUEST,
851 "ValidationException",
852 "Tags is required",
853 )
854 })?;
855
856 let mut state = self.state.write();
857 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
858
859 for tag in tags_arr {
860 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
861 table.tags.insert(k.to_string(), v.to_string());
862 }
863 }
864
865 Self::ok_json(json!({}))
866 }
867
868 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
869 let body = Self::parse_body(req)?;
870 let resource_arn = require_str(&body, "ResourceArn")?;
871 let tag_keys = body["TagKeys"].as_array().ok_or_else(|| {
872 AwsServiceError::aws_error(
873 StatusCode::BAD_REQUEST,
874 "ValidationException",
875 "TagKeys is required",
876 )
877 })?;
878
879 let mut state = self.state.write();
880 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
881
882 for key in tag_keys {
883 if let Some(k) = key.as_str() {
884 table.tags.remove(k);
885 }
886 }
887
888 Self::ok_json(json!({}))
889 }
890
891 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892 let body = Self::parse_body(req)?;
893 let resource_arn = require_str(&body, "ResourceArn")?;
894
895 let state = self.state.read();
896 let table = find_table_by_arn(&state.tables, resource_arn)?;
897
898 let tags: Vec<Value> = table
899 .tags
900 .iter()
901 .map(|(k, v)| json!({"Key": k, "Value": v}))
902 .collect();
903
904 Self::ok_json(json!({ "Tags": tags }))
905 }
906
907 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
910 let body = Self::parse_body(req)?;
911 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
912 AwsServiceError::aws_error(
913 StatusCode::BAD_REQUEST,
914 "ValidationException",
915 "TransactItems is required",
916 )
917 })?;
918
919 let state = self.state.read();
920 let mut responses: Vec<Value> = Vec::new();
921
922 for ti in transact_items {
923 let get = &ti["Get"];
924 let table_name = get["TableName"].as_str().ok_or_else(|| {
925 AwsServiceError::aws_error(
926 StatusCode::BAD_REQUEST,
927 "ValidationException",
928 "TableName is required in Get",
929 )
930 })?;
931 let key: HashMap<String, AttributeValue> =
932 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
933
934 let table = get_table(&state.tables, table_name)?;
935 match table.find_item_index(&key) {
936 Some(idx) => {
937 responses.push(json!({ "Item": table.items[idx] }));
938 }
939 None => {
940 responses.push(json!({}));
941 }
942 }
943 }
944
945 Self::ok_json(json!({ "Responses": responses }))
946 }
947
948 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
949 let body = Self::parse_body(req)?;
950 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
951 AwsServiceError::aws_error(
952 StatusCode::BAD_REQUEST,
953 "ValidationException",
954 "TransactItems is required",
955 )
956 })?;
957
958 let mut state = self.state.write();
959
960 let mut cancellation_reasons: Vec<Value> = Vec::new();
962 let mut any_failed = false;
963
964 for ti in transact_items {
965 if let Some(put) = ti.get("Put") {
966 let table_name = put["TableName"].as_str().unwrap_or_default();
967 let item: HashMap<String, AttributeValue> =
968 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
969 let condition = put["ConditionExpression"].as_str();
970
971 if let Some(cond) = condition {
972 let table = get_table(&state.tables, table_name)?;
973 let expr_attr_names = parse_expression_attribute_names(put);
974 let expr_attr_values = parse_expression_attribute_values(put);
975 let key = extract_key(table, &item);
976 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
977 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
978 .is_err()
979 {
980 cancellation_reasons.push(json!({
981 "Code": "ConditionalCheckFailed",
982 "Message": "The conditional request failed"
983 }));
984 any_failed = true;
985 continue;
986 }
987 }
988 cancellation_reasons.push(json!({ "Code": "None" }));
989 } else if let Some(delete) = ti.get("Delete") {
990 let table_name = delete["TableName"].as_str().unwrap_or_default();
991 let key: HashMap<String, AttributeValue> =
992 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
993 let condition = delete["ConditionExpression"].as_str();
994
995 if let Some(cond) = condition {
996 let table = get_table(&state.tables, table_name)?;
997 let expr_attr_names = parse_expression_attribute_names(delete);
998 let expr_attr_values = parse_expression_attribute_values(delete);
999 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1000 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1001 .is_err()
1002 {
1003 cancellation_reasons.push(json!({
1004 "Code": "ConditionalCheckFailed",
1005 "Message": "The conditional request failed"
1006 }));
1007 any_failed = true;
1008 continue;
1009 }
1010 }
1011 cancellation_reasons.push(json!({ "Code": "None" }));
1012 } else if let Some(update) = ti.get("Update") {
1013 let table_name = update["TableName"].as_str().unwrap_or_default();
1014 let key: HashMap<String, AttributeValue> =
1015 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1016 let condition = update["ConditionExpression"].as_str();
1017
1018 if let Some(cond) = condition {
1019 let table = get_table(&state.tables, table_name)?;
1020 let expr_attr_names = parse_expression_attribute_names(update);
1021 let expr_attr_values = parse_expression_attribute_values(update);
1022 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1023 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1024 .is_err()
1025 {
1026 cancellation_reasons.push(json!({
1027 "Code": "ConditionalCheckFailed",
1028 "Message": "The conditional request failed"
1029 }));
1030 any_failed = true;
1031 continue;
1032 }
1033 }
1034 cancellation_reasons.push(json!({ "Code": "None" }));
1035 } else if let Some(check) = ti.get("ConditionCheck") {
1036 let table_name = check["TableName"].as_str().unwrap_or_default();
1037 let key: HashMap<String, AttributeValue> =
1038 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1039 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1040
1041 let table = get_table(&state.tables, table_name)?;
1042 let expr_attr_names = parse_expression_attribute_names(check);
1043 let expr_attr_values = parse_expression_attribute_values(check);
1044 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1045 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1046 {
1047 cancellation_reasons.push(json!({
1048 "Code": "ConditionalCheckFailed",
1049 "Message": "The conditional request failed"
1050 }));
1051 any_failed = true;
1052 continue;
1053 }
1054 cancellation_reasons.push(json!({ "Code": "None" }));
1055 } else {
1056 cancellation_reasons.push(json!({ "Code": "None" }));
1057 }
1058 }
1059
1060 if any_failed {
1061 let error_body = json!({
1062 "__type": "TransactionCanceledException",
1063 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1064 "CancellationReasons": cancellation_reasons
1065 });
1066 return Ok(AwsResponse::json(
1067 StatusCode::BAD_REQUEST,
1068 serde_json::to_vec(&error_body).unwrap(),
1069 ));
1070 }
1071
1072 for ti in transact_items {
1074 if let Some(put) = ti.get("Put") {
1075 let table_name = put["TableName"].as_str().unwrap_or_default();
1076 let item: HashMap<String, AttributeValue> =
1077 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1078 let table = get_table_mut(&mut state.tables, table_name)?;
1079 let key = extract_key(table, &item);
1080 if let Some(idx) = table.find_item_index(&key) {
1081 table.items[idx] = item;
1082 } else {
1083 table.items.push(item);
1084 }
1085 table.recalculate_stats();
1086 } else if let Some(delete) = ti.get("Delete") {
1087 let table_name = delete["TableName"].as_str().unwrap_or_default();
1088 let key: HashMap<String, AttributeValue> =
1089 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1090 let table = get_table_mut(&mut state.tables, table_name)?;
1091 if let Some(idx) = table.find_item_index(&key) {
1092 table.items.remove(idx);
1093 }
1094 table.recalculate_stats();
1095 } else if let Some(update) = ti.get("Update") {
1096 let table_name = update["TableName"].as_str().unwrap_or_default();
1097 let key: HashMap<String, AttributeValue> =
1098 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1099 let update_expression = update["UpdateExpression"].as_str();
1100 let expr_attr_names = parse_expression_attribute_names(update);
1101 let expr_attr_values = parse_expression_attribute_values(update);
1102
1103 let table = get_table_mut(&mut state.tables, table_name)?;
1104 let idx = match table.find_item_index(&key) {
1105 Some(i) => i,
1106 None => {
1107 let mut new_item = HashMap::new();
1108 for (k, v) in &key {
1109 new_item.insert(k.clone(), v.clone());
1110 }
1111 table.items.push(new_item);
1112 table.items.len() - 1
1113 }
1114 };
1115
1116 if let Some(expr) = update_expression {
1117 apply_update_expression(
1118 &mut table.items[idx],
1119 expr,
1120 &expr_attr_names,
1121 &expr_attr_values,
1122 )?;
1123 }
1124 table.recalculate_stats();
1125 }
1126 }
1128
1129 Self::ok_json(json!({}))
1130 }
1131
1132 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1133 let body = Self::parse_body(req)?;
1134 let statement = require_str(&body, "Statement")?;
1135 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1136
1137 execute_partiql_statement(&self.state, statement, ¶meters)
1138 }
1139
1140 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1141 let body = Self::parse_body(req)?;
1142 let statements = body["Statements"].as_array().ok_or_else(|| {
1143 AwsServiceError::aws_error(
1144 StatusCode::BAD_REQUEST,
1145 "ValidationException",
1146 "Statements is required",
1147 )
1148 })?;
1149
1150 let mut responses: Vec<Value> = Vec::new();
1151 for stmt_obj in statements {
1152 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1153 let parameters = stmt_obj["Parameters"]
1154 .as_array()
1155 .cloned()
1156 .unwrap_or_default();
1157
1158 match execute_partiql_statement(&self.state, statement, ¶meters) {
1159 Ok(resp) => {
1160 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1161 responses.push(resp_body);
1162 }
1163 Err(e) => {
1164 responses.push(json!({
1165 "Error": {
1166 "Code": "ValidationException",
1167 "Message": e.to_string()
1168 }
1169 }));
1170 }
1171 }
1172 }
1173
1174 Self::ok_json(json!({ "Responses": responses }))
1175 }
1176
1177 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1178 let body = Self::parse_body(req)?;
1179 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1180 AwsServiceError::aws_error(
1181 StatusCode::BAD_REQUEST,
1182 "ValidationException",
1183 "TransactStatements is required",
1184 )
1185 })?;
1186
1187 let mut results: Vec<Result<Value, String>> = Vec::new();
1189 for stmt_obj in transact_statements {
1190 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1191 let parameters = stmt_obj["Parameters"]
1192 .as_array()
1193 .cloned()
1194 .unwrap_or_default();
1195
1196 match execute_partiql_statement(&self.state, statement, ¶meters) {
1197 Ok(resp) => {
1198 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1199 results.push(Ok(resp_body));
1200 }
1201 Err(e) => {
1202 results.push(Err(e.to_string()));
1203 }
1204 }
1205 }
1206
1207 let any_failed = results.iter().any(|r| r.is_err());
1208 if any_failed {
1209 let reasons: Vec<Value> = results
1210 .iter()
1211 .map(|r| match r {
1212 Ok(_) => json!({ "Code": "None" }),
1213 Err(msg) => json!({
1214 "Code": "ValidationException",
1215 "Message": msg
1216 }),
1217 })
1218 .collect();
1219 let error_body = json!({
1220 "__type": "TransactionCanceledException",
1221 "message": "Transaction cancelled due to validation errors",
1222 "CancellationReasons": reasons
1223 });
1224 return Ok(AwsResponse::json(
1225 StatusCode::BAD_REQUEST,
1226 serde_json::to_vec(&error_body).unwrap(),
1227 ));
1228 }
1229
1230 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1231 Self::ok_json(json!({ "Responses": responses }))
1232 }
1233
1234 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1237 let body = Self::parse_body(req)?;
1238 let table_name = require_str(&body, "TableName")?;
1239 let spec = &body["TimeToLiveSpecification"];
1240 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1241 AwsServiceError::aws_error(
1242 StatusCode::BAD_REQUEST,
1243 "ValidationException",
1244 "TimeToLiveSpecification.AttributeName is required",
1245 )
1246 })?;
1247 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1248 AwsServiceError::aws_error(
1249 StatusCode::BAD_REQUEST,
1250 "ValidationException",
1251 "TimeToLiveSpecification.Enabled is required",
1252 )
1253 })?;
1254
1255 let mut state = self.state.write();
1256 let table = get_table_mut(&mut state.tables, table_name)?;
1257
1258 if enabled {
1259 table.ttl_attribute = Some(attr_name.to_string());
1260 table.ttl_enabled = true;
1261 } else {
1262 table.ttl_enabled = false;
1263 }
1264
1265 Self::ok_json(json!({
1266 "TimeToLiveSpecification": {
1267 "AttributeName": attr_name,
1268 "Enabled": enabled
1269 }
1270 }))
1271 }
1272
1273 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274 let body = Self::parse_body(req)?;
1275 let table_name = require_str(&body, "TableName")?;
1276
1277 let state = self.state.read();
1278 let table = get_table(&state.tables, table_name)?;
1279
1280 let status = if table.ttl_enabled {
1281 "ENABLED"
1282 } else {
1283 "DISABLED"
1284 };
1285
1286 let mut desc = json!({
1287 "TimeToLiveDescription": {
1288 "TimeToLiveStatus": status
1289 }
1290 });
1291
1292 if let Some(ref attr) = table.ttl_attribute {
1293 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1294 }
1295
1296 Self::ok_json(desc)
1297 }
1298
1299 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1302 let body = Self::parse_body(req)?;
1303 let resource_arn = require_str(&body, "ResourceArn")?;
1304 let policy = require_str(&body, "Policy")?;
1305
1306 let mut state = self.state.write();
1307 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1308 table.resource_policy = Some(policy.to_string());
1309
1310 let revision_id = uuid::Uuid::new_v4().to_string();
1311 Self::ok_json(json!({ "RevisionId": revision_id }))
1312 }
1313
1314 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1315 let body = Self::parse_body(req)?;
1316 let resource_arn = require_str(&body, "ResourceArn")?;
1317
1318 let state = self.state.read();
1319 let table = find_table_by_arn(&state.tables, resource_arn)?;
1320
1321 match &table.resource_policy {
1322 Some(policy) => {
1323 let revision_id = uuid::Uuid::new_v4().to_string();
1324 Self::ok_json(json!({
1325 "Policy": policy,
1326 "RevisionId": revision_id
1327 }))
1328 }
1329 None => Self::ok_json(json!({ "Policy": null })),
1330 }
1331 }
1332
1333 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1334 let body = Self::parse_body(req)?;
1335 let resource_arn = require_str(&body, "ResourceArn")?;
1336
1337 let mut state = self.state.write();
1338 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1339 table.resource_policy = None;
1340
1341 Self::ok_json(json!({}))
1342 }
1343
1344 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1347 Self::ok_json(json!({
1348 "Endpoints": [{
1349 "Address": "dynamodb.us-east-1.amazonaws.com",
1350 "CachePeriodInMinutes": 1440
1351 }]
1352 }))
1353 }
1354
1355 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1356 Self::ok_json(json!({
1357 "AccountMaxReadCapacityUnits": 80000,
1358 "AccountMaxWriteCapacityUnits": 80000,
1359 "TableMaxReadCapacityUnits": 40000,
1360 "TableMaxWriteCapacityUnits": 40000
1361 }))
1362 }
1363
1364 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1367 let body = Self::parse_body(req)?;
1368 let table_name = require_str(&body, "TableName")?;
1369 let backup_name = require_str(&body, "BackupName")?;
1370
1371 let mut state = self.state.write();
1372 let table = get_table(&state.tables, table_name)?;
1373
1374 let backup_arn = format!(
1375 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1376 state.region,
1377 state.account_id,
1378 table_name,
1379 Utc::now().format("%Y%m%d%H%M%S")
1380 );
1381 let now = Utc::now();
1382
1383 let backup = BackupDescription {
1384 backup_arn: backup_arn.clone(),
1385 backup_name: backup_name.to_string(),
1386 table_name: table_name.to_string(),
1387 table_arn: table.arn.clone(),
1388 backup_status: "AVAILABLE".to_string(),
1389 backup_type: "USER".to_string(),
1390 backup_creation_date: now,
1391 key_schema: table.key_schema.clone(),
1392 attribute_definitions: table.attribute_definitions.clone(),
1393 provisioned_throughput: table.provisioned_throughput.clone(),
1394 billing_mode: table.billing_mode.clone(),
1395 item_count: table.item_count,
1396 size_bytes: table.size_bytes,
1397 };
1398
1399 state.backups.insert(backup_arn.clone(), backup);
1400
1401 Self::ok_json(json!({
1402 "BackupDetails": {
1403 "BackupArn": backup_arn,
1404 "BackupName": backup_name,
1405 "BackupStatus": "AVAILABLE",
1406 "BackupType": "USER",
1407 "BackupCreationDateTime": now.timestamp() as f64,
1408 "BackupSizeBytes": 0
1409 }
1410 }))
1411 }
1412
1413 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1414 let body = Self::parse_body(req)?;
1415 let backup_arn = require_str(&body, "BackupArn")?;
1416
1417 let mut state = self.state.write();
1418 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1419 AwsServiceError::aws_error(
1420 StatusCode::BAD_REQUEST,
1421 "BackupNotFoundException",
1422 format!("Backup not found: {backup_arn}"),
1423 )
1424 })?;
1425
1426 Self::ok_json(json!({
1427 "BackupDescription": {
1428 "BackupDetails": {
1429 "BackupArn": backup.backup_arn,
1430 "BackupName": backup.backup_name,
1431 "BackupStatus": "DELETED",
1432 "BackupType": backup.backup_type,
1433 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1434 "BackupSizeBytes": backup.size_bytes
1435 },
1436 "SourceTableDetails": {
1437 "TableName": backup.table_name,
1438 "TableArn": backup.table_arn,
1439 "TableId": uuid::Uuid::new_v4().to_string(),
1440 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1441 "AttributeName": ks.attribute_name,
1442 "KeyType": ks.key_type
1443 })).collect::<Vec<_>>(),
1444 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1445 "ProvisionedThroughput": {
1446 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1447 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1448 },
1449 "ItemCount": backup.item_count,
1450 "BillingMode": backup.billing_mode,
1451 "TableSizeBytes": backup.size_bytes
1452 },
1453 "SourceTableFeatureDetails": {}
1454 }
1455 }))
1456 }
1457
1458 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1459 let body = Self::parse_body(req)?;
1460 let backup_arn = require_str(&body, "BackupArn")?;
1461
1462 let state = self.state.read();
1463 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1464 AwsServiceError::aws_error(
1465 StatusCode::BAD_REQUEST,
1466 "BackupNotFoundException",
1467 format!("Backup not found: {backup_arn}"),
1468 )
1469 })?;
1470
1471 Self::ok_json(json!({
1472 "BackupDescription": {
1473 "BackupDetails": {
1474 "BackupArn": backup.backup_arn,
1475 "BackupName": backup.backup_name,
1476 "BackupStatus": backup.backup_status,
1477 "BackupType": backup.backup_type,
1478 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1479 "BackupSizeBytes": backup.size_bytes
1480 },
1481 "SourceTableDetails": {
1482 "TableName": backup.table_name,
1483 "TableArn": backup.table_arn,
1484 "TableId": uuid::Uuid::new_v4().to_string(),
1485 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1486 "AttributeName": ks.attribute_name,
1487 "KeyType": ks.key_type
1488 })).collect::<Vec<_>>(),
1489 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1490 "ProvisionedThroughput": {
1491 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1492 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1493 },
1494 "ItemCount": backup.item_count,
1495 "BillingMode": backup.billing_mode,
1496 "TableSizeBytes": backup.size_bytes
1497 },
1498 "SourceTableFeatureDetails": {}
1499 }
1500 }))
1501 }
1502
1503 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1504 let body = Self::parse_body(req)?;
1505 let table_name = body["TableName"].as_str();
1506
1507 let state = self.state.read();
1508 let summaries: Vec<Value> = state
1509 .backups
1510 .values()
1511 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1512 .map(|b| {
1513 json!({
1514 "TableName": b.table_name,
1515 "TableArn": b.table_arn,
1516 "BackupArn": b.backup_arn,
1517 "BackupName": b.backup_name,
1518 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1519 "BackupStatus": b.backup_status,
1520 "BackupType": b.backup_type,
1521 "BackupSizeBytes": b.size_bytes
1522 })
1523 })
1524 .collect();
1525
1526 Self::ok_json(json!({
1527 "BackupSummaries": summaries
1528 }))
1529 }
1530
1531 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1532 let body = Self::parse_body(req)?;
1533 let backup_arn = require_str(&body, "BackupArn")?;
1534 let target_table_name = require_str(&body, "TargetTableName")?;
1535
1536 let mut state = self.state.write();
1537 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1538 AwsServiceError::aws_error(
1539 StatusCode::BAD_REQUEST,
1540 "BackupNotFoundException",
1541 format!("Backup not found: {backup_arn}"),
1542 )
1543 })?;
1544
1545 if state.tables.contains_key(target_table_name) {
1546 return Err(AwsServiceError::aws_error(
1547 StatusCode::BAD_REQUEST,
1548 "TableAlreadyExistsException",
1549 format!("Table already exists: {target_table_name}"),
1550 ));
1551 }
1552
1553 let now = Utc::now();
1554 let arn = format!(
1555 "arn:aws:dynamodb:{}:{}:table/{}",
1556 state.region, state.account_id, target_table_name
1557 );
1558
1559 let table = DynamoTable {
1560 name: target_table_name.to_string(),
1561 arn: arn.clone(),
1562 key_schema: backup.key_schema.clone(),
1563 attribute_definitions: backup.attribute_definitions.clone(),
1564 provisioned_throughput: backup.provisioned_throughput.clone(),
1565 items: Vec::new(),
1566 gsi: Vec::new(),
1567 lsi: Vec::new(),
1568 tags: HashMap::new(),
1569 created_at: now,
1570 status: "ACTIVE".to_string(),
1571 item_count: 0,
1572 size_bytes: 0,
1573 billing_mode: backup.billing_mode.clone(),
1574 ttl_attribute: None,
1575 ttl_enabled: false,
1576 resource_policy: None,
1577 pitr_enabled: false,
1578 kinesis_destinations: Vec::new(),
1579 contributor_insights_status: "DISABLED".to_string(),
1580 };
1581
1582 let desc = build_table_description(&table);
1583 state.tables.insert(target_table_name.to_string(), table);
1584
1585 Self::ok_json(json!({
1586 "TableDescription": desc
1587 }))
1588 }
1589
1590 fn restore_table_to_point_in_time(
1591 &self,
1592 req: &AwsRequest,
1593 ) -> Result<AwsResponse, AwsServiceError> {
1594 let body = Self::parse_body(req)?;
1595 let target_table_name = require_str(&body, "TargetTableName")?;
1596 let source_table_name = body["SourceTableName"].as_str();
1597 let source_table_arn = body["SourceTableArn"].as_str();
1598
1599 let mut state = self.state.write();
1600
1601 let source = if let Some(name) = source_table_name {
1603 get_table(&state.tables, name)?.clone()
1604 } else if let Some(arn) = source_table_arn {
1605 find_table_by_arn(&state.tables, arn)?.clone()
1606 } else {
1607 return Err(AwsServiceError::aws_error(
1608 StatusCode::BAD_REQUEST,
1609 "ValidationException",
1610 "SourceTableName or SourceTableArn is required",
1611 ));
1612 };
1613
1614 if state.tables.contains_key(target_table_name) {
1615 return Err(AwsServiceError::aws_error(
1616 StatusCode::BAD_REQUEST,
1617 "TableAlreadyExistsException",
1618 format!("Table already exists: {target_table_name}"),
1619 ));
1620 }
1621
1622 let now = Utc::now();
1623 let arn = format!(
1624 "arn:aws:dynamodb:{}:{}:table/{}",
1625 state.region, state.account_id, target_table_name
1626 );
1627
1628 let table = DynamoTable {
1629 name: target_table_name.to_string(),
1630 arn: arn.clone(),
1631 key_schema: source.key_schema.clone(),
1632 attribute_definitions: source.attribute_definitions.clone(),
1633 provisioned_throughput: source.provisioned_throughput.clone(),
1634 items: Vec::new(),
1635 gsi: Vec::new(),
1636 lsi: Vec::new(),
1637 tags: HashMap::new(),
1638 created_at: now,
1639 status: "ACTIVE".to_string(),
1640 item_count: 0,
1641 size_bytes: 0,
1642 billing_mode: source.billing_mode.clone(),
1643 ttl_attribute: None,
1644 ttl_enabled: false,
1645 resource_policy: None,
1646 pitr_enabled: false,
1647 kinesis_destinations: Vec::new(),
1648 contributor_insights_status: "DISABLED".to_string(),
1649 };
1650
1651 let desc = build_table_description(&table);
1652 state.tables.insert(target_table_name.to_string(), table);
1653
1654 Self::ok_json(json!({
1655 "TableDescription": desc
1656 }))
1657 }
1658
1659 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1660 let body = Self::parse_body(req)?;
1661 let table_name = require_str(&body, "TableName")?;
1662
1663 let pitr_spec = body["PointInTimeRecoverySpecification"]
1664 .as_object()
1665 .ok_or_else(|| {
1666 AwsServiceError::aws_error(
1667 StatusCode::BAD_REQUEST,
1668 "ValidationException",
1669 "PointInTimeRecoverySpecification is required",
1670 )
1671 })?;
1672 let enabled = pitr_spec
1673 .get("PointInTimeRecoveryEnabled")
1674 .and_then(|v| v.as_bool())
1675 .unwrap_or(false);
1676
1677 let mut state = self.state.write();
1678 let table = get_table_mut(&mut state.tables, table_name)?;
1679 table.pitr_enabled = enabled;
1680
1681 let status = if enabled { "ENABLED" } else { "DISABLED" };
1682 Self::ok_json(json!({
1683 "ContinuousBackupsDescription": {
1684 "ContinuousBackupsStatus": status,
1685 "PointInTimeRecoveryDescription": {
1686 "PointInTimeRecoveryStatus": status,
1687 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1688 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1689 }
1690 }
1691 }))
1692 }
1693
1694 fn describe_continuous_backups(
1695 &self,
1696 req: &AwsRequest,
1697 ) -> Result<AwsResponse, AwsServiceError> {
1698 let body = Self::parse_body(req)?;
1699 let table_name = require_str(&body, "TableName")?;
1700
1701 let state = self.state.read();
1702 let table = get_table(&state.tables, table_name)?;
1703
1704 let status = if table.pitr_enabled {
1705 "ENABLED"
1706 } else {
1707 "DISABLED"
1708 };
1709 Self::ok_json(json!({
1710 "ContinuousBackupsDescription": {
1711 "ContinuousBackupsStatus": status,
1712 "PointInTimeRecoveryDescription": {
1713 "PointInTimeRecoveryStatus": status,
1714 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1715 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1716 }
1717 }
1718 }))
1719 }
1720
1721 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1724 let body = Self::parse_body(req)?;
1725 let global_table_name = require_str(&body, "GlobalTableName")?;
1726
1727 let replication_group = body["ReplicationGroup"]
1728 .as_array()
1729 .ok_or_else(|| {
1730 AwsServiceError::aws_error(
1731 StatusCode::BAD_REQUEST,
1732 "ValidationException",
1733 "ReplicationGroup is required",
1734 )
1735 })?
1736 .iter()
1737 .filter_map(|r| {
1738 r["RegionName"].as_str().map(|rn| ReplicaDescription {
1739 region_name: rn.to_string(),
1740 replica_status: "ACTIVE".to_string(),
1741 })
1742 })
1743 .collect::<Vec<_>>();
1744
1745 let mut state = self.state.write();
1746
1747 if state.global_tables.contains_key(global_table_name) {
1748 return Err(AwsServiceError::aws_error(
1749 StatusCode::BAD_REQUEST,
1750 "GlobalTableAlreadyExistsException",
1751 format!("Global table already exists: {global_table_name}"),
1752 ));
1753 }
1754
1755 let arn = format!(
1756 "arn:aws:dynamodb::{}:global-table/{}",
1757 state.account_id, global_table_name
1758 );
1759 let now = Utc::now();
1760
1761 let gt = GlobalTableDescription {
1762 global_table_name: global_table_name.to_string(),
1763 global_table_arn: arn.clone(),
1764 global_table_status: "ACTIVE".to_string(),
1765 creation_date: now,
1766 replication_group: replication_group.clone(),
1767 };
1768
1769 state
1770 .global_tables
1771 .insert(global_table_name.to_string(), gt);
1772
1773 Self::ok_json(json!({
1774 "GlobalTableDescription": {
1775 "GlobalTableName": global_table_name,
1776 "GlobalTableArn": arn,
1777 "GlobalTableStatus": "ACTIVE",
1778 "CreationDateTime": now.timestamp() as f64,
1779 "ReplicationGroup": replication_group.iter().map(|r| json!({
1780 "RegionName": r.region_name,
1781 "ReplicaStatus": r.replica_status
1782 })).collect::<Vec<_>>()
1783 }
1784 }))
1785 }
1786
1787 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1788 let body = Self::parse_body(req)?;
1789 let global_table_name = require_str(&body, "GlobalTableName")?;
1790
1791 let state = self.state.read();
1792 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1793 AwsServiceError::aws_error(
1794 StatusCode::BAD_REQUEST,
1795 "GlobalTableNotFoundException",
1796 format!("Global table not found: {global_table_name}"),
1797 )
1798 })?;
1799
1800 Self::ok_json(json!({
1801 "GlobalTableDescription": {
1802 "GlobalTableName": gt.global_table_name,
1803 "GlobalTableArn": gt.global_table_arn,
1804 "GlobalTableStatus": gt.global_table_status,
1805 "CreationDateTime": gt.creation_date.timestamp() as f64,
1806 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1807 "RegionName": r.region_name,
1808 "ReplicaStatus": r.replica_status
1809 })).collect::<Vec<_>>()
1810 }
1811 }))
1812 }
1813
1814 fn describe_global_table_settings(
1815 &self,
1816 req: &AwsRequest,
1817 ) -> Result<AwsResponse, AwsServiceError> {
1818 let body = Self::parse_body(req)?;
1819 let global_table_name = require_str(&body, "GlobalTableName")?;
1820
1821 let state = self.state.read();
1822 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1823 AwsServiceError::aws_error(
1824 StatusCode::BAD_REQUEST,
1825 "GlobalTableNotFoundException",
1826 format!("Global table not found: {global_table_name}"),
1827 )
1828 })?;
1829
1830 let replica_settings: Vec<Value> = gt
1831 .replication_group
1832 .iter()
1833 .map(|r| {
1834 json!({
1835 "RegionName": r.region_name,
1836 "ReplicaStatus": r.replica_status,
1837 "ReplicaProvisionedReadCapacityUnits": 0,
1838 "ReplicaProvisionedWriteCapacityUnits": 0
1839 })
1840 })
1841 .collect();
1842
1843 Self::ok_json(json!({
1844 "GlobalTableName": gt.global_table_name,
1845 "ReplicaSettings": replica_settings
1846 }))
1847 }
1848
1849 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1850 let body = Self::parse_body(req)?;
1851 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1852
1853 let state = self.state.read();
1854 let tables: Vec<Value> = state
1855 .global_tables
1856 .values()
1857 .take(limit)
1858 .map(|gt| {
1859 json!({
1860 "GlobalTableName": gt.global_table_name,
1861 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1862 "RegionName": r.region_name
1863 })).collect::<Vec<_>>()
1864 })
1865 })
1866 .collect();
1867
1868 Self::ok_json(json!({
1869 "GlobalTables": tables
1870 }))
1871 }
1872
1873 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1874 let body = Self::parse_body(req)?;
1875 let global_table_name = require_str(&body, "GlobalTableName")?;
1876
1877 let mut state = self.state.write();
1878 let gt = state
1879 .global_tables
1880 .get_mut(global_table_name)
1881 .ok_or_else(|| {
1882 AwsServiceError::aws_error(
1883 StatusCode::BAD_REQUEST,
1884 "GlobalTableNotFoundException",
1885 format!("Global table not found: {global_table_name}"),
1886 )
1887 })?;
1888
1889 if let Some(updates) = body["ReplicaUpdates"].as_array() {
1890 for update in updates {
1891 if let Some(create) = update["Create"].as_object() {
1892 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
1893 gt.replication_group.push(ReplicaDescription {
1894 region_name: region.to_string(),
1895 replica_status: "ACTIVE".to_string(),
1896 });
1897 }
1898 }
1899 if let Some(delete) = update["Delete"].as_object() {
1900 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
1901 gt.replication_group.retain(|r| r.region_name != region);
1902 }
1903 }
1904 }
1905 }
1906
1907 Self::ok_json(json!({
1908 "GlobalTableDescription": {
1909 "GlobalTableName": gt.global_table_name,
1910 "GlobalTableArn": gt.global_table_arn,
1911 "GlobalTableStatus": gt.global_table_status,
1912 "CreationDateTime": gt.creation_date.timestamp() as f64,
1913 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1914 "RegionName": r.region_name,
1915 "ReplicaStatus": r.replica_status
1916 })).collect::<Vec<_>>()
1917 }
1918 }))
1919 }
1920
1921 fn update_global_table_settings(
1922 &self,
1923 req: &AwsRequest,
1924 ) -> Result<AwsResponse, AwsServiceError> {
1925 let body = Self::parse_body(req)?;
1926 let global_table_name = require_str(&body, "GlobalTableName")?;
1927
1928 let state = self.state.read();
1929 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1930 AwsServiceError::aws_error(
1931 StatusCode::BAD_REQUEST,
1932 "GlobalTableNotFoundException",
1933 format!("Global table not found: {global_table_name}"),
1934 )
1935 })?;
1936
1937 let replica_settings: Vec<Value> = gt
1938 .replication_group
1939 .iter()
1940 .map(|r| {
1941 json!({
1942 "RegionName": r.region_name,
1943 "ReplicaStatus": r.replica_status,
1944 "ReplicaProvisionedReadCapacityUnits": 0,
1945 "ReplicaProvisionedWriteCapacityUnits": 0
1946 })
1947 })
1948 .collect();
1949
1950 Self::ok_json(json!({
1951 "GlobalTableName": gt.global_table_name,
1952 "ReplicaSettings": replica_settings
1953 }))
1954 }
1955
1956 fn describe_table_replica_auto_scaling(
1957 &self,
1958 req: &AwsRequest,
1959 ) -> Result<AwsResponse, AwsServiceError> {
1960 let body = Self::parse_body(req)?;
1961 let table_name = require_str(&body, "TableName")?;
1962
1963 let state = self.state.read();
1964 let table = get_table(&state.tables, table_name)?;
1965
1966 Self::ok_json(json!({
1967 "TableAutoScalingDescription": {
1968 "TableName": table.name,
1969 "TableStatus": table.status,
1970 "Replicas": []
1971 }
1972 }))
1973 }
1974
1975 fn update_table_replica_auto_scaling(
1976 &self,
1977 req: &AwsRequest,
1978 ) -> Result<AwsResponse, AwsServiceError> {
1979 let body = Self::parse_body(req)?;
1980 let table_name = require_str(&body, "TableName")?;
1981
1982 let state = self.state.read();
1983 let table = get_table(&state.tables, table_name)?;
1984
1985 Self::ok_json(json!({
1986 "TableAutoScalingDescription": {
1987 "TableName": table.name,
1988 "TableStatus": table.status,
1989 "Replicas": []
1990 }
1991 }))
1992 }
1993
1994 fn enable_kinesis_streaming_destination(
1997 &self,
1998 req: &AwsRequest,
1999 ) -> Result<AwsResponse, AwsServiceError> {
2000 let body = Self::parse_body(req)?;
2001 let table_name = require_str(&body, "TableName")?;
2002 let stream_arn = require_str(&body, "StreamArn")?;
2003 let precision = body["EnableKinesisStreamingConfiguration"]
2004 ["ApproximateCreationDateTimePrecision"]
2005 .as_str()
2006 .unwrap_or("MILLISECOND");
2007
2008 let mut state = self.state.write();
2009 let table = get_table_mut(&mut state.tables, table_name)?;
2010
2011 table.kinesis_destinations.push(KinesisDestination {
2012 stream_arn: stream_arn.to_string(),
2013 destination_status: "ACTIVE".to_string(),
2014 approximate_creation_date_time_precision: precision.to_string(),
2015 });
2016
2017 Self::ok_json(json!({
2018 "TableName": table_name,
2019 "StreamArn": stream_arn,
2020 "DestinationStatus": "ACTIVE",
2021 "EnableKinesisStreamingConfiguration": {
2022 "ApproximateCreationDateTimePrecision": precision
2023 }
2024 }))
2025 }
2026
2027 fn disable_kinesis_streaming_destination(
2028 &self,
2029 req: &AwsRequest,
2030 ) -> Result<AwsResponse, AwsServiceError> {
2031 let body = Self::parse_body(req)?;
2032 let table_name = require_str(&body, "TableName")?;
2033 let stream_arn = require_str(&body, "StreamArn")?;
2034
2035 let mut state = self.state.write();
2036 let table = get_table_mut(&mut state.tables, table_name)?;
2037
2038 if let Some(dest) = table
2039 .kinesis_destinations
2040 .iter_mut()
2041 .find(|d| d.stream_arn == stream_arn)
2042 {
2043 dest.destination_status = "DISABLED".to_string();
2044 }
2045
2046 Self::ok_json(json!({
2047 "TableName": table_name,
2048 "StreamArn": stream_arn,
2049 "DestinationStatus": "DISABLED"
2050 }))
2051 }
2052
2053 fn describe_kinesis_streaming_destination(
2054 &self,
2055 req: &AwsRequest,
2056 ) -> Result<AwsResponse, AwsServiceError> {
2057 let body = Self::parse_body(req)?;
2058 let table_name = require_str(&body, "TableName")?;
2059
2060 let state = self.state.read();
2061 let table = get_table(&state.tables, table_name)?;
2062
2063 let destinations: Vec<Value> = table
2064 .kinesis_destinations
2065 .iter()
2066 .map(|d| {
2067 json!({
2068 "StreamArn": d.stream_arn,
2069 "DestinationStatus": d.destination_status,
2070 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2071 })
2072 })
2073 .collect();
2074
2075 Self::ok_json(json!({
2076 "TableName": table_name,
2077 "KinesisDataStreamDestinations": destinations
2078 }))
2079 }
2080
2081 fn update_kinesis_streaming_destination(
2082 &self,
2083 req: &AwsRequest,
2084 ) -> Result<AwsResponse, AwsServiceError> {
2085 let body = Self::parse_body(req)?;
2086 let table_name = require_str(&body, "TableName")?;
2087 let stream_arn = require_str(&body, "StreamArn")?;
2088 let precision = body["UpdateKinesisStreamingConfiguration"]
2089 ["ApproximateCreationDateTimePrecision"]
2090 .as_str()
2091 .unwrap_or("MILLISECOND");
2092
2093 let mut state = self.state.write();
2094 let table = get_table_mut(&mut state.tables, table_name)?;
2095
2096 if let Some(dest) = table
2097 .kinesis_destinations
2098 .iter_mut()
2099 .find(|d| d.stream_arn == stream_arn)
2100 {
2101 dest.approximate_creation_date_time_precision = precision.to_string();
2102 }
2103
2104 Self::ok_json(json!({
2105 "TableName": table_name,
2106 "StreamArn": stream_arn,
2107 "DestinationStatus": "ACTIVE",
2108 "UpdateKinesisStreamingConfiguration": {
2109 "ApproximateCreationDateTimePrecision": precision
2110 }
2111 }))
2112 }
2113
2114 fn describe_contributor_insights(
2117 &self,
2118 req: &AwsRequest,
2119 ) -> Result<AwsResponse, AwsServiceError> {
2120 let body = Self::parse_body(req)?;
2121 let table_name = require_str(&body, "TableName")?;
2122 let index_name = body["IndexName"].as_str();
2123
2124 let state = self.state.read();
2125 let table = get_table(&state.tables, table_name)?;
2126
2127 let mut result = json!({
2128 "TableName": table_name,
2129 "ContributorInsightsStatus": table.contributor_insights_status,
2130 "ContributorInsightsRuleList": []
2131 });
2132 if let Some(idx) = index_name {
2133 result["IndexName"] = json!(idx);
2134 }
2135
2136 Self::ok_json(result)
2137 }
2138
2139 fn update_contributor_insights(
2140 &self,
2141 req: &AwsRequest,
2142 ) -> Result<AwsResponse, AwsServiceError> {
2143 let body = Self::parse_body(req)?;
2144 let table_name = require_str(&body, "TableName")?;
2145 let action = require_str(&body, "ContributorInsightsAction")?;
2146 let index_name = body["IndexName"].as_str();
2147
2148 let mut state = self.state.write();
2149 let table = get_table_mut(&mut state.tables, table_name)?;
2150
2151 let status = match action {
2152 "ENABLE" => "ENABLED",
2153 "DISABLE" => "DISABLED",
2154 _ => {
2155 return Err(AwsServiceError::aws_error(
2156 StatusCode::BAD_REQUEST,
2157 "ValidationException",
2158 format!("Invalid ContributorInsightsAction: {action}"),
2159 ))
2160 }
2161 };
2162 table.contributor_insights_status = status.to_string();
2163
2164 let mut result = json!({
2165 "TableName": table_name,
2166 "ContributorInsightsStatus": status
2167 });
2168 if let Some(idx) = index_name {
2169 result["IndexName"] = json!(idx);
2170 }
2171
2172 Self::ok_json(result)
2173 }
2174
2175 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2176 let body = Self::parse_body(req)?;
2177 let table_name = body["TableName"].as_str();
2178
2179 let state = self.state.read();
2180 let summaries: Vec<Value> = state
2181 .tables
2182 .values()
2183 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2184 .map(|t| {
2185 json!({
2186 "TableName": t.name,
2187 "ContributorInsightsStatus": t.contributor_insights_status
2188 })
2189 })
2190 .collect();
2191
2192 Self::ok_json(json!({
2193 "ContributorInsightsSummaries": summaries
2194 }))
2195 }
2196
2197 fn export_table_to_point_in_time(
2200 &self,
2201 req: &AwsRequest,
2202 ) -> Result<AwsResponse, AwsServiceError> {
2203 let body = Self::parse_body(req)?;
2204 let table_arn = require_str(&body, "TableArn")?;
2205 let s3_bucket = require_str(&body, "S3Bucket")?;
2206 let s3_prefix = body["S3Prefix"].as_str();
2207 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2208
2209 let state = self.state.read();
2210 find_table_by_arn(&state.tables, table_arn)?;
2212
2213 let now = Utc::now();
2214 let export_arn = format!(
2215 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2216 state.region,
2217 state.account_id,
2218 table_arn.rsplit('/').next().unwrap_or("unknown"),
2219 uuid::Uuid::new_v4()
2220 );
2221
2222 let export = ExportDescription {
2223 export_arn: export_arn.clone(),
2224 export_status: "COMPLETED".to_string(),
2225 table_arn: table_arn.to_string(),
2226 s3_bucket: s3_bucket.to_string(),
2227 s3_prefix: s3_prefix.map(|s| s.to_string()),
2228 export_format: export_format.to_string(),
2229 start_time: now,
2230 end_time: now,
2231 export_time: now,
2232 item_count: 0,
2233 billed_size_bytes: 0,
2234 };
2235
2236 drop(state);
2237 let mut state = self.state.write();
2238 state.exports.insert(export_arn.clone(), export);
2239
2240 Self::ok_json(json!({
2241 "ExportDescription": {
2242 "ExportArn": export_arn,
2243 "ExportStatus": "COMPLETED",
2244 "TableArn": table_arn,
2245 "S3Bucket": s3_bucket,
2246 "S3Prefix": s3_prefix,
2247 "ExportFormat": export_format,
2248 "StartTime": now.timestamp() as f64,
2249 "EndTime": now.timestamp() as f64,
2250 "ExportTime": now.timestamp() as f64,
2251 "ItemCount": 0,
2252 "BilledSizeBytes": 0
2253 }
2254 }))
2255 }
2256
2257 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2258 let body = Self::parse_body(req)?;
2259 let export_arn = require_str(&body, "ExportArn")?;
2260
2261 let state = self.state.read();
2262 let export = state.exports.get(export_arn).ok_or_else(|| {
2263 AwsServiceError::aws_error(
2264 StatusCode::BAD_REQUEST,
2265 "ExportNotFoundException",
2266 format!("Export not found: {export_arn}"),
2267 )
2268 })?;
2269
2270 Self::ok_json(json!({
2271 "ExportDescription": {
2272 "ExportArn": export.export_arn,
2273 "ExportStatus": export.export_status,
2274 "TableArn": export.table_arn,
2275 "S3Bucket": export.s3_bucket,
2276 "S3Prefix": export.s3_prefix,
2277 "ExportFormat": export.export_format,
2278 "StartTime": export.start_time.timestamp() as f64,
2279 "EndTime": export.end_time.timestamp() as f64,
2280 "ExportTime": export.export_time.timestamp() as f64,
2281 "ItemCount": export.item_count,
2282 "BilledSizeBytes": export.billed_size_bytes
2283 }
2284 }))
2285 }
2286
2287 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2288 let body = Self::parse_body(req)?;
2289 let table_arn = body["TableArn"].as_str();
2290
2291 let state = self.state.read();
2292 let summaries: Vec<Value> = state
2293 .exports
2294 .values()
2295 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2296 .map(|e| {
2297 json!({
2298 "ExportArn": e.export_arn,
2299 "ExportStatus": e.export_status,
2300 "TableArn": e.table_arn
2301 })
2302 })
2303 .collect();
2304
2305 Self::ok_json(json!({
2306 "ExportSummaries": summaries
2307 }))
2308 }
2309
2310 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2311 let body = Self::parse_body(req)?;
2312 let input_format = require_str(&body, "InputFormat")?;
2313 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2314 AwsServiceError::aws_error(
2315 StatusCode::BAD_REQUEST,
2316 "ValidationException",
2317 "S3BucketSource is required",
2318 )
2319 })?;
2320 let s3_bucket = s3_source
2321 .get("S3Bucket")
2322 .and_then(|v| v.as_str())
2323 .unwrap_or("");
2324
2325 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2326 AwsServiceError::aws_error(
2327 StatusCode::BAD_REQUEST,
2328 "ValidationException",
2329 "TableCreationParameters is required",
2330 )
2331 })?;
2332 let table_name = table_params
2333 .get("TableName")
2334 .and_then(|v| v.as_str())
2335 .ok_or_else(|| {
2336 AwsServiceError::aws_error(
2337 StatusCode::BAD_REQUEST,
2338 "ValidationException",
2339 "TableCreationParameters.TableName is required",
2340 )
2341 })?;
2342
2343 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2344 let attribute_definitions = parse_attribute_definitions(
2345 table_params
2346 .get("AttributeDefinitions")
2347 .unwrap_or(&Value::Null),
2348 )?;
2349
2350 let mut state = self.state.write();
2351
2352 if state.tables.contains_key(table_name) {
2353 return Err(AwsServiceError::aws_error(
2354 StatusCode::BAD_REQUEST,
2355 "ResourceInUseException",
2356 format!("Table already exists: {table_name}"),
2357 ));
2358 }
2359
2360 let now = Utc::now();
2361 let table_arn = format!(
2362 "arn:aws:dynamodb:{}:{}:table/{}",
2363 state.region, state.account_id, table_name
2364 );
2365 let import_arn = format!(
2366 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2367 state.region,
2368 state.account_id,
2369 table_name,
2370 uuid::Uuid::new_v4()
2371 );
2372
2373 let table = DynamoTable {
2374 name: table_name.to_string(),
2375 arn: table_arn.clone(),
2376 key_schema,
2377 attribute_definitions,
2378 provisioned_throughput: ProvisionedThroughput {
2379 read_capacity_units: 0,
2380 write_capacity_units: 0,
2381 },
2382 items: Vec::new(),
2383 gsi: Vec::new(),
2384 lsi: Vec::new(),
2385 tags: HashMap::new(),
2386 created_at: now,
2387 status: "ACTIVE".to_string(),
2388 item_count: 0,
2389 size_bytes: 0,
2390 billing_mode: "PAY_PER_REQUEST".to_string(),
2391 ttl_attribute: None,
2392 ttl_enabled: false,
2393 resource_policy: None,
2394 pitr_enabled: false,
2395 kinesis_destinations: Vec::new(),
2396 contributor_insights_status: "DISABLED".to_string(),
2397 };
2398 state.tables.insert(table_name.to_string(), table);
2399
2400 let import_desc = ImportDescription {
2401 import_arn: import_arn.clone(),
2402 import_status: "COMPLETED".to_string(),
2403 table_arn: table_arn.clone(),
2404 table_name: table_name.to_string(),
2405 s3_bucket_source: s3_bucket.to_string(),
2406 input_format: input_format.to_string(),
2407 start_time: now,
2408 end_time: now,
2409 processed_item_count: 0,
2410 processed_size_bytes: 0,
2411 };
2412 state.imports.insert(import_arn.clone(), import_desc);
2413
2414 Self::ok_json(json!({
2415 "ImportTableDescription": {
2416 "ImportArn": import_arn,
2417 "ImportStatus": "COMPLETED",
2418 "TableArn": table_arn,
2419 "TableId": uuid::Uuid::new_v4().to_string(),
2420 "S3BucketSource": {
2421 "S3Bucket": s3_bucket
2422 },
2423 "InputFormat": input_format,
2424 "TableCreationParameters": {
2425 "TableName": table_name
2426 },
2427 "StartTime": now.timestamp() as f64,
2428 "EndTime": now.timestamp() as f64,
2429 "ProcessedItemCount": 0,
2430 "ProcessedSizeBytes": 0
2431 }
2432 }))
2433 }
2434
2435 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2436 let body = Self::parse_body(req)?;
2437 let import_arn = require_str(&body, "ImportArn")?;
2438
2439 let state = self.state.read();
2440 let import = state.imports.get(import_arn).ok_or_else(|| {
2441 AwsServiceError::aws_error(
2442 StatusCode::BAD_REQUEST,
2443 "ImportNotFoundException",
2444 format!("Import not found: {import_arn}"),
2445 )
2446 })?;
2447
2448 Self::ok_json(json!({
2449 "ImportTableDescription": {
2450 "ImportArn": import.import_arn,
2451 "ImportStatus": import.import_status,
2452 "TableArn": import.table_arn,
2453 "S3BucketSource": {
2454 "S3Bucket": import.s3_bucket_source
2455 },
2456 "InputFormat": import.input_format,
2457 "StartTime": import.start_time.timestamp() as f64,
2458 "EndTime": import.end_time.timestamp() as f64,
2459 "ProcessedItemCount": import.processed_item_count,
2460 "ProcessedSizeBytes": import.processed_size_bytes
2461 }
2462 }))
2463 }
2464
2465 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2466 let body = Self::parse_body(req)?;
2467 let table_arn = body["TableArn"].as_str();
2468
2469 let state = self.state.read();
2470 let summaries: Vec<Value> = state
2471 .imports
2472 .values()
2473 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2474 .map(|i| {
2475 json!({
2476 "ImportArn": i.import_arn,
2477 "ImportStatus": i.import_status,
2478 "TableArn": i.table_arn
2479 })
2480 })
2481 .collect();
2482
2483 Self::ok_json(json!({
2484 "ImportSummaryList": summaries
2485 }))
2486 }
2487}
2488
2489#[async_trait]
2490impl AwsService for DynamoDbService {
2491 fn service_name(&self) -> &str {
2492 "dynamodb"
2493 }
2494
2495 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2496 match req.action.as_str() {
2497 "CreateTable" => self.create_table(&req),
2498 "DeleteTable" => self.delete_table(&req),
2499 "DescribeTable" => self.describe_table(&req),
2500 "ListTables" => self.list_tables(&req),
2501 "UpdateTable" => self.update_table(&req),
2502 "PutItem" => self.put_item(&req),
2503 "GetItem" => self.get_item(&req),
2504 "DeleteItem" => self.delete_item(&req),
2505 "UpdateItem" => self.update_item(&req),
2506 "Query" => self.query(&req),
2507 "Scan" => self.scan(&req),
2508 "BatchGetItem" => self.batch_get_item(&req),
2509 "BatchWriteItem" => self.batch_write_item(&req),
2510 "TagResource" => self.tag_resource(&req),
2511 "UntagResource" => self.untag_resource(&req),
2512 "ListTagsOfResource" => self.list_tags_of_resource(&req),
2513 "TransactGetItems" => self.transact_get_items(&req),
2514 "TransactWriteItems" => self.transact_write_items(&req),
2515 "ExecuteStatement" => self.execute_statement(&req),
2516 "BatchExecuteStatement" => self.batch_execute_statement(&req),
2517 "ExecuteTransaction" => self.execute_transaction(&req),
2518 "UpdateTimeToLive" => self.update_time_to_live(&req),
2519 "DescribeTimeToLive" => self.describe_time_to_live(&req),
2520 "PutResourcePolicy" => self.put_resource_policy(&req),
2521 "GetResourcePolicy" => self.get_resource_policy(&req),
2522 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2523 "DescribeEndpoints" => self.describe_endpoints(&req),
2525 "DescribeLimits" => self.describe_limits(&req),
2526 "CreateBackup" => self.create_backup(&req),
2528 "DeleteBackup" => self.delete_backup(&req),
2529 "DescribeBackup" => self.describe_backup(&req),
2530 "ListBackups" => self.list_backups(&req),
2531 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2532 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2533 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2534 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2535 "CreateGlobalTable" => self.create_global_table(&req),
2537 "DescribeGlobalTable" => self.describe_global_table(&req),
2538 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2539 "ListGlobalTables" => self.list_global_tables(&req),
2540 "UpdateGlobalTable" => self.update_global_table(&req),
2541 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2542 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2543 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2544 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2546 "DisableKinesisStreamingDestination" => {
2547 self.disable_kinesis_streaming_destination(&req)
2548 }
2549 "DescribeKinesisStreamingDestination" => {
2550 self.describe_kinesis_streaming_destination(&req)
2551 }
2552 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2553 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2555 "UpdateContributorInsights" => self.update_contributor_insights(&req),
2556 "ListContributorInsights" => self.list_contributor_insights(&req),
2557 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2559 "DescribeExport" => self.describe_export(&req),
2560 "ListExports" => self.list_exports(&req),
2561 "ImportTable" => self.import_table(&req),
2562 "DescribeImport" => self.describe_import(&req),
2563 "ListImports" => self.list_imports(&req),
2564 _ => Err(AwsServiceError::action_not_implemented(
2565 "dynamodb",
2566 &req.action,
2567 )),
2568 }
2569 }
2570
2571 fn supported_actions(&self) -> &[&str] {
2572 &[
2573 "CreateTable",
2574 "DeleteTable",
2575 "DescribeTable",
2576 "ListTables",
2577 "UpdateTable",
2578 "PutItem",
2579 "GetItem",
2580 "DeleteItem",
2581 "UpdateItem",
2582 "Query",
2583 "Scan",
2584 "BatchGetItem",
2585 "BatchWriteItem",
2586 "TagResource",
2587 "UntagResource",
2588 "ListTagsOfResource",
2589 "TransactGetItems",
2590 "TransactWriteItems",
2591 "ExecuteStatement",
2592 "BatchExecuteStatement",
2593 "ExecuteTransaction",
2594 "UpdateTimeToLive",
2595 "DescribeTimeToLive",
2596 "PutResourcePolicy",
2597 "GetResourcePolicy",
2598 "DeleteResourcePolicy",
2599 "DescribeEndpoints",
2600 "DescribeLimits",
2601 "CreateBackup",
2602 "DeleteBackup",
2603 "DescribeBackup",
2604 "ListBackups",
2605 "RestoreTableFromBackup",
2606 "RestoreTableToPointInTime",
2607 "UpdateContinuousBackups",
2608 "DescribeContinuousBackups",
2609 "CreateGlobalTable",
2610 "DescribeGlobalTable",
2611 "DescribeGlobalTableSettings",
2612 "ListGlobalTables",
2613 "UpdateGlobalTable",
2614 "UpdateGlobalTableSettings",
2615 "DescribeTableReplicaAutoScaling",
2616 "UpdateTableReplicaAutoScaling",
2617 "EnableKinesisStreamingDestination",
2618 "DisableKinesisStreamingDestination",
2619 "DescribeKinesisStreamingDestination",
2620 "UpdateKinesisStreamingDestination",
2621 "DescribeContributorInsights",
2622 "UpdateContributorInsights",
2623 "ListContributorInsights",
2624 "ExportTableToPointInTime",
2625 "DescribeExport",
2626 "ListExports",
2627 "ImportTable",
2628 "DescribeImport",
2629 "ListImports",
2630 ]
2631 }
2632}
2633
2634fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
2637 body[field].as_str().ok_or_else(|| {
2638 AwsServiceError::aws_error(
2639 StatusCode::BAD_REQUEST,
2640 "ValidationException",
2641 format!("{field} is required"),
2642 )
2643 })
2644}
2645
2646fn require_object(
2647 body: &Value,
2648 field: &str,
2649) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2650 let obj = body[field].as_object().ok_or_else(|| {
2651 AwsServiceError::aws_error(
2652 StatusCode::BAD_REQUEST,
2653 "ValidationException",
2654 format!("{field} is required"),
2655 )
2656 })?;
2657 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2658}
2659
2660fn get_table<'a>(
2661 tables: &'a HashMap<String, DynamoTable>,
2662 name: &str,
2663) -> Result<&'a DynamoTable, AwsServiceError> {
2664 tables.get(name).ok_or_else(|| {
2665 AwsServiceError::aws_error(
2666 StatusCode::BAD_REQUEST,
2667 "ResourceNotFoundException",
2668 format!("Requested resource not found: Table: {name} not found"),
2669 )
2670 })
2671}
2672
2673fn get_table_mut<'a>(
2674 tables: &'a mut HashMap<String, DynamoTable>,
2675 name: &str,
2676) -> Result<&'a mut DynamoTable, AwsServiceError> {
2677 tables.get_mut(name).ok_or_else(|| {
2678 AwsServiceError::aws_error(
2679 StatusCode::BAD_REQUEST,
2680 "ResourceNotFoundException",
2681 format!("Requested resource not found: Table: {name} not found"),
2682 )
2683 })
2684}
2685
2686fn find_table_by_arn<'a>(
2687 tables: &'a HashMap<String, DynamoTable>,
2688 arn: &str,
2689) -> Result<&'a DynamoTable, AwsServiceError> {
2690 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
2691 AwsServiceError::aws_error(
2692 StatusCode::BAD_REQUEST,
2693 "ResourceNotFoundException",
2694 format!("Requested resource not found: {arn}"),
2695 )
2696 })
2697}
2698
2699fn find_table_by_arn_mut<'a>(
2700 tables: &'a mut HashMap<String, DynamoTable>,
2701 arn: &str,
2702) -> Result<&'a mut DynamoTable, AwsServiceError> {
2703 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
2704 AwsServiceError::aws_error(
2705 StatusCode::BAD_REQUEST,
2706 "ResourceNotFoundException",
2707 format!("Requested resource not found: {arn}"),
2708 )
2709 })
2710}
2711
2712fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
2713 let arr = val.as_array().ok_or_else(|| {
2714 AwsServiceError::aws_error(
2715 StatusCode::BAD_REQUEST,
2716 "ValidationException",
2717 "KeySchema is required",
2718 )
2719 })?;
2720 Ok(arr
2721 .iter()
2722 .map(|elem| KeySchemaElement {
2723 attribute_name: elem["AttributeName"]
2724 .as_str()
2725 .unwrap_or_default()
2726 .to_string(),
2727 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
2728 })
2729 .collect())
2730}
2731
2732fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
2733 let arr = val.as_array().ok_or_else(|| {
2734 AwsServiceError::aws_error(
2735 StatusCode::BAD_REQUEST,
2736 "ValidationException",
2737 "AttributeDefinitions is required",
2738 )
2739 })?;
2740 Ok(arr
2741 .iter()
2742 .map(|elem| AttributeDefinition {
2743 attribute_name: elem["AttributeName"]
2744 .as_str()
2745 .unwrap_or_default()
2746 .to_string(),
2747 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
2748 })
2749 .collect())
2750}
2751
2752fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
2753 Ok(ProvisionedThroughput {
2754 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
2755 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
2756 })
2757}
2758
2759fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
2760 let Some(arr) = val.as_array() else {
2761 return Vec::new();
2762 };
2763 arr.iter()
2764 .filter_map(|g| {
2765 Some(GlobalSecondaryIndex {
2766 index_name: g["IndexName"].as_str()?.to_string(),
2767 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
2768 projection: parse_projection(&g["Projection"]),
2769 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
2770 .ok(),
2771 })
2772 })
2773 .collect()
2774}
2775
2776fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
2777 let Some(arr) = val.as_array() else {
2778 return Vec::new();
2779 };
2780 arr.iter()
2781 .filter_map(|l| {
2782 Some(LocalSecondaryIndex {
2783 index_name: l["IndexName"].as_str()?.to_string(),
2784 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
2785 projection: parse_projection(&l["Projection"]),
2786 })
2787 })
2788 .collect()
2789}
2790
2791fn parse_projection(val: &Value) -> Projection {
2792 Projection {
2793 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
2794 non_key_attributes: val["NonKeyAttributes"]
2795 .as_array()
2796 .map(|arr| {
2797 arr.iter()
2798 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2799 .collect()
2800 })
2801 .unwrap_or_default(),
2802 }
2803}
2804
2805fn parse_tags(val: &Value) -> HashMap<String, String> {
2806 let mut tags = HashMap::new();
2807 if let Some(arr) = val.as_array() {
2808 for tag in arr {
2809 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2810 tags.insert(k.to_string(), v.to_string());
2811 }
2812 }
2813 }
2814 tags
2815}
2816
2817fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
2818 let mut names = HashMap::new();
2819 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
2820 for (k, v) in obj {
2821 if let Some(s) = v.as_str() {
2822 names.insert(k.clone(), s.to_string());
2823 }
2824 }
2825 }
2826 names
2827}
2828
2829fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
2830 let mut values = HashMap::new();
2831 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
2832 for (k, v) in obj {
2833 values.insert(k.clone(), v.clone());
2834 }
2835 }
2836 values
2837}
2838
2839fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
2840 if name.starts_with('#') {
2841 expr_attr_names
2842 .get(name)
2843 .cloned()
2844 .unwrap_or_else(|| name.to_string())
2845 } else {
2846 name.to_string()
2847 }
2848}
2849
2850fn extract_key(
2851 table: &DynamoTable,
2852 item: &HashMap<String, AttributeValue>,
2853) -> HashMap<String, AttributeValue> {
2854 let mut key = HashMap::new();
2855 let hash_key = table.hash_key_name();
2856 if let Some(v) = item.get(hash_key) {
2857 key.insert(hash_key.to_string(), v.clone());
2858 }
2859 if let Some(range_key) = table.range_key_name() {
2860 if let Some(v) = item.get(range_key) {
2861 key.insert(range_key.to_string(), v.clone());
2862 }
2863 }
2864 key
2865}
2866
2867fn validate_key_in_item(
2868 table: &DynamoTable,
2869 item: &HashMap<String, AttributeValue>,
2870) -> Result<(), AwsServiceError> {
2871 let hash_key = table.hash_key_name();
2872 if !item.contains_key(hash_key) {
2873 return Err(AwsServiceError::aws_error(
2874 StatusCode::BAD_REQUEST,
2875 "ValidationException",
2876 format!("Missing the key {hash_key} in the item"),
2877 ));
2878 }
2879 if let Some(range_key) = table.range_key_name() {
2880 if !item.contains_key(range_key) {
2881 return Err(AwsServiceError::aws_error(
2882 StatusCode::BAD_REQUEST,
2883 "ValidationException",
2884 format!("Missing the key {range_key} in the item"),
2885 ));
2886 }
2887 }
2888 Ok(())
2889}
2890
2891fn validate_key_attributes_in_key(
2892 table: &DynamoTable,
2893 key: &HashMap<String, AttributeValue>,
2894) -> Result<(), AwsServiceError> {
2895 let hash_key = table.hash_key_name();
2896 if !key.contains_key(hash_key) {
2897 return Err(AwsServiceError::aws_error(
2898 StatusCode::BAD_REQUEST,
2899 "ValidationException",
2900 format!("Missing the key {hash_key} in the item"),
2901 ));
2902 }
2903 Ok(())
2904}
2905
2906fn project_item(
2907 item: &HashMap<String, AttributeValue>,
2908 body: &Value,
2909) -> HashMap<String, AttributeValue> {
2910 let projection = body["ProjectionExpression"].as_str();
2911 match projection {
2912 Some(proj) if !proj.is_empty() => {
2913 let expr_attr_names = parse_expression_attribute_names(body);
2914 let attrs: Vec<String> = proj
2915 .split(',')
2916 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
2917 .collect();
2918 let mut result = HashMap::new();
2919 for attr in &attrs {
2920 if let Some(v) = resolve_nested_path(item, attr) {
2921 insert_nested_value(&mut result, attr, v);
2922 }
2923 }
2924 result
2925 }
2926 _ => item.clone(),
2927 }
2928}
2929
2930fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
2933 let mut result = String::new();
2935 for (i, segment) in path.split('.').enumerate() {
2936 if i > 0 {
2937 result.push('.');
2938 }
2939 if let Some(bracket_pos) = segment.find('[') {
2941 let key_part = &segment[..bracket_pos];
2942 let index_part = &segment[bracket_pos..];
2943 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
2944 result.push_str(index_part);
2945 } else {
2946 result.push_str(&resolve_attr_name(segment, expr_attr_names));
2947 }
2948 }
2949 result
2950}
2951
2952fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
2954 let segments = parse_path_segments(path);
2955 if segments.is_empty() {
2956 return None;
2957 }
2958
2959 let first = &segments[0];
2960 let top_key = match first {
2961 PathSegment::Key(k) => k.as_str(),
2962 _ => return None,
2963 };
2964
2965 let mut current = item.get(top_key)?.clone();
2966
2967 for segment in &segments[1..] {
2968 match segment {
2969 PathSegment::Key(k) => {
2970 current = current.get("M")?.get(k)?.clone();
2972 }
2973 PathSegment::Index(idx) => {
2974 current = current.get("L")?.get(*idx)?.clone();
2976 }
2977 }
2978 }
2979
2980 Some(current)
2981}
2982
2983#[derive(Debug)]
2984enum PathSegment {
2985 Key(String),
2986 Index(usize),
2987}
2988
2989fn parse_path_segments(path: &str) -> Vec<PathSegment> {
2991 let mut segments = Vec::new();
2992 let mut current = String::new();
2993
2994 let chars: Vec<char> = path.chars().collect();
2995 let mut i = 0;
2996 while i < chars.len() {
2997 match chars[i] {
2998 '.' => {
2999 if !current.is_empty() {
3000 segments.push(PathSegment::Key(current.clone()));
3001 current.clear();
3002 }
3003 }
3004 '[' => {
3005 if !current.is_empty() {
3006 segments.push(PathSegment::Key(current.clone()));
3007 current.clear();
3008 }
3009 i += 1;
3010 let mut num = String::new();
3011 while i < chars.len() && chars[i] != ']' {
3012 num.push(chars[i]);
3013 i += 1;
3014 }
3015 if let Ok(idx) = num.parse::<usize>() {
3016 segments.push(PathSegment::Index(idx));
3017 }
3018 }
3020 c => {
3021 current.push(c);
3022 }
3023 }
3024 i += 1;
3025 }
3026 if !current.is_empty() {
3027 segments.push(PathSegment::Key(current));
3028 }
3029 segments
3030}
3031
3032fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3035 if !path.contains('.') && !path.contains('[') {
3037 result.insert(path.to_string(), value);
3038 return;
3039 }
3040
3041 let segments = parse_path_segments(path);
3042 if segments.is_empty() {
3043 return;
3044 }
3045
3046 let top_key = match &segments[0] {
3047 PathSegment::Key(k) => k.clone(),
3048 _ => return,
3049 };
3050
3051 if segments.len() == 1 {
3052 result.insert(top_key, value);
3053 return;
3054 }
3055
3056 let wrapped = wrap_value_in_path(&segments[1..], value);
3058 let existing = result.remove(&top_key);
3060 let merged = match existing {
3061 Some(existing) => merge_attribute_values(existing, wrapped),
3062 None => wrapped,
3063 };
3064 result.insert(top_key, merged);
3065}
3066
3067fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3069 if segments.is_empty() {
3070 return value;
3071 }
3072 let inner = wrap_value_in_path(&segments[1..], value);
3073 match &segments[0] {
3074 PathSegment::Key(k) => {
3075 json!({"M": {k.clone(): inner}})
3076 }
3077 PathSegment::Index(idx) => {
3078 let mut arr = vec![Value::Null; idx + 1];
3079 arr[*idx] = inner;
3080 json!({"L": arr})
3081 }
3082 }
3083}
3084
3085fn merge_attribute_values(a: Value, b: Value) -> Value {
3087 if let (Some(a_map), Some(b_map)) = (
3088 a.get("M").and_then(|v| v.as_object()),
3089 b.get("M").and_then(|v| v.as_object()),
3090 ) {
3091 let mut merged = a_map.clone();
3092 for (k, v) in b_map {
3093 if let Some(existing) = merged.get(k) {
3094 merged.insert(
3095 k.clone(),
3096 merge_attribute_values(existing.clone(), v.clone()),
3097 );
3098 } else {
3099 merged.insert(k.clone(), v.clone());
3100 }
3101 }
3102 json!({"M": merged})
3103 } else {
3104 b
3105 }
3106}
3107
3108fn evaluate_condition(
3109 condition: &str,
3110 existing: Option<&HashMap<String, AttributeValue>>,
3111 expr_attr_names: &HashMap<String, String>,
3112 expr_attr_values: &HashMap<String, Value>,
3113) -> Result<(), AwsServiceError> {
3114 let cond = condition.trim();
3115
3116 if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3117 let attr = resolve_attr_name(inner, expr_attr_names);
3118 match existing {
3119 Some(item) if item.contains_key(&attr) => {
3120 return Err(AwsServiceError::aws_error(
3121 StatusCode::BAD_REQUEST,
3122 "ConditionalCheckFailedException",
3123 "The conditional request failed",
3124 ));
3125 }
3126 _ => return Ok(()),
3127 }
3128 }
3129
3130 if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3131 let attr = resolve_attr_name(inner, expr_attr_names);
3132 match existing {
3133 Some(item) if item.contains_key(&attr) => return Ok(()),
3134 _ => {
3135 return Err(AwsServiceError::aws_error(
3136 StatusCode::BAD_REQUEST,
3137 "ConditionalCheckFailedException",
3138 "The conditional request failed",
3139 ));
3140 }
3141 }
3142 }
3143
3144 if let Some((left, op, right)) = parse_simple_comparison(cond) {
3145 let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3146 let expected = expr_attr_values.get(right.trim());
3147 let actual = existing.and_then(|item| item.get(&attr_name));
3148
3149 let result = match op {
3150 "=" => actual == expected,
3151 "<>" => actual != expected,
3152 _ => true,
3153 };
3154
3155 if !result {
3156 return Err(AwsServiceError::aws_error(
3157 StatusCode::BAD_REQUEST,
3158 "ConditionalCheckFailedException",
3159 "The conditional request failed",
3160 ));
3161 }
3162 }
3163
3164 Ok(())
3165}
3166
3167fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3168 let prefix = format!("{func_name}(");
3169 if let Some(rest) = expr.strip_prefix(&prefix) {
3170 if let Some(inner) = rest.strip_suffix(')') {
3171 return Some(inner.trim());
3172 }
3173 }
3174 None
3175}
3176
3177fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3178 for op in &["<>", "=", "<", ">", "<=", ">="] {
3179 if let Some(pos) = expr.find(op) {
3180 let left = &expr[..pos];
3181 let right = &expr[pos + op.len()..];
3182 return Some((left, op, right));
3183 }
3184 }
3185 None
3186}
3187
3188fn evaluate_key_condition(
3189 expr: &str,
3190 item: &HashMap<String, AttributeValue>,
3191 hash_key_name: &str,
3192 _range_key_name: Option<&str>,
3193 expr_attr_names: &HashMap<String, String>,
3194 expr_attr_values: &HashMap<String, Value>,
3195) -> bool {
3196 let parts: Vec<&str> = split_on_and(expr);
3197 for part in &parts {
3198 let part = part.trim();
3199 if !evaluate_single_key_condition(
3200 part,
3201 item,
3202 hash_key_name,
3203 expr_attr_names,
3204 expr_attr_values,
3205 ) {
3206 return false;
3207 }
3208 }
3209 true
3210}
3211
3212fn split_on_and(expr: &str) -> Vec<&str> {
3213 let mut parts = Vec::new();
3214 let mut start = 0;
3215 let len = expr.len();
3216 let mut i = 0;
3217 let mut depth = 0;
3218 while i < len {
3219 let ch = expr.as_bytes()[i];
3220 if ch == b'(' {
3221 depth += 1;
3222 } else if ch == b')' {
3223 if depth > 0 {
3224 depth -= 1;
3225 }
3226 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3227 parts.push(&expr[start..i]);
3228 start = i + 5;
3229 i = start;
3230 continue;
3231 }
3232 i += 1;
3233 }
3234 parts.push(&expr[start..]);
3235 parts
3236}
3237
3238fn split_on_or(expr: &str) -> Vec<&str> {
3239 let mut parts = Vec::new();
3240 let mut start = 0;
3241 let len = expr.len();
3242 let mut i = 0;
3243 let mut depth = 0;
3244 while i < len {
3245 let ch = expr.as_bytes()[i];
3246 if ch == b'(' {
3247 depth += 1;
3248 } else if ch == b')' {
3249 if depth > 0 {
3250 depth -= 1;
3251 }
3252 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3253 parts.push(&expr[start..i]);
3254 start = i + 4;
3255 i = start;
3256 continue;
3257 }
3258 i += 1;
3259 }
3260 parts.push(&expr[start..]);
3261 parts
3262}
3263
3264fn evaluate_single_key_condition(
3265 part: &str,
3266 item: &HashMap<String, AttributeValue>,
3267 _hash_key_name: &str,
3268 expr_attr_names: &HashMap<String, String>,
3269 expr_attr_values: &HashMap<String, Value>,
3270) -> bool {
3271 let part = part.trim();
3272
3273 if let Some(rest) = part
3275 .strip_prefix("begins_with(")
3276 .or_else(|| part.strip_prefix("begins_with ("))
3277 {
3278 if let Some(inner) = rest.strip_suffix(')') {
3279 let mut split = inner.splitn(2, ',');
3280 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3281 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3282 let val_ref = val_ref.trim();
3283 let expected = expr_attr_values.get(val_ref);
3284 let actual = item.get(&attr_name);
3285 return match (actual, expected) {
3286 (Some(a), Some(e)) => {
3287 let a_str = extract_string_value(a);
3288 let e_str = extract_string_value(e);
3289 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3290 }
3291 _ => false,
3292 };
3293 }
3294 }
3295 return false;
3296 }
3297
3298 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3300 let attr_part = part[..between_pos].trim();
3301 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3302 let range_part = &part[between_pos + 7..];
3303 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3304 let lo_ref = range_part[..and_pos].trim();
3305 let hi_ref = range_part[and_pos + 5..].trim();
3306 let lo = expr_attr_values.get(lo_ref);
3307 let hi = expr_attr_values.get(hi_ref);
3308 let actual = item.get(&attr_name);
3309 return match (actual, lo, hi) {
3310 (Some(a), Some(l), Some(h)) => {
3311 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3312 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3313 }
3314 _ => false,
3315 };
3316 }
3317 }
3318
3319 for op in &["<=", ">=", "<>", "=", "<", ">"] {
3321 if let Some(pos) = part.find(op) {
3322 let left = part[..pos].trim();
3323 let right = part[pos + op.len()..].trim();
3324 let attr_name = resolve_attr_name(left, expr_attr_names);
3325 let expected = expr_attr_values.get(right);
3326 let actual = item.get(&attr_name);
3327
3328 return match *op {
3329 "=" => actual == expected,
3330 "<>" => actual != expected,
3331 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3332 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3333 "<=" => {
3334 let cmp = compare_attribute_values(actual, expected);
3335 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3336 }
3337 ">=" => {
3338 let cmp = compare_attribute_values(actual, expected);
3339 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3340 }
3341 _ => true,
3342 };
3343 }
3344 }
3345
3346 true
3347}
3348
3349fn extract_string_value(val: &Value) -> Option<String> {
3350 val.get("S")
3351 .and_then(|v| v.as_str())
3352 .map(|s| s.to_string())
3353 .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3354}
3355
3356fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3357 match (a, b) {
3358 (None, None) => std::cmp::Ordering::Equal,
3359 (None, Some(_)) => std::cmp::Ordering::Less,
3360 (Some(_), None) => std::cmp::Ordering::Greater,
3361 (Some(a), Some(b)) => {
3362 let a_type = attribute_type_and_value(a);
3363 let b_type = attribute_type_and_value(b);
3364 match (a_type, b_type) {
3365 (Some(("S", a_val)), Some(("S", b_val))) => {
3366 let a_str = a_val.as_str().unwrap_or("");
3367 let b_str = b_val.as_str().unwrap_or("");
3368 a_str.cmp(b_str)
3369 }
3370 (Some(("N", a_val)), Some(("N", b_val))) => {
3371 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3372 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3373 a_num
3374 .partial_cmp(&b_num)
3375 .unwrap_or(std::cmp::Ordering::Equal)
3376 }
3377 (Some(("B", a_val)), Some(("B", b_val))) => {
3378 let a_str = a_val.as_str().unwrap_or("");
3379 let b_str = b_val.as_str().unwrap_or("");
3380 a_str.cmp(b_str)
3381 }
3382 _ => std::cmp::Ordering::Equal,
3383 }
3384 }
3385 }
3386}
3387
3388fn evaluate_filter_expression(
3389 expr: &str,
3390 item: &HashMap<String, AttributeValue>,
3391 expr_attr_names: &HashMap<String, String>,
3392 expr_attr_values: &HashMap<String, Value>,
3393) -> bool {
3394 let trimmed = expr.trim();
3395
3396 let or_parts = split_on_or(trimmed);
3398 if or_parts.len() > 1 {
3399 return or_parts.iter().any(|part| {
3400 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3401 });
3402 }
3403
3404 let and_parts = split_on_and(trimmed);
3406 if and_parts.len() > 1 {
3407 return and_parts.iter().all(|part| {
3408 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3409 });
3410 }
3411
3412 let stripped = strip_outer_parens(trimmed);
3414 if stripped != trimmed {
3415 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3416 }
3417
3418 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3419}
3420
3421fn strip_outer_parens(expr: &str) -> &str {
3423 let trimmed = expr.trim();
3424 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3425 return trimmed;
3426 }
3427 let inner = &trimmed[1..trimmed.len() - 1];
3429 let mut depth = 0;
3430 for ch in inner.bytes() {
3431 match ch {
3432 b'(' => depth += 1,
3433 b')' => {
3434 if depth == 0 {
3435 return trimmed; }
3437 depth -= 1;
3438 }
3439 _ => {}
3440 }
3441 }
3442 if depth == 0 {
3443 inner
3444 } else {
3445 trimmed
3446 }
3447}
3448
3449fn evaluate_single_filter_condition(
3450 part: &str,
3451 item: &HashMap<String, AttributeValue>,
3452 expr_attr_names: &HashMap<String, String>,
3453 expr_attr_values: &HashMap<String, Value>,
3454) -> bool {
3455 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3456 let attr = resolve_attr_name(inner, expr_attr_names);
3457 return item.contains_key(&attr);
3458 }
3459
3460 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3461 let attr = resolve_attr_name(inner, expr_attr_names);
3462 return !item.contains_key(&attr);
3463 }
3464
3465 if let Some(rest) = part
3466 .strip_prefix("begins_with(")
3467 .or_else(|| part.strip_prefix("begins_with ("))
3468 {
3469 if let Some(inner) = rest.strip_suffix(')') {
3470 let mut split = inner.splitn(2, ',');
3471 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3472 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3473 let expected = expr_attr_values.get(val_ref.trim());
3474 let actual = item.get(&attr_name);
3475 return match (actual, expected) {
3476 (Some(a), Some(e)) => {
3477 let a_str = extract_string_value(a);
3478 let e_str = extract_string_value(e);
3479 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3480 }
3481 _ => false,
3482 };
3483 }
3484 }
3485 }
3486
3487 if let Some(rest) = part
3488 .strip_prefix("contains(")
3489 .or_else(|| part.strip_prefix("contains ("))
3490 {
3491 if let Some(inner) = rest.strip_suffix(')') {
3492 let mut split = inner.splitn(2, ',');
3493 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3494 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3495 let expected = expr_attr_values.get(val_ref.trim());
3496 let actual = item.get(&attr_name);
3497 return match (actual, expected) {
3498 (Some(a), Some(e)) => {
3499 let a_str = extract_string_value(a);
3500 let e_str = extract_string_value(e);
3501 matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3502 }
3503 _ => false,
3504 };
3505 }
3506 }
3507 }
3508
3509 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3510}
3511
3512fn apply_update_expression(
3513 item: &mut HashMap<String, AttributeValue>,
3514 expr: &str,
3515 expr_attr_names: &HashMap<String, String>,
3516 expr_attr_values: &HashMap<String, Value>,
3517) -> Result<(), AwsServiceError> {
3518 let clauses = parse_update_clauses(expr);
3519 for (action, assignments) in &clauses {
3520 match action.to_ascii_uppercase().as_str() {
3521 "SET" => {
3522 for assignment in assignments {
3523 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3524 }
3525 }
3526 "REMOVE" => {
3527 for attr_ref in assignments {
3528 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3529 item.remove(&attr);
3530 }
3531 }
3532 "ADD" => {
3533 for assignment in assignments {
3534 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3535 }
3536 }
3537 "DELETE" => {
3538 for assignment in assignments {
3539 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3540 }
3541 }
3542 _ => {}
3543 }
3544 }
3545 Ok(())
3546}
3547
3548fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
3549 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
3550 let upper = expr.to_ascii_uppercase();
3551 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
3552 let mut positions: Vec<(usize, &str)> = Vec::new();
3553
3554 for kw in &keywords {
3555 let mut search_from = 0;
3556 while let Some(pos) = upper[search_from..].find(kw) {
3557 let abs_pos = search_from + pos;
3558 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
3559 let after_pos = abs_pos + kw.len();
3560 let after_ok =
3561 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
3562 if before_ok && after_ok {
3563 positions.push((abs_pos, kw));
3564 }
3565 search_from = abs_pos + kw.len();
3566 }
3567 }
3568
3569 positions.sort_by_key(|(pos, _)| *pos);
3570
3571 for (i, &(pos, kw)) in positions.iter().enumerate() {
3572 let start = pos + kw.len();
3573 let end = if i + 1 < positions.len() {
3574 positions[i + 1].0
3575 } else {
3576 expr.len()
3577 };
3578 let content = expr[start..end].trim();
3579 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
3580 clauses.push((kw.to_string(), assignments));
3581 }
3582
3583 clauses
3584}
3585
3586fn apply_set_assignment(
3587 item: &mut HashMap<String, AttributeValue>,
3588 assignment: &str,
3589 expr_attr_names: &HashMap<String, String>,
3590 expr_attr_values: &HashMap<String, Value>,
3591) -> Result<(), AwsServiceError> {
3592 let Some((left, right)) = assignment.split_once('=') else {
3593 return Ok(());
3594 };
3595
3596 let attr = resolve_attr_name(left.trim(), expr_attr_names);
3597 let right = right.trim();
3598
3599 if let Some(rest) = right
3601 .strip_prefix("if_not_exists(")
3602 .or_else(|| right.strip_prefix("if_not_exists ("))
3603 {
3604 if let Some(inner) = rest.strip_suffix(')') {
3605 let mut split = inner.splitn(2, ',');
3606 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
3607 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
3608 if !item.contains_key(&check_name) {
3609 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
3610 item.insert(attr, val.clone());
3611 }
3612 }
3613 return Ok(());
3614 }
3615 }
3616 }
3617
3618 if let Some(rest) = right
3620 .strip_prefix("list_append(")
3621 .or_else(|| right.strip_prefix("list_append ("))
3622 {
3623 if let Some(inner) = rest.strip_suffix(')') {
3624 let mut split = inner.splitn(2, ',');
3625 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
3626 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
3627 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
3628
3629 let mut merged = Vec::new();
3630 if let Some(Value::Object(obj)) = &a_val {
3631 if let Some(Value::Array(arr)) = obj.get("L") {
3632 merged.extend(arr.clone());
3633 }
3634 }
3635 if let Some(Value::Object(obj)) = &b_val {
3636 if let Some(Value::Array(arr)) = obj.get("L") {
3637 merged.extend(arr.clone());
3638 }
3639 }
3640
3641 item.insert(attr, json!({"L": merged}));
3642 return Ok(());
3643 }
3644 }
3645 }
3646
3647 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
3649 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
3650 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
3651
3652 let left_num = extract_number(&left_val).unwrap_or(0.0);
3653 let right_num = extract_number(&right_val).unwrap_or(0.0);
3654
3655 let result = if is_add {
3656 left_num + right_num
3657 } else {
3658 left_num - right_num
3659 };
3660
3661 let num_str = if result == result.trunc() {
3662 format!("{}", result as i64)
3663 } else {
3664 format!("{result}")
3665 };
3666
3667 item.insert(attr, json!({"N": num_str}));
3668 return Ok(());
3669 }
3670
3671 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
3673 if let Some(v) = val {
3674 item.insert(attr, v);
3675 }
3676
3677 Ok(())
3678}
3679
3680fn resolve_value(
3681 reference: &str,
3682 item: &HashMap<String, AttributeValue>,
3683 expr_attr_names: &HashMap<String, String>,
3684 expr_attr_values: &HashMap<String, Value>,
3685) -> Option<Value> {
3686 let reference = reference.trim();
3687 if reference.starts_with(':') {
3688 expr_attr_values.get(reference).cloned()
3689 } else {
3690 let attr_name = resolve_attr_name(reference, expr_attr_names);
3691 item.get(&attr_name).cloned()
3692 }
3693}
3694
3695fn extract_number(val: &Option<Value>) -> Option<f64> {
3696 val.as_ref()
3697 .and_then(|v| v.get("N"))
3698 .and_then(|n| n.as_str())
3699 .and_then(|s| s.parse().ok())
3700}
3701
3702fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
3703 let mut depth = 0;
3704 for (i, c) in expr.char_indices() {
3705 match c {
3706 '(' => depth += 1,
3707 ')' => depth -= 1,
3708 '+' if depth == 0 && i > 0 => {
3709 return Some((&expr[..i], &expr[i + 1..], true));
3710 }
3711 '-' if depth == 0 && i > 0 => {
3712 return Some((&expr[..i], &expr[i + 1..], false));
3713 }
3714 _ => {}
3715 }
3716 }
3717 None
3718}
3719
3720fn apply_add_assignment(
3721 item: &mut HashMap<String, AttributeValue>,
3722 assignment: &str,
3723 expr_attr_names: &HashMap<String, String>,
3724 expr_attr_values: &HashMap<String, Value>,
3725) -> Result<(), AwsServiceError> {
3726 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3727 if parts.len() != 2 {
3728 return Ok(());
3729 }
3730
3731 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3732 let val_ref = parts[1].trim();
3733 let add_val = expr_attr_values.get(val_ref);
3734
3735 if let Some(add_val) = add_val {
3736 if let Some(existing) = item.get(&attr) {
3737 if let (Some(existing_num), Some(add_num)) = (
3738 extract_number(&Some(existing.clone())),
3739 extract_number(&Some(add_val.clone())),
3740 ) {
3741 let result = existing_num + add_num;
3742 let num_str = if result == result.trunc() {
3743 format!("{}", result as i64)
3744 } else {
3745 format!("{result}")
3746 };
3747 item.insert(attr, json!({"N": num_str}));
3748 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
3749 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
3750 let mut merged: Vec<Value> = existing_set.clone();
3751 for v in add_set {
3752 if !merged.contains(v) {
3753 merged.push(v.clone());
3754 }
3755 }
3756 item.insert(attr, json!({"SS": merged}));
3757 }
3758 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
3759 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
3760 let mut merged: Vec<Value> = existing_set.clone();
3761 for v in add_set {
3762 if !merged.contains(v) {
3763 merged.push(v.clone());
3764 }
3765 }
3766 item.insert(attr, json!({"NS": merged}));
3767 }
3768 }
3769 } else {
3770 item.insert(attr, add_val.clone());
3771 }
3772 }
3773
3774 Ok(())
3775}
3776
3777fn apply_delete_assignment(
3778 item: &mut HashMap<String, AttributeValue>,
3779 assignment: &str,
3780 expr_attr_names: &HashMap<String, String>,
3781 expr_attr_values: &HashMap<String, Value>,
3782) -> Result<(), AwsServiceError> {
3783 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3784 if parts.len() != 2 {
3785 return Ok(());
3786 }
3787
3788 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3789 let val_ref = parts[1].trim();
3790 let del_val = expr_attr_values.get(val_ref);
3791
3792 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
3793 if let (Some(existing_set), Some(del_set)) = (
3794 existing.get("SS").and_then(|v| v.as_array()),
3795 del_val.get("SS").and_then(|v| v.as_array()),
3796 ) {
3797 let filtered: Vec<Value> = existing_set
3798 .iter()
3799 .filter(|v| !del_set.contains(v))
3800 .cloned()
3801 .collect();
3802 if filtered.is_empty() {
3803 item.remove(&attr);
3804 } else {
3805 item.insert(attr, json!({"SS": filtered}));
3806 }
3807 } else if let (Some(existing_set), Some(del_set)) = (
3808 existing.get("NS").and_then(|v| v.as_array()),
3809 del_val.get("NS").and_then(|v| v.as_array()),
3810 ) {
3811 let filtered: Vec<Value> = existing_set
3812 .iter()
3813 .filter(|v| !del_set.contains(v))
3814 .cloned()
3815 .collect();
3816 if filtered.is_empty() {
3817 item.remove(&attr);
3818 } else {
3819 item.insert(attr, json!({"NS": filtered}));
3820 }
3821 }
3822 }
3823
3824 Ok(())
3825}
3826
3827#[allow(clippy::too_many_arguments)]
3828fn build_table_description_json(
3829 arn: &str,
3830 key_schema: &[KeySchemaElement],
3831 attribute_definitions: &[AttributeDefinition],
3832 provisioned_throughput: &ProvisionedThroughput,
3833 gsi: &[GlobalSecondaryIndex],
3834 lsi: &[LocalSecondaryIndex],
3835 billing_mode: &str,
3836 created_at: chrono::DateTime<chrono::Utc>,
3837 item_count: i64,
3838 size_bytes: i64,
3839 status: &str,
3840) -> Value {
3841 let table_name = arn.rsplit('/').next().unwrap_or("");
3842 let creation_timestamp =
3843 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
3844
3845 let ks: Vec<Value> = key_schema
3846 .iter()
3847 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3848 .collect();
3849
3850 let ad: Vec<Value> = attribute_definitions
3851 .iter()
3852 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3853 .collect();
3854
3855 let mut desc = json!({
3856 "TableName": table_name,
3857 "TableArn": arn,
3858 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
3859 "TableStatus": status,
3860 "KeySchema": ks,
3861 "AttributeDefinitions": ad,
3862 "CreationDateTime": creation_timestamp,
3863 "ItemCount": item_count,
3864 "TableSizeBytes": size_bytes,
3865 "BillingModeSummary": { "BillingMode": billing_mode },
3866 });
3867
3868 if billing_mode != "PAY_PER_REQUEST" {
3869 desc["ProvisionedThroughput"] = json!({
3870 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
3871 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
3872 "NumberOfDecreasesToday": 0,
3873 });
3874 } else {
3875 desc["ProvisionedThroughput"] = json!({
3876 "ReadCapacityUnits": 0,
3877 "WriteCapacityUnits": 0,
3878 "NumberOfDecreasesToday": 0,
3879 });
3880 }
3881
3882 if !gsi.is_empty() {
3883 let gsi_json: Vec<Value> = gsi
3884 .iter()
3885 .map(|g| {
3886 let gks: Vec<Value> = g
3887 .key_schema
3888 .iter()
3889 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3890 .collect();
3891 let mut idx = json!({
3892 "IndexName": g.index_name,
3893 "KeySchema": gks,
3894 "Projection": { "ProjectionType": g.projection.projection_type },
3895 "IndexStatus": "ACTIVE",
3896 "IndexArn": format!("{arn}/index/{}", g.index_name),
3897 "ItemCount": 0,
3898 "IndexSizeBytes": 0,
3899 });
3900 if !g.projection.non_key_attributes.is_empty() {
3901 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
3902 }
3903 if let Some(ref pt) = g.provisioned_throughput {
3904 idx["ProvisionedThroughput"] = json!({
3905 "ReadCapacityUnits": pt.read_capacity_units,
3906 "WriteCapacityUnits": pt.write_capacity_units,
3907 "NumberOfDecreasesToday": 0,
3908 });
3909 }
3910 idx
3911 })
3912 .collect();
3913 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
3914 }
3915
3916 if !lsi.is_empty() {
3917 let lsi_json: Vec<Value> = lsi
3918 .iter()
3919 .map(|l| {
3920 let lks: Vec<Value> = l
3921 .key_schema
3922 .iter()
3923 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3924 .collect();
3925 let mut idx = json!({
3926 "IndexName": l.index_name,
3927 "KeySchema": lks,
3928 "Projection": { "ProjectionType": l.projection.projection_type },
3929 "IndexArn": format!("{arn}/index/{}", l.index_name),
3930 "ItemCount": 0,
3931 "IndexSizeBytes": 0,
3932 });
3933 if !l.projection.non_key_attributes.is_empty() {
3934 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
3935 }
3936 idx
3937 })
3938 .collect();
3939 desc["LocalSecondaryIndexes"] = json!(lsi_json);
3940 }
3941
3942 desc
3943}
3944
3945fn build_table_description(table: &DynamoTable) -> Value {
3946 build_table_description_json(
3947 &table.arn,
3948 &table.key_schema,
3949 &table.attribute_definitions,
3950 &table.provisioned_throughput,
3951 &table.gsi,
3952 &table.lsi,
3953 &table.billing_mode,
3954 table.created_at,
3955 table.item_count,
3956 table.size_bytes,
3957 &table.status,
3958 )
3959}
3960
3961fn execute_partiql_statement(
3962 state: &SharedDynamoDbState,
3963 statement: &str,
3964 parameters: &[Value],
3965) -> Result<AwsResponse, AwsServiceError> {
3966 let trimmed = statement.trim();
3967 let upper = trimmed.to_ascii_uppercase();
3968
3969 if upper.starts_with("SELECT") {
3970 execute_partiql_select(state, trimmed, parameters)
3971 } else if upper.starts_with("INSERT") {
3972 execute_partiql_insert(state, trimmed, parameters)
3973 } else if upper.starts_with("UPDATE") {
3974 execute_partiql_update(state, trimmed, parameters)
3975 } else if upper.starts_with("DELETE") {
3976 execute_partiql_delete(state, trimmed, parameters)
3977 } else {
3978 Err(AwsServiceError::aws_error(
3979 StatusCode::BAD_REQUEST,
3980 "ValidationException",
3981 format!("Unsupported PartiQL statement: {trimmed}"),
3982 ))
3983 }
3984}
3985
3986fn execute_partiql_select(
3988 state: &SharedDynamoDbState,
3989 statement: &str,
3990 parameters: &[Value],
3991) -> Result<AwsResponse, AwsServiceError> {
3992 let upper = statement.to_ascii_uppercase();
3994 let from_pos = upper.find("FROM").ok_or_else(|| {
3995 AwsServiceError::aws_error(
3996 StatusCode::BAD_REQUEST,
3997 "ValidationException",
3998 "Invalid SELECT statement: missing FROM",
3999 )
4000 })?;
4001
4002 let after_from = statement[from_pos + 4..].trim();
4003 let (table_name, rest) = parse_partiql_table_name(after_from);
4004
4005 let state = state.read();
4006 let table = get_table(&state.tables, &table_name)?;
4007
4008 let rest_upper = rest.trim().to_ascii_uppercase();
4009 if rest_upper.starts_with("WHERE") {
4010 let where_clause = rest.trim()[5..].trim();
4011 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4012 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4013 DynamoDbService::ok_json(json!({ "Items": items }))
4014 } else {
4015 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4017 DynamoDbService::ok_json(json!({ "Items": items }))
4018 }
4019}
4020
4021fn execute_partiql_insert(
4022 state: &SharedDynamoDbState,
4023 statement: &str,
4024 parameters: &[Value],
4025) -> Result<AwsResponse, AwsServiceError> {
4026 let upper = statement.to_ascii_uppercase();
4029 let into_pos = upper.find("INTO").ok_or_else(|| {
4030 AwsServiceError::aws_error(
4031 StatusCode::BAD_REQUEST,
4032 "ValidationException",
4033 "Invalid INSERT statement: missing INTO",
4034 )
4035 })?;
4036
4037 let after_into = statement[into_pos + 4..].trim();
4038 let (table_name, rest) = parse_partiql_table_name(after_into);
4039
4040 let rest_upper = rest.trim().to_ascii_uppercase();
4041 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4042 AwsServiceError::aws_error(
4043 StatusCode::BAD_REQUEST,
4044 "ValidationException",
4045 "Invalid INSERT statement: missing VALUE",
4046 )
4047 })?;
4048
4049 let value_str = rest.trim()[value_pos + 5..].trim();
4050 let item = parse_partiql_value_object(value_str, parameters)?;
4051
4052 let mut state = state.write();
4053 let table = get_table_mut(&mut state.tables, &table_name)?;
4054 let key = extract_key(table, &item);
4055 if table.find_item_index(&key).is_some() {
4056 return Err(AwsServiceError::aws_error(
4058 StatusCode::BAD_REQUEST,
4059 "DuplicateItemException",
4060 "Duplicate primary key exists in table",
4061 ));
4062 } else {
4063 table.items.push(item);
4064 }
4065 table.recalculate_stats();
4066
4067 DynamoDbService::ok_json(json!({}))
4068}
4069
4070fn execute_partiql_update(
4071 state: &SharedDynamoDbState,
4072 statement: &str,
4073 parameters: &[Value],
4074) -> Result<AwsResponse, AwsServiceError> {
4075 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
4079
4080 let rest_upper = rest.trim().to_ascii_uppercase();
4081 let set_pos = rest_upper.find("SET").ok_or_else(|| {
4082 AwsServiceError::aws_error(
4083 StatusCode::BAD_REQUEST,
4084 "ValidationException",
4085 "Invalid UPDATE statement: missing SET",
4086 )
4087 })?;
4088
4089 let after_set = rest.trim()[set_pos + 3..].trim();
4090
4091 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4093 let (set_clause, where_clause) = if let Some(wp) = where_pos {
4094 (&after_set[..wp], after_set[wp + 5..].trim())
4095 } else {
4096 (after_set, "")
4097 };
4098
4099 let mut state = state.write();
4100 let table = get_table_mut(&mut state.tables, &table_name)?;
4101
4102 let matched_indices = if !where_clause.is_empty() {
4103 find_partiql_where_indices(table, where_clause, parameters)?
4104 } else {
4105 (0..table.items.len()).collect()
4106 };
4107
4108 let param_offset = count_params_in_str(where_clause);
4110 let assignments: Vec<&str> = set_clause.split(',').collect();
4111 for idx in &matched_indices {
4112 let mut local_offset = param_offset;
4113 for assignment in &assignments {
4114 let assignment = assignment.trim();
4115 if let Some((attr, val_str)) = assignment.split_once('=') {
4116 let attr = attr.trim().trim_matches('"');
4117 let val_str = val_str.trim();
4118 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4119 if let Some(v) = value {
4120 table.items[*idx].insert(attr.to_string(), v);
4121 }
4122 }
4123 }
4124 }
4125 table.recalculate_stats();
4126
4127 DynamoDbService::ok_json(json!({}))
4128}
4129
4130fn execute_partiql_delete(
4131 state: &SharedDynamoDbState,
4132 statement: &str,
4133 parameters: &[Value],
4134) -> Result<AwsResponse, AwsServiceError> {
4135 let upper = statement.to_ascii_uppercase();
4137 let from_pos = upper.find("FROM").ok_or_else(|| {
4138 AwsServiceError::aws_error(
4139 StatusCode::BAD_REQUEST,
4140 "ValidationException",
4141 "Invalid DELETE statement: missing FROM",
4142 )
4143 })?;
4144
4145 let after_from = statement[from_pos + 4..].trim();
4146 let (table_name, rest) = parse_partiql_table_name(after_from);
4147
4148 let rest_upper = rest.trim().to_ascii_uppercase();
4149 if !rest_upper.starts_with("WHERE") {
4150 return Err(AwsServiceError::aws_error(
4151 StatusCode::BAD_REQUEST,
4152 "ValidationException",
4153 "DELETE requires a WHERE clause",
4154 ));
4155 }
4156 let where_clause = rest.trim()[5..].trim();
4157
4158 let mut state = state.write();
4159 let table = get_table_mut(&mut state.tables, &table_name)?;
4160
4161 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4162 indices.sort_unstable();
4164 indices.reverse();
4165 for idx in indices {
4166 table.items.remove(idx);
4167 }
4168 table.recalculate_stats();
4169
4170 DynamoDbService::ok_json(json!({}))
4171}
4172
4173fn parse_partiql_table_name(s: &str) -> (String, &str) {
4176 let s = s.trim();
4177 if let Some(stripped) = s.strip_prefix('"') {
4178 if let Some(end) = stripped.find('"') {
4180 let name = &stripped[..end];
4181 let rest = &stripped[end + 1..];
4182 (name.to_string(), rest)
4183 } else {
4184 let end = s.find(' ').unwrap_or(s.len());
4185 (s[..end].trim_matches('"').to_string(), &s[end..])
4186 }
4187 } else {
4188 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4189 (s[..end].to_string(), &s[end..])
4190 }
4191}
4192
4193fn evaluate_partiql_where<'a>(
4196 table: &'a DynamoTable,
4197 where_clause: &str,
4198 parameters: &[Value],
4199) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4200 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4201 Ok(indices.iter().map(|i| &table.items[*i]).collect())
4202}
4203
4204fn find_partiql_where_indices(
4205 table: &DynamoTable,
4206 where_clause: &str,
4207 parameters: &[Value],
4208) -> Result<Vec<usize>, AwsServiceError> {
4209 let upper = where_clause.to_uppercase();
4212 let conditions = if upper.contains(" AND ") {
4213 let mut parts = Vec::new();
4215 let mut last = 0;
4216 for (i, _) in upper.match_indices(" AND ") {
4217 parts.push(where_clause[last..i].trim());
4218 last = i + 5;
4219 }
4220 parts.push(where_clause[last..].trim());
4221 parts
4222 } else {
4223 vec![where_clause.trim()]
4224 };
4225
4226 let mut param_idx = 0usize;
4227 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4228
4229 for cond in &conditions {
4230 let cond = cond.trim();
4231 if let Some((left, right)) = cond.split_once('=') {
4232 let attr = left.trim().trim_matches('"').to_string();
4233 let val_str = right.trim();
4234 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4235 if let Some(v) = value {
4236 parsed_conditions.push((attr, v));
4237 }
4238 }
4239 }
4240
4241 let mut indices = Vec::new();
4242 for (i, item) in table.items.iter().enumerate() {
4243 let all_match = parsed_conditions
4244 .iter()
4245 .all(|(attr, expected)| item.get(attr) == Some(expected));
4246 if all_match {
4247 indices.push(i);
4248 }
4249 }
4250
4251 Ok(indices)
4252}
4253
4254fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4259 let s = s.trim();
4260 if s == "?" {
4261 let idx = *param_idx;
4262 *param_idx += 1;
4263 parameters.get(idx).cloned()
4264 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4265 let inner = &s[1..s.len() - 1];
4266 Some(json!({"S": inner}))
4267 } else if let Ok(n) = s.parse::<f64>() {
4268 let num_str = if n == n.trunc() {
4269 format!("{}", n as i64)
4270 } else {
4271 format!("{n}")
4272 };
4273 Some(json!({"N": num_str}))
4274 } else {
4275 None
4276 }
4277}
4278
4279fn parse_partiql_value_object(
4281 s: &str,
4282 parameters: &[Value],
4283) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4284 let s = s.trim();
4285 let inner = s
4286 .strip_prefix('{')
4287 .and_then(|s| s.strip_suffix('}'))
4288 .ok_or_else(|| {
4289 AwsServiceError::aws_error(
4290 StatusCode::BAD_REQUEST,
4291 "ValidationException",
4292 "Invalid VALUE: expected object literal",
4293 )
4294 })?;
4295
4296 let mut item = HashMap::new();
4297 let mut param_idx = 0usize;
4298
4299 for pair in split_partiql_pairs(inner) {
4301 let pair = pair.trim();
4302 if pair.is_empty() {
4303 continue;
4304 }
4305 if let Some((key_part, val_part)) = pair.split_once(':') {
4306 let key = key_part
4307 .trim()
4308 .trim_matches('\'')
4309 .trim_matches('"')
4310 .to_string();
4311 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4312 item.insert(key, val);
4313 }
4314 }
4315 }
4316
4317 Ok(item)
4318}
4319
4320fn split_partiql_pairs(s: &str) -> Vec<&str> {
4322 let mut parts = Vec::new();
4323 let mut start = 0;
4324 let mut depth = 0;
4325 let mut in_quote = false;
4326
4327 for (i, c) in s.char_indices() {
4328 match c {
4329 '\'' if !in_quote => in_quote = true,
4330 '\'' if in_quote => in_quote = false,
4331 '{' if !in_quote => depth += 1,
4332 '}' if !in_quote => depth -= 1,
4333 ',' if !in_quote && depth == 0 => {
4334 parts.push(&s[start..i]);
4335 start = i + 1;
4336 }
4337 _ => {}
4338 }
4339 }
4340 parts.push(&s[start..]);
4341 parts
4342}
4343
4344fn count_params_in_str(s: &str) -> usize {
4346 s.chars().filter(|c| *c == '?').count()
4347}
4348
4349#[cfg(test)]
4350mod tests {
4351 use super::*;
4352 use serde_json::json;
4353
4354 #[test]
4355 fn test_parse_update_clauses_set() {
4356 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4357 assert_eq!(clauses.len(), 1);
4358 assert_eq!(clauses[0].0, "SET");
4359 assert_eq!(clauses[0].1.len(), 2);
4360 }
4361
4362 #[test]
4363 fn test_parse_update_clauses_set_and_remove() {
4364 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4365 assert_eq!(clauses.len(), 2);
4366 assert_eq!(clauses[0].0, "SET");
4367 assert_eq!(clauses[1].0, "REMOVE");
4368 }
4369
4370 #[test]
4371 fn test_evaluate_key_condition_simple() {
4372 let mut item = HashMap::new();
4373 item.insert("pk".to_string(), json!({"S": "user1"}));
4374 item.insert("sk".to_string(), json!({"S": "order1"}));
4375
4376 let mut expr_values = HashMap::new();
4377 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4378
4379 assert!(evaluate_key_condition(
4380 "pk = :pk",
4381 &item,
4382 "pk",
4383 Some("sk"),
4384 &HashMap::new(),
4385 &expr_values,
4386 ));
4387 }
4388
4389 #[test]
4390 fn test_compare_attribute_values_numbers() {
4391 let a = json!({"N": "10"});
4392 let b = json!({"N": "20"});
4393 assert_eq!(
4394 compare_attribute_values(Some(&a), Some(&b)),
4395 std::cmp::Ordering::Less
4396 );
4397 }
4398
4399 #[test]
4400 fn test_compare_attribute_values_strings() {
4401 let a = json!({"S": "apple"});
4402 let b = json!({"S": "banana"});
4403 assert_eq!(
4404 compare_attribute_values(Some(&a), Some(&b)),
4405 std::cmp::Ordering::Less
4406 );
4407 }
4408
4409 #[test]
4410 fn test_split_on_and() {
4411 let parts = split_on_and("pk = :pk AND sk > :sk");
4412 assert_eq!(parts.len(), 2);
4413 assert_eq!(parts[0].trim(), "pk = :pk");
4414 assert_eq!(parts[1].trim(), "sk > :sk");
4415 }
4416
4417 #[test]
4418 fn test_split_on_and_respects_parentheses() {
4419 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4421 assert_eq!(parts.len(), 1);
4423 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4424 }
4425
4426 #[test]
4427 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4428 let mut item = HashMap::new();
4430 item.insert("x".to_string(), json!({"S": "no"}));
4431 item.insert("y".to_string(), json!({"S": "no"}));
4432 item.insert("z".to_string(), json!({"S": "yes"}));
4433
4434 let mut expr_values = HashMap::new();
4435 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4436
4437 let result = evaluate_filter_expression(
4439 "(x = :yes AND y = :yes) OR z = :yes",
4440 &item,
4441 &HashMap::new(),
4442 &expr_values,
4443 );
4444 assert!(result, "should match because z = :yes is true");
4445
4446 let mut item2 = HashMap::new();
4448 item2.insert("x".to_string(), json!({"S": "no"}));
4449 item2.insert("y".to_string(), json!({"S": "no"}));
4450 item2.insert("z".to_string(), json!({"S": "no"}));
4451
4452 let result2 = evaluate_filter_expression(
4453 "(x = :yes AND y = :yes) OR z = :yes",
4454 &item2,
4455 &HashMap::new(),
4456 &expr_values,
4457 );
4458 assert!(!result2, "should not match because nothing is true");
4459 }
4460
4461 #[test]
4462 fn test_project_item_nested_path() {
4463 let mut item = HashMap::new();
4465 item.insert("pk".to_string(), json!({"S": "key1"}));
4466 item.insert(
4467 "data".to_string(),
4468 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4469 );
4470
4471 let body = json!({
4472 "ProjectionExpression": "data[0].name"
4473 });
4474
4475 let projected = project_item(&item, &body);
4476 let name = projected
4478 .get("data")
4479 .and_then(|v| v.get("L"))
4480 .and_then(|v| v.get(0))
4481 .and_then(|v| v.get("M"))
4482 .and_then(|v| v.get("name"))
4483 .and_then(|v| v.get("S"))
4484 .and_then(|v| v.as_str());
4485 assert_eq!(name, Some("Alice"));
4486
4487 let age = projected
4489 .get("data")
4490 .and_then(|v| v.get("L"))
4491 .and_then(|v| v.get(0))
4492 .and_then(|v| v.get("M"))
4493 .and_then(|v| v.get("age"));
4494 assert!(age.is_none(), "age should not be present in projection");
4495 }
4496
4497 #[test]
4498 fn test_resolve_nested_path_map() {
4499 let mut item = HashMap::new();
4500 item.insert(
4501 "info".to_string(),
4502 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4503 );
4504
4505 let result = resolve_nested_path(&item, "info.address.city");
4506 assert_eq!(result, Some(json!({"S": "NYC"})));
4507 }
4508
4509 #[test]
4510 fn test_resolve_nested_path_list_then_map() {
4511 let mut item = HashMap::new();
4512 item.insert(
4513 "items".to_string(),
4514 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4515 );
4516
4517 let result = resolve_nested_path(&item, "items[0].sku");
4518 assert_eq!(result, Some(json!({"S": "ABC"})));
4519 }
4520
4521 use crate::state::SharedDynamoDbState;
4524 use parking_lot::RwLock;
4525 use std::sync::Arc;
4526
4527 fn make_service() -> DynamoDbService {
4528 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
4529 "123456789012",
4530 "us-east-1",
4531 )));
4532 DynamoDbService::new(state)
4533 }
4534
4535 fn make_request(action: &str, body: Value) -> AwsRequest {
4536 AwsRequest {
4537 service: "dynamodb".to_string(),
4538 action: action.to_string(),
4539 region: "us-east-1".to_string(),
4540 account_id: "123456789012".to_string(),
4541 request_id: "test-id".to_string(),
4542 headers: http::HeaderMap::new(),
4543 query_params: HashMap::new(),
4544 body: serde_json::to_vec(&body).unwrap().into(),
4545 path_segments: vec![],
4546 raw_path: "/".to_string(),
4547 method: http::Method::POST,
4548 is_query_protocol: false,
4549 access_key_id: None,
4550 }
4551 }
4552
4553 fn create_test_table(svc: &DynamoDbService) {
4554 let req = make_request(
4555 "CreateTable",
4556 json!({
4557 "TableName": "test-table",
4558 "KeySchema": [
4559 { "AttributeName": "pk", "KeyType": "HASH" }
4560 ],
4561 "AttributeDefinitions": [
4562 { "AttributeName": "pk", "AttributeType": "S" }
4563 ],
4564 "BillingMode": "PAY_PER_REQUEST"
4565 }),
4566 );
4567 svc.create_table(&req).unwrap();
4568 }
4569
4570 #[test]
4571 fn delete_item_return_values_all_old() {
4572 let svc = make_service();
4573 create_test_table(&svc);
4574
4575 let req = make_request(
4577 "PutItem",
4578 json!({
4579 "TableName": "test-table",
4580 "Item": {
4581 "pk": { "S": "key1" },
4582 "name": { "S": "Alice" },
4583 "age": { "N": "30" }
4584 }
4585 }),
4586 );
4587 svc.put_item(&req).unwrap();
4588
4589 let req = make_request(
4591 "DeleteItem",
4592 json!({
4593 "TableName": "test-table",
4594 "Key": { "pk": { "S": "key1" } },
4595 "ReturnValues": "ALL_OLD"
4596 }),
4597 );
4598 let resp = svc.delete_item(&req).unwrap();
4599 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4600
4601 let attrs = &body["Attributes"];
4603 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
4604 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
4605 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
4606
4607 let req = make_request(
4609 "GetItem",
4610 json!({
4611 "TableName": "test-table",
4612 "Key": { "pk": { "S": "key1" } }
4613 }),
4614 );
4615 let resp = svc.get_item(&req).unwrap();
4616 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4617 assert!(body.get("Item").is_none(), "item should be deleted");
4618 }
4619
4620 #[test]
4621 fn transact_get_items_returns_existing_and_missing() {
4622 let svc = make_service();
4623 create_test_table(&svc);
4624
4625 let req = make_request(
4627 "PutItem",
4628 json!({
4629 "TableName": "test-table",
4630 "Item": {
4631 "pk": { "S": "exists" },
4632 "val": { "S": "hello" }
4633 }
4634 }),
4635 );
4636 svc.put_item(&req).unwrap();
4637
4638 let req = make_request(
4639 "TransactGetItems",
4640 json!({
4641 "TransactItems": [
4642 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
4643 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
4644 ]
4645 }),
4646 );
4647 let resp = svc.transact_get_items(&req).unwrap();
4648 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4649 let responses = body["Responses"].as_array().unwrap();
4650 assert_eq!(responses.len(), 2);
4651 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
4652 assert!(responses[1].get("Item").is_none());
4653 }
4654
4655 #[test]
4656 fn transact_write_items_put_and_delete() {
4657 let svc = make_service();
4658 create_test_table(&svc);
4659
4660 let req = make_request(
4662 "PutItem",
4663 json!({
4664 "TableName": "test-table",
4665 "Item": {
4666 "pk": { "S": "to-delete" },
4667 "val": { "S": "bye" }
4668 }
4669 }),
4670 );
4671 svc.put_item(&req).unwrap();
4672
4673 let req = make_request(
4675 "TransactWriteItems",
4676 json!({
4677 "TransactItems": [
4678 {
4679 "Put": {
4680 "TableName": "test-table",
4681 "Item": {
4682 "pk": { "S": "new-item" },
4683 "val": { "S": "hi" }
4684 }
4685 }
4686 },
4687 {
4688 "Delete": {
4689 "TableName": "test-table",
4690 "Key": { "pk": { "S": "to-delete" } }
4691 }
4692 }
4693 ]
4694 }),
4695 );
4696 let resp = svc.transact_write_items(&req).unwrap();
4697 assert_eq!(resp.status, StatusCode::OK);
4698
4699 let req = make_request(
4701 "GetItem",
4702 json!({
4703 "TableName": "test-table",
4704 "Key": { "pk": { "S": "new-item" } }
4705 }),
4706 );
4707 let resp = svc.get_item(&req).unwrap();
4708 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4709 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
4710
4711 let req = make_request(
4713 "GetItem",
4714 json!({
4715 "TableName": "test-table",
4716 "Key": { "pk": { "S": "to-delete" } }
4717 }),
4718 );
4719 let resp = svc.get_item(&req).unwrap();
4720 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4721 assert!(body.get("Item").is_none());
4722 }
4723
4724 #[test]
4725 fn transact_write_items_condition_check_failure() {
4726 let svc = make_service();
4727 create_test_table(&svc);
4728
4729 let req = make_request(
4731 "TransactWriteItems",
4732 json!({
4733 "TransactItems": [
4734 {
4735 "ConditionCheck": {
4736 "TableName": "test-table",
4737 "Key": { "pk": { "S": "nonexistent" } },
4738 "ConditionExpression": "attribute_exists(pk)"
4739 }
4740 }
4741 ]
4742 }),
4743 );
4744 let resp = svc.transact_write_items(&req).unwrap();
4745 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
4747 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4748 assert_eq!(
4749 body["__type"].as_str().unwrap(),
4750 "TransactionCanceledException"
4751 );
4752 assert!(body["CancellationReasons"].as_array().is_some());
4753 }
4754
4755 #[test]
4756 fn update_and_describe_time_to_live() {
4757 let svc = make_service();
4758 create_test_table(&svc);
4759
4760 let req = make_request(
4762 "UpdateTimeToLive",
4763 json!({
4764 "TableName": "test-table",
4765 "TimeToLiveSpecification": {
4766 "AttributeName": "ttl",
4767 "Enabled": true
4768 }
4769 }),
4770 );
4771 let resp = svc.update_time_to_live(&req).unwrap();
4772 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4773 assert_eq!(
4774 body["TimeToLiveSpecification"]["AttributeName"]
4775 .as_str()
4776 .unwrap(),
4777 "ttl"
4778 );
4779 assert!(body["TimeToLiveSpecification"]["Enabled"]
4780 .as_bool()
4781 .unwrap());
4782
4783 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4785 let resp = svc.describe_time_to_live(&req).unwrap();
4786 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4787 assert_eq!(
4788 body["TimeToLiveDescription"]["TimeToLiveStatus"]
4789 .as_str()
4790 .unwrap(),
4791 "ENABLED"
4792 );
4793 assert_eq!(
4794 body["TimeToLiveDescription"]["AttributeName"]
4795 .as_str()
4796 .unwrap(),
4797 "ttl"
4798 );
4799
4800 let req = make_request(
4802 "UpdateTimeToLive",
4803 json!({
4804 "TableName": "test-table",
4805 "TimeToLiveSpecification": {
4806 "AttributeName": "ttl",
4807 "Enabled": false
4808 }
4809 }),
4810 );
4811 svc.update_time_to_live(&req).unwrap();
4812
4813 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4814 let resp = svc.describe_time_to_live(&req).unwrap();
4815 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816 assert_eq!(
4817 body["TimeToLiveDescription"]["TimeToLiveStatus"]
4818 .as_str()
4819 .unwrap(),
4820 "DISABLED"
4821 );
4822 }
4823
4824 #[test]
4825 fn resource_policy_lifecycle() {
4826 let svc = make_service();
4827 create_test_table(&svc);
4828
4829 let table_arn = {
4830 let state = svc.state.read();
4831 state.tables.get("test-table").unwrap().arn.clone()
4832 };
4833
4834 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
4836 let req = make_request(
4837 "PutResourcePolicy",
4838 json!({
4839 "ResourceArn": table_arn,
4840 "Policy": policy_doc
4841 }),
4842 );
4843 let resp = svc.put_resource_policy(&req).unwrap();
4844 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4845 assert!(body["RevisionId"].as_str().is_some());
4846
4847 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4849 let resp = svc.get_resource_policy(&req).unwrap();
4850 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4851 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
4852
4853 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
4855 svc.delete_resource_policy(&req).unwrap();
4856
4857 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4859 let resp = svc.get_resource_policy(&req).unwrap();
4860 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4861 assert!(body["Policy"].is_null());
4862 }
4863
4864 #[test]
4865 fn describe_endpoints() {
4866 let svc = make_service();
4867 let req = make_request("DescribeEndpoints", json!({}));
4868 let resp = svc.describe_endpoints(&req).unwrap();
4869 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4870 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
4871 }
4872
4873 #[test]
4874 fn describe_limits() {
4875 let svc = make_service();
4876 let req = make_request("DescribeLimits", json!({}));
4877 let resp = svc.describe_limits(&req).unwrap();
4878 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4879 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
4880 }
4881
4882 #[test]
4883 fn backup_lifecycle() {
4884 let svc = make_service();
4885 create_test_table(&svc);
4886
4887 let req = make_request(
4889 "CreateBackup",
4890 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
4891 );
4892 let resp = svc.create_backup(&req).unwrap();
4893 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4894 let backup_arn = body["BackupDetails"]["BackupArn"]
4895 .as_str()
4896 .unwrap()
4897 .to_string();
4898 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
4899
4900 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
4902 let resp = svc.describe_backup(&req).unwrap();
4903 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4904 assert_eq!(
4905 body["BackupDescription"]["BackupDetails"]["BackupName"],
4906 "my-backup"
4907 );
4908
4909 let req = make_request("ListBackups", json!({}));
4911 let resp = svc.list_backups(&req).unwrap();
4912 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4913 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
4914
4915 let req = make_request(
4917 "RestoreTableFromBackup",
4918 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
4919 );
4920 svc.restore_table_from_backup(&req).unwrap();
4921
4922 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
4924 let resp = svc.describe_table(&req).unwrap();
4925 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4926 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
4927
4928 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
4930 svc.delete_backup(&req).unwrap();
4931
4932 let req = make_request("ListBackups", json!({}));
4934 let resp = svc.list_backups(&req).unwrap();
4935 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4936 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
4937 }
4938
4939 #[test]
4940 fn continuous_backups() {
4941 let svc = make_service();
4942 create_test_table(&svc);
4943
4944 let req = make_request(
4946 "DescribeContinuousBackups",
4947 json!({ "TableName": "test-table" }),
4948 );
4949 let resp = svc.describe_continuous_backups(&req).unwrap();
4950 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4951 assert_eq!(
4952 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
4953 ["PointInTimeRecoveryStatus"],
4954 "DISABLED"
4955 );
4956
4957 let req = make_request(
4959 "UpdateContinuousBackups",
4960 json!({
4961 "TableName": "test-table",
4962 "PointInTimeRecoverySpecification": {
4963 "PointInTimeRecoveryEnabled": true
4964 }
4965 }),
4966 );
4967 svc.update_continuous_backups(&req).unwrap();
4968
4969 let req = make_request(
4971 "DescribeContinuousBackups",
4972 json!({ "TableName": "test-table" }),
4973 );
4974 let resp = svc.describe_continuous_backups(&req).unwrap();
4975 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4976 assert_eq!(
4977 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
4978 ["PointInTimeRecoveryStatus"],
4979 "ENABLED"
4980 );
4981 }
4982
4983 #[test]
4984 fn restore_table_to_point_in_time() {
4985 let svc = make_service();
4986 create_test_table(&svc);
4987
4988 let req = make_request(
4989 "RestoreTableToPointInTime",
4990 json!({
4991 "SourceTableName": "test-table",
4992 "TargetTableName": "pitr-restored"
4993 }),
4994 );
4995 svc.restore_table_to_point_in_time(&req).unwrap();
4996
4997 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
4998 let resp = svc.describe_table(&req).unwrap();
4999 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5000 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5001 }
5002
5003 #[test]
5004 fn global_table_lifecycle() {
5005 let svc = make_service();
5006
5007 let req = make_request(
5009 "CreateGlobalTable",
5010 json!({
5011 "GlobalTableName": "my-global",
5012 "ReplicationGroup": [
5013 { "RegionName": "us-east-1" },
5014 { "RegionName": "eu-west-1" }
5015 ]
5016 }),
5017 );
5018 let resp = svc.create_global_table(&req).unwrap();
5019 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5020 assert_eq!(
5021 body["GlobalTableDescription"]["GlobalTableStatus"],
5022 "ACTIVE"
5023 );
5024
5025 let req = make_request(
5027 "DescribeGlobalTable",
5028 json!({ "GlobalTableName": "my-global" }),
5029 );
5030 let resp = svc.describe_global_table(&req).unwrap();
5031 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5032 assert_eq!(
5033 body["GlobalTableDescription"]["ReplicationGroup"]
5034 .as_array()
5035 .unwrap()
5036 .len(),
5037 2
5038 );
5039
5040 let req = make_request("ListGlobalTables", json!({}));
5042 let resp = svc.list_global_tables(&req).unwrap();
5043 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5044 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5045
5046 let req = make_request(
5048 "UpdateGlobalTable",
5049 json!({
5050 "GlobalTableName": "my-global",
5051 "ReplicaUpdates": [
5052 { "Create": { "RegionName": "ap-southeast-1" } }
5053 ]
5054 }),
5055 );
5056 let resp = svc.update_global_table(&req).unwrap();
5057 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5058 assert_eq!(
5059 body["GlobalTableDescription"]["ReplicationGroup"]
5060 .as_array()
5061 .unwrap()
5062 .len(),
5063 3
5064 );
5065
5066 let req = make_request(
5068 "DescribeGlobalTableSettings",
5069 json!({ "GlobalTableName": "my-global" }),
5070 );
5071 let resp = svc.describe_global_table_settings(&req).unwrap();
5072 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5073 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5074
5075 let req = make_request(
5077 "UpdateGlobalTableSettings",
5078 json!({ "GlobalTableName": "my-global" }),
5079 );
5080 svc.update_global_table_settings(&req).unwrap();
5081 }
5082
5083 #[test]
5084 fn table_replica_auto_scaling() {
5085 let svc = make_service();
5086 create_test_table(&svc);
5087
5088 let req = make_request(
5089 "DescribeTableReplicaAutoScaling",
5090 json!({ "TableName": "test-table" }),
5091 );
5092 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5093 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5094 assert_eq!(
5095 body["TableAutoScalingDescription"]["TableName"],
5096 "test-table"
5097 );
5098
5099 let req = make_request(
5100 "UpdateTableReplicaAutoScaling",
5101 json!({ "TableName": "test-table" }),
5102 );
5103 svc.update_table_replica_auto_scaling(&req).unwrap();
5104 }
5105
5106 #[test]
5107 fn kinesis_streaming_lifecycle() {
5108 let svc = make_service();
5109 create_test_table(&svc);
5110
5111 let req = make_request(
5113 "EnableKinesisStreamingDestination",
5114 json!({
5115 "TableName": "test-table",
5116 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5117 }),
5118 );
5119 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5120 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5121 assert_eq!(body["DestinationStatus"], "ACTIVE");
5122
5123 let req = make_request(
5125 "DescribeKinesisStreamingDestination",
5126 json!({ "TableName": "test-table" }),
5127 );
5128 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5129 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5130 assert_eq!(
5131 body["KinesisDataStreamDestinations"]
5132 .as_array()
5133 .unwrap()
5134 .len(),
5135 1
5136 );
5137
5138 let req = make_request(
5140 "UpdateKinesisStreamingDestination",
5141 json!({
5142 "TableName": "test-table",
5143 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5144 "UpdateKinesisStreamingConfiguration": {
5145 "ApproximateCreationDateTimePrecision": "MICROSECOND"
5146 }
5147 }),
5148 );
5149 svc.update_kinesis_streaming_destination(&req).unwrap();
5150
5151 let req = make_request(
5153 "DisableKinesisStreamingDestination",
5154 json!({
5155 "TableName": "test-table",
5156 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5157 }),
5158 );
5159 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5160 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5161 assert_eq!(body["DestinationStatus"], "DISABLED");
5162 }
5163
5164 #[test]
5165 fn contributor_insights_lifecycle() {
5166 let svc = make_service();
5167 create_test_table(&svc);
5168
5169 let req = make_request(
5171 "DescribeContributorInsights",
5172 json!({ "TableName": "test-table" }),
5173 );
5174 let resp = svc.describe_contributor_insights(&req).unwrap();
5175 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5176 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5177
5178 let req = make_request(
5180 "UpdateContributorInsights",
5181 json!({
5182 "TableName": "test-table",
5183 "ContributorInsightsAction": "ENABLE"
5184 }),
5185 );
5186 let resp = svc.update_contributor_insights(&req).unwrap();
5187 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5188 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5189
5190 let req = make_request("ListContributorInsights", json!({}));
5192 let resp = svc.list_contributor_insights(&req).unwrap();
5193 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5194 assert_eq!(
5195 body["ContributorInsightsSummaries"]
5196 .as_array()
5197 .unwrap()
5198 .len(),
5199 1
5200 );
5201 }
5202
5203 #[test]
5204 fn export_lifecycle() {
5205 let svc = make_service();
5206 create_test_table(&svc);
5207
5208 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5209
5210 let req = make_request(
5212 "ExportTableToPointInTime",
5213 json!({
5214 "TableArn": table_arn,
5215 "S3Bucket": "my-bucket"
5216 }),
5217 );
5218 let resp = svc.export_table_to_point_in_time(&req).unwrap();
5219 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5220 let export_arn = body["ExportDescription"]["ExportArn"]
5221 .as_str()
5222 .unwrap()
5223 .to_string();
5224 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5225
5226 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5228 let resp = svc.describe_export(&req).unwrap();
5229 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5230 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5231
5232 let req = make_request("ListExports", json!({}));
5234 let resp = svc.list_exports(&req).unwrap();
5235 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5236 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5237 }
5238
5239 #[test]
5240 fn import_lifecycle() {
5241 let svc = make_service();
5242
5243 let req = make_request(
5244 "ImportTable",
5245 json!({
5246 "InputFormat": "DYNAMODB_JSON",
5247 "S3BucketSource": { "S3Bucket": "import-bucket" },
5248 "TableCreationParameters": {
5249 "TableName": "imported-table",
5250 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5251 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5252 }
5253 }),
5254 );
5255 let resp = svc.import_table(&req).unwrap();
5256 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5257 let import_arn = body["ImportTableDescription"]["ImportArn"]
5258 .as_str()
5259 .unwrap()
5260 .to_string();
5261 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5262
5263 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5265 let resp = svc.describe_import(&req).unwrap();
5266 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5267 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5268
5269 let req = make_request("ListImports", json!({}));
5271 let resp = svc.list_imports(&req).unwrap();
5272 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5273 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5274
5275 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5277 let resp = svc.describe_table(&req).unwrap();
5278 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5279 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5280 }
5281}