1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use chrono::Utc;
5use http::StatusCode;
6use serde_json::{json, Value};
7
8use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
9use fakecloud_core::validation::*;
10
11use crate::state::{
12 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
13 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
14 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
15 ReplicaDescription, SharedDynamoDbState,
16};
17
18pub struct DynamoDbService {
19 state: SharedDynamoDbState,
20}
21
22impl DynamoDbService {
23 pub fn new(state: SharedDynamoDbState) -> Self {
24 Self { state }
25 }
26
27 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
28 serde_json::from_slice(&req.body).map_err(|e| {
29 AwsServiceError::aws_error(
30 StatusCode::BAD_REQUEST,
31 "SerializationException",
32 format!("Invalid JSON: {e}"),
33 )
34 })
35 }
36
37 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
38 Ok(AwsResponse::json(
39 StatusCode::OK,
40 serde_json::to_vec(&body).unwrap(),
41 ))
42 }
43
44 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
47 let body = Self::parse_body(req)?;
48
49 let table_name = body["TableName"]
50 .as_str()
51 .ok_or_else(|| {
52 AwsServiceError::aws_error(
53 StatusCode::BAD_REQUEST,
54 "ValidationException",
55 "TableName is required",
56 )
57 })?
58 .to_string();
59
60 let key_schema = parse_key_schema(&body["KeySchema"])?;
61 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
62
63 for ks in &key_schema {
65 if !attribute_definitions
66 .iter()
67 .any(|ad| ad.attribute_name == ks.attribute_name)
68 {
69 return Err(AwsServiceError::aws_error(
70 StatusCode::BAD_REQUEST,
71 "ValidationException",
72 format!(
73 "One or more parameter values were invalid: \
74 Some index key attributes are not defined in AttributeDefinitions. \
75 Keys: [{}], AttributeDefinitions: [{}]",
76 ks.attribute_name,
77 attribute_definitions
78 .iter()
79 .map(|ad| ad.attribute_name.as_str())
80 .collect::<Vec<_>>()
81 .join(", ")
82 ),
83 ));
84 }
85 }
86
87 let billing_mode = body["BillingMode"]
88 .as_str()
89 .unwrap_or("PROVISIONED")
90 .to_string();
91
92 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
93 ProvisionedThroughput {
94 read_capacity_units: 0,
95 write_capacity_units: 0,
96 }
97 } else {
98 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
99 };
100
101 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
102 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
103 let tags = parse_tags(&body["Tags"]);
104
105 let mut state = self.state.write();
106
107 if state.tables.contains_key(&table_name) {
108 return Err(AwsServiceError::aws_error(
109 StatusCode::BAD_REQUEST,
110 "ResourceInUseException",
111 format!("Table already exists: {table_name}"),
112 ));
113 }
114
115 let arn = format!(
116 "arn:aws:dynamodb:{}:{}:table/{}",
117 state.region, state.account_id, table_name
118 );
119 let now = Utc::now();
120
121 let table = DynamoTable {
122 name: table_name.clone(),
123 arn: arn.clone(),
124 key_schema: key_schema.clone(),
125 attribute_definitions: attribute_definitions.clone(),
126 provisioned_throughput: provisioned_throughput.clone(),
127 items: Vec::new(),
128 gsi: gsi.clone(),
129 lsi: lsi.clone(),
130 tags,
131 created_at: now,
132 status: "ACTIVE".to_string(),
133 item_count: 0,
134 size_bytes: 0,
135 billing_mode: billing_mode.clone(),
136 ttl_attribute: None,
137 ttl_enabled: false,
138 resource_policy: None,
139 pitr_enabled: false,
140 kinesis_destinations: Vec::new(),
141 contributor_insights_status: "DISABLED".to_string(),
142 };
143
144 state.tables.insert(table_name, table);
145
146 let table_desc = build_table_description_json(
147 &arn,
148 &key_schema,
149 &attribute_definitions,
150 &provisioned_throughput,
151 &gsi,
152 &lsi,
153 &billing_mode,
154 now,
155 0,
156 0,
157 "ACTIVE",
158 );
159
160 Self::ok_json(json!({ "TableDescription": table_desc }))
161 }
162
163 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
164 let body = Self::parse_body(req)?;
165 let table_name = require_str(&body, "TableName")?;
166
167 let mut state = self.state.write();
168 let table = state.tables.remove(table_name).ok_or_else(|| {
169 AwsServiceError::aws_error(
170 StatusCode::BAD_REQUEST,
171 "ResourceNotFoundException",
172 format!("Requested resource not found: Table: {table_name} not found"),
173 )
174 })?;
175
176 let table_desc = build_table_description_json(
177 &table.arn,
178 &table.key_schema,
179 &table.attribute_definitions,
180 &table.provisioned_throughput,
181 &table.gsi,
182 &table.lsi,
183 &table.billing_mode,
184 table.created_at,
185 table.item_count,
186 table.size_bytes,
187 "DELETING",
188 );
189
190 Self::ok_json(json!({ "TableDescription": table_desc }))
191 }
192
193 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
194 let body = Self::parse_body(req)?;
195 let table_name = require_str(&body, "TableName")?;
196
197 let state = self.state.read();
198 let table = get_table(&state.tables, table_name)?;
199
200 let table_desc = build_table_description_json(
201 &table.arn,
202 &table.key_schema,
203 &table.attribute_definitions,
204 &table.provisioned_throughput,
205 &table.gsi,
206 &table.lsi,
207 &table.billing_mode,
208 table.created_at,
209 table.item_count,
210 table.size_bytes,
211 &table.status,
212 );
213
214 Self::ok_json(json!({ "Table": table_desc }))
215 }
216
217 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
218 let body = Self::parse_body(req)?;
219
220 validate_optional_string_length(
221 "exclusiveStartTableName",
222 body["ExclusiveStartTableName"].as_str(),
223 3,
224 255,
225 )?;
226 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
227
228 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
229 let exclusive_start = body["ExclusiveStartTableName"]
230 .as_str()
231 .map(|s| s.to_string());
232
233 let state = self.state.read();
234 let mut names: Vec<&String> = state.tables.keys().collect();
235 names.sort();
236
237 let start_idx = match &exclusive_start {
238 Some(start) => names
239 .iter()
240 .position(|n| n.as_str() > start.as_str())
241 .unwrap_or(names.len()),
242 None => 0,
243 };
244
245 let page: Vec<&str> = names
246 .iter()
247 .skip(start_idx)
248 .take(limit)
249 .map(|n| n.as_str())
250 .collect();
251
252 let mut result = json!({ "TableNames": page });
253
254 if start_idx + limit < names.len() {
255 if let Some(last) = page.last() {
256 result["LastEvaluatedTableName"] = json!(last);
257 }
258 }
259
260 Self::ok_json(result)
261 }
262
263 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
264 let body = Self::parse_body(req)?;
265 let table_name = require_str(&body, "TableName")?;
266
267 let mut state = self.state.write();
268 let table = state.tables.get_mut(table_name).ok_or_else(|| {
269 AwsServiceError::aws_error(
270 StatusCode::BAD_REQUEST,
271 "ResourceNotFoundException",
272 format!("Requested resource not found: Table: {table_name} not found"),
273 )
274 })?;
275
276 if let Some(pt) = body.get("ProvisionedThroughput") {
277 if let Ok(throughput) = parse_provisioned_throughput(pt) {
278 table.provisioned_throughput = throughput;
279 }
280 }
281
282 if let Some(bm) = body["BillingMode"].as_str() {
283 table.billing_mode = bm.to_string();
284 }
285
286 let table_desc = build_table_description_json(
287 &table.arn,
288 &table.key_schema,
289 &table.attribute_definitions,
290 &table.provisioned_throughput,
291 &table.gsi,
292 &table.lsi,
293 &table.billing_mode,
294 table.created_at,
295 table.item_count,
296 table.size_bytes,
297 &table.status,
298 );
299
300 Self::ok_json(json!({ "TableDescription": table_desc }))
301 }
302
303 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
306 let body = Self::parse_body(req)?;
307 let table_name = require_str(&body, "TableName")?;
308 let item = require_object(&body, "Item")?;
309
310 let mut state = self.state.write();
311 let table = get_table_mut(&mut state.tables, table_name)?;
312
313 validate_key_in_item(table, &item)?;
314
315 let condition = body["ConditionExpression"].as_str();
316 let expr_attr_names = parse_expression_attribute_names(&body);
317 let expr_attr_values = parse_expression_attribute_values(&body);
318
319 let key = extract_key(table, &item);
320 let existing_idx = table.find_item_index(&key);
321
322 if let Some(cond) = condition {
323 let existing = existing_idx.map(|i| &table.items[i]);
324 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
325 }
326
327 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
328 let old_item = if return_values == "ALL_OLD" {
329 existing_idx.map(|i| table.items[i].clone())
330 } else {
331 None
332 };
333
334 if let Some(idx) = existing_idx {
335 table.items[idx] = item;
336 } else {
337 table.items.push(item);
338 }
339
340 table.recalculate_stats();
341
342 let mut result = json!({});
343 if let Some(old) = old_item {
344 result["Attributes"] = json!(old);
345 }
346
347 Self::ok_json(result)
348 }
349
350 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
351 let body = Self::parse_body(req)?;
352 let table_name = require_str(&body, "TableName")?;
353 let key = require_object(&body, "Key")?;
354
355 let state = self.state.read();
356 let table = get_table(&state.tables, table_name)?;
357
358 let result = match table.find_item_index(&key) {
359 Some(idx) => {
360 let item = &table.items[idx];
361 let projected = project_item(item, &body);
362 json!({ "Item": projected })
363 }
364 None => json!({}),
365 };
366
367 Self::ok_json(result)
368 }
369
370 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
371 let body = Self::parse_body(req)?;
372
373 validate_optional_enum_value(
374 "conditionalOperator",
375 &body["ConditionalOperator"],
376 &["AND", "OR"],
377 )?;
378 validate_optional_enum_value(
379 "returnConsumedCapacity",
380 &body["ReturnConsumedCapacity"],
381 &["INDEXES", "TOTAL", "NONE"],
382 )?;
383 validate_optional_enum_value(
384 "returnValues",
385 &body["ReturnValues"],
386 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
387 )?;
388 validate_optional_enum_value(
389 "returnItemCollectionMetrics",
390 &body["ReturnItemCollectionMetrics"],
391 &["SIZE", "NONE"],
392 )?;
393 validate_optional_enum_value(
394 "returnValuesOnConditionCheckFailure",
395 &body["ReturnValuesOnConditionCheckFailure"],
396 &["ALL_OLD", "NONE"],
397 )?;
398
399 let table_name = require_str(&body, "TableName")?;
400 let key = require_object(&body, "Key")?;
401
402 let mut state = self.state.write();
403 let table = get_table_mut(&mut state.tables, table_name)?;
404
405 let condition = body["ConditionExpression"].as_str();
406 let expr_attr_names = parse_expression_attribute_names(&body);
407 let expr_attr_values = parse_expression_attribute_values(&body);
408
409 let existing_idx = table.find_item_index(&key);
410
411 if let Some(cond) = condition {
412 let existing = existing_idx.map(|i| &table.items[i]);
413 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
414 }
415
416 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
417
418 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
419 let return_icm = body["ReturnItemCollectionMetrics"]
420 .as_str()
421 .unwrap_or("NONE");
422
423 let mut result = json!({});
424
425 if let Some(idx) = existing_idx {
426 if return_values == "ALL_OLD" {
427 result["Attributes"] = json!(table.items[idx]);
428 }
429 table.items.remove(idx);
430 table.recalculate_stats();
431 }
432
433 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
434 result["ConsumedCapacity"] = json!({
435 "TableName": table_name,
436 "CapacityUnits": 1.0,
437 });
438 }
439
440 if return_icm == "SIZE" {
441 result["ItemCollectionMetrics"] = json!({});
442 }
443
444 Self::ok_json(result)
445 }
446
447 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
448 let body = Self::parse_body(req)?;
449 let table_name = require_str(&body, "TableName")?;
450 let key = require_object(&body, "Key")?;
451
452 let mut state = self.state.write();
453 let table = get_table_mut(&mut state.tables, table_name)?;
454
455 validate_key_attributes_in_key(table, &key)?;
456
457 let condition = body["ConditionExpression"].as_str();
458 let expr_attr_names = parse_expression_attribute_names(&body);
459 let expr_attr_values = parse_expression_attribute_values(&body);
460 let update_expression = body["UpdateExpression"].as_str();
461
462 let existing_idx = table.find_item_index(&key);
463
464 if let Some(cond) = condition {
465 let existing = existing_idx.map(|i| &table.items[i]);
466 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
467 }
468
469 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
470
471 let idx = match existing_idx {
472 Some(i) => i,
473 None => {
474 let mut new_item = HashMap::new();
475 for (k, v) in &key {
476 new_item.insert(k.clone(), v.clone());
477 }
478 table.items.push(new_item);
479 table.items.len() - 1
480 }
481 };
482
483 let old_item = if return_values == "ALL_OLD" {
484 Some(table.items[idx].clone())
485 } else {
486 None
487 };
488
489 if let Some(expr) = update_expression {
490 apply_update_expression(
491 &mut table.items[idx],
492 expr,
493 &expr_attr_names,
494 &expr_attr_values,
495 )?;
496 }
497
498 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
499 Some(table.items[idx].clone())
500 } else {
501 None
502 };
503
504 table.recalculate_stats();
505
506 let mut result = json!({});
507 if let Some(old) = old_item {
508 result["Attributes"] = json!(old);
509 } else if let Some(new) = new_item {
510 result["Attributes"] = json!(new);
511 }
512
513 Self::ok_json(result)
514 }
515
516 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
519 let body = Self::parse_body(req)?;
520 let table_name = require_str(&body, "TableName")?;
521
522 let state = self.state.read();
523 let table = get_table(&state.tables, table_name)?;
524
525 let expr_attr_names = parse_expression_attribute_names(&body);
526 let expr_attr_values = parse_expression_attribute_values(&body);
527
528 let key_condition = body["KeyConditionExpression"].as_str();
529 let filter_expression = body["FilterExpression"].as_str();
530 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
531 let limit = body["Limit"].as_i64().map(|l| l as usize);
532 let index_name = body["IndexName"].as_str();
533
534 let (items_to_scan, hash_key_name, range_key_name): (
535 &[HashMap<String, AttributeValue>],
536 String,
537 Option<String>,
538 ) = if let Some(idx_name) = index_name {
539 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
540 let hk = gsi
541 .key_schema
542 .iter()
543 .find(|k| k.key_type == "HASH")
544 .map(|k| k.attribute_name.clone())
545 .unwrap_or_default();
546 let rk = gsi
547 .key_schema
548 .iter()
549 .find(|k| k.key_type == "RANGE")
550 .map(|k| k.attribute_name.clone());
551 (&table.items, hk, rk)
552 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
553 let hk = lsi
554 .key_schema
555 .iter()
556 .find(|k| k.key_type == "HASH")
557 .map(|k| k.attribute_name.clone())
558 .unwrap_or_default();
559 let rk = lsi
560 .key_schema
561 .iter()
562 .find(|k| k.key_type == "RANGE")
563 .map(|k| k.attribute_name.clone());
564 (&table.items, hk, rk)
565 } else {
566 return Err(AwsServiceError::aws_error(
567 StatusCode::BAD_REQUEST,
568 "ValidationException",
569 format!("The table does not have the specified index: {idx_name}"),
570 ));
571 }
572 } else {
573 (
574 &table.items[..],
575 table.hash_key_name().to_string(),
576 table.range_key_name().map(|s| s.to_string()),
577 )
578 };
579
580 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
581 .iter()
582 .filter(|item| {
583 if let Some(kc) = key_condition {
584 evaluate_key_condition(
585 kc,
586 item,
587 &hash_key_name,
588 range_key_name.as_deref(),
589 &expr_attr_names,
590 &expr_attr_values,
591 )
592 } else {
593 true
594 }
595 })
596 .collect();
597
598 if let Some(ref rk) = range_key_name {
599 matched.sort_by(|a, b| {
600 let av = a.get(rk.as_str());
601 let bv = b.get(rk.as_str());
602 compare_attribute_values(av, bv)
603 });
604 if !scan_forward {
605 matched.reverse();
606 }
607 }
608
609 if let Some(filter) = filter_expression {
610 matched.retain(|item| {
611 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
612 });
613 }
614
615 let scanned_count = matched.len();
616
617 if let Some(lim) = limit {
618 matched.truncate(lim);
619 }
620
621 let items: Vec<Value> = matched
622 .iter()
623 .map(|item| {
624 let projected = project_item(item, &body);
625 json!(projected)
626 })
627 .collect();
628
629 Self::ok_json(json!({
630 "Items": items,
631 "Count": items.len(),
632 "ScannedCount": scanned_count,
633 }))
634 }
635
636 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
637 let body = Self::parse_body(req)?;
638 let table_name = require_str(&body, "TableName")?;
639
640 let state = self.state.read();
641 let table = get_table(&state.tables, table_name)?;
642
643 let expr_attr_names = parse_expression_attribute_names(&body);
644 let expr_attr_values = parse_expression_attribute_values(&body);
645 let filter_expression = body["FilterExpression"].as_str();
646 let limit = body["Limit"].as_i64().map(|l| l as usize);
647
648 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
649 let scanned_count = matched.len();
650
651 if let Some(filter) = filter_expression {
652 matched.retain(|item| {
653 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
654 });
655 }
656
657 if let Some(lim) = limit {
658 matched.truncate(lim);
659 }
660
661 let items: Vec<Value> = matched
662 .iter()
663 .map(|item| {
664 let projected = project_item(item, &body);
665 json!(projected)
666 })
667 .collect();
668
669 Self::ok_json(json!({
670 "Items": items,
671 "Count": items.len(),
672 "ScannedCount": scanned_count,
673 }))
674 }
675
676 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
679 let body = Self::parse_body(req)?;
680
681 validate_optional_enum_value(
682 "returnConsumedCapacity",
683 &body["ReturnConsumedCapacity"],
684 &["INDEXES", "TOTAL", "NONE"],
685 )?;
686
687 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
688
689 let request_items = body["RequestItems"]
690 .as_object()
691 .ok_or_else(|| {
692 AwsServiceError::aws_error(
693 StatusCode::BAD_REQUEST,
694 "ValidationException",
695 "RequestItems is required",
696 )
697 })?
698 .clone();
699
700 let state = self.state.read();
701 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
702 let mut consumed_capacity: Vec<Value> = Vec::new();
703
704 for (table_name, params) in &request_items {
705 let table = get_table(&state.tables, table_name)?;
706 let keys = params["Keys"].as_array().ok_or_else(|| {
707 AwsServiceError::aws_error(
708 StatusCode::BAD_REQUEST,
709 "ValidationException",
710 "Keys is required",
711 )
712 })?;
713
714 let mut items = Vec::new();
715 for key_val in keys {
716 let key: HashMap<String, AttributeValue> =
717 serde_json::from_value(key_val.clone()).unwrap_or_default();
718 if let Some(idx) = table.find_item_index(&key) {
719 items.push(json!(table.items[idx]));
720 }
721 }
722 responses.insert(table_name.clone(), items);
723
724 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
725 consumed_capacity.push(json!({
726 "TableName": table_name,
727 "CapacityUnits": 1.0,
728 }));
729 }
730 }
731
732 let mut result = json!({
733 "Responses": responses,
734 "UnprocessedKeys": {},
735 });
736
737 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
738 result["ConsumedCapacity"] = json!(consumed_capacity);
739 }
740
741 Self::ok_json(result)
742 }
743
744 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
745 let body = Self::parse_body(req)?;
746
747 validate_optional_enum_value(
748 "returnConsumedCapacity",
749 &body["ReturnConsumedCapacity"],
750 &["INDEXES", "TOTAL", "NONE"],
751 )?;
752 validate_optional_enum_value(
753 "returnItemCollectionMetrics",
754 &body["ReturnItemCollectionMetrics"],
755 &["SIZE", "NONE"],
756 )?;
757
758 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
759 let return_icm = body["ReturnItemCollectionMetrics"]
760 .as_str()
761 .unwrap_or("NONE");
762
763 let request_items = body["RequestItems"]
764 .as_object()
765 .ok_or_else(|| {
766 AwsServiceError::aws_error(
767 StatusCode::BAD_REQUEST,
768 "ValidationException",
769 "RequestItems is required",
770 )
771 })?
772 .clone();
773
774 let mut state = self.state.write();
775 let mut consumed_capacity: Vec<Value> = Vec::new();
776 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
777
778 for (table_name, requests) in &request_items {
779 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
780 AwsServiceError::aws_error(
781 StatusCode::BAD_REQUEST,
782 "ResourceNotFoundException",
783 format!("Requested resource not found: Table: {table_name} not found"),
784 )
785 })?;
786
787 let reqs = requests.as_array().ok_or_else(|| {
788 AwsServiceError::aws_error(
789 StatusCode::BAD_REQUEST,
790 "ValidationException",
791 "Request list must be an array",
792 )
793 })?;
794
795 for request in reqs {
796 if let Some(put_req) = request.get("PutRequest") {
797 let item: HashMap<String, AttributeValue> =
798 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
799 let key = extract_key(table, &item);
800 if let Some(idx) = table.find_item_index(&key) {
801 table.items[idx] = item;
802 } else {
803 table.items.push(item);
804 }
805 } else if let Some(del_req) = request.get("DeleteRequest") {
806 let key: HashMap<String, AttributeValue> =
807 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
808 if let Some(idx) = table.find_item_index(&key) {
809 table.items.remove(idx);
810 }
811 }
812 }
813
814 table.recalculate_stats();
815
816 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
817 consumed_capacity.push(json!({
818 "TableName": table_name,
819 "CapacityUnits": 1.0,
820 }));
821 }
822
823 if return_icm == "SIZE" {
824 item_collection_metrics.insert(table_name.clone(), vec![]);
825 }
826 }
827
828 let mut result = json!({
829 "UnprocessedItems": {},
830 });
831
832 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
833 result["ConsumedCapacity"] = json!(consumed_capacity);
834 }
835
836 if return_icm == "SIZE" {
837 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
838 }
839
840 Self::ok_json(result)
841 }
842
843 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
846 let body = Self::parse_body(req)?;
847 let resource_arn = require_str(&body, "ResourceArn")?;
848 let tags_arr = body["Tags"].as_array().ok_or_else(|| {
849 AwsServiceError::aws_error(
850 StatusCode::BAD_REQUEST,
851 "ValidationException",
852 "Tags is required",
853 )
854 })?;
855
856 let mut state = self.state.write();
857 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
858
859 for tag in tags_arr {
860 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
861 table.tags.insert(k.to_string(), v.to_string());
862 }
863 }
864
865 Self::ok_json(json!({}))
866 }
867
868 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
869 let body = Self::parse_body(req)?;
870 let resource_arn = require_str(&body, "ResourceArn")?;
871 let tag_keys = body["TagKeys"].as_array().ok_or_else(|| {
872 AwsServiceError::aws_error(
873 StatusCode::BAD_REQUEST,
874 "ValidationException",
875 "TagKeys is required",
876 )
877 })?;
878
879 let mut state = self.state.write();
880 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
881
882 for key in tag_keys {
883 if let Some(k) = key.as_str() {
884 table.tags.remove(k);
885 }
886 }
887
888 Self::ok_json(json!({}))
889 }
890
891 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
892 let body = Self::parse_body(req)?;
893 let resource_arn = require_str(&body, "ResourceArn")?;
894
895 let state = self.state.read();
896 let table = find_table_by_arn(&state.tables, resource_arn)?;
897
898 let tags: Vec<Value> = table
899 .tags
900 .iter()
901 .map(|(k, v)| json!({"Key": k, "Value": v}))
902 .collect();
903
904 Self::ok_json(json!({ "Tags": tags }))
905 }
906
907 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
910 let body = Self::parse_body(req)?;
911 validate_optional_enum_value(
912 "returnConsumedCapacity",
913 &body["ReturnConsumedCapacity"],
914 &["INDEXES", "TOTAL", "NONE"],
915 )?;
916 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
917 AwsServiceError::aws_error(
918 StatusCode::BAD_REQUEST,
919 "ValidationException",
920 "TransactItems is required",
921 )
922 })?;
923
924 let state = self.state.read();
925 let mut responses: Vec<Value> = Vec::new();
926
927 for ti in transact_items {
928 let get = &ti["Get"];
929 let table_name = get["TableName"].as_str().ok_or_else(|| {
930 AwsServiceError::aws_error(
931 StatusCode::BAD_REQUEST,
932 "ValidationException",
933 "TableName is required in Get",
934 )
935 })?;
936 let key: HashMap<String, AttributeValue> =
937 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
938
939 let table = get_table(&state.tables, table_name)?;
940 match table.find_item_index(&key) {
941 Some(idx) => {
942 responses.push(json!({ "Item": table.items[idx] }));
943 }
944 None => {
945 responses.push(json!({}));
946 }
947 }
948 }
949
950 Self::ok_json(json!({ "Responses": responses }))
951 }
952
953 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
954 let body = Self::parse_body(req)?;
955 validate_optional_string_length(
956 "clientRequestToken",
957 body["ClientRequestToken"].as_str(),
958 1,
959 36,
960 )?;
961 validate_optional_enum_value(
962 "returnConsumedCapacity",
963 &body["ReturnConsumedCapacity"],
964 &["INDEXES", "TOTAL", "NONE"],
965 )?;
966 validate_optional_enum_value(
967 "returnItemCollectionMetrics",
968 &body["ReturnItemCollectionMetrics"],
969 &["SIZE", "NONE"],
970 )?;
971 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
972 AwsServiceError::aws_error(
973 StatusCode::BAD_REQUEST,
974 "ValidationException",
975 "TransactItems is required",
976 )
977 })?;
978
979 let mut state = self.state.write();
980
981 let mut cancellation_reasons: Vec<Value> = Vec::new();
983 let mut any_failed = false;
984
985 for ti in transact_items {
986 if let Some(put) = ti.get("Put") {
987 let table_name = put["TableName"].as_str().unwrap_or_default();
988 let item: HashMap<String, AttributeValue> =
989 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
990 let condition = put["ConditionExpression"].as_str();
991
992 if let Some(cond) = condition {
993 let table = get_table(&state.tables, table_name)?;
994 let expr_attr_names = parse_expression_attribute_names(put);
995 let expr_attr_values = parse_expression_attribute_values(put);
996 let key = extract_key(table, &item);
997 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
998 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
999 .is_err()
1000 {
1001 cancellation_reasons.push(json!({
1002 "Code": "ConditionalCheckFailed",
1003 "Message": "The conditional request failed"
1004 }));
1005 any_failed = true;
1006 continue;
1007 }
1008 }
1009 cancellation_reasons.push(json!({ "Code": "None" }));
1010 } else if let Some(delete) = ti.get("Delete") {
1011 let table_name = delete["TableName"].as_str().unwrap_or_default();
1012 let key: HashMap<String, AttributeValue> =
1013 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1014 let condition = delete["ConditionExpression"].as_str();
1015
1016 if let Some(cond) = condition {
1017 let table = get_table(&state.tables, table_name)?;
1018 let expr_attr_names = parse_expression_attribute_names(delete);
1019 let expr_attr_values = parse_expression_attribute_values(delete);
1020 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1021 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1022 .is_err()
1023 {
1024 cancellation_reasons.push(json!({
1025 "Code": "ConditionalCheckFailed",
1026 "Message": "The conditional request failed"
1027 }));
1028 any_failed = true;
1029 continue;
1030 }
1031 }
1032 cancellation_reasons.push(json!({ "Code": "None" }));
1033 } else if let Some(update) = ti.get("Update") {
1034 let table_name = update["TableName"].as_str().unwrap_or_default();
1035 let key: HashMap<String, AttributeValue> =
1036 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1037 let condition = update["ConditionExpression"].as_str();
1038
1039 if let Some(cond) = condition {
1040 let table = get_table(&state.tables, table_name)?;
1041 let expr_attr_names = parse_expression_attribute_names(update);
1042 let expr_attr_values = parse_expression_attribute_values(update);
1043 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1044 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1045 .is_err()
1046 {
1047 cancellation_reasons.push(json!({
1048 "Code": "ConditionalCheckFailed",
1049 "Message": "The conditional request failed"
1050 }));
1051 any_failed = true;
1052 continue;
1053 }
1054 }
1055 cancellation_reasons.push(json!({ "Code": "None" }));
1056 } else if let Some(check) = ti.get("ConditionCheck") {
1057 let table_name = check["TableName"].as_str().unwrap_or_default();
1058 let key: HashMap<String, AttributeValue> =
1059 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1060 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1061
1062 let table = get_table(&state.tables, table_name)?;
1063 let expr_attr_names = parse_expression_attribute_names(check);
1064 let expr_attr_values = parse_expression_attribute_values(check);
1065 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1066 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1067 {
1068 cancellation_reasons.push(json!({
1069 "Code": "ConditionalCheckFailed",
1070 "Message": "The conditional request failed"
1071 }));
1072 any_failed = true;
1073 continue;
1074 }
1075 cancellation_reasons.push(json!({ "Code": "None" }));
1076 } else {
1077 cancellation_reasons.push(json!({ "Code": "None" }));
1078 }
1079 }
1080
1081 if any_failed {
1082 let error_body = json!({
1083 "__type": "TransactionCanceledException",
1084 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1085 "CancellationReasons": cancellation_reasons
1086 });
1087 return Ok(AwsResponse::json(
1088 StatusCode::BAD_REQUEST,
1089 serde_json::to_vec(&error_body).unwrap(),
1090 ));
1091 }
1092
1093 for ti in transact_items {
1095 if let Some(put) = ti.get("Put") {
1096 let table_name = put["TableName"].as_str().unwrap_or_default();
1097 let item: HashMap<String, AttributeValue> =
1098 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1099 let table = get_table_mut(&mut state.tables, table_name)?;
1100 let key = extract_key(table, &item);
1101 if let Some(idx) = table.find_item_index(&key) {
1102 table.items[idx] = item;
1103 } else {
1104 table.items.push(item);
1105 }
1106 table.recalculate_stats();
1107 } else if let Some(delete) = ti.get("Delete") {
1108 let table_name = delete["TableName"].as_str().unwrap_or_default();
1109 let key: HashMap<String, AttributeValue> =
1110 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1111 let table = get_table_mut(&mut state.tables, table_name)?;
1112 if let Some(idx) = table.find_item_index(&key) {
1113 table.items.remove(idx);
1114 }
1115 table.recalculate_stats();
1116 } else if let Some(update) = ti.get("Update") {
1117 let table_name = update["TableName"].as_str().unwrap_or_default();
1118 let key: HashMap<String, AttributeValue> =
1119 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1120 let update_expression = update["UpdateExpression"].as_str();
1121 let expr_attr_names = parse_expression_attribute_names(update);
1122 let expr_attr_values = parse_expression_attribute_values(update);
1123
1124 let table = get_table_mut(&mut state.tables, table_name)?;
1125 let idx = match table.find_item_index(&key) {
1126 Some(i) => i,
1127 None => {
1128 let mut new_item = HashMap::new();
1129 for (k, v) in &key {
1130 new_item.insert(k.clone(), v.clone());
1131 }
1132 table.items.push(new_item);
1133 table.items.len() - 1
1134 }
1135 };
1136
1137 if let Some(expr) = update_expression {
1138 apply_update_expression(
1139 &mut table.items[idx],
1140 expr,
1141 &expr_attr_names,
1142 &expr_attr_values,
1143 )?;
1144 }
1145 table.recalculate_stats();
1146 }
1147 }
1149
1150 Self::ok_json(json!({}))
1151 }
1152
1153 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1154 let body = Self::parse_body(req)?;
1155 let statement = require_str(&body, "Statement")?;
1156 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1157
1158 execute_partiql_statement(&self.state, statement, ¶meters)
1159 }
1160
1161 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1162 let body = Self::parse_body(req)?;
1163 validate_optional_enum_value(
1164 "returnConsumedCapacity",
1165 &body["ReturnConsumedCapacity"],
1166 &["INDEXES", "TOTAL", "NONE"],
1167 )?;
1168 let statements = body["Statements"].as_array().ok_or_else(|| {
1169 AwsServiceError::aws_error(
1170 StatusCode::BAD_REQUEST,
1171 "ValidationException",
1172 "Statements is required",
1173 )
1174 })?;
1175
1176 let mut responses: Vec<Value> = Vec::new();
1177 for stmt_obj in statements {
1178 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1179 let parameters = stmt_obj["Parameters"]
1180 .as_array()
1181 .cloned()
1182 .unwrap_or_default();
1183
1184 match execute_partiql_statement(&self.state, statement, ¶meters) {
1185 Ok(resp) => {
1186 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1187 responses.push(resp_body);
1188 }
1189 Err(e) => {
1190 responses.push(json!({
1191 "Error": {
1192 "Code": "ValidationException",
1193 "Message": e.to_string()
1194 }
1195 }));
1196 }
1197 }
1198 }
1199
1200 Self::ok_json(json!({ "Responses": responses }))
1201 }
1202
1203 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1204 let body = Self::parse_body(req)?;
1205 validate_optional_string_length(
1206 "clientRequestToken",
1207 body["ClientRequestToken"].as_str(),
1208 1,
1209 36,
1210 )?;
1211 validate_optional_enum_value(
1212 "returnConsumedCapacity",
1213 &body["ReturnConsumedCapacity"],
1214 &["INDEXES", "TOTAL", "NONE"],
1215 )?;
1216 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1217 AwsServiceError::aws_error(
1218 StatusCode::BAD_REQUEST,
1219 "ValidationException",
1220 "TransactStatements is required",
1221 )
1222 })?;
1223
1224 let mut results: Vec<Result<Value, String>> = Vec::new();
1226 for stmt_obj in transact_statements {
1227 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1228 let parameters = stmt_obj["Parameters"]
1229 .as_array()
1230 .cloned()
1231 .unwrap_or_default();
1232
1233 match execute_partiql_statement(&self.state, statement, ¶meters) {
1234 Ok(resp) => {
1235 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1236 results.push(Ok(resp_body));
1237 }
1238 Err(e) => {
1239 results.push(Err(e.to_string()));
1240 }
1241 }
1242 }
1243
1244 let any_failed = results.iter().any(|r| r.is_err());
1245 if any_failed {
1246 let reasons: Vec<Value> = results
1247 .iter()
1248 .map(|r| match r {
1249 Ok(_) => json!({ "Code": "None" }),
1250 Err(msg) => json!({
1251 "Code": "ValidationException",
1252 "Message": msg
1253 }),
1254 })
1255 .collect();
1256 let error_body = json!({
1257 "__type": "TransactionCanceledException",
1258 "message": "Transaction cancelled due to validation errors",
1259 "CancellationReasons": reasons
1260 });
1261 return Ok(AwsResponse::json(
1262 StatusCode::BAD_REQUEST,
1263 serde_json::to_vec(&error_body).unwrap(),
1264 ));
1265 }
1266
1267 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1268 Self::ok_json(json!({ "Responses": responses }))
1269 }
1270
1271 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1274 let body = Self::parse_body(req)?;
1275 let table_name = require_str(&body, "TableName")?;
1276 let spec = &body["TimeToLiveSpecification"];
1277 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1278 AwsServiceError::aws_error(
1279 StatusCode::BAD_REQUEST,
1280 "ValidationException",
1281 "TimeToLiveSpecification.AttributeName is required",
1282 )
1283 })?;
1284 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1285 AwsServiceError::aws_error(
1286 StatusCode::BAD_REQUEST,
1287 "ValidationException",
1288 "TimeToLiveSpecification.Enabled is required",
1289 )
1290 })?;
1291
1292 let mut state = self.state.write();
1293 let table = get_table_mut(&mut state.tables, table_name)?;
1294
1295 if enabled {
1296 table.ttl_attribute = Some(attr_name.to_string());
1297 table.ttl_enabled = true;
1298 } else {
1299 table.ttl_enabled = false;
1300 }
1301
1302 Self::ok_json(json!({
1303 "TimeToLiveSpecification": {
1304 "AttributeName": attr_name,
1305 "Enabled": enabled
1306 }
1307 }))
1308 }
1309
1310 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1311 let body = Self::parse_body(req)?;
1312 let table_name = require_str(&body, "TableName")?;
1313
1314 let state = self.state.read();
1315 let table = get_table(&state.tables, table_name)?;
1316
1317 let status = if table.ttl_enabled {
1318 "ENABLED"
1319 } else {
1320 "DISABLED"
1321 };
1322
1323 let mut desc = json!({
1324 "TimeToLiveDescription": {
1325 "TimeToLiveStatus": status
1326 }
1327 });
1328
1329 if let Some(ref attr) = table.ttl_attribute {
1330 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1331 }
1332
1333 Self::ok_json(desc)
1334 }
1335
1336 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1339 let body = Self::parse_body(req)?;
1340 let resource_arn = require_str(&body, "ResourceArn")?;
1341 let policy = require_str(&body, "Policy")?;
1342
1343 let mut state = self.state.write();
1344 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1345 table.resource_policy = Some(policy.to_string());
1346
1347 let revision_id = uuid::Uuid::new_v4().to_string();
1348 Self::ok_json(json!({ "RevisionId": revision_id }))
1349 }
1350
1351 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352 let body = Self::parse_body(req)?;
1353 let resource_arn = require_str(&body, "ResourceArn")?;
1354
1355 let state = self.state.read();
1356 let table = find_table_by_arn(&state.tables, resource_arn)?;
1357
1358 match &table.resource_policy {
1359 Some(policy) => {
1360 let revision_id = uuid::Uuid::new_v4().to_string();
1361 Self::ok_json(json!({
1362 "Policy": policy,
1363 "RevisionId": revision_id
1364 }))
1365 }
1366 None => Self::ok_json(json!({ "Policy": null })),
1367 }
1368 }
1369
1370 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371 let body = Self::parse_body(req)?;
1372 let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374 let mut state = self.state.write();
1375 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1376 table.resource_policy = None;
1377
1378 Self::ok_json(json!({}))
1379 }
1380
1381 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1384 Self::ok_json(json!({
1385 "Endpoints": [{
1386 "Address": "dynamodb.us-east-1.amazonaws.com",
1387 "CachePeriodInMinutes": 1440
1388 }]
1389 }))
1390 }
1391
1392 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1393 Self::ok_json(json!({
1394 "AccountMaxReadCapacityUnits": 80000,
1395 "AccountMaxWriteCapacityUnits": 80000,
1396 "TableMaxReadCapacityUnits": 40000,
1397 "TableMaxWriteCapacityUnits": 40000
1398 }))
1399 }
1400
1401 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1404 let body = Self::parse_body(req)?;
1405 let table_name = require_str(&body, "TableName")?;
1406 let backup_name = require_str(&body, "BackupName")?;
1407
1408 let mut state = self.state.write();
1409 let table = get_table(&state.tables, table_name)?;
1410
1411 let backup_arn = format!(
1412 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1413 state.region,
1414 state.account_id,
1415 table_name,
1416 Utc::now().format("%Y%m%d%H%M%S")
1417 );
1418 let now = Utc::now();
1419
1420 let backup = BackupDescription {
1421 backup_arn: backup_arn.clone(),
1422 backup_name: backup_name.to_string(),
1423 table_name: table_name.to_string(),
1424 table_arn: table.arn.clone(),
1425 backup_status: "AVAILABLE".to_string(),
1426 backup_type: "USER".to_string(),
1427 backup_creation_date: now,
1428 key_schema: table.key_schema.clone(),
1429 attribute_definitions: table.attribute_definitions.clone(),
1430 provisioned_throughput: table.provisioned_throughput.clone(),
1431 billing_mode: table.billing_mode.clone(),
1432 item_count: table.item_count,
1433 size_bytes: table.size_bytes,
1434 };
1435
1436 state.backups.insert(backup_arn.clone(), backup);
1437
1438 Self::ok_json(json!({
1439 "BackupDetails": {
1440 "BackupArn": backup_arn,
1441 "BackupName": backup_name,
1442 "BackupStatus": "AVAILABLE",
1443 "BackupType": "USER",
1444 "BackupCreationDateTime": now.timestamp() as f64,
1445 "BackupSizeBytes": 0
1446 }
1447 }))
1448 }
1449
1450 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1451 let body = Self::parse_body(req)?;
1452 let backup_arn = require_str(&body, "BackupArn")?;
1453
1454 let mut state = self.state.write();
1455 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1456 AwsServiceError::aws_error(
1457 StatusCode::BAD_REQUEST,
1458 "BackupNotFoundException",
1459 format!("Backup not found: {backup_arn}"),
1460 )
1461 })?;
1462
1463 Self::ok_json(json!({
1464 "BackupDescription": {
1465 "BackupDetails": {
1466 "BackupArn": backup.backup_arn,
1467 "BackupName": backup.backup_name,
1468 "BackupStatus": "DELETED",
1469 "BackupType": backup.backup_type,
1470 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1471 "BackupSizeBytes": backup.size_bytes
1472 },
1473 "SourceTableDetails": {
1474 "TableName": backup.table_name,
1475 "TableArn": backup.table_arn,
1476 "TableId": uuid::Uuid::new_v4().to_string(),
1477 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1478 "AttributeName": ks.attribute_name,
1479 "KeyType": ks.key_type
1480 })).collect::<Vec<_>>(),
1481 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1482 "ProvisionedThroughput": {
1483 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1484 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1485 },
1486 "ItemCount": backup.item_count,
1487 "BillingMode": backup.billing_mode,
1488 "TableSizeBytes": backup.size_bytes
1489 },
1490 "SourceTableFeatureDetails": {}
1491 }
1492 }))
1493 }
1494
1495 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1496 let body = Self::parse_body(req)?;
1497 let backup_arn = require_str(&body, "BackupArn")?;
1498
1499 let state = self.state.read();
1500 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1501 AwsServiceError::aws_error(
1502 StatusCode::BAD_REQUEST,
1503 "BackupNotFoundException",
1504 format!("Backup not found: {backup_arn}"),
1505 )
1506 })?;
1507
1508 Self::ok_json(json!({
1509 "BackupDescription": {
1510 "BackupDetails": {
1511 "BackupArn": backup.backup_arn,
1512 "BackupName": backup.backup_name,
1513 "BackupStatus": backup.backup_status,
1514 "BackupType": backup.backup_type,
1515 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1516 "BackupSizeBytes": backup.size_bytes
1517 },
1518 "SourceTableDetails": {
1519 "TableName": backup.table_name,
1520 "TableArn": backup.table_arn,
1521 "TableId": uuid::Uuid::new_v4().to_string(),
1522 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1523 "AttributeName": ks.attribute_name,
1524 "KeyType": ks.key_type
1525 })).collect::<Vec<_>>(),
1526 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1527 "ProvisionedThroughput": {
1528 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1529 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1530 },
1531 "ItemCount": backup.item_count,
1532 "BillingMode": backup.billing_mode,
1533 "TableSizeBytes": backup.size_bytes
1534 },
1535 "SourceTableFeatureDetails": {}
1536 }
1537 }))
1538 }
1539
1540 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1541 let body = Self::parse_body(req)?;
1542 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
1543 validate_optional_string_length(
1544 "exclusiveStartBackupArn",
1545 body["ExclusiveStartBackupArn"].as_str(),
1546 37,
1547 1024,
1548 )?;
1549 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
1550 validate_optional_enum_value(
1551 "backupType",
1552 &body["BackupType"],
1553 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
1554 )?;
1555 let table_name = body["TableName"].as_str();
1556
1557 let state = self.state.read();
1558 let summaries: Vec<Value> = state
1559 .backups
1560 .values()
1561 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
1562 .map(|b| {
1563 json!({
1564 "TableName": b.table_name,
1565 "TableArn": b.table_arn,
1566 "BackupArn": b.backup_arn,
1567 "BackupName": b.backup_name,
1568 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
1569 "BackupStatus": b.backup_status,
1570 "BackupType": b.backup_type,
1571 "BackupSizeBytes": b.size_bytes
1572 })
1573 })
1574 .collect();
1575
1576 Self::ok_json(json!({
1577 "BackupSummaries": summaries
1578 }))
1579 }
1580
1581 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1582 let body = Self::parse_body(req)?;
1583 let backup_arn = require_str(&body, "BackupArn")?;
1584 let target_table_name = require_str(&body, "TargetTableName")?;
1585
1586 let mut state = self.state.write();
1587 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1588 AwsServiceError::aws_error(
1589 StatusCode::BAD_REQUEST,
1590 "BackupNotFoundException",
1591 format!("Backup not found: {backup_arn}"),
1592 )
1593 })?;
1594
1595 if state.tables.contains_key(target_table_name) {
1596 return Err(AwsServiceError::aws_error(
1597 StatusCode::BAD_REQUEST,
1598 "TableAlreadyExistsException",
1599 format!("Table already exists: {target_table_name}"),
1600 ));
1601 }
1602
1603 let now = Utc::now();
1604 let arn = format!(
1605 "arn:aws:dynamodb:{}:{}:table/{}",
1606 state.region, state.account_id, target_table_name
1607 );
1608
1609 let table = DynamoTable {
1610 name: target_table_name.to_string(),
1611 arn: arn.clone(),
1612 key_schema: backup.key_schema.clone(),
1613 attribute_definitions: backup.attribute_definitions.clone(),
1614 provisioned_throughput: backup.provisioned_throughput.clone(),
1615 items: Vec::new(),
1616 gsi: Vec::new(),
1617 lsi: Vec::new(),
1618 tags: HashMap::new(),
1619 created_at: now,
1620 status: "ACTIVE".to_string(),
1621 item_count: 0,
1622 size_bytes: 0,
1623 billing_mode: backup.billing_mode.clone(),
1624 ttl_attribute: None,
1625 ttl_enabled: false,
1626 resource_policy: None,
1627 pitr_enabled: false,
1628 kinesis_destinations: Vec::new(),
1629 contributor_insights_status: "DISABLED".to_string(),
1630 };
1631
1632 let desc = build_table_description(&table);
1633 state.tables.insert(target_table_name.to_string(), table);
1634
1635 Self::ok_json(json!({
1636 "TableDescription": desc
1637 }))
1638 }
1639
1640 fn restore_table_to_point_in_time(
1641 &self,
1642 req: &AwsRequest,
1643 ) -> Result<AwsResponse, AwsServiceError> {
1644 let body = Self::parse_body(req)?;
1645 let target_table_name = require_str(&body, "TargetTableName")?;
1646 let source_table_name = body["SourceTableName"].as_str();
1647 let source_table_arn = body["SourceTableArn"].as_str();
1648
1649 let mut state = self.state.write();
1650
1651 let source = if let Some(name) = source_table_name {
1653 get_table(&state.tables, name)?.clone()
1654 } else if let Some(arn) = source_table_arn {
1655 find_table_by_arn(&state.tables, arn)?.clone()
1656 } else {
1657 return Err(AwsServiceError::aws_error(
1658 StatusCode::BAD_REQUEST,
1659 "ValidationException",
1660 "SourceTableName or SourceTableArn is required",
1661 ));
1662 };
1663
1664 if state.tables.contains_key(target_table_name) {
1665 return Err(AwsServiceError::aws_error(
1666 StatusCode::BAD_REQUEST,
1667 "TableAlreadyExistsException",
1668 format!("Table already exists: {target_table_name}"),
1669 ));
1670 }
1671
1672 let now = Utc::now();
1673 let arn = format!(
1674 "arn:aws:dynamodb:{}:{}:table/{}",
1675 state.region, state.account_id, target_table_name
1676 );
1677
1678 let table = DynamoTable {
1679 name: target_table_name.to_string(),
1680 arn: arn.clone(),
1681 key_schema: source.key_schema.clone(),
1682 attribute_definitions: source.attribute_definitions.clone(),
1683 provisioned_throughput: source.provisioned_throughput.clone(),
1684 items: Vec::new(),
1685 gsi: Vec::new(),
1686 lsi: Vec::new(),
1687 tags: HashMap::new(),
1688 created_at: now,
1689 status: "ACTIVE".to_string(),
1690 item_count: 0,
1691 size_bytes: 0,
1692 billing_mode: source.billing_mode.clone(),
1693 ttl_attribute: None,
1694 ttl_enabled: false,
1695 resource_policy: None,
1696 pitr_enabled: false,
1697 kinesis_destinations: Vec::new(),
1698 contributor_insights_status: "DISABLED".to_string(),
1699 };
1700
1701 let desc = build_table_description(&table);
1702 state.tables.insert(target_table_name.to_string(), table);
1703
1704 Self::ok_json(json!({
1705 "TableDescription": desc
1706 }))
1707 }
1708
1709 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1710 let body = Self::parse_body(req)?;
1711 let table_name = require_str(&body, "TableName")?;
1712
1713 let pitr_spec = body["PointInTimeRecoverySpecification"]
1714 .as_object()
1715 .ok_or_else(|| {
1716 AwsServiceError::aws_error(
1717 StatusCode::BAD_REQUEST,
1718 "ValidationException",
1719 "PointInTimeRecoverySpecification is required",
1720 )
1721 })?;
1722 let enabled = pitr_spec
1723 .get("PointInTimeRecoveryEnabled")
1724 .and_then(|v| v.as_bool())
1725 .unwrap_or(false);
1726
1727 let mut state = self.state.write();
1728 let table = get_table_mut(&mut state.tables, table_name)?;
1729 table.pitr_enabled = enabled;
1730
1731 let status = if enabled { "ENABLED" } else { "DISABLED" };
1732 Self::ok_json(json!({
1733 "ContinuousBackupsDescription": {
1734 "ContinuousBackupsStatus": status,
1735 "PointInTimeRecoveryDescription": {
1736 "PointInTimeRecoveryStatus": status,
1737 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1738 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1739 }
1740 }
1741 }))
1742 }
1743
1744 fn describe_continuous_backups(
1745 &self,
1746 req: &AwsRequest,
1747 ) -> Result<AwsResponse, AwsServiceError> {
1748 let body = Self::parse_body(req)?;
1749 let table_name = require_str(&body, "TableName")?;
1750
1751 let state = self.state.read();
1752 let table = get_table(&state.tables, table_name)?;
1753
1754 let status = if table.pitr_enabled {
1755 "ENABLED"
1756 } else {
1757 "DISABLED"
1758 };
1759 Self::ok_json(json!({
1760 "ContinuousBackupsDescription": {
1761 "ContinuousBackupsStatus": status,
1762 "PointInTimeRecoveryDescription": {
1763 "PointInTimeRecoveryStatus": status,
1764 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
1765 "LatestRestorableDateTime": Utc::now().timestamp() as f64
1766 }
1767 }
1768 }))
1769 }
1770
1771 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1774 let body = Self::parse_body(req)?;
1775 let global_table_name = require_str(&body, "GlobalTableName")?;
1776 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1777
1778 let replication_group = body["ReplicationGroup"]
1779 .as_array()
1780 .ok_or_else(|| {
1781 AwsServiceError::aws_error(
1782 StatusCode::BAD_REQUEST,
1783 "ValidationException",
1784 "ReplicationGroup is required",
1785 )
1786 })?
1787 .iter()
1788 .filter_map(|r| {
1789 r["RegionName"].as_str().map(|rn| ReplicaDescription {
1790 region_name: rn.to_string(),
1791 replica_status: "ACTIVE".to_string(),
1792 })
1793 })
1794 .collect::<Vec<_>>();
1795
1796 let mut state = self.state.write();
1797
1798 if state.global_tables.contains_key(global_table_name) {
1799 return Err(AwsServiceError::aws_error(
1800 StatusCode::BAD_REQUEST,
1801 "GlobalTableAlreadyExistsException",
1802 format!("Global table already exists: {global_table_name}"),
1803 ));
1804 }
1805
1806 let arn = format!(
1807 "arn:aws:dynamodb::{}:global-table/{}",
1808 state.account_id, global_table_name
1809 );
1810 let now = Utc::now();
1811
1812 let gt = GlobalTableDescription {
1813 global_table_name: global_table_name.to_string(),
1814 global_table_arn: arn.clone(),
1815 global_table_status: "ACTIVE".to_string(),
1816 creation_date: now,
1817 replication_group: replication_group.clone(),
1818 };
1819
1820 state
1821 .global_tables
1822 .insert(global_table_name.to_string(), gt);
1823
1824 Self::ok_json(json!({
1825 "GlobalTableDescription": {
1826 "GlobalTableName": global_table_name,
1827 "GlobalTableArn": arn,
1828 "GlobalTableStatus": "ACTIVE",
1829 "CreationDateTime": now.timestamp() as f64,
1830 "ReplicationGroup": replication_group.iter().map(|r| json!({
1831 "RegionName": r.region_name,
1832 "ReplicaStatus": r.replica_status
1833 })).collect::<Vec<_>>()
1834 }
1835 }))
1836 }
1837
1838 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1839 let body = Self::parse_body(req)?;
1840 let global_table_name = require_str(&body, "GlobalTableName")?;
1841 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1842
1843 let state = self.state.read();
1844 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1845 AwsServiceError::aws_error(
1846 StatusCode::BAD_REQUEST,
1847 "GlobalTableNotFoundException",
1848 format!("Global table not found: {global_table_name}"),
1849 )
1850 })?;
1851
1852 Self::ok_json(json!({
1853 "GlobalTableDescription": {
1854 "GlobalTableName": gt.global_table_name,
1855 "GlobalTableArn": gt.global_table_arn,
1856 "GlobalTableStatus": gt.global_table_status,
1857 "CreationDateTime": gt.creation_date.timestamp() as f64,
1858 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1859 "RegionName": r.region_name,
1860 "ReplicaStatus": r.replica_status
1861 })).collect::<Vec<_>>()
1862 }
1863 }))
1864 }
1865
1866 fn describe_global_table_settings(
1867 &self,
1868 req: &AwsRequest,
1869 ) -> Result<AwsResponse, AwsServiceError> {
1870 let body = Self::parse_body(req)?;
1871 let global_table_name = require_str(&body, "GlobalTableName")?;
1872 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1873
1874 let state = self.state.read();
1875 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
1876 AwsServiceError::aws_error(
1877 StatusCode::BAD_REQUEST,
1878 "GlobalTableNotFoundException",
1879 format!("Global table not found: {global_table_name}"),
1880 )
1881 })?;
1882
1883 let replica_settings: Vec<Value> = gt
1884 .replication_group
1885 .iter()
1886 .map(|r| {
1887 json!({
1888 "RegionName": r.region_name,
1889 "ReplicaStatus": r.replica_status,
1890 "ReplicaProvisionedReadCapacityUnits": 0,
1891 "ReplicaProvisionedWriteCapacityUnits": 0
1892 })
1893 })
1894 .collect();
1895
1896 Self::ok_json(json!({
1897 "GlobalTableName": gt.global_table_name,
1898 "ReplicaSettings": replica_settings
1899 }))
1900 }
1901
1902 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1903 let body = Self::parse_body(req)?;
1904 validate_optional_string_length(
1905 "exclusiveStartGlobalTableName",
1906 body["ExclusiveStartGlobalTableName"].as_str(),
1907 3,
1908 255,
1909 )?;
1910 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
1911 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
1912
1913 let state = self.state.read();
1914 let tables: Vec<Value> = state
1915 .global_tables
1916 .values()
1917 .take(limit)
1918 .map(|gt| {
1919 json!({
1920 "GlobalTableName": gt.global_table_name,
1921 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1922 "RegionName": r.region_name
1923 })).collect::<Vec<_>>()
1924 })
1925 })
1926 .collect();
1927
1928 Self::ok_json(json!({
1929 "GlobalTables": tables
1930 }))
1931 }
1932
1933 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1934 let body = Self::parse_body(req)?;
1935 let global_table_name = require_str(&body, "GlobalTableName")?;
1936 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1937 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
1938
1939 let mut state = self.state.write();
1940 let gt = state
1941 .global_tables
1942 .get_mut(global_table_name)
1943 .ok_or_else(|| {
1944 AwsServiceError::aws_error(
1945 StatusCode::BAD_REQUEST,
1946 "GlobalTableNotFoundException",
1947 format!("Global table not found: {global_table_name}"),
1948 )
1949 })?;
1950
1951 if let Some(updates) = body["ReplicaUpdates"].as_array() {
1952 for update in updates {
1953 if let Some(create) = update["Create"].as_object() {
1954 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
1955 gt.replication_group.push(ReplicaDescription {
1956 region_name: region.to_string(),
1957 replica_status: "ACTIVE".to_string(),
1958 });
1959 }
1960 }
1961 if let Some(delete) = update["Delete"].as_object() {
1962 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
1963 gt.replication_group.retain(|r| r.region_name != region);
1964 }
1965 }
1966 }
1967 }
1968
1969 Self::ok_json(json!({
1970 "GlobalTableDescription": {
1971 "GlobalTableName": gt.global_table_name,
1972 "GlobalTableArn": gt.global_table_arn,
1973 "GlobalTableStatus": gt.global_table_status,
1974 "CreationDateTime": gt.creation_date.timestamp() as f64,
1975 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
1976 "RegionName": r.region_name,
1977 "ReplicaStatus": r.replica_status
1978 })).collect::<Vec<_>>()
1979 }
1980 }))
1981 }
1982
1983 fn update_global_table_settings(
1984 &self,
1985 req: &AwsRequest,
1986 ) -> Result<AwsResponse, AwsServiceError> {
1987 let body = Self::parse_body(req)?;
1988 let global_table_name = require_str(&body, "GlobalTableName")?;
1989 validate_string_length("globalTableName", global_table_name, 3, 255)?;
1990 validate_optional_enum_value(
1991 "globalTableBillingMode",
1992 &body["GlobalTableBillingMode"],
1993 &["PROVISIONED", "PAY_PER_REQUEST"],
1994 )?;
1995 validate_optional_range_i64(
1996 "globalTableProvisionedWriteCapacityUnits",
1997 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
1998 1,
1999 i64::MAX,
2000 )?;
2001
2002 let state = self.state.read();
2003 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2004 AwsServiceError::aws_error(
2005 StatusCode::BAD_REQUEST,
2006 "GlobalTableNotFoundException",
2007 format!("Global table not found: {global_table_name}"),
2008 )
2009 })?;
2010
2011 let replica_settings: Vec<Value> = gt
2012 .replication_group
2013 .iter()
2014 .map(|r| {
2015 json!({
2016 "RegionName": r.region_name,
2017 "ReplicaStatus": r.replica_status,
2018 "ReplicaProvisionedReadCapacityUnits": 0,
2019 "ReplicaProvisionedWriteCapacityUnits": 0
2020 })
2021 })
2022 .collect();
2023
2024 Self::ok_json(json!({
2025 "GlobalTableName": gt.global_table_name,
2026 "ReplicaSettings": replica_settings
2027 }))
2028 }
2029
2030 fn describe_table_replica_auto_scaling(
2031 &self,
2032 req: &AwsRequest,
2033 ) -> Result<AwsResponse, AwsServiceError> {
2034 let body = Self::parse_body(req)?;
2035 let table_name = require_str(&body, "TableName")?;
2036
2037 let state = self.state.read();
2038 let table = get_table(&state.tables, table_name)?;
2039
2040 Self::ok_json(json!({
2041 "TableAutoScalingDescription": {
2042 "TableName": table.name,
2043 "TableStatus": table.status,
2044 "Replicas": []
2045 }
2046 }))
2047 }
2048
2049 fn update_table_replica_auto_scaling(
2050 &self,
2051 req: &AwsRequest,
2052 ) -> Result<AwsResponse, AwsServiceError> {
2053 let body = Self::parse_body(req)?;
2054 let table_name = require_str(&body, "TableName")?;
2055
2056 let state = self.state.read();
2057 let table = get_table(&state.tables, table_name)?;
2058
2059 Self::ok_json(json!({
2060 "TableAutoScalingDescription": {
2061 "TableName": table.name,
2062 "TableStatus": table.status,
2063 "Replicas": []
2064 }
2065 }))
2066 }
2067
2068 fn enable_kinesis_streaming_destination(
2071 &self,
2072 req: &AwsRequest,
2073 ) -> Result<AwsResponse, AwsServiceError> {
2074 let body = Self::parse_body(req)?;
2075 let table_name = require_str(&body, "TableName")?;
2076 let stream_arn = require_str(&body, "StreamArn")?;
2077 let precision = body["EnableKinesisStreamingConfiguration"]
2078 ["ApproximateCreationDateTimePrecision"]
2079 .as_str()
2080 .unwrap_or("MILLISECOND");
2081
2082 let mut state = self.state.write();
2083 let table = get_table_mut(&mut state.tables, table_name)?;
2084
2085 table.kinesis_destinations.push(KinesisDestination {
2086 stream_arn: stream_arn.to_string(),
2087 destination_status: "ACTIVE".to_string(),
2088 approximate_creation_date_time_precision: precision.to_string(),
2089 });
2090
2091 Self::ok_json(json!({
2092 "TableName": table_name,
2093 "StreamArn": stream_arn,
2094 "DestinationStatus": "ACTIVE",
2095 "EnableKinesisStreamingConfiguration": {
2096 "ApproximateCreationDateTimePrecision": precision
2097 }
2098 }))
2099 }
2100
2101 fn disable_kinesis_streaming_destination(
2102 &self,
2103 req: &AwsRequest,
2104 ) -> Result<AwsResponse, AwsServiceError> {
2105 let body = Self::parse_body(req)?;
2106 let table_name = require_str(&body, "TableName")?;
2107 let stream_arn = require_str(&body, "StreamArn")?;
2108
2109 let mut state = self.state.write();
2110 let table = get_table_mut(&mut state.tables, table_name)?;
2111
2112 if let Some(dest) = table
2113 .kinesis_destinations
2114 .iter_mut()
2115 .find(|d| d.stream_arn == stream_arn)
2116 {
2117 dest.destination_status = "DISABLED".to_string();
2118 }
2119
2120 Self::ok_json(json!({
2121 "TableName": table_name,
2122 "StreamArn": stream_arn,
2123 "DestinationStatus": "DISABLED"
2124 }))
2125 }
2126
2127 fn describe_kinesis_streaming_destination(
2128 &self,
2129 req: &AwsRequest,
2130 ) -> Result<AwsResponse, AwsServiceError> {
2131 let body = Self::parse_body(req)?;
2132 let table_name = require_str(&body, "TableName")?;
2133
2134 let state = self.state.read();
2135 let table = get_table(&state.tables, table_name)?;
2136
2137 let destinations: Vec<Value> = table
2138 .kinesis_destinations
2139 .iter()
2140 .map(|d| {
2141 json!({
2142 "StreamArn": d.stream_arn,
2143 "DestinationStatus": d.destination_status,
2144 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2145 })
2146 })
2147 .collect();
2148
2149 Self::ok_json(json!({
2150 "TableName": table_name,
2151 "KinesisDataStreamDestinations": destinations
2152 }))
2153 }
2154
2155 fn update_kinesis_streaming_destination(
2156 &self,
2157 req: &AwsRequest,
2158 ) -> Result<AwsResponse, AwsServiceError> {
2159 let body = Self::parse_body(req)?;
2160 let table_name = require_str(&body, "TableName")?;
2161 let stream_arn = require_str(&body, "StreamArn")?;
2162 let precision = body["UpdateKinesisStreamingConfiguration"]
2163 ["ApproximateCreationDateTimePrecision"]
2164 .as_str()
2165 .unwrap_or("MILLISECOND");
2166
2167 let mut state = self.state.write();
2168 let table = get_table_mut(&mut state.tables, table_name)?;
2169
2170 if let Some(dest) = table
2171 .kinesis_destinations
2172 .iter_mut()
2173 .find(|d| d.stream_arn == stream_arn)
2174 {
2175 dest.approximate_creation_date_time_precision = precision.to_string();
2176 }
2177
2178 Self::ok_json(json!({
2179 "TableName": table_name,
2180 "StreamArn": stream_arn,
2181 "DestinationStatus": "ACTIVE",
2182 "UpdateKinesisStreamingConfiguration": {
2183 "ApproximateCreationDateTimePrecision": precision
2184 }
2185 }))
2186 }
2187
2188 fn describe_contributor_insights(
2191 &self,
2192 req: &AwsRequest,
2193 ) -> Result<AwsResponse, AwsServiceError> {
2194 let body = Self::parse_body(req)?;
2195 let table_name = require_str(&body, "TableName")?;
2196 let index_name = body["IndexName"].as_str();
2197
2198 let state = self.state.read();
2199 let table = get_table(&state.tables, table_name)?;
2200
2201 let mut result = json!({
2202 "TableName": table_name,
2203 "ContributorInsightsStatus": table.contributor_insights_status,
2204 "ContributorInsightsRuleList": []
2205 });
2206 if let Some(idx) = index_name {
2207 result["IndexName"] = json!(idx);
2208 }
2209
2210 Self::ok_json(result)
2211 }
2212
2213 fn update_contributor_insights(
2214 &self,
2215 req: &AwsRequest,
2216 ) -> Result<AwsResponse, AwsServiceError> {
2217 let body = Self::parse_body(req)?;
2218 let table_name = require_str(&body, "TableName")?;
2219 let action = require_str(&body, "ContributorInsightsAction")?;
2220 let index_name = body["IndexName"].as_str();
2221
2222 let mut state = self.state.write();
2223 let table = get_table_mut(&mut state.tables, table_name)?;
2224
2225 let status = match action {
2226 "ENABLE" => "ENABLED",
2227 "DISABLE" => "DISABLED",
2228 _ => {
2229 return Err(AwsServiceError::aws_error(
2230 StatusCode::BAD_REQUEST,
2231 "ValidationException",
2232 format!("Invalid ContributorInsightsAction: {action}"),
2233 ))
2234 }
2235 };
2236 table.contributor_insights_status = status.to_string();
2237
2238 let mut result = json!({
2239 "TableName": table_name,
2240 "ContributorInsightsStatus": status
2241 });
2242 if let Some(idx) = index_name {
2243 result["IndexName"] = json!(idx);
2244 }
2245
2246 Self::ok_json(result)
2247 }
2248
2249 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2250 let body = Self::parse_body(req)?;
2251 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2252 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2253 let table_name = body["TableName"].as_str();
2254
2255 let state = self.state.read();
2256 let summaries: Vec<Value> = state
2257 .tables
2258 .values()
2259 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2260 .map(|t| {
2261 json!({
2262 "TableName": t.name,
2263 "ContributorInsightsStatus": t.contributor_insights_status
2264 })
2265 })
2266 .collect();
2267
2268 Self::ok_json(json!({
2269 "ContributorInsightsSummaries": summaries
2270 }))
2271 }
2272
2273 fn export_table_to_point_in_time(
2276 &self,
2277 req: &AwsRequest,
2278 ) -> Result<AwsResponse, AwsServiceError> {
2279 let body = Self::parse_body(req)?;
2280 let table_arn = require_str(&body, "TableArn")?;
2281 let s3_bucket = require_str(&body, "S3Bucket")?;
2282 let s3_prefix = body["S3Prefix"].as_str();
2283 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2284
2285 let state = self.state.read();
2286 find_table_by_arn(&state.tables, table_arn)?;
2288
2289 let now = Utc::now();
2290 let export_arn = format!(
2291 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2292 state.region,
2293 state.account_id,
2294 table_arn.rsplit('/').next().unwrap_or("unknown"),
2295 uuid::Uuid::new_v4()
2296 );
2297
2298 let export = ExportDescription {
2299 export_arn: export_arn.clone(),
2300 export_status: "COMPLETED".to_string(),
2301 table_arn: table_arn.to_string(),
2302 s3_bucket: s3_bucket.to_string(),
2303 s3_prefix: s3_prefix.map(|s| s.to_string()),
2304 export_format: export_format.to_string(),
2305 start_time: now,
2306 end_time: now,
2307 export_time: now,
2308 item_count: 0,
2309 billed_size_bytes: 0,
2310 };
2311
2312 drop(state);
2313 let mut state = self.state.write();
2314 state.exports.insert(export_arn.clone(), export);
2315
2316 Self::ok_json(json!({
2317 "ExportDescription": {
2318 "ExportArn": export_arn,
2319 "ExportStatus": "COMPLETED",
2320 "TableArn": table_arn,
2321 "S3Bucket": s3_bucket,
2322 "S3Prefix": s3_prefix,
2323 "ExportFormat": export_format,
2324 "StartTime": now.timestamp() as f64,
2325 "EndTime": now.timestamp() as f64,
2326 "ExportTime": now.timestamp() as f64,
2327 "ItemCount": 0,
2328 "BilledSizeBytes": 0
2329 }
2330 }))
2331 }
2332
2333 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2334 let body = Self::parse_body(req)?;
2335 let export_arn = require_str(&body, "ExportArn")?;
2336
2337 let state = self.state.read();
2338 let export = state.exports.get(export_arn).ok_or_else(|| {
2339 AwsServiceError::aws_error(
2340 StatusCode::BAD_REQUEST,
2341 "ExportNotFoundException",
2342 format!("Export not found: {export_arn}"),
2343 )
2344 })?;
2345
2346 Self::ok_json(json!({
2347 "ExportDescription": {
2348 "ExportArn": export.export_arn,
2349 "ExportStatus": export.export_status,
2350 "TableArn": export.table_arn,
2351 "S3Bucket": export.s3_bucket,
2352 "S3Prefix": export.s3_prefix,
2353 "ExportFormat": export.export_format,
2354 "StartTime": export.start_time.timestamp() as f64,
2355 "EndTime": export.end_time.timestamp() as f64,
2356 "ExportTime": export.export_time.timestamp() as f64,
2357 "ItemCount": export.item_count,
2358 "BilledSizeBytes": export.billed_size_bytes
2359 }
2360 }))
2361 }
2362
2363 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2364 let body = Self::parse_body(req)?;
2365 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2366 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2367 let table_arn = body["TableArn"].as_str();
2368
2369 let state = self.state.read();
2370 let summaries: Vec<Value> = state
2371 .exports
2372 .values()
2373 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2374 .map(|e| {
2375 json!({
2376 "ExportArn": e.export_arn,
2377 "ExportStatus": e.export_status,
2378 "TableArn": e.table_arn
2379 })
2380 })
2381 .collect();
2382
2383 Self::ok_json(json!({
2384 "ExportSummaries": summaries
2385 }))
2386 }
2387
2388 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2389 let body = Self::parse_body(req)?;
2390 let input_format = require_str(&body, "InputFormat")?;
2391 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2392 AwsServiceError::aws_error(
2393 StatusCode::BAD_REQUEST,
2394 "ValidationException",
2395 "S3BucketSource is required",
2396 )
2397 })?;
2398 let s3_bucket = s3_source
2399 .get("S3Bucket")
2400 .and_then(|v| v.as_str())
2401 .unwrap_or("");
2402
2403 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2404 AwsServiceError::aws_error(
2405 StatusCode::BAD_REQUEST,
2406 "ValidationException",
2407 "TableCreationParameters is required",
2408 )
2409 })?;
2410 let table_name = table_params
2411 .get("TableName")
2412 .and_then(|v| v.as_str())
2413 .ok_or_else(|| {
2414 AwsServiceError::aws_error(
2415 StatusCode::BAD_REQUEST,
2416 "ValidationException",
2417 "TableCreationParameters.TableName is required",
2418 )
2419 })?;
2420
2421 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
2422 let attribute_definitions = parse_attribute_definitions(
2423 table_params
2424 .get("AttributeDefinitions")
2425 .unwrap_or(&Value::Null),
2426 )?;
2427
2428 let mut state = self.state.write();
2429
2430 if state.tables.contains_key(table_name) {
2431 return Err(AwsServiceError::aws_error(
2432 StatusCode::BAD_REQUEST,
2433 "ResourceInUseException",
2434 format!("Table already exists: {table_name}"),
2435 ));
2436 }
2437
2438 let now = Utc::now();
2439 let table_arn = format!(
2440 "arn:aws:dynamodb:{}:{}:table/{}",
2441 state.region, state.account_id, table_name
2442 );
2443 let import_arn = format!(
2444 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
2445 state.region,
2446 state.account_id,
2447 table_name,
2448 uuid::Uuid::new_v4()
2449 );
2450
2451 let table = DynamoTable {
2452 name: table_name.to_string(),
2453 arn: table_arn.clone(),
2454 key_schema,
2455 attribute_definitions,
2456 provisioned_throughput: ProvisionedThroughput {
2457 read_capacity_units: 0,
2458 write_capacity_units: 0,
2459 },
2460 items: Vec::new(),
2461 gsi: Vec::new(),
2462 lsi: Vec::new(),
2463 tags: HashMap::new(),
2464 created_at: now,
2465 status: "ACTIVE".to_string(),
2466 item_count: 0,
2467 size_bytes: 0,
2468 billing_mode: "PAY_PER_REQUEST".to_string(),
2469 ttl_attribute: None,
2470 ttl_enabled: false,
2471 resource_policy: None,
2472 pitr_enabled: false,
2473 kinesis_destinations: Vec::new(),
2474 contributor_insights_status: "DISABLED".to_string(),
2475 };
2476 state.tables.insert(table_name.to_string(), table);
2477
2478 let import_desc = ImportDescription {
2479 import_arn: import_arn.clone(),
2480 import_status: "COMPLETED".to_string(),
2481 table_arn: table_arn.clone(),
2482 table_name: table_name.to_string(),
2483 s3_bucket_source: s3_bucket.to_string(),
2484 input_format: input_format.to_string(),
2485 start_time: now,
2486 end_time: now,
2487 processed_item_count: 0,
2488 processed_size_bytes: 0,
2489 };
2490 state.imports.insert(import_arn.clone(), import_desc);
2491
2492 let table_ref = state.tables.get(table_name).unwrap();
2493 let ks: Vec<Value> = table_ref
2494 .key_schema
2495 .iter()
2496 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2497 .collect();
2498 let ad: Vec<Value> = table_ref
2499 .attribute_definitions
2500 .iter()
2501 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2502 .collect();
2503
2504 Self::ok_json(json!({
2505 "ImportTableDescription": {
2506 "ImportArn": import_arn,
2507 "ImportStatus": "COMPLETED",
2508 "TableArn": table_arn,
2509 "TableId": uuid::Uuid::new_v4().to_string(),
2510 "S3BucketSource": {
2511 "S3Bucket": s3_bucket
2512 },
2513 "InputFormat": input_format,
2514 "TableCreationParameters": {
2515 "TableName": table_name,
2516 "KeySchema": ks,
2517 "AttributeDefinitions": ad
2518 },
2519 "StartTime": now.timestamp() as f64,
2520 "EndTime": now.timestamp() as f64,
2521 "ProcessedItemCount": 0,
2522 "ProcessedSizeBytes": 0
2523 }
2524 }))
2525 }
2526
2527 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2528 let body = Self::parse_body(req)?;
2529 let import_arn = require_str(&body, "ImportArn")?;
2530
2531 let state = self.state.read();
2532 let import = state.imports.get(import_arn).ok_or_else(|| {
2533 AwsServiceError::aws_error(
2534 StatusCode::BAD_REQUEST,
2535 "ImportNotFoundException",
2536 format!("Import not found: {import_arn}"),
2537 )
2538 })?;
2539
2540 Self::ok_json(json!({
2541 "ImportTableDescription": {
2542 "ImportArn": import.import_arn,
2543 "ImportStatus": import.import_status,
2544 "TableArn": import.table_arn,
2545 "S3BucketSource": {
2546 "S3Bucket": import.s3_bucket_source
2547 },
2548 "InputFormat": import.input_format,
2549 "StartTime": import.start_time.timestamp() as f64,
2550 "EndTime": import.end_time.timestamp() as f64,
2551 "ProcessedItemCount": import.processed_item_count,
2552 "ProcessedSizeBytes": import.processed_size_bytes
2553 }
2554 }))
2555 }
2556
2557 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2558 let body = Self::parse_body(req)?;
2559 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2560 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
2561 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
2562 let table_arn = body["TableArn"].as_str();
2563
2564 let state = self.state.read();
2565 let summaries: Vec<Value> = state
2566 .imports
2567 .values()
2568 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
2569 .map(|i| {
2570 json!({
2571 "ImportArn": i.import_arn,
2572 "ImportStatus": i.import_status,
2573 "TableArn": i.table_arn
2574 })
2575 })
2576 .collect();
2577
2578 Self::ok_json(json!({
2579 "ImportSummaryList": summaries
2580 }))
2581 }
2582}
2583
2584#[async_trait]
2585impl AwsService for DynamoDbService {
2586 fn service_name(&self) -> &str {
2587 "dynamodb"
2588 }
2589
2590 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2591 match req.action.as_str() {
2592 "CreateTable" => self.create_table(&req),
2593 "DeleteTable" => self.delete_table(&req),
2594 "DescribeTable" => self.describe_table(&req),
2595 "ListTables" => self.list_tables(&req),
2596 "UpdateTable" => self.update_table(&req),
2597 "PutItem" => self.put_item(&req),
2598 "GetItem" => self.get_item(&req),
2599 "DeleteItem" => self.delete_item(&req),
2600 "UpdateItem" => self.update_item(&req),
2601 "Query" => self.query(&req),
2602 "Scan" => self.scan(&req),
2603 "BatchGetItem" => self.batch_get_item(&req),
2604 "BatchWriteItem" => self.batch_write_item(&req),
2605 "TagResource" => self.tag_resource(&req),
2606 "UntagResource" => self.untag_resource(&req),
2607 "ListTagsOfResource" => self.list_tags_of_resource(&req),
2608 "TransactGetItems" => self.transact_get_items(&req),
2609 "TransactWriteItems" => self.transact_write_items(&req),
2610 "ExecuteStatement" => self.execute_statement(&req),
2611 "BatchExecuteStatement" => self.batch_execute_statement(&req),
2612 "ExecuteTransaction" => self.execute_transaction(&req),
2613 "UpdateTimeToLive" => self.update_time_to_live(&req),
2614 "DescribeTimeToLive" => self.describe_time_to_live(&req),
2615 "PutResourcePolicy" => self.put_resource_policy(&req),
2616 "GetResourcePolicy" => self.get_resource_policy(&req),
2617 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
2618 "DescribeEndpoints" => self.describe_endpoints(&req),
2620 "DescribeLimits" => self.describe_limits(&req),
2621 "CreateBackup" => self.create_backup(&req),
2623 "DeleteBackup" => self.delete_backup(&req),
2624 "DescribeBackup" => self.describe_backup(&req),
2625 "ListBackups" => self.list_backups(&req),
2626 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
2627 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
2628 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
2629 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
2630 "CreateGlobalTable" => self.create_global_table(&req),
2632 "DescribeGlobalTable" => self.describe_global_table(&req),
2633 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
2634 "ListGlobalTables" => self.list_global_tables(&req),
2635 "UpdateGlobalTable" => self.update_global_table(&req),
2636 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
2637 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
2638 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
2639 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
2641 "DisableKinesisStreamingDestination" => {
2642 self.disable_kinesis_streaming_destination(&req)
2643 }
2644 "DescribeKinesisStreamingDestination" => {
2645 self.describe_kinesis_streaming_destination(&req)
2646 }
2647 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
2648 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
2650 "UpdateContributorInsights" => self.update_contributor_insights(&req),
2651 "ListContributorInsights" => self.list_contributor_insights(&req),
2652 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
2654 "DescribeExport" => self.describe_export(&req),
2655 "ListExports" => self.list_exports(&req),
2656 "ImportTable" => self.import_table(&req),
2657 "DescribeImport" => self.describe_import(&req),
2658 "ListImports" => self.list_imports(&req),
2659 _ => Err(AwsServiceError::action_not_implemented(
2660 "dynamodb",
2661 &req.action,
2662 )),
2663 }
2664 }
2665
2666 fn supported_actions(&self) -> &[&str] {
2667 &[
2668 "CreateTable",
2669 "DeleteTable",
2670 "DescribeTable",
2671 "ListTables",
2672 "UpdateTable",
2673 "PutItem",
2674 "GetItem",
2675 "DeleteItem",
2676 "UpdateItem",
2677 "Query",
2678 "Scan",
2679 "BatchGetItem",
2680 "BatchWriteItem",
2681 "TagResource",
2682 "UntagResource",
2683 "ListTagsOfResource",
2684 "TransactGetItems",
2685 "TransactWriteItems",
2686 "ExecuteStatement",
2687 "BatchExecuteStatement",
2688 "ExecuteTransaction",
2689 "UpdateTimeToLive",
2690 "DescribeTimeToLive",
2691 "PutResourcePolicy",
2692 "GetResourcePolicy",
2693 "DeleteResourcePolicy",
2694 "DescribeEndpoints",
2695 "DescribeLimits",
2696 "CreateBackup",
2697 "DeleteBackup",
2698 "DescribeBackup",
2699 "ListBackups",
2700 "RestoreTableFromBackup",
2701 "RestoreTableToPointInTime",
2702 "UpdateContinuousBackups",
2703 "DescribeContinuousBackups",
2704 "CreateGlobalTable",
2705 "DescribeGlobalTable",
2706 "DescribeGlobalTableSettings",
2707 "ListGlobalTables",
2708 "UpdateGlobalTable",
2709 "UpdateGlobalTableSettings",
2710 "DescribeTableReplicaAutoScaling",
2711 "UpdateTableReplicaAutoScaling",
2712 "EnableKinesisStreamingDestination",
2713 "DisableKinesisStreamingDestination",
2714 "DescribeKinesisStreamingDestination",
2715 "UpdateKinesisStreamingDestination",
2716 "DescribeContributorInsights",
2717 "UpdateContributorInsights",
2718 "ListContributorInsights",
2719 "ExportTableToPointInTime",
2720 "DescribeExport",
2721 "ListExports",
2722 "ImportTable",
2723 "DescribeImport",
2724 "ListImports",
2725 ]
2726 }
2727}
2728
2729fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
2732 body[field].as_str().ok_or_else(|| {
2733 AwsServiceError::aws_error(
2734 StatusCode::BAD_REQUEST,
2735 "ValidationException",
2736 format!("{field} is required"),
2737 )
2738 })
2739}
2740
2741fn require_object(
2742 body: &Value,
2743 field: &str,
2744) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2745 let obj = body[field].as_object().ok_or_else(|| {
2746 AwsServiceError::aws_error(
2747 StatusCode::BAD_REQUEST,
2748 "ValidationException",
2749 format!("{field} is required"),
2750 )
2751 })?;
2752 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
2753}
2754
2755fn get_table<'a>(
2756 tables: &'a HashMap<String, DynamoTable>,
2757 name: &str,
2758) -> Result<&'a DynamoTable, AwsServiceError> {
2759 tables.get(name).ok_or_else(|| {
2760 AwsServiceError::aws_error(
2761 StatusCode::BAD_REQUEST,
2762 "ResourceNotFoundException",
2763 format!("Requested resource not found: Table: {name} not found"),
2764 )
2765 })
2766}
2767
2768fn get_table_mut<'a>(
2769 tables: &'a mut HashMap<String, DynamoTable>,
2770 name: &str,
2771) -> Result<&'a mut DynamoTable, AwsServiceError> {
2772 tables.get_mut(name).ok_or_else(|| {
2773 AwsServiceError::aws_error(
2774 StatusCode::BAD_REQUEST,
2775 "ResourceNotFoundException",
2776 format!("Requested resource not found: Table: {name} not found"),
2777 )
2778 })
2779}
2780
2781fn find_table_by_arn<'a>(
2782 tables: &'a HashMap<String, DynamoTable>,
2783 arn: &str,
2784) -> Result<&'a DynamoTable, AwsServiceError> {
2785 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
2786 AwsServiceError::aws_error(
2787 StatusCode::BAD_REQUEST,
2788 "ResourceNotFoundException",
2789 format!("Requested resource not found: {arn}"),
2790 )
2791 })
2792}
2793
2794fn find_table_by_arn_mut<'a>(
2795 tables: &'a mut HashMap<String, DynamoTable>,
2796 arn: &str,
2797) -> Result<&'a mut DynamoTable, AwsServiceError> {
2798 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
2799 AwsServiceError::aws_error(
2800 StatusCode::BAD_REQUEST,
2801 "ResourceNotFoundException",
2802 format!("Requested resource not found: {arn}"),
2803 )
2804 })
2805}
2806
2807fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
2808 let arr = val.as_array().ok_or_else(|| {
2809 AwsServiceError::aws_error(
2810 StatusCode::BAD_REQUEST,
2811 "ValidationException",
2812 "KeySchema is required",
2813 )
2814 })?;
2815 Ok(arr
2816 .iter()
2817 .map(|elem| KeySchemaElement {
2818 attribute_name: elem["AttributeName"]
2819 .as_str()
2820 .unwrap_or_default()
2821 .to_string(),
2822 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
2823 })
2824 .collect())
2825}
2826
2827fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
2828 let arr = val.as_array().ok_or_else(|| {
2829 AwsServiceError::aws_error(
2830 StatusCode::BAD_REQUEST,
2831 "ValidationException",
2832 "AttributeDefinitions is required",
2833 )
2834 })?;
2835 Ok(arr
2836 .iter()
2837 .map(|elem| AttributeDefinition {
2838 attribute_name: elem["AttributeName"]
2839 .as_str()
2840 .unwrap_or_default()
2841 .to_string(),
2842 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
2843 })
2844 .collect())
2845}
2846
2847fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
2848 Ok(ProvisionedThroughput {
2849 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
2850 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
2851 })
2852}
2853
2854fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
2855 let Some(arr) = val.as_array() else {
2856 return Vec::new();
2857 };
2858 arr.iter()
2859 .filter_map(|g| {
2860 Some(GlobalSecondaryIndex {
2861 index_name: g["IndexName"].as_str()?.to_string(),
2862 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
2863 projection: parse_projection(&g["Projection"]),
2864 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
2865 .ok(),
2866 })
2867 })
2868 .collect()
2869}
2870
2871fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
2872 let Some(arr) = val.as_array() else {
2873 return Vec::new();
2874 };
2875 arr.iter()
2876 .filter_map(|l| {
2877 Some(LocalSecondaryIndex {
2878 index_name: l["IndexName"].as_str()?.to_string(),
2879 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
2880 projection: parse_projection(&l["Projection"]),
2881 })
2882 })
2883 .collect()
2884}
2885
2886fn parse_projection(val: &Value) -> Projection {
2887 Projection {
2888 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
2889 non_key_attributes: val["NonKeyAttributes"]
2890 .as_array()
2891 .map(|arr| {
2892 arr.iter()
2893 .filter_map(|v| v.as_str().map(|s| s.to_string()))
2894 .collect()
2895 })
2896 .unwrap_or_default(),
2897 }
2898}
2899
2900fn parse_tags(val: &Value) -> HashMap<String, String> {
2901 let mut tags = HashMap::new();
2902 if let Some(arr) = val.as_array() {
2903 for tag in arr {
2904 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
2905 tags.insert(k.to_string(), v.to_string());
2906 }
2907 }
2908 }
2909 tags
2910}
2911
2912fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
2913 let mut names = HashMap::new();
2914 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
2915 for (k, v) in obj {
2916 if let Some(s) = v.as_str() {
2917 names.insert(k.clone(), s.to_string());
2918 }
2919 }
2920 }
2921 names
2922}
2923
2924fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
2925 let mut values = HashMap::new();
2926 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
2927 for (k, v) in obj {
2928 values.insert(k.clone(), v.clone());
2929 }
2930 }
2931 values
2932}
2933
2934fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
2935 if name.starts_with('#') {
2936 expr_attr_names
2937 .get(name)
2938 .cloned()
2939 .unwrap_or_else(|| name.to_string())
2940 } else {
2941 name.to_string()
2942 }
2943}
2944
2945fn extract_key(
2946 table: &DynamoTable,
2947 item: &HashMap<String, AttributeValue>,
2948) -> HashMap<String, AttributeValue> {
2949 let mut key = HashMap::new();
2950 let hash_key = table.hash_key_name();
2951 if let Some(v) = item.get(hash_key) {
2952 key.insert(hash_key.to_string(), v.clone());
2953 }
2954 if let Some(range_key) = table.range_key_name() {
2955 if let Some(v) = item.get(range_key) {
2956 key.insert(range_key.to_string(), v.clone());
2957 }
2958 }
2959 key
2960}
2961
2962fn validate_key_in_item(
2963 table: &DynamoTable,
2964 item: &HashMap<String, AttributeValue>,
2965) -> Result<(), AwsServiceError> {
2966 let hash_key = table.hash_key_name();
2967 if !item.contains_key(hash_key) {
2968 return Err(AwsServiceError::aws_error(
2969 StatusCode::BAD_REQUEST,
2970 "ValidationException",
2971 format!("Missing the key {hash_key} in the item"),
2972 ));
2973 }
2974 if let Some(range_key) = table.range_key_name() {
2975 if !item.contains_key(range_key) {
2976 return Err(AwsServiceError::aws_error(
2977 StatusCode::BAD_REQUEST,
2978 "ValidationException",
2979 format!("Missing the key {range_key} in the item"),
2980 ));
2981 }
2982 }
2983 Ok(())
2984}
2985
2986fn validate_key_attributes_in_key(
2987 table: &DynamoTable,
2988 key: &HashMap<String, AttributeValue>,
2989) -> Result<(), AwsServiceError> {
2990 let hash_key = table.hash_key_name();
2991 if !key.contains_key(hash_key) {
2992 return Err(AwsServiceError::aws_error(
2993 StatusCode::BAD_REQUEST,
2994 "ValidationException",
2995 format!("Missing the key {hash_key} in the item"),
2996 ));
2997 }
2998 Ok(())
2999}
3000
3001fn project_item(
3002 item: &HashMap<String, AttributeValue>,
3003 body: &Value,
3004) -> HashMap<String, AttributeValue> {
3005 let projection = body["ProjectionExpression"].as_str();
3006 match projection {
3007 Some(proj) if !proj.is_empty() => {
3008 let expr_attr_names = parse_expression_attribute_names(body);
3009 let attrs: Vec<String> = proj
3010 .split(',')
3011 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3012 .collect();
3013 let mut result = HashMap::new();
3014 for attr in &attrs {
3015 if let Some(v) = resolve_nested_path(item, attr) {
3016 insert_nested_value(&mut result, attr, v);
3017 }
3018 }
3019 result
3020 }
3021 _ => item.clone(),
3022 }
3023}
3024
3025fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3028 let mut result = String::new();
3030 for (i, segment) in path.split('.').enumerate() {
3031 if i > 0 {
3032 result.push('.');
3033 }
3034 if let Some(bracket_pos) = segment.find('[') {
3036 let key_part = &segment[..bracket_pos];
3037 let index_part = &segment[bracket_pos..];
3038 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3039 result.push_str(index_part);
3040 } else {
3041 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3042 }
3043 }
3044 result
3045}
3046
3047fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3049 let segments = parse_path_segments(path);
3050 if segments.is_empty() {
3051 return None;
3052 }
3053
3054 let first = &segments[0];
3055 let top_key = match first {
3056 PathSegment::Key(k) => k.as_str(),
3057 _ => return None,
3058 };
3059
3060 let mut current = item.get(top_key)?.clone();
3061
3062 for segment in &segments[1..] {
3063 match segment {
3064 PathSegment::Key(k) => {
3065 current = current.get("M")?.get(k)?.clone();
3067 }
3068 PathSegment::Index(idx) => {
3069 current = current.get("L")?.get(*idx)?.clone();
3071 }
3072 }
3073 }
3074
3075 Some(current)
3076}
3077
3078#[derive(Debug)]
3079enum PathSegment {
3080 Key(String),
3081 Index(usize),
3082}
3083
3084fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3086 let mut segments = Vec::new();
3087 let mut current = String::new();
3088
3089 let chars: Vec<char> = path.chars().collect();
3090 let mut i = 0;
3091 while i < chars.len() {
3092 match chars[i] {
3093 '.' => {
3094 if !current.is_empty() {
3095 segments.push(PathSegment::Key(current.clone()));
3096 current.clear();
3097 }
3098 }
3099 '[' => {
3100 if !current.is_empty() {
3101 segments.push(PathSegment::Key(current.clone()));
3102 current.clear();
3103 }
3104 i += 1;
3105 let mut num = String::new();
3106 while i < chars.len() && chars[i] != ']' {
3107 num.push(chars[i]);
3108 i += 1;
3109 }
3110 if let Ok(idx) = num.parse::<usize>() {
3111 segments.push(PathSegment::Index(idx));
3112 }
3113 }
3115 c => {
3116 current.push(c);
3117 }
3118 }
3119 i += 1;
3120 }
3121 if !current.is_empty() {
3122 segments.push(PathSegment::Key(current));
3123 }
3124 segments
3125}
3126
3127fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3130 if !path.contains('.') && !path.contains('[') {
3132 result.insert(path.to_string(), value);
3133 return;
3134 }
3135
3136 let segments = parse_path_segments(path);
3137 if segments.is_empty() {
3138 return;
3139 }
3140
3141 let top_key = match &segments[0] {
3142 PathSegment::Key(k) => k.clone(),
3143 _ => return,
3144 };
3145
3146 if segments.len() == 1 {
3147 result.insert(top_key, value);
3148 return;
3149 }
3150
3151 let wrapped = wrap_value_in_path(&segments[1..], value);
3153 let existing = result.remove(&top_key);
3155 let merged = match existing {
3156 Some(existing) => merge_attribute_values(existing, wrapped),
3157 None => wrapped,
3158 };
3159 result.insert(top_key, merged);
3160}
3161
3162fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3164 if segments.is_empty() {
3165 return value;
3166 }
3167 let inner = wrap_value_in_path(&segments[1..], value);
3168 match &segments[0] {
3169 PathSegment::Key(k) => {
3170 json!({"M": {k.clone(): inner}})
3171 }
3172 PathSegment::Index(idx) => {
3173 let mut arr = vec![Value::Null; idx + 1];
3174 arr[*idx] = inner;
3175 json!({"L": arr})
3176 }
3177 }
3178}
3179
3180fn merge_attribute_values(a: Value, b: Value) -> Value {
3182 if let (Some(a_map), Some(b_map)) = (
3183 a.get("M").and_then(|v| v.as_object()),
3184 b.get("M").and_then(|v| v.as_object()),
3185 ) {
3186 let mut merged = a_map.clone();
3187 for (k, v) in b_map {
3188 if let Some(existing) = merged.get(k) {
3189 merged.insert(
3190 k.clone(),
3191 merge_attribute_values(existing.clone(), v.clone()),
3192 );
3193 } else {
3194 merged.insert(k.clone(), v.clone());
3195 }
3196 }
3197 json!({"M": merged})
3198 } else {
3199 b
3200 }
3201}
3202
3203fn evaluate_condition(
3204 condition: &str,
3205 existing: Option<&HashMap<String, AttributeValue>>,
3206 expr_attr_names: &HashMap<String, String>,
3207 expr_attr_values: &HashMap<String, Value>,
3208) -> Result<(), AwsServiceError> {
3209 let cond = condition.trim();
3210
3211 if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3212 let attr = resolve_attr_name(inner, expr_attr_names);
3213 match existing {
3214 Some(item) if item.contains_key(&attr) => {
3215 return Err(AwsServiceError::aws_error(
3216 StatusCode::BAD_REQUEST,
3217 "ConditionalCheckFailedException",
3218 "The conditional request failed",
3219 ));
3220 }
3221 _ => return Ok(()),
3222 }
3223 }
3224
3225 if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3226 let attr = resolve_attr_name(inner, expr_attr_names);
3227 match existing {
3228 Some(item) if item.contains_key(&attr) => return Ok(()),
3229 _ => {
3230 return Err(AwsServiceError::aws_error(
3231 StatusCode::BAD_REQUEST,
3232 "ConditionalCheckFailedException",
3233 "The conditional request failed",
3234 ));
3235 }
3236 }
3237 }
3238
3239 if let Some((left, op, right)) = parse_simple_comparison(cond) {
3240 let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3241 let expected = expr_attr_values.get(right.trim());
3242 let actual = existing.and_then(|item| item.get(&attr_name));
3243
3244 let result = match op {
3245 "=" => actual == expected,
3246 "<>" => actual != expected,
3247 _ => true,
3248 };
3249
3250 if !result {
3251 return Err(AwsServiceError::aws_error(
3252 StatusCode::BAD_REQUEST,
3253 "ConditionalCheckFailedException",
3254 "The conditional request failed",
3255 ));
3256 }
3257 }
3258
3259 Ok(())
3260}
3261
3262fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3263 let prefix = format!("{func_name}(");
3264 if let Some(rest) = expr.strip_prefix(&prefix) {
3265 if let Some(inner) = rest.strip_suffix(')') {
3266 return Some(inner.trim());
3267 }
3268 }
3269 None
3270}
3271
3272fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3273 for op in &["<>", "=", "<", ">", "<=", ">="] {
3274 if let Some(pos) = expr.find(op) {
3275 let left = &expr[..pos];
3276 let right = &expr[pos + op.len()..];
3277 return Some((left, op, right));
3278 }
3279 }
3280 None
3281}
3282
3283fn evaluate_key_condition(
3284 expr: &str,
3285 item: &HashMap<String, AttributeValue>,
3286 hash_key_name: &str,
3287 _range_key_name: Option<&str>,
3288 expr_attr_names: &HashMap<String, String>,
3289 expr_attr_values: &HashMap<String, Value>,
3290) -> bool {
3291 let parts: Vec<&str> = split_on_and(expr);
3292 for part in &parts {
3293 let part = part.trim();
3294 if !evaluate_single_key_condition(
3295 part,
3296 item,
3297 hash_key_name,
3298 expr_attr_names,
3299 expr_attr_values,
3300 ) {
3301 return false;
3302 }
3303 }
3304 true
3305}
3306
3307fn split_on_and(expr: &str) -> Vec<&str> {
3308 let mut parts = Vec::new();
3309 let mut start = 0;
3310 let len = expr.len();
3311 let mut i = 0;
3312 let mut depth = 0;
3313 while i < len {
3314 let ch = expr.as_bytes()[i];
3315 if ch == b'(' {
3316 depth += 1;
3317 } else if ch == b')' {
3318 if depth > 0 {
3319 depth -= 1;
3320 }
3321 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3322 parts.push(&expr[start..i]);
3323 start = i + 5;
3324 i = start;
3325 continue;
3326 }
3327 i += 1;
3328 }
3329 parts.push(&expr[start..]);
3330 parts
3331}
3332
3333fn split_on_or(expr: &str) -> Vec<&str> {
3334 let mut parts = Vec::new();
3335 let mut start = 0;
3336 let len = expr.len();
3337 let mut i = 0;
3338 let mut depth = 0;
3339 while i < len {
3340 let ch = expr.as_bytes()[i];
3341 if ch == b'(' {
3342 depth += 1;
3343 } else if ch == b')' {
3344 if depth > 0 {
3345 depth -= 1;
3346 }
3347 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3348 parts.push(&expr[start..i]);
3349 start = i + 4;
3350 i = start;
3351 continue;
3352 }
3353 i += 1;
3354 }
3355 parts.push(&expr[start..]);
3356 parts
3357}
3358
3359fn evaluate_single_key_condition(
3360 part: &str,
3361 item: &HashMap<String, AttributeValue>,
3362 _hash_key_name: &str,
3363 expr_attr_names: &HashMap<String, String>,
3364 expr_attr_values: &HashMap<String, Value>,
3365) -> bool {
3366 let part = part.trim();
3367
3368 if let Some(rest) = part
3370 .strip_prefix("begins_with(")
3371 .or_else(|| part.strip_prefix("begins_with ("))
3372 {
3373 if let Some(inner) = rest.strip_suffix(')') {
3374 let mut split = inner.splitn(2, ',');
3375 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3376 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3377 let val_ref = val_ref.trim();
3378 let expected = expr_attr_values.get(val_ref);
3379 let actual = item.get(&attr_name);
3380 return match (actual, expected) {
3381 (Some(a), Some(e)) => {
3382 let a_str = extract_string_value(a);
3383 let e_str = extract_string_value(e);
3384 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3385 }
3386 _ => false,
3387 };
3388 }
3389 }
3390 return false;
3391 }
3392
3393 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
3395 let attr_part = part[..between_pos].trim();
3396 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
3397 let range_part = &part[between_pos + 7..];
3398 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
3399 let lo_ref = range_part[..and_pos].trim();
3400 let hi_ref = range_part[and_pos + 5..].trim();
3401 let lo = expr_attr_values.get(lo_ref);
3402 let hi = expr_attr_values.get(hi_ref);
3403 let actual = item.get(&attr_name);
3404 return match (actual, lo, hi) {
3405 (Some(a), Some(l), Some(h)) => {
3406 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
3407 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
3408 }
3409 _ => false,
3410 };
3411 }
3412 }
3413
3414 for op in &["<=", ">=", "<>", "=", "<", ">"] {
3416 if let Some(pos) = part.find(op) {
3417 let left = part[..pos].trim();
3418 let right = part[pos + op.len()..].trim();
3419 let attr_name = resolve_attr_name(left, expr_attr_names);
3420 let expected = expr_attr_values.get(right);
3421 let actual = item.get(&attr_name);
3422
3423 return match *op {
3424 "=" => actual == expected,
3425 "<>" => actual != expected,
3426 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
3427 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
3428 "<=" => {
3429 let cmp = compare_attribute_values(actual, expected);
3430 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
3431 }
3432 ">=" => {
3433 let cmp = compare_attribute_values(actual, expected);
3434 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
3435 }
3436 _ => true,
3437 };
3438 }
3439 }
3440
3441 true
3442}
3443
3444fn extract_string_value(val: &Value) -> Option<String> {
3445 val.get("S")
3446 .and_then(|v| v.as_str())
3447 .map(|s| s.to_string())
3448 .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
3449}
3450
3451fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
3452 match (a, b) {
3453 (None, None) => std::cmp::Ordering::Equal,
3454 (None, Some(_)) => std::cmp::Ordering::Less,
3455 (Some(_), None) => std::cmp::Ordering::Greater,
3456 (Some(a), Some(b)) => {
3457 let a_type = attribute_type_and_value(a);
3458 let b_type = attribute_type_and_value(b);
3459 match (a_type, b_type) {
3460 (Some(("S", a_val)), Some(("S", b_val))) => {
3461 let a_str = a_val.as_str().unwrap_or("");
3462 let b_str = b_val.as_str().unwrap_or("");
3463 a_str.cmp(b_str)
3464 }
3465 (Some(("N", a_val)), Some(("N", b_val))) => {
3466 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3467 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
3468 a_num
3469 .partial_cmp(&b_num)
3470 .unwrap_or(std::cmp::Ordering::Equal)
3471 }
3472 (Some(("B", a_val)), Some(("B", b_val))) => {
3473 let a_str = a_val.as_str().unwrap_or("");
3474 let b_str = b_val.as_str().unwrap_or("");
3475 a_str.cmp(b_str)
3476 }
3477 _ => std::cmp::Ordering::Equal,
3478 }
3479 }
3480 }
3481}
3482
3483fn evaluate_filter_expression(
3484 expr: &str,
3485 item: &HashMap<String, AttributeValue>,
3486 expr_attr_names: &HashMap<String, String>,
3487 expr_attr_values: &HashMap<String, Value>,
3488) -> bool {
3489 let trimmed = expr.trim();
3490
3491 let or_parts = split_on_or(trimmed);
3493 if or_parts.len() > 1 {
3494 return or_parts.iter().any(|part| {
3495 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3496 });
3497 }
3498
3499 let and_parts = split_on_and(trimmed);
3501 if and_parts.len() > 1 {
3502 return and_parts.iter().all(|part| {
3503 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
3504 });
3505 }
3506
3507 let stripped = strip_outer_parens(trimmed);
3509 if stripped != trimmed {
3510 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
3511 }
3512
3513 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
3514}
3515
3516fn strip_outer_parens(expr: &str) -> &str {
3518 let trimmed = expr.trim();
3519 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
3520 return trimmed;
3521 }
3522 let inner = &trimmed[1..trimmed.len() - 1];
3524 let mut depth = 0;
3525 for ch in inner.bytes() {
3526 match ch {
3527 b'(' => depth += 1,
3528 b')' => {
3529 if depth == 0 {
3530 return trimmed; }
3532 depth -= 1;
3533 }
3534 _ => {}
3535 }
3536 }
3537 if depth == 0 {
3538 inner
3539 } else {
3540 trimmed
3541 }
3542}
3543
3544fn evaluate_single_filter_condition(
3545 part: &str,
3546 item: &HashMap<String, AttributeValue>,
3547 expr_attr_names: &HashMap<String, String>,
3548 expr_attr_values: &HashMap<String, Value>,
3549) -> bool {
3550 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
3551 let attr = resolve_attr_name(inner, expr_attr_names);
3552 return item.contains_key(&attr);
3553 }
3554
3555 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
3556 let attr = resolve_attr_name(inner, expr_attr_names);
3557 return !item.contains_key(&attr);
3558 }
3559
3560 if let Some(rest) = part
3561 .strip_prefix("begins_with(")
3562 .or_else(|| part.strip_prefix("begins_with ("))
3563 {
3564 if let Some(inner) = rest.strip_suffix(')') {
3565 let mut split = inner.splitn(2, ',');
3566 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3567 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3568 let expected = expr_attr_values.get(val_ref.trim());
3569 let actual = item.get(&attr_name);
3570 return match (actual, expected) {
3571 (Some(a), Some(e)) => {
3572 let a_str = extract_string_value(a);
3573 let e_str = extract_string_value(e);
3574 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
3575 }
3576 _ => false,
3577 };
3578 }
3579 }
3580 }
3581
3582 if let Some(rest) = part
3583 .strip_prefix("contains(")
3584 .or_else(|| part.strip_prefix("contains ("))
3585 {
3586 if let Some(inner) = rest.strip_suffix(')') {
3587 let mut split = inner.splitn(2, ',');
3588 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
3589 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3590 let expected = expr_attr_values.get(val_ref.trim());
3591 let actual = item.get(&attr_name);
3592 return match (actual, expected) {
3593 (Some(a), Some(e)) => {
3594 let a_str = extract_string_value(a);
3595 let e_str = extract_string_value(e);
3596 matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
3597 }
3598 _ => false,
3599 };
3600 }
3601 }
3602 }
3603
3604 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
3605}
3606
3607fn apply_update_expression(
3608 item: &mut HashMap<String, AttributeValue>,
3609 expr: &str,
3610 expr_attr_names: &HashMap<String, String>,
3611 expr_attr_values: &HashMap<String, Value>,
3612) -> Result<(), AwsServiceError> {
3613 let clauses = parse_update_clauses(expr);
3614 for (action, assignments) in &clauses {
3615 match action.to_ascii_uppercase().as_str() {
3616 "SET" => {
3617 for assignment in assignments {
3618 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3619 }
3620 }
3621 "REMOVE" => {
3622 for attr_ref in assignments {
3623 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
3624 item.remove(&attr);
3625 }
3626 }
3627 "ADD" => {
3628 for assignment in assignments {
3629 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3630 }
3631 }
3632 "DELETE" => {
3633 for assignment in assignments {
3634 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
3635 }
3636 }
3637 _ => {}
3638 }
3639 }
3640 Ok(())
3641}
3642
3643fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
3644 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
3645 let upper = expr.to_ascii_uppercase();
3646 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
3647 let mut positions: Vec<(usize, &str)> = Vec::new();
3648
3649 for kw in &keywords {
3650 let mut search_from = 0;
3651 while let Some(pos) = upper[search_from..].find(kw) {
3652 let abs_pos = search_from + pos;
3653 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
3654 let after_pos = abs_pos + kw.len();
3655 let after_ok =
3656 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
3657 if before_ok && after_ok {
3658 positions.push((abs_pos, kw));
3659 }
3660 search_from = abs_pos + kw.len();
3661 }
3662 }
3663
3664 positions.sort_by_key(|(pos, _)| *pos);
3665
3666 for (i, &(pos, kw)) in positions.iter().enumerate() {
3667 let start = pos + kw.len();
3668 let end = if i + 1 < positions.len() {
3669 positions[i + 1].0
3670 } else {
3671 expr.len()
3672 };
3673 let content = expr[start..end].trim();
3674 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
3675 clauses.push((kw.to_string(), assignments));
3676 }
3677
3678 clauses
3679}
3680
3681fn apply_set_assignment(
3682 item: &mut HashMap<String, AttributeValue>,
3683 assignment: &str,
3684 expr_attr_names: &HashMap<String, String>,
3685 expr_attr_values: &HashMap<String, Value>,
3686) -> Result<(), AwsServiceError> {
3687 let Some((left, right)) = assignment.split_once('=') else {
3688 return Ok(());
3689 };
3690
3691 let attr = resolve_attr_name(left.trim(), expr_attr_names);
3692 let right = right.trim();
3693
3694 if let Some(rest) = right
3696 .strip_prefix("if_not_exists(")
3697 .or_else(|| right.strip_prefix("if_not_exists ("))
3698 {
3699 if let Some(inner) = rest.strip_suffix(')') {
3700 let mut split = inner.splitn(2, ',');
3701 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
3702 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
3703 if !item.contains_key(&check_name) {
3704 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
3705 item.insert(attr, val.clone());
3706 }
3707 }
3708 return Ok(());
3709 }
3710 }
3711 }
3712
3713 if let Some(rest) = right
3715 .strip_prefix("list_append(")
3716 .or_else(|| right.strip_prefix("list_append ("))
3717 {
3718 if let Some(inner) = rest.strip_suffix(')') {
3719 let mut split = inner.splitn(2, ',');
3720 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
3721 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
3722 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
3723
3724 let mut merged = Vec::new();
3725 if let Some(Value::Object(obj)) = &a_val {
3726 if let Some(Value::Array(arr)) = obj.get("L") {
3727 merged.extend(arr.clone());
3728 }
3729 }
3730 if let Some(Value::Object(obj)) = &b_val {
3731 if let Some(Value::Array(arr)) = obj.get("L") {
3732 merged.extend(arr.clone());
3733 }
3734 }
3735
3736 item.insert(attr, json!({"L": merged}));
3737 return Ok(());
3738 }
3739 }
3740 }
3741
3742 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
3744 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
3745 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
3746
3747 let left_num = extract_number(&left_val).unwrap_or(0.0);
3748 let right_num = extract_number(&right_val).unwrap_or(0.0);
3749
3750 let result = if is_add {
3751 left_num + right_num
3752 } else {
3753 left_num - right_num
3754 };
3755
3756 let num_str = if result == result.trunc() {
3757 format!("{}", result as i64)
3758 } else {
3759 format!("{result}")
3760 };
3761
3762 item.insert(attr, json!({"N": num_str}));
3763 return Ok(());
3764 }
3765
3766 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
3768 if let Some(v) = val {
3769 item.insert(attr, v);
3770 }
3771
3772 Ok(())
3773}
3774
3775fn resolve_value(
3776 reference: &str,
3777 item: &HashMap<String, AttributeValue>,
3778 expr_attr_names: &HashMap<String, String>,
3779 expr_attr_values: &HashMap<String, Value>,
3780) -> Option<Value> {
3781 let reference = reference.trim();
3782 if reference.starts_with(':') {
3783 expr_attr_values.get(reference).cloned()
3784 } else {
3785 let attr_name = resolve_attr_name(reference, expr_attr_names);
3786 item.get(&attr_name).cloned()
3787 }
3788}
3789
3790fn extract_number(val: &Option<Value>) -> Option<f64> {
3791 val.as_ref()
3792 .and_then(|v| v.get("N"))
3793 .and_then(|n| n.as_str())
3794 .and_then(|s| s.parse().ok())
3795}
3796
3797fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
3798 let mut depth = 0;
3799 for (i, c) in expr.char_indices() {
3800 match c {
3801 '(' => depth += 1,
3802 ')' => depth -= 1,
3803 '+' if depth == 0 && i > 0 => {
3804 return Some((&expr[..i], &expr[i + 1..], true));
3805 }
3806 '-' if depth == 0 && i > 0 => {
3807 return Some((&expr[..i], &expr[i + 1..], false));
3808 }
3809 _ => {}
3810 }
3811 }
3812 None
3813}
3814
3815fn apply_add_assignment(
3816 item: &mut HashMap<String, AttributeValue>,
3817 assignment: &str,
3818 expr_attr_names: &HashMap<String, String>,
3819 expr_attr_values: &HashMap<String, Value>,
3820) -> Result<(), AwsServiceError> {
3821 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3822 if parts.len() != 2 {
3823 return Ok(());
3824 }
3825
3826 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3827 let val_ref = parts[1].trim();
3828 let add_val = expr_attr_values.get(val_ref);
3829
3830 if let Some(add_val) = add_val {
3831 if let Some(existing) = item.get(&attr) {
3832 if let (Some(existing_num), Some(add_num)) = (
3833 extract_number(&Some(existing.clone())),
3834 extract_number(&Some(add_val.clone())),
3835 ) {
3836 let result = existing_num + add_num;
3837 let num_str = if result == result.trunc() {
3838 format!("{}", result as i64)
3839 } else {
3840 format!("{result}")
3841 };
3842 item.insert(attr, json!({"N": num_str}));
3843 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
3844 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
3845 let mut merged: Vec<Value> = existing_set.clone();
3846 for v in add_set {
3847 if !merged.contains(v) {
3848 merged.push(v.clone());
3849 }
3850 }
3851 item.insert(attr, json!({"SS": merged}));
3852 }
3853 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
3854 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
3855 let mut merged: Vec<Value> = existing_set.clone();
3856 for v in add_set {
3857 if !merged.contains(v) {
3858 merged.push(v.clone());
3859 }
3860 }
3861 item.insert(attr, json!({"NS": merged}));
3862 }
3863 }
3864 } else {
3865 item.insert(attr, add_val.clone());
3866 }
3867 }
3868
3869 Ok(())
3870}
3871
3872fn apply_delete_assignment(
3873 item: &mut HashMap<String, AttributeValue>,
3874 assignment: &str,
3875 expr_attr_names: &HashMap<String, String>,
3876 expr_attr_values: &HashMap<String, Value>,
3877) -> Result<(), AwsServiceError> {
3878 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
3879 if parts.len() != 2 {
3880 return Ok(());
3881 }
3882
3883 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
3884 let val_ref = parts[1].trim();
3885 let del_val = expr_attr_values.get(val_ref);
3886
3887 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
3888 if let (Some(existing_set), Some(del_set)) = (
3889 existing.get("SS").and_then(|v| v.as_array()),
3890 del_val.get("SS").and_then(|v| v.as_array()),
3891 ) {
3892 let filtered: Vec<Value> = existing_set
3893 .iter()
3894 .filter(|v| !del_set.contains(v))
3895 .cloned()
3896 .collect();
3897 if filtered.is_empty() {
3898 item.remove(&attr);
3899 } else {
3900 item.insert(attr, json!({"SS": filtered}));
3901 }
3902 } else if let (Some(existing_set), Some(del_set)) = (
3903 existing.get("NS").and_then(|v| v.as_array()),
3904 del_val.get("NS").and_then(|v| v.as_array()),
3905 ) {
3906 let filtered: Vec<Value> = existing_set
3907 .iter()
3908 .filter(|v| !del_set.contains(v))
3909 .cloned()
3910 .collect();
3911 if filtered.is_empty() {
3912 item.remove(&attr);
3913 } else {
3914 item.insert(attr, json!({"NS": filtered}));
3915 }
3916 }
3917 }
3918
3919 Ok(())
3920}
3921
3922#[allow(clippy::too_many_arguments)]
3923fn build_table_description_json(
3924 arn: &str,
3925 key_schema: &[KeySchemaElement],
3926 attribute_definitions: &[AttributeDefinition],
3927 provisioned_throughput: &ProvisionedThroughput,
3928 gsi: &[GlobalSecondaryIndex],
3929 lsi: &[LocalSecondaryIndex],
3930 billing_mode: &str,
3931 created_at: chrono::DateTime<chrono::Utc>,
3932 item_count: i64,
3933 size_bytes: i64,
3934 status: &str,
3935) -> Value {
3936 let table_name = arn.rsplit('/').next().unwrap_or("");
3937 let creation_timestamp =
3938 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
3939
3940 let ks: Vec<Value> = key_schema
3941 .iter()
3942 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3943 .collect();
3944
3945 let ad: Vec<Value> = attribute_definitions
3946 .iter()
3947 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3948 .collect();
3949
3950 let mut desc = json!({
3951 "TableName": table_name,
3952 "TableArn": arn,
3953 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
3954 "TableStatus": status,
3955 "KeySchema": ks,
3956 "AttributeDefinitions": ad,
3957 "CreationDateTime": creation_timestamp,
3958 "ItemCount": item_count,
3959 "TableSizeBytes": size_bytes,
3960 "BillingModeSummary": { "BillingMode": billing_mode },
3961 });
3962
3963 if billing_mode != "PAY_PER_REQUEST" {
3964 desc["ProvisionedThroughput"] = json!({
3965 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
3966 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
3967 "NumberOfDecreasesToday": 0,
3968 });
3969 } else {
3970 desc["ProvisionedThroughput"] = json!({
3971 "ReadCapacityUnits": 0,
3972 "WriteCapacityUnits": 0,
3973 "NumberOfDecreasesToday": 0,
3974 });
3975 }
3976
3977 if !gsi.is_empty() {
3978 let gsi_json: Vec<Value> = gsi
3979 .iter()
3980 .map(|g| {
3981 let gks: Vec<Value> = g
3982 .key_schema
3983 .iter()
3984 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3985 .collect();
3986 let mut idx = json!({
3987 "IndexName": g.index_name,
3988 "KeySchema": gks,
3989 "Projection": { "ProjectionType": g.projection.projection_type },
3990 "IndexStatus": "ACTIVE",
3991 "IndexArn": format!("{arn}/index/{}", g.index_name),
3992 "ItemCount": 0,
3993 "IndexSizeBytes": 0,
3994 });
3995 if !g.projection.non_key_attributes.is_empty() {
3996 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
3997 }
3998 if let Some(ref pt) = g.provisioned_throughput {
3999 idx["ProvisionedThroughput"] = json!({
4000 "ReadCapacityUnits": pt.read_capacity_units,
4001 "WriteCapacityUnits": pt.write_capacity_units,
4002 "NumberOfDecreasesToday": 0,
4003 });
4004 }
4005 idx
4006 })
4007 .collect();
4008 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4009 }
4010
4011 if !lsi.is_empty() {
4012 let lsi_json: Vec<Value> = lsi
4013 .iter()
4014 .map(|l| {
4015 let lks: Vec<Value> = l
4016 .key_schema
4017 .iter()
4018 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4019 .collect();
4020 let mut idx = json!({
4021 "IndexName": l.index_name,
4022 "KeySchema": lks,
4023 "Projection": { "ProjectionType": l.projection.projection_type },
4024 "IndexArn": format!("{arn}/index/{}", l.index_name),
4025 "ItemCount": 0,
4026 "IndexSizeBytes": 0,
4027 });
4028 if !l.projection.non_key_attributes.is_empty() {
4029 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4030 }
4031 idx
4032 })
4033 .collect();
4034 desc["LocalSecondaryIndexes"] = json!(lsi_json);
4035 }
4036
4037 desc
4038}
4039
4040fn build_table_description(table: &DynamoTable) -> Value {
4041 build_table_description_json(
4042 &table.arn,
4043 &table.key_schema,
4044 &table.attribute_definitions,
4045 &table.provisioned_throughput,
4046 &table.gsi,
4047 &table.lsi,
4048 &table.billing_mode,
4049 table.created_at,
4050 table.item_count,
4051 table.size_bytes,
4052 &table.status,
4053 )
4054}
4055
4056fn execute_partiql_statement(
4057 state: &SharedDynamoDbState,
4058 statement: &str,
4059 parameters: &[Value],
4060) -> Result<AwsResponse, AwsServiceError> {
4061 let trimmed = statement.trim();
4062 let upper = trimmed.to_ascii_uppercase();
4063
4064 if upper.starts_with("SELECT") {
4065 execute_partiql_select(state, trimmed, parameters)
4066 } else if upper.starts_with("INSERT") {
4067 execute_partiql_insert(state, trimmed, parameters)
4068 } else if upper.starts_with("UPDATE") {
4069 execute_partiql_update(state, trimmed, parameters)
4070 } else if upper.starts_with("DELETE") {
4071 execute_partiql_delete(state, trimmed, parameters)
4072 } else {
4073 Err(AwsServiceError::aws_error(
4074 StatusCode::BAD_REQUEST,
4075 "ValidationException",
4076 format!("Unsupported PartiQL statement: {trimmed}"),
4077 ))
4078 }
4079}
4080
4081fn execute_partiql_select(
4083 state: &SharedDynamoDbState,
4084 statement: &str,
4085 parameters: &[Value],
4086) -> Result<AwsResponse, AwsServiceError> {
4087 let upper = statement.to_ascii_uppercase();
4089 let from_pos = upper.find("FROM").ok_or_else(|| {
4090 AwsServiceError::aws_error(
4091 StatusCode::BAD_REQUEST,
4092 "ValidationException",
4093 "Invalid SELECT statement: missing FROM",
4094 )
4095 })?;
4096
4097 let after_from = statement[from_pos + 4..].trim();
4098 let (table_name, rest) = parse_partiql_table_name(after_from);
4099
4100 let state = state.read();
4101 let table = get_table(&state.tables, &table_name)?;
4102
4103 let rest_upper = rest.trim().to_ascii_uppercase();
4104 if rest_upper.starts_with("WHERE") {
4105 let where_clause = rest.trim()[5..].trim();
4106 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4107 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4108 DynamoDbService::ok_json(json!({ "Items": items }))
4109 } else {
4110 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4112 DynamoDbService::ok_json(json!({ "Items": items }))
4113 }
4114}
4115
4116fn execute_partiql_insert(
4117 state: &SharedDynamoDbState,
4118 statement: &str,
4119 parameters: &[Value],
4120) -> Result<AwsResponse, AwsServiceError> {
4121 let upper = statement.to_ascii_uppercase();
4124 let into_pos = upper.find("INTO").ok_or_else(|| {
4125 AwsServiceError::aws_error(
4126 StatusCode::BAD_REQUEST,
4127 "ValidationException",
4128 "Invalid INSERT statement: missing INTO",
4129 )
4130 })?;
4131
4132 let after_into = statement[into_pos + 4..].trim();
4133 let (table_name, rest) = parse_partiql_table_name(after_into);
4134
4135 let rest_upper = rest.trim().to_ascii_uppercase();
4136 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4137 AwsServiceError::aws_error(
4138 StatusCode::BAD_REQUEST,
4139 "ValidationException",
4140 "Invalid INSERT statement: missing VALUE",
4141 )
4142 })?;
4143
4144 let value_str = rest.trim()[value_pos + 5..].trim();
4145 let item = parse_partiql_value_object(value_str, parameters)?;
4146
4147 let mut state = state.write();
4148 let table = get_table_mut(&mut state.tables, &table_name)?;
4149 let key = extract_key(table, &item);
4150 if table.find_item_index(&key).is_some() {
4151 return Err(AwsServiceError::aws_error(
4153 StatusCode::BAD_REQUEST,
4154 "DuplicateItemException",
4155 "Duplicate primary key exists in table",
4156 ));
4157 } else {
4158 table.items.push(item);
4159 }
4160 table.recalculate_stats();
4161
4162 DynamoDbService::ok_json(json!({}))
4163}
4164
4165fn execute_partiql_update(
4166 state: &SharedDynamoDbState,
4167 statement: &str,
4168 parameters: &[Value],
4169) -> Result<AwsResponse, AwsServiceError> {
4170 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
4174
4175 let rest_upper = rest.trim().to_ascii_uppercase();
4176 let set_pos = rest_upper.find("SET").ok_or_else(|| {
4177 AwsServiceError::aws_error(
4178 StatusCode::BAD_REQUEST,
4179 "ValidationException",
4180 "Invalid UPDATE statement: missing SET",
4181 )
4182 })?;
4183
4184 let after_set = rest.trim()[set_pos + 3..].trim();
4185
4186 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4188 let (set_clause, where_clause) = if let Some(wp) = where_pos {
4189 (&after_set[..wp], after_set[wp + 5..].trim())
4190 } else {
4191 (after_set, "")
4192 };
4193
4194 let mut state = state.write();
4195 let table = get_table_mut(&mut state.tables, &table_name)?;
4196
4197 let matched_indices = if !where_clause.is_empty() {
4198 find_partiql_where_indices(table, where_clause, parameters)?
4199 } else {
4200 (0..table.items.len()).collect()
4201 };
4202
4203 let param_offset = count_params_in_str(where_clause);
4205 let assignments: Vec<&str> = set_clause.split(',').collect();
4206 for idx in &matched_indices {
4207 let mut local_offset = param_offset;
4208 for assignment in &assignments {
4209 let assignment = assignment.trim();
4210 if let Some((attr, val_str)) = assignment.split_once('=') {
4211 let attr = attr.trim().trim_matches('"');
4212 let val_str = val_str.trim();
4213 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4214 if let Some(v) = value {
4215 table.items[*idx].insert(attr.to_string(), v);
4216 }
4217 }
4218 }
4219 }
4220 table.recalculate_stats();
4221
4222 DynamoDbService::ok_json(json!({}))
4223}
4224
4225fn execute_partiql_delete(
4226 state: &SharedDynamoDbState,
4227 statement: &str,
4228 parameters: &[Value],
4229) -> Result<AwsResponse, AwsServiceError> {
4230 let upper = statement.to_ascii_uppercase();
4232 let from_pos = upper.find("FROM").ok_or_else(|| {
4233 AwsServiceError::aws_error(
4234 StatusCode::BAD_REQUEST,
4235 "ValidationException",
4236 "Invalid DELETE statement: missing FROM",
4237 )
4238 })?;
4239
4240 let after_from = statement[from_pos + 4..].trim();
4241 let (table_name, rest) = parse_partiql_table_name(after_from);
4242
4243 let rest_upper = rest.trim().to_ascii_uppercase();
4244 if !rest_upper.starts_with("WHERE") {
4245 return Err(AwsServiceError::aws_error(
4246 StatusCode::BAD_REQUEST,
4247 "ValidationException",
4248 "DELETE requires a WHERE clause",
4249 ));
4250 }
4251 let where_clause = rest.trim()[5..].trim();
4252
4253 let mut state = state.write();
4254 let table = get_table_mut(&mut state.tables, &table_name)?;
4255
4256 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4257 indices.sort_unstable();
4259 indices.reverse();
4260 for idx in indices {
4261 table.items.remove(idx);
4262 }
4263 table.recalculate_stats();
4264
4265 DynamoDbService::ok_json(json!({}))
4266}
4267
4268fn parse_partiql_table_name(s: &str) -> (String, &str) {
4271 let s = s.trim();
4272 if let Some(stripped) = s.strip_prefix('"') {
4273 if let Some(end) = stripped.find('"') {
4275 let name = &stripped[..end];
4276 let rest = &stripped[end + 1..];
4277 (name.to_string(), rest)
4278 } else {
4279 let end = s.find(' ').unwrap_or(s.len());
4280 (s[..end].trim_matches('"').to_string(), &s[end..])
4281 }
4282 } else {
4283 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4284 (s[..end].to_string(), &s[end..])
4285 }
4286}
4287
4288fn evaluate_partiql_where<'a>(
4291 table: &'a DynamoTable,
4292 where_clause: &str,
4293 parameters: &[Value],
4294) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4295 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4296 Ok(indices.iter().map(|i| &table.items[*i]).collect())
4297}
4298
4299fn find_partiql_where_indices(
4300 table: &DynamoTable,
4301 where_clause: &str,
4302 parameters: &[Value],
4303) -> Result<Vec<usize>, AwsServiceError> {
4304 let upper = where_clause.to_uppercase();
4307 let conditions = if upper.contains(" AND ") {
4308 let mut parts = Vec::new();
4310 let mut last = 0;
4311 for (i, _) in upper.match_indices(" AND ") {
4312 parts.push(where_clause[last..i].trim());
4313 last = i + 5;
4314 }
4315 parts.push(where_clause[last..].trim());
4316 parts
4317 } else {
4318 vec![where_clause.trim()]
4319 };
4320
4321 let mut param_idx = 0usize;
4322 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
4323
4324 for cond in &conditions {
4325 let cond = cond.trim();
4326 if let Some((left, right)) = cond.split_once('=') {
4327 let attr = left.trim().trim_matches('"').to_string();
4328 let val_str = right.trim();
4329 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
4330 if let Some(v) = value {
4331 parsed_conditions.push((attr, v));
4332 }
4333 }
4334 }
4335
4336 let mut indices = Vec::new();
4337 for (i, item) in table.items.iter().enumerate() {
4338 let all_match = parsed_conditions
4339 .iter()
4340 .all(|(attr, expected)| item.get(attr) == Some(expected));
4341 if all_match {
4342 indices.push(i);
4343 }
4344 }
4345
4346 Ok(indices)
4347}
4348
4349fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
4354 let s = s.trim();
4355 if s == "?" {
4356 let idx = *param_idx;
4357 *param_idx += 1;
4358 parameters.get(idx).cloned()
4359 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
4360 let inner = &s[1..s.len() - 1];
4361 Some(json!({"S": inner}))
4362 } else if let Ok(n) = s.parse::<f64>() {
4363 let num_str = if n == n.trunc() {
4364 format!("{}", n as i64)
4365 } else {
4366 format!("{n}")
4367 };
4368 Some(json!({"N": num_str}))
4369 } else {
4370 None
4371 }
4372}
4373
4374fn parse_partiql_value_object(
4376 s: &str,
4377 parameters: &[Value],
4378) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
4379 let s = s.trim();
4380 let inner = s
4381 .strip_prefix('{')
4382 .and_then(|s| s.strip_suffix('}'))
4383 .ok_or_else(|| {
4384 AwsServiceError::aws_error(
4385 StatusCode::BAD_REQUEST,
4386 "ValidationException",
4387 "Invalid VALUE: expected object literal",
4388 )
4389 })?;
4390
4391 let mut item = HashMap::new();
4392 let mut param_idx = 0usize;
4393
4394 for pair in split_partiql_pairs(inner) {
4396 let pair = pair.trim();
4397 if pair.is_empty() {
4398 continue;
4399 }
4400 if let Some((key_part, val_part)) = pair.split_once(':') {
4401 let key = key_part
4402 .trim()
4403 .trim_matches('\'')
4404 .trim_matches('"')
4405 .to_string();
4406 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
4407 item.insert(key, val);
4408 }
4409 }
4410 }
4411
4412 Ok(item)
4413}
4414
4415fn split_partiql_pairs(s: &str) -> Vec<&str> {
4417 let mut parts = Vec::new();
4418 let mut start = 0;
4419 let mut depth = 0;
4420 let mut in_quote = false;
4421
4422 for (i, c) in s.char_indices() {
4423 match c {
4424 '\'' if !in_quote => in_quote = true,
4425 '\'' if in_quote => in_quote = false,
4426 '{' if !in_quote => depth += 1,
4427 '}' if !in_quote => depth -= 1,
4428 ',' if !in_quote && depth == 0 => {
4429 parts.push(&s[start..i]);
4430 start = i + 1;
4431 }
4432 _ => {}
4433 }
4434 }
4435 parts.push(&s[start..]);
4436 parts
4437}
4438
4439fn count_params_in_str(s: &str) -> usize {
4441 s.chars().filter(|c| *c == '?').count()
4442}
4443
4444#[cfg(test)]
4445mod tests {
4446 use super::*;
4447 use serde_json::json;
4448
4449 #[test]
4450 fn test_parse_update_clauses_set() {
4451 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
4452 assert_eq!(clauses.len(), 1);
4453 assert_eq!(clauses[0].0, "SET");
4454 assert_eq!(clauses[0].1.len(), 2);
4455 }
4456
4457 #[test]
4458 fn test_parse_update_clauses_set_and_remove() {
4459 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
4460 assert_eq!(clauses.len(), 2);
4461 assert_eq!(clauses[0].0, "SET");
4462 assert_eq!(clauses[1].0, "REMOVE");
4463 }
4464
4465 #[test]
4466 fn test_evaluate_key_condition_simple() {
4467 let mut item = HashMap::new();
4468 item.insert("pk".to_string(), json!({"S": "user1"}));
4469 item.insert("sk".to_string(), json!({"S": "order1"}));
4470
4471 let mut expr_values = HashMap::new();
4472 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
4473
4474 assert!(evaluate_key_condition(
4475 "pk = :pk",
4476 &item,
4477 "pk",
4478 Some("sk"),
4479 &HashMap::new(),
4480 &expr_values,
4481 ));
4482 }
4483
4484 #[test]
4485 fn test_compare_attribute_values_numbers() {
4486 let a = json!({"N": "10"});
4487 let b = json!({"N": "20"});
4488 assert_eq!(
4489 compare_attribute_values(Some(&a), Some(&b)),
4490 std::cmp::Ordering::Less
4491 );
4492 }
4493
4494 #[test]
4495 fn test_compare_attribute_values_strings() {
4496 let a = json!({"S": "apple"});
4497 let b = json!({"S": "banana"});
4498 assert_eq!(
4499 compare_attribute_values(Some(&a), Some(&b)),
4500 std::cmp::Ordering::Less
4501 );
4502 }
4503
4504 #[test]
4505 fn test_split_on_and() {
4506 let parts = split_on_and("pk = :pk AND sk > :sk");
4507 assert_eq!(parts.len(), 2);
4508 assert_eq!(parts[0].trim(), "pk = :pk");
4509 assert_eq!(parts[1].trim(), "sk > :sk");
4510 }
4511
4512 #[test]
4513 fn test_split_on_and_respects_parentheses() {
4514 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
4516 assert_eq!(parts.len(), 1);
4518 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
4519 }
4520
4521 #[test]
4522 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
4523 let mut item = HashMap::new();
4525 item.insert("x".to_string(), json!({"S": "no"}));
4526 item.insert("y".to_string(), json!({"S": "no"}));
4527 item.insert("z".to_string(), json!({"S": "yes"}));
4528
4529 let mut expr_values = HashMap::new();
4530 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
4531
4532 let result = evaluate_filter_expression(
4534 "(x = :yes AND y = :yes) OR z = :yes",
4535 &item,
4536 &HashMap::new(),
4537 &expr_values,
4538 );
4539 assert!(result, "should match because z = :yes is true");
4540
4541 let mut item2 = HashMap::new();
4543 item2.insert("x".to_string(), json!({"S": "no"}));
4544 item2.insert("y".to_string(), json!({"S": "no"}));
4545 item2.insert("z".to_string(), json!({"S": "no"}));
4546
4547 let result2 = evaluate_filter_expression(
4548 "(x = :yes AND y = :yes) OR z = :yes",
4549 &item2,
4550 &HashMap::new(),
4551 &expr_values,
4552 );
4553 assert!(!result2, "should not match because nothing is true");
4554 }
4555
4556 #[test]
4557 fn test_project_item_nested_path() {
4558 let mut item = HashMap::new();
4560 item.insert("pk".to_string(), json!({"S": "key1"}));
4561 item.insert(
4562 "data".to_string(),
4563 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
4564 );
4565
4566 let body = json!({
4567 "ProjectionExpression": "data[0].name"
4568 });
4569
4570 let projected = project_item(&item, &body);
4571 let name = projected
4573 .get("data")
4574 .and_then(|v| v.get("L"))
4575 .and_then(|v| v.get(0))
4576 .and_then(|v| v.get("M"))
4577 .and_then(|v| v.get("name"))
4578 .and_then(|v| v.get("S"))
4579 .and_then(|v| v.as_str());
4580 assert_eq!(name, Some("Alice"));
4581
4582 let age = projected
4584 .get("data")
4585 .and_then(|v| v.get("L"))
4586 .and_then(|v| v.get(0))
4587 .and_then(|v| v.get("M"))
4588 .and_then(|v| v.get("age"));
4589 assert!(age.is_none(), "age should not be present in projection");
4590 }
4591
4592 #[test]
4593 fn test_resolve_nested_path_map() {
4594 let mut item = HashMap::new();
4595 item.insert(
4596 "info".to_string(),
4597 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
4598 );
4599
4600 let result = resolve_nested_path(&item, "info.address.city");
4601 assert_eq!(result, Some(json!({"S": "NYC"})));
4602 }
4603
4604 #[test]
4605 fn test_resolve_nested_path_list_then_map() {
4606 let mut item = HashMap::new();
4607 item.insert(
4608 "items".to_string(),
4609 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
4610 );
4611
4612 let result = resolve_nested_path(&item, "items[0].sku");
4613 assert_eq!(result, Some(json!({"S": "ABC"})));
4614 }
4615
4616 use crate::state::SharedDynamoDbState;
4619 use parking_lot::RwLock;
4620 use std::sync::Arc;
4621
4622 fn make_service() -> DynamoDbService {
4623 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
4624 "123456789012",
4625 "us-east-1",
4626 )));
4627 DynamoDbService::new(state)
4628 }
4629
4630 fn make_request(action: &str, body: Value) -> AwsRequest {
4631 AwsRequest {
4632 service: "dynamodb".to_string(),
4633 action: action.to_string(),
4634 region: "us-east-1".to_string(),
4635 account_id: "123456789012".to_string(),
4636 request_id: "test-id".to_string(),
4637 headers: http::HeaderMap::new(),
4638 query_params: HashMap::new(),
4639 body: serde_json::to_vec(&body).unwrap().into(),
4640 path_segments: vec![],
4641 raw_path: "/".to_string(),
4642 method: http::Method::POST,
4643 is_query_protocol: false,
4644 access_key_id: None,
4645 }
4646 }
4647
4648 fn create_test_table(svc: &DynamoDbService) {
4649 let req = make_request(
4650 "CreateTable",
4651 json!({
4652 "TableName": "test-table",
4653 "KeySchema": [
4654 { "AttributeName": "pk", "KeyType": "HASH" }
4655 ],
4656 "AttributeDefinitions": [
4657 { "AttributeName": "pk", "AttributeType": "S" }
4658 ],
4659 "BillingMode": "PAY_PER_REQUEST"
4660 }),
4661 );
4662 svc.create_table(&req).unwrap();
4663 }
4664
4665 #[test]
4666 fn delete_item_return_values_all_old() {
4667 let svc = make_service();
4668 create_test_table(&svc);
4669
4670 let req = make_request(
4672 "PutItem",
4673 json!({
4674 "TableName": "test-table",
4675 "Item": {
4676 "pk": { "S": "key1" },
4677 "name": { "S": "Alice" },
4678 "age": { "N": "30" }
4679 }
4680 }),
4681 );
4682 svc.put_item(&req).unwrap();
4683
4684 let req = make_request(
4686 "DeleteItem",
4687 json!({
4688 "TableName": "test-table",
4689 "Key": { "pk": { "S": "key1" } },
4690 "ReturnValues": "ALL_OLD"
4691 }),
4692 );
4693 let resp = svc.delete_item(&req).unwrap();
4694 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4695
4696 let attrs = &body["Attributes"];
4698 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
4699 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
4700 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
4701
4702 let req = make_request(
4704 "GetItem",
4705 json!({
4706 "TableName": "test-table",
4707 "Key": { "pk": { "S": "key1" } }
4708 }),
4709 );
4710 let resp = svc.get_item(&req).unwrap();
4711 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4712 assert!(body.get("Item").is_none(), "item should be deleted");
4713 }
4714
4715 #[test]
4716 fn transact_get_items_returns_existing_and_missing() {
4717 let svc = make_service();
4718 create_test_table(&svc);
4719
4720 let req = make_request(
4722 "PutItem",
4723 json!({
4724 "TableName": "test-table",
4725 "Item": {
4726 "pk": { "S": "exists" },
4727 "val": { "S": "hello" }
4728 }
4729 }),
4730 );
4731 svc.put_item(&req).unwrap();
4732
4733 let req = make_request(
4734 "TransactGetItems",
4735 json!({
4736 "TransactItems": [
4737 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
4738 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
4739 ]
4740 }),
4741 );
4742 let resp = svc.transact_get_items(&req).unwrap();
4743 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4744 let responses = body["Responses"].as_array().unwrap();
4745 assert_eq!(responses.len(), 2);
4746 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
4747 assert!(responses[1].get("Item").is_none());
4748 }
4749
4750 #[test]
4751 fn transact_write_items_put_and_delete() {
4752 let svc = make_service();
4753 create_test_table(&svc);
4754
4755 let req = make_request(
4757 "PutItem",
4758 json!({
4759 "TableName": "test-table",
4760 "Item": {
4761 "pk": { "S": "to-delete" },
4762 "val": { "S": "bye" }
4763 }
4764 }),
4765 );
4766 svc.put_item(&req).unwrap();
4767
4768 let req = make_request(
4770 "TransactWriteItems",
4771 json!({
4772 "TransactItems": [
4773 {
4774 "Put": {
4775 "TableName": "test-table",
4776 "Item": {
4777 "pk": { "S": "new-item" },
4778 "val": { "S": "hi" }
4779 }
4780 }
4781 },
4782 {
4783 "Delete": {
4784 "TableName": "test-table",
4785 "Key": { "pk": { "S": "to-delete" } }
4786 }
4787 }
4788 ]
4789 }),
4790 );
4791 let resp = svc.transact_write_items(&req).unwrap();
4792 assert_eq!(resp.status, StatusCode::OK);
4793
4794 let req = make_request(
4796 "GetItem",
4797 json!({
4798 "TableName": "test-table",
4799 "Key": { "pk": { "S": "new-item" } }
4800 }),
4801 );
4802 let resp = svc.get_item(&req).unwrap();
4803 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4804 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
4805
4806 let req = make_request(
4808 "GetItem",
4809 json!({
4810 "TableName": "test-table",
4811 "Key": { "pk": { "S": "to-delete" } }
4812 }),
4813 );
4814 let resp = svc.get_item(&req).unwrap();
4815 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4816 assert!(body.get("Item").is_none());
4817 }
4818
4819 #[test]
4820 fn transact_write_items_condition_check_failure() {
4821 let svc = make_service();
4822 create_test_table(&svc);
4823
4824 let req = make_request(
4826 "TransactWriteItems",
4827 json!({
4828 "TransactItems": [
4829 {
4830 "ConditionCheck": {
4831 "TableName": "test-table",
4832 "Key": { "pk": { "S": "nonexistent" } },
4833 "ConditionExpression": "attribute_exists(pk)"
4834 }
4835 }
4836 ]
4837 }),
4838 );
4839 let resp = svc.transact_write_items(&req).unwrap();
4840 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
4842 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4843 assert_eq!(
4844 body["__type"].as_str().unwrap(),
4845 "TransactionCanceledException"
4846 );
4847 assert!(body["CancellationReasons"].as_array().is_some());
4848 }
4849
4850 #[test]
4851 fn update_and_describe_time_to_live() {
4852 let svc = make_service();
4853 create_test_table(&svc);
4854
4855 let req = make_request(
4857 "UpdateTimeToLive",
4858 json!({
4859 "TableName": "test-table",
4860 "TimeToLiveSpecification": {
4861 "AttributeName": "ttl",
4862 "Enabled": true
4863 }
4864 }),
4865 );
4866 let resp = svc.update_time_to_live(&req).unwrap();
4867 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4868 assert_eq!(
4869 body["TimeToLiveSpecification"]["AttributeName"]
4870 .as_str()
4871 .unwrap(),
4872 "ttl"
4873 );
4874 assert!(body["TimeToLiveSpecification"]["Enabled"]
4875 .as_bool()
4876 .unwrap());
4877
4878 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4880 let resp = svc.describe_time_to_live(&req).unwrap();
4881 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4882 assert_eq!(
4883 body["TimeToLiveDescription"]["TimeToLiveStatus"]
4884 .as_str()
4885 .unwrap(),
4886 "ENABLED"
4887 );
4888 assert_eq!(
4889 body["TimeToLiveDescription"]["AttributeName"]
4890 .as_str()
4891 .unwrap(),
4892 "ttl"
4893 );
4894
4895 let req = make_request(
4897 "UpdateTimeToLive",
4898 json!({
4899 "TableName": "test-table",
4900 "TimeToLiveSpecification": {
4901 "AttributeName": "ttl",
4902 "Enabled": false
4903 }
4904 }),
4905 );
4906 svc.update_time_to_live(&req).unwrap();
4907
4908 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
4909 let resp = svc.describe_time_to_live(&req).unwrap();
4910 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4911 assert_eq!(
4912 body["TimeToLiveDescription"]["TimeToLiveStatus"]
4913 .as_str()
4914 .unwrap(),
4915 "DISABLED"
4916 );
4917 }
4918
4919 #[test]
4920 fn resource_policy_lifecycle() {
4921 let svc = make_service();
4922 create_test_table(&svc);
4923
4924 let table_arn = {
4925 let state = svc.state.read();
4926 state.tables.get("test-table").unwrap().arn.clone()
4927 };
4928
4929 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
4931 let req = make_request(
4932 "PutResourcePolicy",
4933 json!({
4934 "ResourceArn": table_arn,
4935 "Policy": policy_doc
4936 }),
4937 );
4938 let resp = svc.put_resource_policy(&req).unwrap();
4939 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4940 assert!(body["RevisionId"].as_str().is_some());
4941
4942 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4944 let resp = svc.get_resource_policy(&req).unwrap();
4945 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4946 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
4947
4948 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
4950 svc.delete_resource_policy(&req).unwrap();
4951
4952 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
4954 let resp = svc.get_resource_policy(&req).unwrap();
4955 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4956 assert!(body["Policy"].is_null());
4957 }
4958
4959 #[test]
4960 fn describe_endpoints() {
4961 let svc = make_service();
4962 let req = make_request("DescribeEndpoints", json!({}));
4963 let resp = svc.describe_endpoints(&req).unwrap();
4964 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4965 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
4966 }
4967
4968 #[test]
4969 fn describe_limits() {
4970 let svc = make_service();
4971 let req = make_request("DescribeLimits", json!({}));
4972 let resp = svc.describe_limits(&req).unwrap();
4973 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4974 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
4975 }
4976
4977 #[test]
4978 fn backup_lifecycle() {
4979 let svc = make_service();
4980 create_test_table(&svc);
4981
4982 let req = make_request(
4984 "CreateBackup",
4985 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
4986 );
4987 let resp = svc.create_backup(&req).unwrap();
4988 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4989 let backup_arn = body["BackupDetails"]["BackupArn"]
4990 .as_str()
4991 .unwrap()
4992 .to_string();
4993 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
4994
4995 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
4997 let resp = svc.describe_backup(&req).unwrap();
4998 let body: Value = serde_json::from_slice(&resp.body).unwrap();
4999 assert_eq!(
5000 body["BackupDescription"]["BackupDetails"]["BackupName"],
5001 "my-backup"
5002 );
5003
5004 let req = make_request("ListBackups", json!({}));
5006 let resp = svc.list_backups(&req).unwrap();
5007 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5008 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5009
5010 let req = make_request(
5012 "RestoreTableFromBackup",
5013 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5014 );
5015 svc.restore_table_from_backup(&req).unwrap();
5016
5017 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5019 let resp = svc.describe_table(&req).unwrap();
5020 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5021 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5022
5023 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5025 svc.delete_backup(&req).unwrap();
5026
5027 let req = make_request("ListBackups", json!({}));
5029 let resp = svc.list_backups(&req).unwrap();
5030 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5031 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5032 }
5033
5034 #[test]
5035 fn continuous_backups() {
5036 let svc = make_service();
5037 create_test_table(&svc);
5038
5039 let req = make_request(
5041 "DescribeContinuousBackups",
5042 json!({ "TableName": "test-table" }),
5043 );
5044 let resp = svc.describe_continuous_backups(&req).unwrap();
5045 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5046 assert_eq!(
5047 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5048 ["PointInTimeRecoveryStatus"],
5049 "DISABLED"
5050 );
5051
5052 let req = make_request(
5054 "UpdateContinuousBackups",
5055 json!({
5056 "TableName": "test-table",
5057 "PointInTimeRecoverySpecification": {
5058 "PointInTimeRecoveryEnabled": true
5059 }
5060 }),
5061 );
5062 svc.update_continuous_backups(&req).unwrap();
5063
5064 let req = make_request(
5066 "DescribeContinuousBackups",
5067 json!({ "TableName": "test-table" }),
5068 );
5069 let resp = svc.describe_continuous_backups(&req).unwrap();
5070 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5071 assert_eq!(
5072 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5073 ["PointInTimeRecoveryStatus"],
5074 "ENABLED"
5075 );
5076 }
5077
5078 #[test]
5079 fn restore_table_to_point_in_time() {
5080 let svc = make_service();
5081 create_test_table(&svc);
5082
5083 let req = make_request(
5084 "RestoreTableToPointInTime",
5085 json!({
5086 "SourceTableName": "test-table",
5087 "TargetTableName": "pitr-restored"
5088 }),
5089 );
5090 svc.restore_table_to_point_in_time(&req).unwrap();
5091
5092 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5093 let resp = svc.describe_table(&req).unwrap();
5094 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5095 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5096 }
5097
5098 #[test]
5099 fn global_table_lifecycle() {
5100 let svc = make_service();
5101
5102 let req = make_request(
5104 "CreateGlobalTable",
5105 json!({
5106 "GlobalTableName": "my-global",
5107 "ReplicationGroup": [
5108 { "RegionName": "us-east-1" },
5109 { "RegionName": "eu-west-1" }
5110 ]
5111 }),
5112 );
5113 let resp = svc.create_global_table(&req).unwrap();
5114 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5115 assert_eq!(
5116 body["GlobalTableDescription"]["GlobalTableStatus"],
5117 "ACTIVE"
5118 );
5119
5120 let req = make_request(
5122 "DescribeGlobalTable",
5123 json!({ "GlobalTableName": "my-global" }),
5124 );
5125 let resp = svc.describe_global_table(&req).unwrap();
5126 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5127 assert_eq!(
5128 body["GlobalTableDescription"]["ReplicationGroup"]
5129 .as_array()
5130 .unwrap()
5131 .len(),
5132 2
5133 );
5134
5135 let req = make_request("ListGlobalTables", json!({}));
5137 let resp = svc.list_global_tables(&req).unwrap();
5138 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5139 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5140
5141 let req = make_request(
5143 "UpdateGlobalTable",
5144 json!({
5145 "GlobalTableName": "my-global",
5146 "ReplicaUpdates": [
5147 { "Create": { "RegionName": "ap-southeast-1" } }
5148 ]
5149 }),
5150 );
5151 let resp = svc.update_global_table(&req).unwrap();
5152 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5153 assert_eq!(
5154 body["GlobalTableDescription"]["ReplicationGroup"]
5155 .as_array()
5156 .unwrap()
5157 .len(),
5158 3
5159 );
5160
5161 let req = make_request(
5163 "DescribeGlobalTableSettings",
5164 json!({ "GlobalTableName": "my-global" }),
5165 );
5166 let resp = svc.describe_global_table_settings(&req).unwrap();
5167 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5168 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5169
5170 let req = make_request(
5172 "UpdateGlobalTableSettings",
5173 json!({ "GlobalTableName": "my-global" }),
5174 );
5175 svc.update_global_table_settings(&req).unwrap();
5176 }
5177
5178 #[test]
5179 fn table_replica_auto_scaling() {
5180 let svc = make_service();
5181 create_test_table(&svc);
5182
5183 let req = make_request(
5184 "DescribeTableReplicaAutoScaling",
5185 json!({ "TableName": "test-table" }),
5186 );
5187 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5188 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5189 assert_eq!(
5190 body["TableAutoScalingDescription"]["TableName"],
5191 "test-table"
5192 );
5193
5194 let req = make_request(
5195 "UpdateTableReplicaAutoScaling",
5196 json!({ "TableName": "test-table" }),
5197 );
5198 svc.update_table_replica_auto_scaling(&req).unwrap();
5199 }
5200
5201 #[test]
5202 fn kinesis_streaming_lifecycle() {
5203 let svc = make_service();
5204 create_test_table(&svc);
5205
5206 let req = make_request(
5208 "EnableKinesisStreamingDestination",
5209 json!({
5210 "TableName": "test-table",
5211 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5212 }),
5213 );
5214 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5215 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5216 assert_eq!(body["DestinationStatus"], "ACTIVE");
5217
5218 let req = make_request(
5220 "DescribeKinesisStreamingDestination",
5221 json!({ "TableName": "test-table" }),
5222 );
5223 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5224 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5225 assert_eq!(
5226 body["KinesisDataStreamDestinations"]
5227 .as_array()
5228 .unwrap()
5229 .len(),
5230 1
5231 );
5232
5233 let req = make_request(
5235 "UpdateKinesisStreamingDestination",
5236 json!({
5237 "TableName": "test-table",
5238 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5239 "UpdateKinesisStreamingConfiguration": {
5240 "ApproximateCreationDateTimePrecision": "MICROSECOND"
5241 }
5242 }),
5243 );
5244 svc.update_kinesis_streaming_destination(&req).unwrap();
5245
5246 let req = make_request(
5248 "DisableKinesisStreamingDestination",
5249 json!({
5250 "TableName": "test-table",
5251 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5252 }),
5253 );
5254 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5255 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5256 assert_eq!(body["DestinationStatus"], "DISABLED");
5257 }
5258
5259 #[test]
5260 fn contributor_insights_lifecycle() {
5261 let svc = make_service();
5262 create_test_table(&svc);
5263
5264 let req = make_request(
5266 "DescribeContributorInsights",
5267 json!({ "TableName": "test-table" }),
5268 );
5269 let resp = svc.describe_contributor_insights(&req).unwrap();
5270 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5271 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5272
5273 let req = make_request(
5275 "UpdateContributorInsights",
5276 json!({
5277 "TableName": "test-table",
5278 "ContributorInsightsAction": "ENABLE"
5279 }),
5280 );
5281 let resp = svc.update_contributor_insights(&req).unwrap();
5282 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5283 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5284
5285 let req = make_request("ListContributorInsights", json!({}));
5287 let resp = svc.list_contributor_insights(&req).unwrap();
5288 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5289 assert_eq!(
5290 body["ContributorInsightsSummaries"]
5291 .as_array()
5292 .unwrap()
5293 .len(),
5294 1
5295 );
5296 }
5297
5298 #[test]
5299 fn export_lifecycle() {
5300 let svc = make_service();
5301 create_test_table(&svc);
5302
5303 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5304
5305 let req = make_request(
5307 "ExportTableToPointInTime",
5308 json!({
5309 "TableArn": table_arn,
5310 "S3Bucket": "my-bucket"
5311 }),
5312 );
5313 let resp = svc.export_table_to_point_in_time(&req).unwrap();
5314 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5315 let export_arn = body["ExportDescription"]["ExportArn"]
5316 .as_str()
5317 .unwrap()
5318 .to_string();
5319 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
5320
5321 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
5323 let resp = svc.describe_export(&req).unwrap();
5324 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5325 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
5326
5327 let req = make_request("ListExports", json!({}));
5329 let resp = svc.list_exports(&req).unwrap();
5330 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5331 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
5332 }
5333
5334 #[test]
5335 fn import_lifecycle() {
5336 let svc = make_service();
5337
5338 let req = make_request(
5339 "ImportTable",
5340 json!({
5341 "InputFormat": "DYNAMODB_JSON",
5342 "S3BucketSource": { "S3Bucket": "import-bucket" },
5343 "TableCreationParameters": {
5344 "TableName": "imported-table",
5345 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
5346 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
5347 }
5348 }),
5349 );
5350 let resp = svc.import_table(&req).unwrap();
5351 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5352 let import_arn = body["ImportTableDescription"]["ImportArn"]
5353 .as_str()
5354 .unwrap()
5355 .to_string();
5356 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5357
5358 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
5360 let resp = svc.describe_import(&req).unwrap();
5361 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5362 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
5363
5364 let req = make_request("ListImports", json!({}));
5366 let resp = svc.list_imports(&req).unwrap();
5367 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5368 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
5369
5370 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
5372 let resp = svc.describe_table(&req).unwrap();
5373 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5374 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5375 }
5376}