1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21 ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25 state: SharedDynamoDbState,
26 s3_state: Option<SharedS3State>,
27 delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31 pub fn new(state: SharedDynamoDbState) -> Self {
32 Self {
33 state,
34 s3_state: None,
35 delivery: None,
36 }
37 }
38
39 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40 self.s3_state = Some(s3_state);
41 self
42 }
43
44 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45 self.delivery = Some(delivery);
46 self
47 }
48
49 fn deliver_to_kinesis_destinations(
51 &self,
52 table: &DynamoTable,
53 event_name: &str,
54 keys: &HashMap<String, AttributeValue>,
55 old_image: Option<&HashMap<String, AttributeValue>>,
56 new_image: Option<&HashMap<String, AttributeValue>>,
57 ) {
58 let delivery = match &self.delivery {
59 Some(d) => d,
60 None => return,
61 };
62
63 let active_destinations: Vec<_> = table
64 .kinesis_destinations
65 .iter()
66 .filter(|d| d.destination_status == "ACTIVE")
67 .collect();
68
69 if active_destinations.is_empty() {
70 return;
71 }
72
73 let mut record = json!({
74 "eventID": uuid::Uuid::new_v4().to_string(),
75 "eventName": event_name,
76 "eventVersion": "1.1",
77 "eventSource": "aws:dynamodb",
78 "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79 "dynamodb": {
80 "Keys": keys,
81 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83 "StreamViewType": "NEW_AND_OLD_IMAGES",
84 },
85 "eventSourceARN": &table.arn,
86 "tableName": &table.name,
87 });
88
89 if let Some(old) = old_image {
90 record["dynamodb"]["OldImage"] = json!(old);
91 }
92 if let Some(new) = new_image {
93 record["dynamodb"]["NewImage"] = json!(new);
94 }
95
96 let record_str = serde_json::to_string(&record).unwrap_or_default();
97 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98 let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100 for dest in active_destinations {
101 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102 }
103 }
104
105 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106 serde_json::from_slice(&req.body).map_err(|e| {
107 AwsServiceError::aws_error(
108 StatusCode::BAD_REQUEST,
109 "SerializationException",
110 format!("Invalid JSON: {e}"),
111 )
112 })
113 }
114
115 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116 Ok(AwsResponse::ok_json(body))
117 }
118
119 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122 let body = Self::parse_body(req)?;
123
124 let table_name = body["TableName"]
125 .as_str()
126 .ok_or_else(|| {
127 AwsServiceError::aws_error(
128 StatusCode::BAD_REQUEST,
129 "ValidationException",
130 "TableName is required",
131 )
132 })?
133 .to_string();
134
135 let key_schema = parse_key_schema(&body["KeySchema"])?;
136 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138 for ks in &key_schema {
140 if !attribute_definitions
141 .iter()
142 .any(|ad| ad.attribute_name == ks.attribute_name)
143 {
144 return Err(AwsServiceError::aws_error(
145 StatusCode::BAD_REQUEST,
146 "ValidationException",
147 format!(
148 "One or more parameter values were invalid: \
149 Some index key attributes are not defined in AttributeDefinitions. \
150 Keys: [{}], AttributeDefinitions: [{}]",
151 ks.attribute_name,
152 attribute_definitions
153 .iter()
154 .map(|ad| ad.attribute_name.as_str())
155 .collect::<Vec<_>>()
156 .join(", ")
157 ),
158 ));
159 }
160 }
161
162 let billing_mode = body["BillingMode"]
163 .as_str()
164 .unwrap_or("PROVISIONED")
165 .to_string();
166
167 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168 ProvisionedThroughput {
169 read_capacity_units: 0,
170 write_capacity_units: 0,
171 }
172 } else {
173 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174 };
175
176 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178 let tags = parse_tags(&body["Tags"]);
179
180 let (stream_enabled, stream_view_type) =
182 if let Some(stream_spec) = body.get("StreamSpecification") {
183 let enabled = stream_spec
184 .get("StreamEnabled")
185 .and_then(|v| v.as_bool())
186 .unwrap_or(false);
187 let view_type = if enabled {
188 stream_spec
189 .get("StreamViewType")
190 .and_then(|v| v.as_str())
191 .map(|s| s.to_string())
192 } else {
193 None
194 };
195 (enabled, view_type)
196 } else {
197 (false, None)
198 };
199
200 let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202 let enabled = sse_spec
203 .get("Enabled")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false);
206 if enabled {
207 let sse_type = sse_spec
208 .get("SSEType")
209 .and_then(|v| v.as_str())
210 .unwrap_or("KMS")
211 .to_string();
212 let kms_key = sse_spec
213 .get("KMSMasterKeyId")
214 .and_then(|v| v.as_str())
215 .map(|s| s.to_string());
216 (Some(sse_type), kms_key)
217 } else {
218 (None, None)
219 }
220 } else {
221 (None, None)
222 };
223
224 let mut state = self.state.write();
225
226 if state.tables.contains_key(&table_name) {
227 return Err(AwsServiceError::aws_error(
228 StatusCode::BAD_REQUEST,
229 "ResourceInUseException",
230 format!("Table already exists: {table_name}"),
231 ));
232 }
233
234 let now = Utc::now();
235 let arn = format!(
236 "arn:aws:dynamodb:{}:{}:table/{}",
237 state.region, state.account_id, table_name
238 );
239 let stream_arn = if stream_enabled {
240 Some(format!(
241 "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242 state.region,
243 state.account_id,
244 table_name,
245 now.format("%Y-%m-%dT%H:%M:%S.%3f")
246 ))
247 } else {
248 None
249 };
250
251 let table = DynamoTable {
252 name: table_name.clone(),
253 arn: arn.clone(),
254 key_schema: key_schema.clone(),
255 attribute_definitions: attribute_definitions.clone(),
256 provisioned_throughput: provisioned_throughput.clone(),
257 items: Vec::new(),
258 gsi: gsi.clone(),
259 lsi: lsi.clone(),
260 tags,
261 created_at: now,
262 status: "ACTIVE".to_string(),
263 item_count: 0,
264 size_bytes: 0,
265 billing_mode: billing_mode.clone(),
266 ttl_attribute: None,
267 ttl_enabled: false,
268 resource_policy: None,
269 pitr_enabled: false,
270 kinesis_destinations: Vec::new(),
271 contributor_insights_status: "DISABLED".to_string(),
272 contributor_insights_counters: HashMap::new(),
273 stream_enabled,
274 stream_view_type,
275 stream_arn,
276 stream_records: Arc::new(RwLock::new(Vec::new())),
277 sse_type,
278 sse_kms_key_arn,
279 };
280
281 state.tables.insert(table_name, table);
282
283 let table_desc = build_table_description_json(
284 &arn,
285 &key_schema,
286 &attribute_definitions,
287 &provisioned_throughput,
288 &gsi,
289 &lsi,
290 &billing_mode,
291 now,
292 0,
293 0,
294 "ACTIVE",
295 );
296
297 Self::ok_json(json!({ "TableDescription": table_desc }))
298 }
299
300 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = Self::parse_body(req)?;
302 let table_name = require_str(&body, "TableName")?;
303
304 let mut state = self.state.write();
305 let table = state.tables.remove(table_name).ok_or_else(|| {
306 AwsServiceError::aws_error(
307 StatusCode::BAD_REQUEST,
308 "ResourceNotFoundException",
309 format!("Requested resource not found: Table: {table_name} not found"),
310 )
311 })?;
312
313 let table_desc = build_table_description_json(
314 &table.arn,
315 &table.key_schema,
316 &table.attribute_definitions,
317 &table.provisioned_throughput,
318 &table.gsi,
319 &table.lsi,
320 &table.billing_mode,
321 table.created_at,
322 table.item_count,
323 table.size_bytes,
324 "DELETING",
325 );
326
327 Self::ok_json(json!({ "TableDescription": table_desc }))
328 }
329
330 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331 let body = Self::parse_body(req)?;
332 let table_name = require_str(&body, "TableName")?;
333
334 let state = self.state.read();
335 let table = get_table(&state.tables, table_name)?;
336
337 let table_desc = build_table_description(table);
338
339 Self::ok_json(json!({ "Table": table_desc }))
340 }
341
342 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343 let body = Self::parse_body(req)?;
344
345 validate_optional_string_length(
346 "exclusiveStartTableName",
347 body["ExclusiveStartTableName"].as_str(),
348 3,
349 255,
350 )?;
351 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354 let exclusive_start = body["ExclusiveStartTableName"]
355 .as_str()
356 .map(|s| s.to_string());
357
358 let state = self.state.read();
359 let mut names: Vec<&String> = state.tables.keys().collect();
360 names.sort();
361
362 let start_idx = match &exclusive_start {
363 Some(start) => names
364 .iter()
365 .position(|n| n.as_str() > start.as_str())
366 .unwrap_or(names.len()),
367 None => 0,
368 };
369
370 let page: Vec<&str> = names
371 .iter()
372 .skip(start_idx)
373 .take(limit)
374 .map(|n| n.as_str())
375 .collect();
376
377 let mut result = json!({ "TableNames": page });
378
379 if start_idx + limit < names.len() {
380 if let Some(last) = page.last() {
381 result["LastEvaluatedTableName"] = json!(last);
382 }
383 }
384
385 Self::ok_json(result)
386 }
387
388 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389 let body = Self::parse_body(req)?;
390 let table_name = require_str(&body, "TableName")?;
391
392 let mut state = self.state.write();
393 let table = state.tables.get_mut(table_name).ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::BAD_REQUEST,
396 "ResourceNotFoundException",
397 format!("Requested resource not found: Table: {table_name} not found"),
398 )
399 })?;
400
401 if let Some(pt) = body.get("ProvisionedThroughput") {
402 if let Ok(throughput) = parse_provisioned_throughput(pt) {
403 table.provisioned_throughput = throughput;
404 }
405 }
406
407 if let Some(bm) = body["BillingMode"].as_str() {
408 table.billing_mode = bm.to_string();
409 }
410
411 if let Some(sse_spec) = body.get("SSESpecification") {
413 let enabled = sse_spec
414 .get("Enabled")
415 .and_then(|v| v.as_bool())
416 .unwrap_or(false);
417 if enabled {
418 table.sse_type = Some(
419 sse_spec
420 .get("SSEType")
421 .and_then(|v| v.as_str())
422 .unwrap_or("KMS")
423 .to_string(),
424 );
425 table.sse_kms_key_arn = sse_spec
426 .get("KMSMasterKeyId")
427 .and_then(|v| v.as_str())
428 .map(|s| s.to_string());
429 } else {
430 table.sse_type = None;
431 table.sse_kms_key_arn = None;
432 }
433 }
434
435 let table_desc = build_table_description_json(
436 &table.arn,
437 &table.key_schema,
438 &table.attribute_definitions,
439 &table.provisioned_throughput,
440 &table.gsi,
441 &table.lsi,
442 &table.billing_mode,
443 table.created_at,
444 table.item_count,
445 table.size_bytes,
446 &table.status,
447 );
448
449 Self::ok_json(json!({ "TableDescription": table_desc }))
450 }
451
452 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455 let body = Self::parse_body(req)?;
457 let table_name = require_str(&body, "TableName")?;
458 let item = require_object(&body, "Item")?;
459 let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460 let expr_attr_names = parse_expression_attribute_names(&body);
461 let expr_attr_values = parse_expression_attribute_values(&body);
462 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464 let (old_item, kinesis_info) = {
467 let mut state = self.state.write();
468 let table = get_table_mut(&mut state.tables, table_name)?;
469
470 validate_key_in_item(table, &item)?;
471
472 let key = extract_key(table, &item);
473 let existing_idx = table.find_item_index(&key);
474
475 if let Some(ref cond) = condition {
476 let existing = existing_idx.map(|i| &table.items[i]);
477 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
478 }
479
480 let old_item_for_return = if return_values == "ALL_OLD" {
481 existing_idx.map(|i| table.items[i].clone())
482 } else {
483 None
484 };
485
486 let needs_change_capture = table.stream_enabled
488 || table
489 .kinesis_destinations
490 .iter()
491 .any(|d| d.destination_status == "ACTIVE");
492 let old_item_for_stream = if needs_change_capture {
493 existing_idx.map(|i| table.items[i].clone())
494 } else {
495 None
496 };
497
498 let is_modify = existing_idx.is_some();
499
500 if let Some(idx) = existing_idx {
501 table.items[idx] = item.clone();
502 } else {
503 table.items.push(item.clone());
504 }
505
506 table.record_item_access(&item);
507 table.recalculate_stats();
508
509 let event_name = if is_modify { "MODIFY" } else { "INSERT" };
510 let key = extract_key(table, &item);
511
512 if table.stream_enabled {
514 if let Some(record) = crate::streams::generate_stream_record(
515 table,
516 event_name,
517 key.clone(),
518 old_item_for_stream.clone(),
519 Some(item.clone()),
520 ) {
521 crate::streams::add_stream_record(table, record);
522 }
523 }
524
525 let kinesis_info = if table
527 .kinesis_destinations
528 .iter()
529 .any(|d| d.destination_status == "ACTIVE")
530 {
531 Some((
532 table.clone(),
533 event_name.to_string(),
534 key,
535 old_item_for_stream,
536 Some(item.clone()),
537 ))
538 } else {
539 None
540 };
541
542 (old_item_for_return, kinesis_info)
543 };
544 if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
548 self.deliver_to_kinesis_destinations(
549 &table,
550 &event_name,
551 &keys,
552 old_image.as_ref(),
553 new_image.as_ref(),
554 );
555 }
556
557 let mut result = json!({});
558 if let Some(old) = old_item {
559 result["Attributes"] = json!(old);
560 }
561
562 Self::ok_json(result)
563 }
564
565 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
566 let body = Self::parse_body(req)?;
568 let table_name = require_str(&body, "TableName")?;
569 let key = require_object(&body, "Key")?;
570
571 let (result, needs_insights) = {
573 let state = self.state.read();
574 let table = get_table(&state.tables, table_name)?;
575 let needs_insights = table.contributor_insights_status == "ENABLED";
576
577 let result = match table.find_item_index(&key) {
578 Some(idx) => {
579 let item = &table.items[idx];
580 let projected = project_item(item, &body);
581 json!({ "Item": projected })
582 }
583 None => json!({}),
584 };
585 (result, needs_insights)
586 };
587 if needs_insights {
591 let mut state = self.state.write();
592 if let Some(table) = state.tables.get_mut(table_name) {
593 table.record_key_access(&key);
594 }
595 }
596
597 Self::ok_json(result)
598 }
599
600 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
601 let body = Self::parse_body(req)?;
602
603 validate_optional_enum_value(
604 "conditionalOperator",
605 &body["ConditionalOperator"],
606 &["AND", "OR"],
607 )?;
608 validate_optional_enum_value(
609 "returnConsumedCapacity",
610 &body["ReturnConsumedCapacity"],
611 &["INDEXES", "TOTAL", "NONE"],
612 )?;
613 validate_optional_enum_value(
614 "returnValues",
615 &body["ReturnValues"],
616 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
617 )?;
618 validate_optional_enum_value(
619 "returnItemCollectionMetrics",
620 &body["ReturnItemCollectionMetrics"],
621 &["SIZE", "NONE"],
622 )?;
623 validate_optional_enum_value(
624 "returnValuesOnConditionCheckFailure",
625 &body["ReturnValuesOnConditionCheckFailure"],
626 &["ALL_OLD", "NONE"],
627 )?;
628
629 let table_name = require_str(&body, "TableName")?;
630 let key = require_object(&body, "Key")?;
631
632 let (result, kinesis_info) = {
633 let mut state = self.state.write();
634 let table = get_table_mut(&mut state.tables, table_name)?;
635
636 let condition = body["ConditionExpression"].as_str();
637 let expr_attr_names = parse_expression_attribute_names(&body);
638 let expr_attr_values = parse_expression_attribute_values(&body);
639
640 let existing_idx = table.find_item_index(&key);
641
642 if let Some(cond) = condition {
643 let existing = existing_idx.map(|i| &table.items[i]);
644 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
645 }
646
647 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
648
649 let mut result = json!({});
650 let mut kinesis_info = None;
651
652 if let Some(idx) = existing_idx {
653 let old_item = table.items[idx].clone();
654 if return_values == "ALL_OLD" {
655 result["Attributes"] = json!(old_item.clone());
656 }
657
658 if table.stream_enabled {
660 if let Some(record) = crate::streams::generate_stream_record(
661 table,
662 "REMOVE",
663 key.clone(),
664 Some(old_item.clone()),
665 None,
666 ) {
667 crate::streams::add_stream_record(table, record);
668 }
669 }
670
671 if table
673 .kinesis_destinations
674 .iter()
675 .any(|d| d.destination_status == "ACTIVE")
676 {
677 kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
678 }
679
680 table.items.remove(idx);
681 table.recalculate_stats();
682 }
683
684 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
685 let return_icm = body["ReturnItemCollectionMetrics"]
686 .as_str()
687 .unwrap_or("NONE");
688
689 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
690 result["ConsumedCapacity"] = json!({
691 "TableName": table_name,
692 "CapacityUnits": 1.0,
693 });
694 }
695
696 if return_icm == "SIZE" {
697 result["ItemCollectionMetrics"] = json!({});
698 }
699
700 (result, kinesis_info)
701 };
702
703 if let Some((table, keys, old_image)) = kinesis_info {
705 self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
706 }
707
708 Self::ok_json(result)
709 }
710
711 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
712 let body = Self::parse_body(req)?;
713 let table_name = require_str(&body, "TableName")?;
714 let key = require_object(&body, "Key")?;
715
716 let mut state = self.state.write();
717 let table = get_table_mut(&mut state.tables, table_name)?;
718
719 validate_key_attributes_in_key(table, &key)?;
720
721 let condition = body["ConditionExpression"].as_str();
722 let expr_attr_names = parse_expression_attribute_names(&body);
723 let expr_attr_values = parse_expression_attribute_values(&body);
724 let update_expression = body["UpdateExpression"].as_str();
725
726 let existing_idx = table.find_item_index(&key);
727
728 if let Some(cond) = condition {
729 let existing = existing_idx.map(|i| &table.items[i]);
730 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
731 }
732
733 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
734
735 let is_insert = existing_idx.is_none();
736 let idx = match existing_idx {
737 Some(i) => i,
738 None => {
739 let mut new_item = HashMap::new();
740 for (k, v) in &key {
741 new_item.insert(k.clone(), v.clone());
742 }
743 table.items.push(new_item);
744 table.items.len() - 1
745 }
746 };
747
748 let needs_change_capture = table.stream_enabled
750 || table
751 .kinesis_destinations
752 .iter()
753 .any(|d| d.destination_status == "ACTIVE");
754 let old_item_for_stream = if needs_change_capture {
755 Some(table.items[idx].clone())
756 } else {
757 None
758 };
759
760 let old_item = if return_values == "ALL_OLD" {
761 Some(table.items[idx].clone())
762 } else {
763 None
764 };
765
766 if let Some(expr) = update_expression {
767 apply_update_expression(
768 &mut table.items[idx],
769 expr,
770 &expr_attr_names,
771 &expr_attr_values,
772 )?;
773 }
774
775 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
776 Some(table.items[idx].clone())
777 } else {
778 None
779 };
780
781 let event_name = if is_insert { "INSERT" } else { "MODIFY" };
782 let new_item_for_stream = table.items[idx].clone();
783
784 if table.stream_enabled {
786 if let Some(record) = crate::streams::generate_stream_record(
787 table,
788 event_name,
789 key.clone(),
790 old_item_for_stream.clone(),
791 Some(new_item_for_stream.clone()),
792 ) {
793 crate::streams::add_stream_record(table, record);
794 }
795 }
796
797 let kinesis_info = if table
799 .kinesis_destinations
800 .iter()
801 .any(|d| d.destination_status == "ACTIVE")
802 {
803 Some((
804 table.clone(),
805 event_name.to_string(),
806 key.clone(),
807 old_item_for_stream,
808 Some(new_item_for_stream),
809 ))
810 } else {
811 None
812 };
813
814 table.recalculate_stats();
815
816 drop(state);
818
819 if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
821 self.deliver_to_kinesis_destinations(
822 &tbl,
823 &ev,
824 &keys,
825 old_image.as_ref(),
826 new_image.as_ref(),
827 );
828 }
829
830 let mut result = json!({});
831 if let Some(old) = old_item {
832 result["Attributes"] = json!(old);
833 } else if let Some(new) = new_item {
834 result["Attributes"] = json!(new);
835 }
836
837 Self::ok_json(result)
838 }
839
840 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843 let body = Self::parse_body(req)?;
844 let table_name = require_str(&body, "TableName")?;
845
846 let state = self.state.read();
847 let table = get_table(&state.tables, table_name)?;
848
849 let expr_attr_names = parse_expression_attribute_names(&body);
850 let expr_attr_values = parse_expression_attribute_values(&body);
851
852 let key_condition = body["KeyConditionExpression"].as_str();
853 let filter_expression = body["FilterExpression"].as_str();
854 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
855 let limit = body["Limit"].as_i64().map(|l| l as usize);
856 let index_name = body["IndexName"].as_str();
857 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
858 parse_key_map(&body["ExclusiveStartKey"]);
859
860 let (items_to_scan, hash_key_name, range_key_name): (
861 &[HashMap<String, AttributeValue>],
862 String,
863 Option<String>,
864 ) = if let Some(idx_name) = index_name {
865 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
866 let hk = gsi
867 .key_schema
868 .iter()
869 .find(|k| k.key_type == "HASH")
870 .map(|k| k.attribute_name.clone())
871 .unwrap_or_default();
872 let rk = gsi
873 .key_schema
874 .iter()
875 .find(|k| k.key_type == "RANGE")
876 .map(|k| k.attribute_name.clone());
877 (&table.items, hk, rk)
878 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
879 let hk = lsi
880 .key_schema
881 .iter()
882 .find(|k| k.key_type == "HASH")
883 .map(|k| k.attribute_name.clone())
884 .unwrap_or_default();
885 let rk = lsi
886 .key_schema
887 .iter()
888 .find(|k| k.key_type == "RANGE")
889 .map(|k| k.attribute_name.clone());
890 (&table.items, hk, rk)
891 } else {
892 return Err(AwsServiceError::aws_error(
893 StatusCode::BAD_REQUEST,
894 "ValidationException",
895 format!("The table does not have the specified index: {idx_name}"),
896 ));
897 }
898 } else {
899 (
900 &table.items[..],
901 table.hash_key_name().to_string(),
902 table.range_key_name().map(|s| s.to_string()),
903 )
904 };
905
906 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
907 .iter()
908 .filter(|item| {
909 if let Some(kc) = key_condition {
910 evaluate_key_condition(
911 kc,
912 item,
913 &hash_key_name,
914 range_key_name.as_deref(),
915 &expr_attr_names,
916 &expr_attr_values,
917 )
918 } else {
919 true
920 }
921 })
922 .collect();
923
924 if let Some(ref rk) = range_key_name {
925 matched.sort_by(|a, b| {
926 let av = a.get(rk.as_str());
927 let bv = b.get(rk.as_str());
928 compare_attribute_values(av, bv)
929 });
930 if !scan_forward {
931 matched.reverse();
932 }
933 }
934
935 let table_pk_hash = table.hash_key_name().to_string();
938 let table_pk_range = table.range_key_name().map(|s| s.to_string());
939 let is_gsi_query = index_name.is_some()
940 && (hash_key_name != table_pk_hash
941 || range_key_name.as_deref() != table_pk_range.as_deref());
942
943 if let Some(ref start_key) = exclusive_start_key {
947 if let Some(pos) = matched.iter().position(|item| {
948 let index_match =
949 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
950 if is_gsi_query {
951 index_match
952 && item_matches_key(
953 item,
954 start_key,
955 &table_pk_hash,
956 table_pk_range.as_deref(),
957 )
958 } else {
959 index_match
960 }
961 }) {
962 matched = matched.split_off(pos + 1);
963 }
964 }
965
966 if let Some(filter) = filter_expression {
967 matched.retain(|item| {
968 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
969 });
970 }
971
972 let scanned_count = matched.len();
973
974 let has_more = if let Some(lim) = limit {
975 let more = matched.len() > lim;
976 matched.truncate(lim);
977 more
978 } else {
979 false
980 };
981
982 let last_evaluated_key = if has_more {
986 matched.last().map(|item| {
987 let mut key =
988 extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
989 if is_gsi_query {
990 let table_key =
991 extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
992 key.extend(table_key);
993 }
994 key
995 })
996 } else {
997 None
998 };
999
1000 let insights_enabled = table.contributor_insights_status == "ENABLED";
1002 let pk_name = table.hash_key_name().to_string();
1003 let accessed_keys: Vec<String> = if insights_enabled {
1004 matched
1005 .iter()
1006 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1007 .collect()
1008 } else {
1009 Vec::new()
1010 };
1011
1012 let items: Vec<Value> = matched
1013 .iter()
1014 .map(|item| {
1015 let projected = project_item(item, &body);
1016 json!(projected)
1017 })
1018 .collect();
1019
1020 let mut result = json!({
1021 "Items": items,
1022 "Count": items.len(),
1023 "ScannedCount": scanned_count,
1024 });
1025
1026 if let Some(lek) = last_evaluated_key {
1027 result["LastEvaluatedKey"] = json!(lek);
1028 }
1029
1030 drop(state);
1031
1032 if !accessed_keys.is_empty() {
1033 let mut state = self.state.write();
1034 if let Some(table) = state.tables.get_mut(table_name) {
1035 if table.contributor_insights_status == "ENABLED" {
1038 for key_str in accessed_keys {
1039 *table
1040 .contributor_insights_counters
1041 .entry(key_str)
1042 .or_insert(0) += 1;
1043 }
1044 }
1045 }
1046 }
1047
1048 Self::ok_json(result)
1049 }
1050
1051 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052 let body = Self::parse_body(req)?;
1053 let table_name = require_str(&body, "TableName")?;
1054
1055 let state = self.state.read();
1056 let table = get_table(&state.tables, table_name)?;
1057
1058 let expr_attr_names = parse_expression_attribute_names(&body);
1059 let expr_attr_values = parse_expression_attribute_values(&body);
1060 let filter_expression = body["FilterExpression"].as_str();
1061 let limit = body["Limit"].as_i64().map(|l| l as usize);
1062 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1063 parse_key_map(&body["ExclusiveStartKey"]);
1064
1065 let hash_key_name = table.hash_key_name().to_string();
1066 let range_key_name = table.range_key_name().map(|s| s.to_string());
1067
1068 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1069
1070 if let Some(ref start_key) = exclusive_start_key {
1072 if let Some(pos) = matched.iter().position(|item| {
1073 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1074 }) {
1075 matched = matched.split_off(pos + 1);
1076 }
1077 }
1078
1079 let scanned_count = matched.len();
1080
1081 if let Some(filter) = filter_expression {
1082 matched.retain(|item| {
1083 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1084 });
1085 }
1086
1087 let has_more = if let Some(lim) = limit {
1088 let more = matched.len() > lim;
1089 matched.truncate(lim);
1090 more
1091 } else {
1092 false
1093 };
1094
1095 let last_evaluated_key = if has_more {
1097 matched
1098 .last()
1099 .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1100 } else {
1101 None
1102 };
1103
1104 let insights_enabled = table.contributor_insights_status == "ENABLED";
1106 let pk_name = table.hash_key_name().to_string();
1107 let accessed_keys: Vec<String> = if insights_enabled {
1108 matched
1109 .iter()
1110 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1111 .collect()
1112 } else {
1113 Vec::new()
1114 };
1115
1116 let items: Vec<Value> = matched
1117 .iter()
1118 .map(|item| {
1119 let projected = project_item(item, &body);
1120 json!(projected)
1121 })
1122 .collect();
1123
1124 let mut result = json!({
1125 "Items": items,
1126 "Count": items.len(),
1127 "ScannedCount": scanned_count,
1128 });
1129
1130 if let Some(lek) = last_evaluated_key {
1131 result["LastEvaluatedKey"] = json!(lek);
1132 }
1133
1134 drop(state);
1135
1136 if !accessed_keys.is_empty() {
1137 let mut state = self.state.write();
1138 if let Some(table) = state.tables.get_mut(table_name) {
1139 if table.contributor_insights_status == "ENABLED" {
1142 for key_str in accessed_keys {
1143 *table
1144 .contributor_insights_counters
1145 .entry(key_str)
1146 .or_insert(0) += 1;
1147 }
1148 }
1149 }
1150 }
1151
1152 Self::ok_json(result)
1153 }
1154
1155 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158 let body = Self::parse_body(req)?;
1159
1160 validate_optional_enum_value(
1161 "returnConsumedCapacity",
1162 &body["ReturnConsumedCapacity"],
1163 &["INDEXES", "TOTAL", "NONE"],
1164 )?;
1165
1166 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1167
1168 let request_items = body["RequestItems"]
1169 .as_object()
1170 .ok_or_else(|| {
1171 AwsServiceError::aws_error(
1172 StatusCode::BAD_REQUEST,
1173 "ValidationException",
1174 "RequestItems is required",
1175 )
1176 })?
1177 .clone();
1178
1179 let state = self.state.read();
1180 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1181 let mut consumed_capacity: Vec<Value> = Vec::new();
1182
1183 for (table_name, params) in &request_items {
1184 let table = get_table(&state.tables, table_name)?;
1185 let keys = params["Keys"].as_array().ok_or_else(|| {
1186 AwsServiceError::aws_error(
1187 StatusCode::BAD_REQUEST,
1188 "ValidationException",
1189 "Keys is required",
1190 )
1191 })?;
1192
1193 let mut items = Vec::new();
1194 for key_val in keys {
1195 let key: HashMap<String, AttributeValue> =
1196 serde_json::from_value(key_val.clone()).unwrap_or_default();
1197 if let Some(idx) = table.find_item_index(&key) {
1198 items.push(json!(table.items[idx]));
1199 }
1200 }
1201 responses.insert(table_name.clone(), items);
1202
1203 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1204 consumed_capacity.push(json!({
1205 "TableName": table_name,
1206 "CapacityUnits": 1.0,
1207 }));
1208 }
1209 }
1210
1211 let mut result = json!({
1212 "Responses": responses,
1213 "UnprocessedKeys": {},
1214 });
1215
1216 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1217 result["ConsumedCapacity"] = json!(consumed_capacity);
1218 }
1219
1220 Self::ok_json(result)
1221 }
1222
1223 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1224 let body = Self::parse_body(req)?;
1225
1226 validate_optional_enum_value(
1227 "returnConsumedCapacity",
1228 &body["ReturnConsumedCapacity"],
1229 &["INDEXES", "TOTAL", "NONE"],
1230 )?;
1231 validate_optional_enum_value(
1232 "returnItemCollectionMetrics",
1233 &body["ReturnItemCollectionMetrics"],
1234 &["SIZE", "NONE"],
1235 )?;
1236
1237 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1238 let return_icm = body["ReturnItemCollectionMetrics"]
1239 .as_str()
1240 .unwrap_or("NONE");
1241
1242 let request_items = body["RequestItems"]
1243 .as_object()
1244 .ok_or_else(|| {
1245 AwsServiceError::aws_error(
1246 StatusCode::BAD_REQUEST,
1247 "ValidationException",
1248 "RequestItems is required",
1249 )
1250 })?
1251 .clone();
1252
1253 let mut state = self.state.write();
1254 let mut consumed_capacity: Vec<Value> = Vec::new();
1255 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1256
1257 for (table_name, requests) in &request_items {
1258 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1259 AwsServiceError::aws_error(
1260 StatusCode::BAD_REQUEST,
1261 "ResourceNotFoundException",
1262 format!("Requested resource not found: Table: {table_name} not found"),
1263 )
1264 })?;
1265
1266 let reqs = requests.as_array().ok_or_else(|| {
1267 AwsServiceError::aws_error(
1268 StatusCode::BAD_REQUEST,
1269 "ValidationException",
1270 "Request list must be an array",
1271 )
1272 })?;
1273
1274 for request in reqs {
1275 if let Some(put_req) = request.get("PutRequest") {
1276 let item: HashMap<String, AttributeValue> =
1277 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1278 let key = extract_key(table, &item);
1279 if let Some(idx) = table.find_item_index(&key) {
1280 table.items[idx] = item;
1281 } else {
1282 table.items.push(item);
1283 }
1284 } else if let Some(del_req) = request.get("DeleteRequest") {
1285 let key: HashMap<String, AttributeValue> =
1286 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1287 if let Some(idx) = table.find_item_index(&key) {
1288 table.items.remove(idx);
1289 }
1290 }
1291 }
1292
1293 table.recalculate_stats();
1294
1295 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1296 consumed_capacity.push(json!({
1297 "TableName": table_name,
1298 "CapacityUnits": 1.0,
1299 }));
1300 }
1301
1302 if return_icm == "SIZE" {
1303 item_collection_metrics.insert(table_name.clone(), vec![]);
1304 }
1305 }
1306
1307 let mut result = json!({
1308 "UnprocessedItems": {},
1309 });
1310
1311 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1312 result["ConsumedCapacity"] = json!(consumed_capacity);
1313 }
1314
1315 if return_icm == "SIZE" {
1316 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1317 }
1318
1319 Self::ok_json(result)
1320 }
1321
1322 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1325 let body = Self::parse_body(req)?;
1326 let resource_arn = require_str(&body, "ResourceArn")?;
1327 validate_required("Tags", &body["Tags"])?;
1328
1329 let mut state = self.state.write();
1330 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1331
1332 fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1333 |f| {
1334 AwsServiceError::aws_error(
1335 StatusCode::BAD_REQUEST,
1336 "ValidationException",
1337 format!("{f} must be a list"),
1338 )
1339 },
1340 )?;
1341
1342 Self::ok_json(json!({}))
1343 }
1344
1345 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1346 let body = Self::parse_body(req)?;
1347 let resource_arn = require_str(&body, "ResourceArn")?;
1348 validate_required("TagKeys", &body["TagKeys"])?;
1349
1350 let mut state = self.state.write();
1351 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1352
1353 fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1354 AwsServiceError::aws_error(
1355 StatusCode::BAD_REQUEST,
1356 "ValidationException",
1357 format!("{f} must be a list"),
1358 )
1359 })?;
1360
1361 Self::ok_json(json!({}))
1362 }
1363
1364 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1365 let body = Self::parse_body(req)?;
1366 let resource_arn = require_str(&body, "ResourceArn")?;
1367
1368 let state = self.state.read();
1369 let table = find_table_by_arn(&state.tables, resource_arn)?;
1370
1371 let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1372
1373 Self::ok_json(json!({ "Tags": tags }))
1374 }
1375
1376 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1379 let body = Self::parse_body(req)?;
1380 validate_optional_enum_value(
1381 "returnConsumedCapacity",
1382 &body["ReturnConsumedCapacity"],
1383 &["INDEXES", "TOTAL", "NONE"],
1384 )?;
1385 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1386 AwsServiceError::aws_error(
1387 StatusCode::BAD_REQUEST,
1388 "ValidationException",
1389 "TransactItems is required",
1390 )
1391 })?;
1392
1393 let state = self.state.read();
1394 let mut responses: Vec<Value> = Vec::new();
1395
1396 for ti in transact_items {
1397 let get = &ti["Get"];
1398 let table_name = get["TableName"].as_str().ok_or_else(|| {
1399 AwsServiceError::aws_error(
1400 StatusCode::BAD_REQUEST,
1401 "ValidationException",
1402 "TableName is required in Get",
1403 )
1404 })?;
1405 let key: HashMap<String, AttributeValue> =
1406 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1407
1408 let table = get_table(&state.tables, table_name)?;
1409 match table.find_item_index(&key) {
1410 Some(idx) => {
1411 responses.push(json!({ "Item": table.items[idx] }));
1412 }
1413 None => {
1414 responses.push(json!({}));
1415 }
1416 }
1417 }
1418
1419 Self::ok_json(json!({ "Responses": responses }))
1420 }
1421
1422 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1423 let body = Self::parse_body(req)?;
1424 validate_optional_string_length(
1425 "clientRequestToken",
1426 body["ClientRequestToken"].as_str(),
1427 1,
1428 36,
1429 )?;
1430 validate_optional_enum_value(
1431 "returnConsumedCapacity",
1432 &body["ReturnConsumedCapacity"],
1433 &["INDEXES", "TOTAL", "NONE"],
1434 )?;
1435 validate_optional_enum_value(
1436 "returnItemCollectionMetrics",
1437 &body["ReturnItemCollectionMetrics"],
1438 &["SIZE", "NONE"],
1439 )?;
1440 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1441 AwsServiceError::aws_error(
1442 StatusCode::BAD_REQUEST,
1443 "ValidationException",
1444 "TransactItems is required",
1445 )
1446 })?;
1447
1448 let mut state = self.state.write();
1449
1450 let mut cancellation_reasons: Vec<Value> = Vec::new();
1452 let mut any_failed = false;
1453
1454 for ti in transact_items {
1455 if let Some(put) = ti.get("Put") {
1456 let table_name = put["TableName"].as_str().unwrap_or_default();
1457 let item: HashMap<String, AttributeValue> =
1458 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1459 let condition = put["ConditionExpression"].as_str();
1460
1461 if let Some(cond) = condition {
1462 let table = get_table(&state.tables, table_name)?;
1463 let expr_attr_names = parse_expression_attribute_names(put);
1464 let expr_attr_values = parse_expression_attribute_values(put);
1465 let key = extract_key(table, &item);
1466 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1467 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1468 .is_err()
1469 {
1470 cancellation_reasons.push(json!({
1471 "Code": "ConditionalCheckFailed",
1472 "Message": "The conditional request failed"
1473 }));
1474 any_failed = true;
1475 continue;
1476 }
1477 }
1478 cancellation_reasons.push(json!({ "Code": "None" }));
1479 } else if let Some(delete) = ti.get("Delete") {
1480 let table_name = delete["TableName"].as_str().unwrap_or_default();
1481 let key: HashMap<String, AttributeValue> =
1482 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1483 let condition = delete["ConditionExpression"].as_str();
1484
1485 if let Some(cond) = condition {
1486 let table = get_table(&state.tables, table_name)?;
1487 let expr_attr_names = parse_expression_attribute_names(delete);
1488 let expr_attr_values = parse_expression_attribute_values(delete);
1489 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1490 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1491 .is_err()
1492 {
1493 cancellation_reasons.push(json!({
1494 "Code": "ConditionalCheckFailed",
1495 "Message": "The conditional request failed"
1496 }));
1497 any_failed = true;
1498 continue;
1499 }
1500 }
1501 cancellation_reasons.push(json!({ "Code": "None" }));
1502 } else if let Some(update) = ti.get("Update") {
1503 let table_name = update["TableName"].as_str().unwrap_or_default();
1504 let key: HashMap<String, AttributeValue> =
1505 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1506 let condition = update["ConditionExpression"].as_str();
1507
1508 if let Some(cond) = condition {
1509 let table = get_table(&state.tables, table_name)?;
1510 let expr_attr_names = parse_expression_attribute_names(update);
1511 let expr_attr_values = parse_expression_attribute_values(update);
1512 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1513 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1514 .is_err()
1515 {
1516 cancellation_reasons.push(json!({
1517 "Code": "ConditionalCheckFailed",
1518 "Message": "The conditional request failed"
1519 }));
1520 any_failed = true;
1521 continue;
1522 }
1523 }
1524 cancellation_reasons.push(json!({ "Code": "None" }));
1525 } else if let Some(check) = ti.get("ConditionCheck") {
1526 let table_name = check["TableName"].as_str().unwrap_or_default();
1527 let key: HashMap<String, AttributeValue> =
1528 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1529 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1530
1531 let table = get_table(&state.tables, table_name)?;
1532 let expr_attr_names = parse_expression_attribute_names(check);
1533 let expr_attr_values = parse_expression_attribute_values(check);
1534 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1535 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1536 {
1537 cancellation_reasons.push(json!({
1538 "Code": "ConditionalCheckFailed",
1539 "Message": "The conditional request failed"
1540 }));
1541 any_failed = true;
1542 continue;
1543 }
1544 cancellation_reasons.push(json!({ "Code": "None" }));
1545 } else {
1546 cancellation_reasons.push(json!({ "Code": "None" }));
1547 }
1548 }
1549
1550 if any_failed {
1551 let error_body = json!({
1552 "__type": "TransactionCanceledException",
1553 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1554 "CancellationReasons": cancellation_reasons
1555 });
1556 return Ok(AwsResponse::json(
1557 StatusCode::BAD_REQUEST,
1558 serde_json::to_vec(&error_body).unwrap(),
1559 ));
1560 }
1561
1562 for ti in transact_items {
1564 if let Some(put) = ti.get("Put") {
1565 let table_name = put["TableName"].as_str().unwrap_or_default();
1566 let item: HashMap<String, AttributeValue> =
1567 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1568 let table = get_table_mut(&mut state.tables, table_name)?;
1569 let key = extract_key(table, &item);
1570 if let Some(idx) = table.find_item_index(&key) {
1571 table.items[idx] = item;
1572 } else {
1573 table.items.push(item);
1574 }
1575 table.recalculate_stats();
1576 } else if let Some(delete) = ti.get("Delete") {
1577 let table_name = delete["TableName"].as_str().unwrap_or_default();
1578 let key: HashMap<String, AttributeValue> =
1579 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1580 let table = get_table_mut(&mut state.tables, table_name)?;
1581 if let Some(idx) = table.find_item_index(&key) {
1582 table.items.remove(idx);
1583 }
1584 table.recalculate_stats();
1585 } else if let Some(update) = ti.get("Update") {
1586 let table_name = update["TableName"].as_str().unwrap_or_default();
1587 let key: HashMap<String, AttributeValue> =
1588 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1589 let update_expression = update["UpdateExpression"].as_str();
1590 let expr_attr_names = parse_expression_attribute_names(update);
1591 let expr_attr_values = parse_expression_attribute_values(update);
1592
1593 let table = get_table_mut(&mut state.tables, table_name)?;
1594 let idx = match table.find_item_index(&key) {
1595 Some(i) => i,
1596 None => {
1597 let mut new_item = HashMap::new();
1598 for (k, v) in &key {
1599 new_item.insert(k.clone(), v.clone());
1600 }
1601 table.items.push(new_item);
1602 table.items.len() - 1
1603 }
1604 };
1605
1606 if let Some(expr) = update_expression {
1607 apply_update_expression(
1608 &mut table.items[idx],
1609 expr,
1610 &expr_attr_names,
1611 &expr_attr_values,
1612 )?;
1613 }
1614 table.recalculate_stats();
1615 }
1616 }
1618
1619 Self::ok_json(json!({}))
1620 }
1621
1622 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623 let body = Self::parse_body(req)?;
1624 let statement = require_str(&body, "Statement")?;
1625 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1626
1627 execute_partiql_statement(&self.state, statement, ¶meters)
1628 }
1629
1630 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1631 let body = Self::parse_body(req)?;
1632 validate_optional_enum_value(
1633 "returnConsumedCapacity",
1634 &body["ReturnConsumedCapacity"],
1635 &["INDEXES", "TOTAL", "NONE"],
1636 )?;
1637 let statements = body["Statements"].as_array().ok_or_else(|| {
1638 AwsServiceError::aws_error(
1639 StatusCode::BAD_REQUEST,
1640 "ValidationException",
1641 "Statements is required",
1642 )
1643 })?;
1644
1645 let mut responses: Vec<Value> = Vec::new();
1646 for stmt_obj in statements {
1647 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1648 let parameters = stmt_obj["Parameters"]
1649 .as_array()
1650 .cloned()
1651 .unwrap_or_default();
1652
1653 match execute_partiql_statement(&self.state, statement, ¶meters) {
1654 Ok(resp) => {
1655 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1656 responses.push(resp_body);
1657 }
1658 Err(e) => {
1659 responses.push(json!({
1660 "Error": {
1661 "Code": "ValidationException",
1662 "Message": e.to_string()
1663 }
1664 }));
1665 }
1666 }
1667 }
1668
1669 Self::ok_json(json!({ "Responses": responses }))
1670 }
1671
1672 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673 let body = Self::parse_body(req)?;
1674 validate_optional_string_length(
1675 "clientRequestToken",
1676 body["ClientRequestToken"].as_str(),
1677 1,
1678 36,
1679 )?;
1680 validate_optional_enum_value(
1681 "returnConsumedCapacity",
1682 &body["ReturnConsumedCapacity"],
1683 &["INDEXES", "TOTAL", "NONE"],
1684 )?;
1685 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1686 AwsServiceError::aws_error(
1687 StatusCode::BAD_REQUEST,
1688 "ValidationException",
1689 "TransactStatements is required",
1690 )
1691 })?;
1692
1693 let mut results: Vec<Result<Value, String>> = Vec::new();
1695 for stmt_obj in transact_statements {
1696 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1697 let parameters = stmt_obj["Parameters"]
1698 .as_array()
1699 .cloned()
1700 .unwrap_or_default();
1701
1702 match execute_partiql_statement(&self.state, statement, ¶meters) {
1703 Ok(resp) => {
1704 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1705 results.push(Ok(resp_body));
1706 }
1707 Err(e) => {
1708 results.push(Err(e.to_string()));
1709 }
1710 }
1711 }
1712
1713 let any_failed = results.iter().any(|r| r.is_err());
1714 if any_failed {
1715 let reasons: Vec<Value> = results
1716 .iter()
1717 .map(|r| match r {
1718 Ok(_) => json!({ "Code": "None" }),
1719 Err(msg) => json!({
1720 "Code": "ValidationException",
1721 "Message": msg
1722 }),
1723 })
1724 .collect();
1725 let error_body = json!({
1726 "__type": "TransactionCanceledException",
1727 "message": "Transaction cancelled due to validation errors",
1728 "CancellationReasons": reasons
1729 });
1730 return Ok(AwsResponse::json(
1731 StatusCode::BAD_REQUEST,
1732 serde_json::to_vec(&error_body).unwrap(),
1733 ));
1734 }
1735
1736 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1737 Self::ok_json(json!({ "Responses": responses }))
1738 }
1739
1740 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1743 let body = Self::parse_body(req)?;
1744 let table_name = require_str(&body, "TableName")?;
1745 let spec = &body["TimeToLiveSpecification"];
1746 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1747 AwsServiceError::aws_error(
1748 StatusCode::BAD_REQUEST,
1749 "ValidationException",
1750 "TimeToLiveSpecification.AttributeName is required",
1751 )
1752 })?;
1753 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1754 AwsServiceError::aws_error(
1755 StatusCode::BAD_REQUEST,
1756 "ValidationException",
1757 "TimeToLiveSpecification.Enabled is required",
1758 )
1759 })?;
1760
1761 let mut state = self.state.write();
1762 let table = get_table_mut(&mut state.tables, table_name)?;
1763
1764 if enabled {
1765 table.ttl_attribute = Some(attr_name.to_string());
1766 table.ttl_enabled = true;
1767 } else {
1768 table.ttl_enabled = false;
1769 }
1770
1771 Self::ok_json(json!({
1772 "TimeToLiveSpecification": {
1773 "AttributeName": attr_name,
1774 "Enabled": enabled
1775 }
1776 }))
1777 }
1778
1779 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1780 let body = Self::parse_body(req)?;
1781 let table_name = require_str(&body, "TableName")?;
1782
1783 let state = self.state.read();
1784 let table = get_table(&state.tables, table_name)?;
1785
1786 let status = if table.ttl_enabled {
1787 "ENABLED"
1788 } else {
1789 "DISABLED"
1790 };
1791
1792 let mut desc = json!({
1793 "TimeToLiveDescription": {
1794 "TimeToLiveStatus": status
1795 }
1796 });
1797
1798 if let Some(ref attr) = table.ttl_attribute {
1799 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1800 }
1801
1802 Self::ok_json(desc)
1803 }
1804
1805 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1808 let body = Self::parse_body(req)?;
1809 let resource_arn = require_str(&body, "ResourceArn")?;
1810 let policy = require_str(&body, "Policy")?;
1811
1812 let mut state = self.state.write();
1813 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1814 table.resource_policy = Some(policy.to_string());
1815
1816 let revision_id = uuid::Uuid::new_v4().to_string();
1817 Self::ok_json(json!({ "RevisionId": revision_id }))
1818 }
1819
1820 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821 let body = Self::parse_body(req)?;
1822 let resource_arn = require_str(&body, "ResourceArn")?;
1823
1824 let state = self.state.read();
1825 let table = find_table_by_arn(&state.tables, resource_arn)?;
1826
1827 match &table.resource_policy {
1828 Some(policy) => {
1829 let revision_id = uuid::Uuid::new_v4().to_string();
1830 Self::ok_json(json!({
1831 "Policy": policy,
1832 "RevisionId": revision_id
1833 }))
1834 }
1835 None => Self::ok_json(json!({ "Policy": null })),
1836 }
1837 }
1838
1839 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1840 let body = Self::parse_body(req)?;
1841 let resource_arn = require_str(&body, "ResourceArn")?;
1842
1843 let mut state = self.state.write();
1844 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1845 table.resource_policy = None;
1846
1847 Self::ok_json(json!({}))
1848 }
1849
1850 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1853 Self::ok_json(json!({
1854 "Endpoints": [{
1855 "Address": "dynamodb.us-east-1.amazonaws.com",
1856 "CachePeriodInMinutes": 1440
1857 }]
1858 }))
1859 }
1860
1861 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1862 Self::ok_json(json!({
1863 "AccountMaxReadCapacityUnits": 80000,
1864 "AccountMaxWriteCapacityUnits": 80000,
1865 "TableMaxReadCapacityUnits": 40000,
1866 "TableMaxWriteCapacityUnits": 40000
1867 }))
1868 }
1869
1870 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1873 let body = Self::parse_body(req)?;
1874 let table_name = require_str(&body, "TableName")?;
1875 let backup_name = require_str(&body, "BackupName")?;
1876
1877 let mut state = self.state.write();
1878 let table = get_table(&state.tables, table_name)?;
1879
1880 let backup_arn = format!(
1881 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1882 state.region,
1883 state.account_id,
1884 table_name,
1885 Utc::now().format("%Y%m%d%H%M%S")
1886 );
1887 let now = Utc::now();
1888
1889 let backup = BackupDescription {
1890 backup_arn: backup_arn.clone(),
1891 backup_name: backup_name.to_string(),
1892 table_name: table_name.to_string(),
1893 table_arn: table.arn.clone(),
1894 backup_status: "AVAILABLE".to_string(),
1895 backup_type: "USER".to_string(),
1896 backup_creation_date: now,
1897 key_schema: table.key_schema.clone(),
1898 attribute_definitions: table.attribute_definitions.clone(),
1899 provisioned_throughput: table.provisioned_throughput.clone(),
1900 billing_mode: table.billing_mode.clone(),
1901 item_count: table.item_count,
1902 size_bytes: table.size_bytes,
1903 items: table.items.clone(),
1904 };
1905
1906 state.backups.insert(backup_arn.clone(), backup);
1907
1908 Self::ok_json(json!({
1909 "BackupDetails": {
1910 "BackupArn": backup_arn,
1911 "BackupName": backup_name,
1912 "BackupStatus": "AVAILABLE",
1913 "BackupType": "USER",
1914 "BackupCreationDateTime": now.timestamp() as f64,
1915 "BackupSizeBytes": 0
1916 }
1917 }))
1918 }
1919
1920 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1921 let body = Self::parse_body(req)?;
1922 let backup_arn = require_str(&body, "BackupArn")?;
1923
1924 let mut state = self.state.write();
1925 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1926 AwsServiceError::aws_error(
1927 StatusCode::BAD_REQUEST,
1928 "BackupNotFoundException",
1929 format!("Backup not found: {backup_arn}"),
1930 )
1931 })?;
1932
1933 Self::ok_json(json!({
1934 "BackupDescription": {
1935 "BackupDetails": {
1936 "BackupArn": backup.backup_arn,
1937 "BackupName": backup.backup_name,
1938 "BackupStatus": "DELETED",
1939 "BackupType": backup.backup_type,
1940 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1941 "BackupSizeBytes": backup.size_bytes
1942 },
1943 "SourceTableDetails": {
1944 "TableName": backup.table_name,
1945 "TableArn": backup.table_arn,
1946 "TableId": uuid::Uuid::new_v4().to_string(),
1947 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1948 "AttributeName": ks.attribute_name,
1949 "KeyType": ks.key_type
1950 })).collect::<Vec<_>>(),
1951 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1952 "ProvisionedThroughput": {
1953 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1954 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1955 },
1956 "ItemCount": backup.item_count,
1957 "BillingMode": backup.billing_mode,
1958 "TableSizeBytes": backup.size_bytes
1959 },
1960 "SourceTableFeatureDetails": {}
1961 }
1962 }))
1963 }
1964
1965 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1966 let body = Self::parse_body(req)?;
1967 let backup_arn = require_str(&body, "BackupArn")?;
1968
1969 let state = self.state.read();
1970 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1971 AwsServiceError::aws_error(
1972 StatusCode::BAD_REQUEST,
1973 "BackupNotFoundException",
1974 format!("Backup not found: {backup_arn}"),
1975 )
1976 })?;
1977
1978 Self::ok_json(json!({
1979 "BackupDescription": {
1980 "BackupDetails": {
1981 "BackupArn": backup.backup_arn,
1982 "BackupName": backup.backup_name,
1983 "BackupStatus": backup.backup_status,
1984 "BackupType": backup.backup_type,
1985 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1986 "BackupSizeBytes": backup.size_bytes
1987 },
1988 "SourceTableDetails": {
1989 "TableName": backup.table_name,
1990 "TableArn": backup.table_arn,
1991 "TableId": uuid::Uuid::new_v4().to_string(),
1992 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1993 "AttributeName": ks.attribute_name,
1994 "KeyType": ks.key_type
1995 })).collect::<Vec<_>>(),
1996 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1997 "ProvisionedThroughput": {
1998 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1999 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2000 },
2001 "ItemCount": backup.item_count,
2002 "BillingMode": backup.billing_mode,
2003 "TableSizeBytes": backup.size_bytes
2004 },
2005 "SourceTableFeatureDetails": {}
2006 }
2007 }))
2008 }
2009
2010 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2011 let body = Self::parse_body(req)?;
2012 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2013 validate_optional_string_length(
2014 "exclusiveStartBackupArn",
2015 body["ExclusiveStartBackupArn"].as_str(),
2016 37,
2017 1024,
2018 )?;
2019 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2020 validate_optional_enum_value(
2021 "backupType",
2022 &body["BackupType"],
2023 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2024 )?;
2025 let table_name = body["TableName"].as_str();
2026
2027 let state = self.state.read();
2028 let summaries: Vec<Value> = state
2029 .backups
2030 .values()
2031 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2032 .map(|b| {
2033 json!({
2034 "TableName": b.table_name,
2035 "TableArn": b.table_arn,
2036 "BackupArn": b.backup_arn,
2037 "BackupName": b.backup_name,
2038 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2039 "BackupStatus": b.backup_status,
2040 "BackupType": b.backup_type,
2041 "BackupSizeBytes": b.size_bytes
2042 })
2043 })
2044 .collect();
2045
2046 Self::ok_json(json!({
2047 "BackupSummaries": summaries
2048 }))
2049 }
2050
2051 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2052 let body = Self::parse_body(req)?;
2053 let backup_arn = require_str(&body, "BackupArn")?;
2054 let target_table_name = require_str(&body, "TargetTableName")?;
2055
2056 let mut state = self.state.write();
2057 let backup = state.backups.get(backup_arn).ok_or_else(|| {
2058 AwsServiceError::aws_error(
2059 StatusCode::BAD_REQUEST,
2060 "BackupNotFoundException",
2061 format!("Backup not found: {backup_arn}"),
2062 )
2063 })?;
2064
2065 if state.tables.contains_key(target_table_name) {
2066 return Err(AwsServiceError::aws_error(
2067 StatusCode::BAD_REQUEST,
2068 "TableAlreadyExistsException",
2069 format!("Table already exists: {target_table_name}"),
2070 ));
2071 }
2072
2073 let now = Utc::now();
2074 let arn = format!(
2075 "arn:aws:dynamodb:{}:{}:table/{}",
2076 state.region, state.account_id, target_table_name
2077 );
2078
2079 let restored_items = backup.items.clone();
2080 let mut table = DynamoTable {
2081 name: target_table_name.to_string(),
2082 arn: arn.clone(),
2083 key_schema: backup.key_schema.clone(),
2084 attribute_definitions: backup.attribute_definitions.clone(),
2085 provisioned_throughput: backup.provisioned_throughput.clone(),
2086 items: restored_items,
2087 gsi: Vec::new(),
2088 lsi: Vec::new(),
2089 tags: HashMap::new(),
2090 created_at: now,
2091 status: "ACTIVE".to_string(),
2092 item_count: 0,
2093 size_bytes: 0,
2094 billing_mode: backup.billing_mode.clone(),
2095 ttl_attribute: None,
2096 ttl_enabled: false,
2097 resource_policy: None,
2098 pitr_enabled: false,
2099 kinesis_destinations: Vec::new(),
2100 contributor_insights_status: "DISABLED".to_string(),
2101 contributor_insights_counters: HashMap::new(),
2102 stream_enabled: false,
2103 stream_view_type: None,
2104 stream_arn: None,
2105 stream_records: Arc::new(RwLock::new(Vec::new())),
2106 sse_type: None,
2107 sse_kms_key_arn: None,
2108 };
2109 table.recalculate_stats();
2110
2111 let desc = build_table_description(&table);
2112 state.tables.insert(target_table_name.to_string(), table);
2113
2114 Self::ok_json(json!({
2115 "TableDescription": desc
2116 }))
2117 }
2118
2119 fn restore_table_to_point_in_time(
2120 &self,
2121 req: &AwsRequest,
2122 ) -> Result<AwsResponse, AwsServiceError> {
2123 let body = Self::parse_body(req)?;
2124 let target_table_name = require_str(&body, "TargetTableName")?;
2125 let source_table_name = body["SourceTableName"].as_str();
2126 let source_table_arn = body["SourceTableArn"].as_str();
2127
2128 let mut state = self.state.write();
2129
2130 let source = if let Some(name) = source_table_name {
2132 get_table(&state.tables, name)?.clone()
2133 } else if let Some(arn) = source_table_arn {
2134 find_table_by_arn(&state.tables, arn)?.clone()
2135 } else {
2136 return Err(AwsServiceError::aws_error(
2137 StatusCode::BAD_REQUEST,
2138 "ValidationException",
2139 "SourceTableName or SourceTableArn is required",
2140 ));
2141 };
2142
2143 if state.tables.contains_key(target_table_name) {
2144 return Err(AwsServiceError::aws_error(
2145 StatusCode::BAD_REQUEST,
2146 "TableAlreadyExistsException",
2147 format!("Table already exists: {target_table_name}"),
2148 ));
2149 }
2150
2151 let now = Utc::now();
2152 let arn = format!(
2153 "arn:aws:dynamodb:{}:{}:table/{}",
2154 state.region, state.account_id, target_table_name
2155 );
2156
2157 let mut table = DynamoTable {
2158 name: target_table_name.to_string(),
2159 arn: arn.clone(),
2160 key_schema: source.key_schema.clone(),
2161 attribute_definitions: source.attribute_definitions.clone(),
2162 provisioned_throughput: source.provisioned_throughput.clone(),
2163 items: source.items.clone(),
2164 gsi: Vec::new(),
2165 lsi: Vec::new(),
2166 tags: HashMap::new(),
2167 created_at: now,
2168 status: "ACTIVE".to_string(),
2169 item_count: 0,
2170 size_bytes: 0,
2171 billing_mode: source.billing_mode.clone(),
2172 ttl_attribute: None,
2173 ttl_enabled: false,
2174 resource_policy: None,
2175 pitr_enabled: false,
2176 kinesis_destinations: Vec::new(),
2177 contributor_insights_status: "DISABLED".to_string(),
2178 contributor_insights_counters: HashMap::new(),
2179 stream_enabled: false,
2180 stream_view_type: None,
2181 stream_arn: None,
2182 stream_records: Arc::new(RwLock::new(Vec::new())),
2183 sse_type: None,
2184 sse_kms_key_arn: None,
2185 };
2186 table.recalculate_stats();
2187
2188 let desc = build_table_description(&table);
2189 state.tables.insert(target_table_name.to_string(), table);
2190
2191 Self::ok_json(json!({
2192 "TableDescription": desc
2193 }))
2194 }
2195
2196 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2197 let body = Self::parse_body(req)?;
2198 let table_name = require_str(&body, "TableName")?;
2199
2200 let pitr_spec = body["PointInTimeRecoverySpecification"]
2201 .as_object()
2202 .ok_or_else(|| {
2203 AwsServiceError::aws_error(
2204 StatusCode::BAD_REQUEST,
2205 "ValidationException",
2206 "PointInTimeRecoverySpecification is required",
2207 )
2208 })?;
2209 let enabled = pitr_spec
2210 .get("PointInTimeRecoveryEnabled")
2211 .and_then(|v| v.as_bool())
2212 .unwrap_or(false);
2213
2214 let mut state = self.state.write();
2215 let table = get_table_mut(&mut state.tables, table_name)?;
2216 table.pitr_enabled = enabled;
2217
2218 let status = if enabled { "ENABLED" } else { "DISABLED" };
2219 Self::ok_json(json!({
2220 "ContinuousBackupsDescription": {
2221 "ContinuousBackupsStatus": status,
2222 "PointInTimeRecoveryDescription": {
2223 "PointInTimeRecoveryStatus": status,
2224 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2225 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2226 }
2227 }
2228 }))
2229 }
2230
2231 fn describe_continuous_backups(
2232 &self,
2233 req: &AwsRequest,
2234 ) -> Result<AwsResponse, AwsServiceError> {
2235 let body = Self::parse_body(req)?;
2236 let table_name = require_str(&body, "TableName")?;
2237
2238 let state = self.state.read();
2239 let table = get_table(&state.tables, table_name)?;
2240
2241 let status = if table.pitr_enabled {
2242 "ENABLED"
2243 } else {
2244 "DISABLED"
2245 };
2246 Self::ok_json(json!({
2247 "ContinuousBackupsDescription": {
2248 "ContinuousBackupsStatus": status,
2249 "PointInTimeRecoveryDescription": {
2250 "PointInTimeRecoveryStatus": status,
2251 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2252 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2253 }
2254 }
2255 }))
2256 }
2257
2258 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2261 let body = Self::parse_body(req)?;
2262 let global_table_name = require_str(&body, "GlobalTableName")?;
2263 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2264
2265 let replication_group = body["ReplicationGroup"]
2266 .as_array()
2267 .ok_or_else(|| {
2268 AwsServiceError::aws_error(
2269 StatusCode::BAD_REQUEST,
2270 "ValidationException",
2271 "ReplicationGroup is required",
2272 )
2273 })?
2274 .iter()
2275 .filter_map(|r| {
2276 r["RegionName"].as_str().map(|rn| ReplicaDescription {
2277 region_name: rn.to_string(),
2278 replica_status: "ACTIVE".to_string(),
2279 })
2280 })
2281 .collect::<Vec<_>>();
2282
2283 let mut state = self.state.write();
2284
2285 if state.global_tables.contains_key(global_table_name) {
2286 return Err(AwsServiceError::aws_error(
2287 StatusCode::BAD_REQUEST,
2288 "GlobalTableAlreadyExistsException",
2289 format!("Global table already exists: {global_table_name}"),
2290 ));
2291 }
2292
2293 let arn = format!(
2294 "arn:aws:dynamodb::{}:global-table/{}",
2295 state.account_id, global_table_name
2296 );
2297 let now = Utc::now();
2298
2299 let gt = GlobalTableDescription {
2300 global_table_name: global_table_name.to_string(),
2301 global_table_arn: arn.clone(),
2302 global_table_status: "ACTIVE".to_string(),
2303 creation_date: now,
2304 replication_group: replication_group.clone(),
2305 };
2306
2307 state
2308 .global_tables
2309 .insert(global_table_name.to_string(), gt);
2310
2311 Self::ok_json(json!({
2312 "GlobalTableDescription": {
2313 "GlobalTableName": global_table_name,
2314 "GlobalTableArn": arn,
2315 "GlobalTableStatus": "ACTIVE",
2316 "CreationDateTime": now.timestamp() as f64,
2317 "ReplicationGroup": replication_group.iter().map(|r| json!({
2318 "RegionName": r.region_name,
2319 "ReplicaStatus": r.replica_status
2320 })).collect::<Vec<_>>()
2321 }
2322 }))
2323 }
2324
2325 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2326 let body = Self::parse_body(req)?;
2327 let global_table_name = require_str(&body, "GlobalTableName")?;
2328 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2329
2330 let state = self.state.read();
2331 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2332 AwsServiceError::aws_error(
2333 StatusCode::BAD_REQUEST,
2334 "GlobalTableNotFoundException",
2335 format!("Global table not found: {global_table_name}"),
2336 )
2337 })?;
2338
2339 Self::ok_json(json!({
2340 "GlobalTableDescription": {
2341 "GlobalTableName": gt.global_table_name,
2342 "GlobalTableArn": gt.global_table_arn,
2343 "GlobalTableStatus": gt.global_table_status,
2344 "CreationDateTime": gt.creation_date.timestamp() as f64,
2345 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2346 "RegionName": r.region_name,
2347 "ReplicaStatus": r.replica_status
2348 })).collect::<Vec<_>>()
2349 }
2350 }))
2351 }
2352
2353 fn describe_global_table_settings(
2354 &self,
2355 req: &AwsRequest,
2356 ) -> Result<AwsResponse, AwsServiceError> {
2357 let body = Self::parse_body(req)?;
2358 let global_table_name = require_str(&body, "GlobalTableName")?;
2359 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2360
2361 let state = self.state.read();
2362 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2363 AwsServiceError::aws_error(
2364 StatusCode::BAD_REQUEST,
2365 "GlobalTableNotFoundException",
2366 format!("Global table not found: {global_table_name}"),
2367 )
2368 })?;
2369
2370 let replica_settings: Vec<Value> = gt
2371 .replication_group
2372 .iter()
2373 .map(|r| {
2374 json!({
2375 "RegionName": r.region_name,
2376 "ReplicaStatus": r.replica_status,
2377 "ReplicaProvisionedReadCapacityUnits": 0,
2378 "ReplicaProvisionedWriteCapacityUnits": 0
2379 })
2380 })
2381 .collect();
2382
2383 Self::ok_json(json!({
2384 "GlobalTableName": gt.global_table_name,
2385 "ReplicaSettings": replica_settings
2386 }))
2387 }
2388
2389 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2390 let body = Self::parse_body(req)?;
2391 validate_optional_string_length(
2392 "exclusiveStartGlobalTableName",
2393 body["ExclusiveStartGlobalTableName"].as_str(),
2394 3,
2395 255,
2396 )?;
2397 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2398 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2399
2400 let state = self.state.read();
2401 let tables: Vec<Value> = state
2402 .global_tables
2403 .values()
2404 .take(limit)
2405 .map(|gt| {
2406 json!({
2407 "GlobalTableName": gt.global_table_name,
2408 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2409 "RegionName": r.region_name
2410 })).collect::<Vec<_>>()
2411 })
2412 })
2413 .collect();
2414
2415 Self::ok_json(json!({
2416 "GlobalTables": tables
2417 }))
2418 }
2419
2420 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2421 let body = Self::parse_body(req)?;
2422 let global_table_name = require_str(&body, "GlobalTableName")?;
2423 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2424 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2425
2426 let mut state = self.state.write();
2427 let gt = state
2428 .global_tables
2429 .get_mut(global_table_name)
2430 .ok_or_else(|| {
2431 AwsServiceError::aws_error(
2432 StatusCode::BAD_REQUEST,
2433 "GlobalTableNotFoundException",
2434 format!("Global table not found: {global_table_name}"),
2435 )
2436 })?;
2437
2438 if let Some(updates) = body["ReplicaUpdates"].as_array() {
2439 for update in updates {
2440 if let Some(create) = update["Create"].as_object() {
2441 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2442 gt.replication_group.push(ReplicaDescription {
2443 region_name: region.to_string(),
2444 replica_status: "ACTIVE".to_string(),
2445 });
2446 }
2447 }
2448 if let Some(delete) = update["Delete"].as_object() {
2449 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2450 gt.replication_group.retain(|r| r.region_name != region);
2451 }
2452 }
2453 }
2454 }
2455
2456 Self::ok_json(json!({
2457 "GlobalTableDescription": {
2458 "GlobalTableName": gt.global_table_name,
2459 "GlobalTableArn": gt.global_table_arn,
2460 "GlobalTableStatus": gt.global_table_status,
2461 "CreationDateTime": gt.creation_date.timestamp() as f64,
2462 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2463 "RegionName": r.region_name,
2464 "ReplicaStatus": r.replica_status
2465 })).collect::<Vec<_>>()
2466 }
2467 }))
2468 }
2469
2470 fn update_global_table_settings(
2471 &self,
2472 req: &AwsRequest,
2473 ) -> Result<AwsResponse, AwsServiceError> {
2474 let body = Self::parse_body(req)?;
2475 let global_table_name = require_str(&body, "GlobalTableName")?;
2476 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2477 validate_optional_enum_value(
2478 "globalTableBillingMode",
2479 &body["GlobalTableBillingMode"],
2480 &["PROVISIONED", "PAY_PER_REQUEST"],
2481 )?;
2482 validate_optional_range_i64(
2483 "globalTableProvisionedWriteCapacityUnits",
2484 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2485 1,
2486 i64::MAX,
2487 )?;
2488
2489 let state = self.state.read();
2490 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2491 AwsServiceError::aws_error(
2492 StatusCode::BAD_REQUEST,
2493 "GlobalTableNotFoundException",
2494 format!("Global table not found: {global_table_name}"),
2495 )
2496 })?;
2497
2498 let replica_settings: Vec<Value> = gt
2499 .replication_group
2500 .iter()
2501 .map(|r| {
2502 json!({
2503 "RegionName": r.region_name,
2504 "ReplicaStatus": r.replica_status,
2505 "ReplicaProvisionedReadCapacityUnits": 0,
2506 "ReplicaProvisionedWriteCapacityUnits": 0
2507 })
2508 })
2509 .collect();
2510
2511 Self::ok_json(json!({
2512 "GlobalTableName": gt.global_table_name,
2513 "ReplicaSettings": replica_settings
2514 }))
2515 }
2516
2517 fn describe_table_replica_auto_scaling(
2518 &self,
2519 req: &AwsRequest,
2520 ) -> Result<AwsResponse, AwsServiceError> {
2521 let body = Self::parse_body(req)?;
2522 let table_name = require_str(&body, "TableName")?;
2523
2524 let state = self.state.read();
2525 let table = get_table(&state.tables, table_name)?;
2526
2527 Self::ok_json(json!({
2528 "TableAutoScalingDescription": {
2529 "TableName": table.name,
2530 "TableStatus": table.status,
2531 "Replicas": []
2532 }
2533 }))
2534 }
2535
2536 fn update_table_replica_auto_scaling(
2537 &self,
2538 req: &AwsRequest,
2539 ) -> Result<AwsResponse, AwsServiceError> {
2540 let body = Self::parse_body(req)?;
2541 let table_name = require_str(&body, "TableName")?;
2542
2543 let state = self.state.read();
2544 let table = get_table(&state.tables, table_name)?;
2545
2546 Self::ok_json(json!({
2547 "TableAutoScalingDescription": {
2548 "TableName": table.name,
2549 "TableStatus": table.status,
2550 "Replicas": []
2551 }
2552 }))
2553 }
2554
2555 fn enable_kinesis_streaming_destination(
2558 &self,
2559 req: &AwsRequest,
2560 ) -> Result<AwsResponse, AwsServiceError> {
2561 let body = Self::parse_body(req)?;
2562 let table_name = require_str(&body, "TableName")?;
2563 let stream_arn = require_str(&body, "StreamArn")?;
2564 let precision = body["EnableKinesisStreamingConfiguration"]
2565 ["ApproximateCreationDateTimePrecision"]
2566 .as_str()
2567 .unwrap_or("MILLISECOND");
2568
2569 let mut state = self.state.write();
2570 let table = get_table_mut(&mut state.tables, table_name)?;
2571
2572 table.kinesis_destinations.push(KinesisDestination {
2573 stream_arn: stream_arn.to_string(),
2574 destination_status: "ACTIVE".to_string(),
2575 approximate_creation_date_time_precision: precision.to_string(),
2576 });
2577
2578 Self::ok_json(json!({
2579 "TableName": table_name,
2580 "StreamArn": stream_arn,
2581 "DestinationStatus": "ACTIVE",
2582 "EnableKinesisStreamingConfiguration": {
2583 "ApproximateCreationDateTimePrecision": precision
2584 }
2585 }))
2586 }
2587
2588 fn disable_kinesis_streaming_destination(
2589 &self,
2590 req: &AwsRequest,
2591 ) -> Result<AwsResponse, AwsServiceError> {
2592 let body = Self::parse_body(req)?;
2593 let table_name = require_str(&body, "TableName")?;
2594 let stream_arn = require_str(&body, "StreamArn")?;
2595
2596 let mut state = self.state.write();
2597 let table = get_table_mut(&mut state.tables, table_name)?;
2598
2599 if let Some(dest) = table
2600 .kinesis_destinations
2601 .iter_mut()
2602 .find(|d| d.stream_arn == stream_arn)
2603 {
2604 dest.destination_status = "DISABLED".to_string();
2605 }
2606
2607 Self::ok_json(json!({
2608 "TableName": table_name,
2609 "StreamArn": stream_arn,
2610 "DestinationStatus": "DISABLED"
2611 }))
2612 }
2613
2614 fn describe_kinesis_streaming_destination(
2615 &self,
2616 req: &AwsRequest,
2617 ) -> Result<AwsResponse, AwsServiceError> {
2618 let body = Self::parse_body(req)?;
2619 let table_name = require_str(&body, "TableName")?;
2620
2621 let state = self.state.read();
2622 let table = get_table(&state.tables, table_name)?;
2623
2624 let destinations: Vec<Value> = table
2625 .kinesis_destinations
2626 .iter()
2627 .map(|d| {
2628 json!({
2629 "StreamArn": d.stream_arn,
2630 "DestinationStatus": d.destination_status,
2631 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2632 })
2633 })
2634 .collect();
2635
2636 Self::ok_json(json!({
2637 "TableName": table_name,
2638 "KinesisDataStreamDestinations": destinations
2639 }))
2640 }
2641
2642 fn update_kinesis_streaming_destination(
2643 &self,
2644 req: &AwsRequest,
2645 ) -> Result<AwsResponse, AwsServiceError> {
2646 let body = Self::parse_body(req)?;
2647 let table_name = require_str(&body, "TableName")?;
2648 let stream_arn = require_str(&body, "StreamArn")?;
2649 let precision = body["UpdateKinesisStreamingConfiguration"]
2650 ["ApproximateCreationDateTimePrecision"]
2651 .as_str()
2652 .unwrap_or("MILLISECOND");
2653
2654 let mut state = self.state.write();
2655 let table = get_table_mut(&mut state.tables, table_name)?;
2656
2657 if let Some(dest) = table
2658 .kinesis_destinations
2659 .iter_mut()
2660 .find(|d| d.stream_arn == stream_arn)
2661 {
2662 dest.approximate_creation_date_time_precision = precision.to_string();
2663 }
2664
2665 Self::ok_json(json!({
2666 "TableName": table_name,
2667 "StreamArn": stream_arn,
2668 "DestinationStatus": "ACTIVE",
2669 "UpdateKinesisStreamingConfiguration": {
2670 "ApproximateCreationDateTimePrecision": precision
2671 }
2672 }))
2673 }
2674
2675 fn describe_contributor_insights(
2678 &self,
2679 req: &AwsRequest,
2680 ) -> Result<AwsResponse, AwsServiceError> {
2681 let body = Self::parse_body(req)?;
2682 let table_name = require_str(&body, "TableName")?;
2683 let index_name = body["IndexName"].as_str();
2684
2685 let state = self.state.read();
2686 let table = get_table(&state.tables, table_name)?;
2687
2688 let top = table.top_contributors(10);
2689 let contributors: Vec<Value> = top
2690 .iter()
2691 .map(|(key, count)| {
2692 json!({
2693 "Key": key,
2694 "Count": count
2695 })
2696 })
2697 .collect();
2698
2699 let mut result = json!({
2700 "TableName": table_name,
2701 "ContributorInsightsStatus": table.contributor_insights_status,
2702 "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2703 "TopContributors": contributors
2704 });
2705 if let Some(idx) = index_name {
2706 result["IndexName"] = json!(idx);
2707 }
2708
2709 Self::ok_json(result)
2710 }
2711
2712 fn update_contributor_insights(
2713 &self,
2714 req: &AwsRequest,
2715 ) -> Result<AwsResponse, AwsServiceError> {
2716 let body = Self::parse_body(req)?;
2717 let table_name = require_str(&body, "TableName")?;
2718 let action = require_str(&body, "ContributorInsightsAction")?;
2719 let index_name = body["IndexName"].as_str();
2720
2721 let mut state = self.state.write();
2722 let table = get_table_mut(&mut state.tables, table_name)?;
2723
2724 let status = match action {
2725 "ENABLE" => "ENABLED",
2726 "DISABLE" => "DISABLED",
2727 _ => {
2728 return Err(AwsServiceError::aws_error(
2729 StatusCode::BAD_REQUEST,
2730 "ValidationException",
2731 format!("Invalid ContributorInsightsAction: {action}"),
2732 ))
2733 }
2734 };
2735 table.contributor_insights_status = status.to_string();
2736 if status == "DISABLED" {
2737 table.contributor_insights_counters.clear();
2738 }
2739
2740 let mut result = json!({
2741 "TableName": table_name,
2742 "ContributorInsightsStatus": status
2743 });
2744 if let Some(idx) = index_name {
2745 result["IndexName"] = json!(idx);
2746 }
2747
2748 Self::ok_json(result)
2749 }
2750
2751 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2752 let body = Self::parse_body(req)?;
2753 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2754 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2755 let table_name = body["TableName"].as_str();
2756
2757 let state = self.state.read();
2758 let summaries: Vec<Value> = state
2759 .tables
2760 .values()
2761 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2762 .map(|t| {
2763 json!({
2764 "TableName": t.name,
2765 "ContributorInsightsStatus": t.contributor_insights_status
2766 })
2767 })
2768 .collect();
2769
2770 Self::ok_json(json!({
2771 "ContributorInsightsSummaries": summaries
2772 }))
2773 }
2774
2775 fn export_table_to_point_in_time(
2778 &self,
2779 req: &AwsRequest,
2780 ) -> Result<AwsResponse, AwsServiceError> {
2781 let body = Self::parse_body(req)?;
2782 let table_arn = require_str(&body, "TableArn")?;
2783 let s3_bucket = require_str(&body, "S3Bucket")?;
2784 let s3_prefix = body["S3Prefix"].as_str();
2785 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2786
2787 let state = self.state.read();
2788 let table = find_table_by_arn(&state.tables, table_arn)?;
2790 let items = table.items.clone();
2791 let item_count = items.len() as i64;
2792
2793 let now = Utc::now();
2794 let export_arn = format!(
2795 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2796 state.region,
2797 state.account_id,
2798 table_arn.rsplit('/').next().unwrap_or("unknown"),
2799 uuid::Uuid::new_v4()
2800 );
2801
2802 drop(state);
2803
2804 let mut json_lines = String::new();
2806 for item in &items {
2807 let item_json = if export_format == "DYNAMODB_JSON" {
2808 json!({ "Item": item })
2809 } else {
2810 json!(item)
2811 };
2812 json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2813 json_lines.push('\n');
2814 }
2815 let data_size = json_lines.len() as i64;
2816
2817 let s3_key = if let Some(prefix) = s3_prefix {
2819 format!("{prefix}/data/manifest-files.json")
2820 } else {
2821 "data/manifest-files.json".to_string()
2822 };
2823
2824 let mut export_failed = false;
2826 let mut failure_reason = String::new();
2827 if let Some(ref s3_state) = self.s3_state {
2828 let mut s3 = s3_state.write();
2829 if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2830 let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2831 let obj = fakecloud_s3::state::S3Object {
2832 key: s3_key.clone(),
2833 data: bytes::Bytes::from(json_lines),
2834 content_type: "application/json".to_string(),
2835 etag,
2836 size: data_size as u64,
2837 last_modified: now,
2838 metadata: HashMap::new(),
2839 storage_class: "STANDARD".to_string(),
2840 tags: HashMap::new(),
2841 acl_grants: Vec::new(),
2842 acl_owner_id: None,
2843 parts_count: None,
2844 part_sizes: None,
2845 sse_algorithm: None,
2846 sse_kms_key_id: None,
2847 bucket_key_enabled: None,
2848 version_id: None,
2849 is_delete_marker: false,
2850 content_encoding: None,
2851 website_redirect_location: None,
2852 restore_ongoing: None,
2853 restore_expiry: None,
2854 checksum_algorithm: None,
2855 checksum_value: None,
2856 lock_mode: None,
2857 lock_retain_until: None,
2858 lock_legal_hold: None,
2859 };
2860 bucket.objects.insert(s3_key, obj);
2861 } else {
2862 export_failed = true;
2863 failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2864 }
2865 }
2866
2867 let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2868
2869 let export = ExportDescription {
2870 export_arn: export_arn.clone(),
2871 export_status: export_status.to_string(),
2872 table_arn: table_arn.to_string(),
2873 s3_bucket: s3_bucket.to_string(),
2874 s3_prefix: s3_prefix.map(|s| s.to_string()),
2875 export_format: export_format.to_string(),
2876 start_time: now,
2877 end_time: now,
2878 export_time: now,
2879 item_count,
2880 billed_size_bytes: data_size,
2881 };
2882
2883 let mut state = self.state.write();
2884 state.exports.insert(export_arn.clone(), export);
2885
2886 let mut response = json!({
2887 "ExportDescription": {
2888 "ExportArn": export_arn,
2889 "ExportStatus": export_status,
2890 "TableArn": table_arn,
2891 "S3Bucket": s3_bucket,
2892 "S3Prefix": s3_prefix,
2893 "ExportFormat": export_format,
2894 "StartTime": now.timestamp() as f64,
2895 "EndTime": now.timestamp() as f64,
2896 "ExportTime": now.timestamp() as f64,
2897 "ItemCount": item_count,
2898 "BilledSizeBytes": data_size
2899 }
2900 });
2901 if export_failed {
2902 response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2903 response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2904 }
2905 Self::ok_json(response)
2906 }
2907
2908 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2909 let body = Self::parse_body(req)?;
2910 let export_arn = require_str(&body, "ExportArn")?;
2911
2912 let state = self.state.read();
2913 let export = state.exports.get(export_arn).ok_or_else(|| {
2914 AwsServiceError::aws_error(
2915 StatusCode::BAD_REQUEST,
2916 "ExportNotFoundException",
2917 format!("Export not found: {export_arn}"),
2918 )
2919 })?;
2920
2921 Self::ok_json(json!({
2922 "ExportDescription": {
2923 "ExportArn": export.export_arn,
2924 "ExportStatus": export.export_status,
2925 "TableArn": export.table_arn,
2926 "S3Bucket": export.s3_bucket,
2927 "S3Prefix": export.s3_prefix,
2928 "ExportFormat": export.export_format,
2929 "StartTime": export.start_time.timestamp() as f64,
2930 "EndTime": export.end_time.timestamp() as f64,
2931 "ExportTime": export.export_time.timestamp() as f64,
2932 "ItemCount": export.item_count,
2933 "BilledSizeBytes": export.billed_size_bytes
2934 }
2935 }))
2936 }
2937
2938 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2939 let body = Self::parse_body(req)?;
2940 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2941 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2942 let table_arn = body["TableArn"].as_str();
2943
2944 let state = self.state.read();
2945 let summaries: Vec<Value> = state
2946 .exports
2947 .values()
2948 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2949 .map(|e| {
2950 json!({
2951 "ExportArn": e.export_arn,
2952 "ExportStatus": e.export_status,
2953 "TableArn": e.table_arn
2954 })
2955 })
2956 .collect();
2957
2958 Self::ok_json(json!({
2959 "ExportSummaries": summaries
2960 }))
2961 }
2962
2963 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2964 let body = Self::parse_body(req)?;
2965 let input_format = require_str(&body, "InputFormat")?;
2966 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2967 AwsServiceError::aws_error(
2968 StatusCode::BAD_REQUEST,
2969 "ValidationException",
2970 "S3BucketSource is required",
2971 )
2972 })?;
2973 let s3_bucket = s3_source
2974 .get("S3Bucket")
2975 .and_then(|v| v.as_str())
2976 .unwrap_or("");
2977 let s3_key_prefix = s3_source
2978 .get("S3KeyPrefix")
2979 .and_then(|v| v.as_str())
2980 .unwrap_or("");
2981
2982 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2983 AwsServiceError::aws_error(
2984 StatusCode::BAD_REQUEST,
2985 "ValidationException",
2986 "TableCreationParameters is required",
2987 )
2988 })?;
2989 let table_name = table_params
2990 .get("TableName")
2991 .and_then(|v| v.as_str())
2992 .ok_or_else(|| {
2993 AwsServiceError::aws_error(
2994 StatusCode::BAD_REQUEST,
2995 "ValidationException",
2996 "TableCreationParameters.TableName is required",
2997 )
2998 })?;
2999
3000 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3001 let attribute_definitions = parse_attribute_definitions(
3002 table_params
3003 .get("AttributeDefinitions")
3004 .unwrap_or(&Value::Null),
3005 )?;
3006
3007 let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3009 let mut processed_size_bytes: i64 = 0;
3010 if let Some(ref s3_state) = self.s3_state {
3011 let s3 = s3_state.read();
3012 let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3013 AwsServiceError::aws_error(
3014 StatusCode::BAD_REQUEST,
3015 "ImportConflictException",
3016 format!("S3 bucket does not exist: {s3_bucket}"),
3017 )
3018 })?;
3019 let prefix = if s3_key_prefix.is_empty() {
3021 String::new()
3022 } else {
3023 s3_key_prefix.to_string()
3024 };
3025 for (key, obj) in &bucket.objects {
3026 if !prefix.is_empty() && !key.starts_with(&prefix) {
3027 continue;
3028 }
3029 let data = std::str::from_utf8(&obj.data).unwrap_or("");
3030 processed_size_bytes += obj.size as i64;
3031 for line in data.lines() {
3032 let line = line.trim();
3033 if line.is_empty() {
3034 continue;
3035 }
3036 if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3037 let item = if input_format == "DYNAMODB_JSON" {
3039 if let Some(item_obj) = parsed.get("Item") {
3040 item_obj.as_object().cloned().unwrap_or_default()
3041 } else {
3042 parsed.as_object().cloned().unwrap_or_default()
3043 }
3044 } else {
3045 parsed.as_object().cloned().unwrap_or_default()
3046 };
3047 if !item.is_empty() {
3048 imported_items
3049 .push(item.into_iter().collect::<HashMap<String, Value>>());
3050 }
3051 }
3052 }
3053 }
3054 }
3055
3056 let mut state = self.state.write();
3057
3058 if state.tables.contains_key(table_name) {
3059 return Err(AwsServiceError::aws_error(
3060 StatusCode::BAD_REQUEST,
3061 "ResourceInUseException",
3062 format!("Table already exists: {table_name}"),
3063 ));
3064 }
3065
3066 let now = Utc::now();
3067 let table_arn = format!(
3068 "arn:aws:dynamodb:{}:{}:table/{}",
3069 state.region, state.account_id, table_name
3070 );
3071 let import_arn = format!(
3072 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3073 state.region,
3074 state.account_id,
3075 table_name,
3076 uuid::Uuid::new_v4()
3077 );
3078
3079 let processed_item_count = imported_items.len() as i64;
3080
3081 let mut table = DynamoTable {
3082 name: table_name.to_string(),
3083 arn: table_arn.clone(),
3084 key_schema,
3085 attribute_definitions,
3086 provisioned_throughput: ProvisionedThroughput {
3087 read_capacity_units: 0,
3088 write_capacity_units: 0,
3089 },
3090 items: imported_items,
3091 gsi: Vec::new(),
3092 lsi: Vec::new(),
3093 tags: HashMap::new(),
3094 created_at: now,
3095 status: "ACTIVE".to_string(),
3096 item_count: 0,
3097 size_bytes: 0,
3098 billing_mode: "PAY_PER_REQUEST".to_string(),
3099 ttl_attribute: None,
3100 ttl_enabled: false,
3101 resource_policy: None,
3102 pitr_enabled: false,
3103 kinesis_destinations: Vec::new(),
3104 contributor_insights_status: "DISABLED".to_string(),
3105 contributor_insights_counters: HashMap::new(),
3106 stream_enabled: false,
3107 stream_view_type: None,
3108 stream_arn: None,
3109 stream_records: Arc::new(RwLock::new(Vec::new())),
3110 sse_type: None,
3111 sse_kms_key_arn: None,
3112 };
3113 table.recalculate_stats();
3114 state.tables.insert(table_name.to_string(), table);
3115
3116 let import_desc = ImportDescription {
3117 import_arn: import_arn.clone(),
3118 import_status: "COMPLETED".to_string(),
3119 table_arn: table_arn.clone(),
3120 table_name: table_name.to_string(),
3121 s3_bucket_source: s3_bucket.to_string(),
3122 input_format: input_format.to_string(),
3123 start_time: now,
3124 end_time: now,
3125 processed_item_count,
3126 processed_size_bytes,
3127 };
3128 state.imports.insert(import_arn.clone(), import_desc);
3129
3130 let table_ref = state.tables.get(table_name).unwrap();
3131 let ks: Vec<Value> = table_ref
3132 .key_schema
3133 .iter()
3134 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3135 .collect();
3136 let ad: Vec<Value> = table_ref
3137 .attribute_definitions
3138 .iter()
3139 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3140 .collect();
3141
3142 Self::ok_json(json!({
3143 "ImportTableDescription": {
3144 "ImportArn": import_arn,
3145 "ImportStatus": "COMPLETED",
3146 "TableArn": table_arn,
3147 "TableId": uuid::Uuid::new_v4().to_string(),
3148 "S3BucketSource": {
3149 "S3Bucket": s3_bucket
3150 },
3151 "InputFormat": input_format,
3152 "TableCreationParameters": {
3153 "TableName": table_name,
3154 "KeySchema": ks,
3155 "AttributeDefinitions": ad
3156 },
3157 "StartTime": now.timestamp() as f64,
3158 "EndTime": now.timestamp() as f64,
3159 "ProcessedItemCount": processed_item_count,
3160 "ProcessedSizeBytes": processed_size_bytes
3161 }
3162 }))
3163 }
3164
3165 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3166 let body = Self::parse_body(req)?;
3167 let import_arn = require_str(&body, "ImportArn")?;
3168
3169 let state = self.state.read();
3170 let import = state.imports.get(import_arn).ok_or_else(|| {
3171 AwsServiceError::aws_error(
3172 StatusCode::BAD_REQUEST,
3173 "ImportNotFoundException",
3174 format!("Import not found: {import_arn}"),
3175 )
3176 })?;
3177
3178 Self::ok_json(json!({
3179 "ImportTableDescription": {
3180 "ImportArn": import.import_arn,
3181 "ImportStatus": import.import_status,
3182 "TableArn": import.table_arn,
3183 "S3BucketSource": {
3184 "S3Bucket": import.s3_bucket_source
3185 },
3186 "InputFormat": import.input_format,
3187 "StartTime": import.start_time.timestamp() as f64,
3188 "EndTime": import.end_time.timestamp() as f64,
3189 "ProcessedItemCount": import.processed_item_count,
3190 "ProcessedSizeBytes": import.processed_size_bytes
3191 }
3192 }))
3193 }
3194
3195 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3196 let body = Self::parse_body(req)?;
3197 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3198 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3199 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3200 let table_arn = body["TableArn"].as_str();
3201
3202 let state = self.state.read();
3203 let summaries: Vec<Value> = state
3204 .imports
3205 .values()
3206 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3207 .map(|i| {
3208 json!({
3209 "ImportArn": i.import_arn,
3210 "ImportStatus": i.import_status,
3211 "TableArn": i.table_arn
3212 })
3213 })
3214 .collect();
3215
3216 Self::ok_json(json!({
3217 "ImportSummaryList": summaries
3218 }))
3219 }
3220}
3221
3222#[async_trait]
3223impl AwsService for DynamoDbService {
3224 fn service_name(&self) -> &str {
3225 "dynamodb"
3226 }
3227
3228 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3229 match req.action.as_str() {
3230 "CreateTable" => self.create_table(&req),
3231 "DeleteTable" => self.delete_table(&req),
3232 "DescribeTable" => self.describe_table(&req),
3233 "ListTables" => self.list_tables(&req),
3234 "UpdateTable" => self.update_table(&req),
3235 "PutItem" => self.put_item(&req),
3236 "GetItem" => self.get_item(&req),
3237 "DeleteItem" => self.delete_item(&req),
3238 "UpdateItem" => self.update_item(&req),
3239 "Query" => self.query(&req),
3240 "Scan" => self.scan(&req),
3241 "BatchGetItem" => self.batch_get_item(&req),
3242 "BatchWriteItem" => self.batch_write_item(&req),
3243 "TagResource" => self.tag_resource(&req),
3244 "UntagResource" => self.untag_resource(&req),
3245 "ListTagsOfResource" => self.list_tags_of_resource(&req),
3246 "TransactGetItems" => self.transact_get_items(&req),
3247 "TransactWriteItems" => self.transact_write_items(&req),
3248 "ExecuteStatement" => self.execute_statement(&req),
3249 "BatchExecuteStatement" => self.batch_execute_statement(&req),
3250 "ExecuteTransaction" => self.execute_transaction(&req),
3251 "UpdateTimeToLive" => self.update_time_to_live(&req),
3252 "DescribeTimeToLive" => self.describe_time_to_live(&req),
3253 "PutResourcePolicy" => self.put_resource_policy(&req),
3254 "GetResourcePolicy" => self.get_resource_policy(&req),
3255 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3256 "DescribeEndpoints" => self.describe_endpoints(&req),
3258 "DescribeLimits" => self.describe_limits(&req),
3259 "CreateBackup" => self.create_backup(&req),
3261 "DeleteBackup" => self.delete_backup(&req),
3262 "DescribeBackup" => self.describe_backup(&req),
3263 "ListBackups" => self.list_backups(&req),
3264 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3265 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3266 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3267 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3268 "CreateGlobalTable" => self.create_global_table(&req),
3270 "DescribeGlobalTable" => self.describe_global_table(&req),
3271 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3272 "ListGlobalTables" => self.list_global_tables(&req),
3273 "UpdateGlobalTable" => self.update_global_table(&req),
3274 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3275 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3276 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3277 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3279 "DisableKinesisStreamingDestination" => {
3280 self.disable_kinesis_streaming_destination(&req)
3281 }
3282 "DescribeKinesisStreamingDestination" => {
3283 self.describe_kinesis_streaming_destination(&req)
3284 }
3285 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3286 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3288 "UpdateContributorInsights" => self.update_contributor_insights(&req),
3289 "ListContributorInsights" => self.list_contributor_insights(&req),
3290 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3292 "DescribeExport" => self.describe_export(&req),
3293 "ListExports" => self.list_exports(&req),
3294 "ImportTable" => self.import_table(&req),
3295 "DescribeImport" => self.describe_import(&req),
3296 "ListImports" => self.list_imports(&req),
3297 _ => Err(AwsServiceError::action_not_implemented(
3298 "dynamodb",
3299 &req.action,
3300 )),
3301 }
3302 }
3303
3304 fn supported_actions(&self) -> &[&str] {
3305 &[
3306 "CreateTable",
3307 "DeleteTable",
3308 "DescribeTable",
3309 "ListTables",
3310 "UpdateTable",
3311 "PutItem",
3312 "GetItem",
3313 "DeleteItem",
3314 "UpdateItem",
3315 "Query",
3316 "Scan",
3317 "BatchGetItem",
3318 "BatchWriteItem",
3319 "TagResource",
3320 "UntagResource",
3321 "ListTagsOfResource",
3322 "TransactGetItems",
3323 "TransactWriteItems",
3324 "ExecuteStatement",
3325 "BatchExecuteStatement",
3326 "ExecuteTransaction",
3327 "UpdateTimeToLive",
3328 "DescribeTimeToLive",
3329 "PutResourcePolicy",
3330 "GetResourcePolicy",
3331 "DeleteResourcePolicy",
3332 "DescribeEndpoints",
3333 "DescribeLimits",
3334 "CreateBackup",
3335 "DeleteBackup",
3336 "DescribeBackup",
3337 "ListBackups",
3338 "RestoreTableFromBackup",
3339 "RestoreTableToPointInTime",
3340 "UpdateContinuousBackups",
3341 "DescribeContinuousBackups",
3342 "CreateGlobalTable",
3343 "DescribeGlobalTable",
3344 "DescribeGlobalTableSettings",
3345 "ListGlobalTables",
3346 "UpdateGlobalTable",
3347 "UpdateGlobalTableSettings",
3348 "DescribeTableReplicaAutoScaling",
3349 "UpdateTableReplicaAutoScaling",
3350 "EnableKinesisStreamingDestination",
3351 "DisableKinesisStreamingDestination",
3352 "DescribeKinesisStreamingDestination",
3353 "UpdateKinesisStreamingDestination",
3354 "DescribeContributorInsights",
3355 "UpdateContributorInsights",
3356 "ListContributorInsights",
3357 "ExportTableToPointInTime",
3358 "DescribeExport",
3359 "ListExports",
3360 "ImportTable",
3361 "DescribeImport",
3362 "ListImports",
3363 ]
3364 }
3365}
3366
3367fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3370 body[field].as_str().ok_or_else(|| {
3371 AwsServiceError::aws_error(
3372 StatusCode::BAD_REQUEST,
3373 "ValidationException",
3374 format!("{field} is required"),
3375 )
3376 })
3377}
3378
3379fn require_object(
3380 body: &Value,
3381 field: &str,
3382) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3383 let obj = body[field].as_object().ok_or_else(|| {
3384 AwsServiceError::aws_error(
3385 StatusCode::BAD_REQUEST,
3386 "ValidationException",
3387 format!("{field} is required"),
3388 )
3389 })?;
3390 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3391}
3392
3393fn get_table<'a>(
3394 tables: &'a HashMap<String, DynamoTable>,
3395 name: &str,
3396) -> Result<&'a DynamoTable, AwsServiceError> {
3397 tables.get(name).ok_or_else(|| {
3398 AwsServiceError::aws_error(
3399 StatusCode::BAD_REQUEST,
3400 "ResourceNotFoundException",
3401 format!("Requested resource not found: Table: {name} not found"),
3402 )
3403 })
3404}
3405
3406fn get_table_mut<'a>(
3407 tables: &'a mut HashMap<String, DynamoTable>,
3408 name: &str,
3409) -> Result<&'a mut DynamoTable, AwsServiceError> {
3410 tables.get_mut(name).ok_or_else(|| {
3411 AwsServiceError::aws_error(
3412 StatusCode::BAD_REQUEST,
3413 "ResourceNotFoundException",
3414 format!("Requested resource not found: Table: {name} not found"),
3415 )
3416 })
3417}
3418
3419fn find_table_by_arn<'a>(
3420 tables: &'a HashMap<String, DynamoTable>,
3421 arn: &str,
3422) -> Result<&'a DynamoTable, AwsServiceError> {
3423 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3424 AwsServiceError::aws_error(
3425 StatusCode::BAD_REQUEST,
3426 "ResourceNotFoundException",
3427 format!("Requested resource not found: {arn}"),
3428 )
3429 })
3430}
3431
3432fn find_table_by_arn_mut<'a>(
3433 tables: &'a mut HashMap<String, DynamoTable>,
3434 arn: &str,
3435) -> Result<&'a mut DynamoTable, AwsServiceError> {
3436 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3437 AwsServiceError::aws_error(
3438 StatusCode::BAD_REQUEST,
3439 "ResourceNotFoundException",
3440 format!("Requested resource not found: {arn}"),
3441 )
3442 })
3443}
3444
3445fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3446 let arr = val.as_array().ok_or_else(|| {
3447 AwsServiceError::aws_error(
3448 StatusCode::BAD_REQUEST,
3449 "ValidationException",
3450 "KeySchema is required",
3451 )
3452 })?;
3453 Ok(arr
3454 .iter()
3455 .map(|elem| KeySchemaElement {
3456 attribute_name: elem["AttributeName"]
3457 .as_str()
3458 .unwrap_or_default()
3459 .to_string(),
3460 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3461 })
3462 .collect())
3463}
3464
3465fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3466 let arr = val.as_array().ok_or_else(|| {
3467 AwsServiceError::aws_error(
3468 StatusCode::BAD_REQUEST,
3469 "ValidationException",
3470 "AttributeDefinitions is required",
3471 )
3472 })?;
3473 Ok(arr
3474 .iter()
3475 .map(|elem| AttributeDefinition {
3476 attribute_name: elem["AttributeName"]
3477 .as_str()
3478 .unwrap_or_default()
3479 .to_string(),
3480 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3481 })
3482 .collect())
3483}
3484
3485fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3486 Ok(ProvisionedThroughput {
3487 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3488 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3489 })
3490}
3491
3492fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3493 let Some(arr) = val.as_array() else {
3494 return Vec::new();
3495 };
3496 arr.iter()
3497 .filter_map(|g| {
3498 Some(GlobalSecondaryIndex {
3499 index_name: g["IndexName"].as_str()?.to_string(),
3500 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3501 projection: parse_projection(&g["Projection"]),
3502 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3503 .ok(),
3504 })
3505 })
3506 .collect()
3507}
3508
3509fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3510 let Some(arr) = val.as_array() else {
3511 return Vec::new();
3512 };
3513 arr.iter()
3514 .filter_map(|l| {
3515 Some(LocalSecondaryIndex {
3516 index_name: l["IndexName"].as_str()?.to_string(),
3517 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3518 projection: parse_projection(&l["Projection"]),
3519 })
3520 })
3521 .collect()
3522}
3523
3524fn parse_projection(val: &Value) -> Projection {
3525 Projection {
3526 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3527 non_key_attributes: val["NonKeyAttributes"]
3528 .as_array()
3529 .map(|arr| {
3530 arr.iter()
3531 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3532 .collect()
3533 })
3534 .unwrap_or_default(),
3535 }
3536}
3537
3538fn parse_tags(val: &Value) -> HashMap<String, String> {
3539 let mut tags = HashMap::new();
3540 if let Some(arr) = val.as_array() {
3541 for tag in arr {
3542 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3543 tags.insert(k.to_string(), v.to_string());
3544 }
3545 }
3546 }
3547 tags
3548}
3549
3550fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3551 let mut names = HashMap::new();
3552 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3553 for (k, v) in obj {
3554 if let Some(s) = v.as_str() {
3555 names.insert(k.clone(), s.to_string());
3556 }
3557 }
3558 }
3559 names
3560}
3561
3562fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3563 let mut values = HashMap::new();
3564 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3565 for (k, v) in obj {
3566 values.insert(k.clone(), v.clone());
3567 }
3568 }
3569 values
3570}
3571
3572fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3573 if name.starts_with('#') {
3574 expr_attr_names
3575 .get(name)
3576 .cloned()
3577 .unwrap_or_else(|| name.to_string())
3578 } else {
3579 name.to_string()
3580 }
3581}
3582
3583fn extract_key(
3584 table: &DynamoTable,
3585 item: &HashMap<String, AttributeValue>,
3586) -> HashMap<String, AttributeValue> {
3587 let mut key = HashMap::new();
3588 let hash_key = table.hash_key_name();
3589 if let Some(v) = item.get(hash_key) {
3590 key.insert(hash_key.to_string(), v.clone());
3591 }
3592 if let Some(range_key) = table.range_key_name() {
3593 if let Some(v) = item.get(range_key) {
3594 key.insert(range_key.to_string(), v.clone());
3595 }
3596 }
3597 key
3598}
3599
3600fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3602 let obj = value.as_object()?;
3603 if obj.is_empty() {
3604 return None;
3605 }
3606 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3607}
3608
3609fn item_matches_key(
3611 item: &HashMap<String, AttributeValue>,
3612 key: &HashMap<String, AttributeValue>,
3613 hash_key_name: &str,
3614 range_key_name: Option<&str>,
3615) -> bool {
3616 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3617 (Some(a), Some(b)) => a == b,
3618 _ => false,
3619 };
3620 if !hash_match {
3621 return false;
3622 }
3623 match range_key_name {
3624 Some(rk) => match (item.get(rk), key.get(rk)) {
3625 (Some(a), Some(b)) => a == b,
3626 (None, None) => true,
3627 _ => false,
3628 },
3629 None => true,
3630 }
3631}
3632
3633fn extract_key_for_schema(
3635 item: &HashMap<String, AttributeValue>,
3636 hash_key_name: &str,
3637 range_key_name: Option<&str>,
3638) -> HashMap<String, AttributeValue> {
3639 let mut key = HashMap::new();
3640 if let Some(v) = item.get(hash_key_name) {
3641 key.insert(hash_key_name.to_string(), v.clone());
3642 }
3643 if let Some(rk) = range_key_name {
3644 if let Some(v) = item.get(rk) {
3645 key.insert(rk.to_string(), v.clone());
3646 }
3647 }
3648 key
3649}
3650
3651fn validate_key_in_item(
3652 table: &DynamoTable,
3653 item: &HashMap<String, AttributeValue>,
3654) -> Result<(), AwsServiceError> {
3655 let hash_key = table.hash_key_name();
3656 if !item.contains_key(hash_key) {
3657 return Err(AwsServiceError::aws_error(
3658 StatusCode::BAD_REQUEST,
3659 "ValidationException",
3660 format!("Missing the key {hash_key} in the item"),
3661 ));
3662 }
3663 if let Some(range_key) = table.range_key_name() {
3664 if !item.contains_key(range_key) {
3665 return Err(AwsServiceError::aws_error(
3666 StatusCode::BAD_REQUEST,
3667 "ValidationException",
3668 format!("Missing the key {range_key} in the item"),
3669 ));
3670 }
3671 }
3672 Ok(())
3673}
3674
3675fn validate_key_attributes_in_key(
3676 table: &DynamoTable,
3677 key: &HashMap<String, AttributeValue>,
3678) -> Result<(), AwsServiceError> {
3679 let hash_key = table.hash_key_name();
3680 if !key.contains_key(hash_key) {
3681 return Err(AwsServiceError::aws_error(
3682 StatusCode::BAD_REQUEST,
3683 "ValidationException",
3684 format!("Missing the key {hash_key} in the item"),
3685 ));
3686 }
3687 Ok(())
3688}
3689
3690fn project_item(
3691 item: &HashMap<String, AttributeValue>,
3692 body: &Value,
3693) -> HashMap<String, AttributeValue> {
3694 let projection = body["ProjectionExpression"].as_str();
3695 match projection {
3696 Some(proj) if !proj.is_empty() => {
3697 let expr_attr_names = parse_expression_attribute_names(body);
3698 let attrs: Vec<String> = proj
3699 .split(',')
3700 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3701 .collect();
3702 let mut result = HashMap::new();
3703 for attr in &attrs {
3704 if let Some(v) = resolve_nested_path(item, attr) {
3705 insert_nested_value(&mut result, attr, v);
3706 }
3707 }
3708 result
3709 }
3710 _ => item.clone(),
3711 }
3712}
3713
3714fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3717 let mut result = String::new();
3719 for (i, segment) in path.split('.').enumerate() {
3720 if i > 0 {
3721 result.push('.');
3722 }
3723 if let Some(bracket_pos) = segment.find('[') {
3725 let key_part = &segment[..bracket_pos];
3726 let index_part = &segment[bracket_pos..];
3727 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3728 result.push_str(index_part);
3729 } else {
3730 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3731 }
3732 }
3733 result
3734}
3735
3736fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3738 let segments = parse_path_segments(path);
3739 if segments.is_empty() {
3740 return None;
3741 }
3742
3743 let first = &segments[0];
3744 let top_key = match first {
3745 PathSegment::Key(k) => k.as_str(),
3746 _ => return None,
3747 };
3748
3749 let mut current = item.get(top_key)?.clone();
3750
3751 for segment in &segments[1..] {
3752 match segment {
3753 PathSegment::Key(k) => {
3754 current = current.get("M")?.get(k)?.clone();
3756 }
3757 PathSegment::Index(idx) => {
3758 current = current.get("L")?.get(*idx)?.clone();
3760 }
3761 }
3762 }
3763
3764 Some(current)
3765}
3766
3767#[derive(Debug)]
3768enum PathSegment {
3769 Key(String),
3770 Index(usize),
3771}
3772
3773fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3775 let mut segments = Vec::new();
3776 let mut current = String::new();
3777
3778 let chars: Vec<char> = path.chars().collect();
3779 let mut i = 0;
3780 while i < chars.len() {
3781 match chars[i] {
3782 '.' => {
3783 if !current.is_empty() {
3784 segments.push(PathSegment::Key(current.clone()));
3785 current.clear();
3786 }
3787 }
3788 '[' => {
3789 if !current.is_empty() {
3790 segments.push(PathSegment::Key(current.clone()));
3791 current.clear();
3792 }
3793 i += 1;
3794 let mut num = String::new();
3795 while i < chars.len() && chars[i] != ']' {
3796 num.push(chars[i]);
3797 i += 1;
3798 }
3799 if let Ok(idx) = num.parse::<usize>() {
3800 segments.push(PathSegment::Index(idx));
3801 }
3802 }
3804 c => {
3805 current.push(c);
3806 }
3807 }
3808 i += 1;
3809 }
3810 if !current.is_empty() {
3811 segments.push(PathSegment::Key(current));
3812 }
3813 segments
3814}
3815
3816fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3819 if !path.contains('.') && !path.contains('[') {
3821 result.insert(path.to_string(), value);
3822 return;
3823 }
3824
3825 let segments = parse_path_segments(path);
3826 if segments.is_empty() {
3827 return;
3828 }
3829
3830 let top_key = match &segments[0] {
3831 PathSegment::Key(k) => k.clone(),
3832 _ => return,
3833 };
3834
3835 if segments.len() == 1 {
3836 result.insert(top_key, value);
3837 return;
3838 }
3839
3840 let wrapped = wrap_value_in_path(&segments[1..], value);
3842 let existing = result.remove(&top_key);
3844 let merged = match existing {
3845 Some(existing) => merge_attribute_values(existing, wrapped),
3846 None => wrapped,
3847 };
3848 result.insert(top_key, merged);
3849}
3850
3851fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3853 if segments.is_empty() {
3854 return value;
3855 }
3856 let inner = wrap_value_in_path(&segments[1..], value);
3857 match &segments[0] {
3858 PathSegment::Key(k) => {
3859 json!({"M": {k.clone(): inner}})
3860 }
3861 PathSegment::Index(idx) => {
3862 let mut arr = vec![Value::Null; idx + 1];
3863 arr[*idx] = inner;
3864 json!({"L": arr})
3865 }
3866 }
3867}
3868
3869fn merge_attribute_values(a: Value, b: Value) -> Value {
3871 if let (Some(a_map), Some(b_map)) = (
3872 a.get("M").and_then(|v| v.as_object()),
3873 b.get("M").and_then(|v| v.as_object()),
3874 ) {
3875 let mut merged = a_map.clone();
3876 for (k, v) in b_map {
3877 if let Some(existing) = merged.get(k) {
3878 merged.insert(
3879 k.clone(),
3880 merge_attribute_values(existing.clone(), v.clone()),
3881 );
3882 } else {
3883 merged.insert(k.clone(), v.clone());
3884 }
3885 }
3886 json!({"M": merged})
3887 } else {
3888 b
3889 }
3890}
3891
3892fn evaluate_condition(
3893 condition: &str,
3894 existing: Option<&HashMap<String, AttributeValue>>,
3895 expr_attr_names: &HashMap<String, String>,
3896 expr_attr_values: &HashMap<String, Value>,
3897) -> Result<(), AwsServiceError> {
3898 let cond = condition.trim();
3899
3900 if let Some(inner) = extract_function_arg(cond, "attribute_not_exists") {
3901 let attr = resolve_attr_name(inner, expr_attr_names);
3902 match existing {
3903 Some(item) if item.contains_key(&attr) => {
3904 return Err(AwsServiceError::aws_error(
3905 StatusCode::BAD_REQUEST,
3906 "ConditionalCheckFailedException",
3907 "The conditional request failed",
3908 ));
3909 }
3910 _ => return Ok(()),
3911 }
3912 }
3913
3914 if let Some(inner) = extract_function_arg(cond, "attribute_exists") {
3915 let attr = resolve_attr_name(inner, expr_attr_names);
3916 match existing {
3917 Some(item) if item.contains_key(&attr) => return Ok(()),
3918 _ => {
3919 return Err(AwsServiceError::aws_error(
3920 StatusCode::BAD_REQUEST,
3921 "ConditionalCheckFailedException",
3922 "The conditional request failed",
3923 ));
3924 }
3925 }
3926 }
3927
3928 if let Some((left, op, right)) = parse_simple_comparison(cond) {
3929 let attr_name = resolve_attr_name(left.trim(), expr_attr_names);
3930 let expected = expr_attr_values.get(right.trim());
3931 let actual = existing.and_then(|item| item.get(&attr_name));
3932
3933 let result = match op {
3934 "=" => actual == expected,
3935 "<>" => actual != expected,
3936 _ => true,
3937 };
3938
3939 if !result {
3940 return Err(AwsServiceError::aws_error(
3941 StatusCode::BAD_REQUEST,
3942 "ConditionalCheckFailedException",
3943 "The conditional request failed",
3944 ));
3945 }
3946 }
3947
3948 Ok(())
3949}
3950
3951fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3952 let prefix = format!("{func_name}(");
3953 if let Some(rest) = expr.strip_prefix(&prefix) {
3954 if let Some(inner) = rest.strip_suffix(')') {
3955 return Some(inner.trim());
3956 }
3957 }
3958 None
3959}
3960
3961fn parse_simple_comparison(expr: &str) -> Option<(&str, &str, &str)> {
3962 for op in &["<>", "=", "<", ">", "<=", ">="] {
3963 if let Some(pos) = expr.find(op) {
3964 let left = &expr[..pos];
3965 let right = &expr[pos + op.len()..];
3966 return Some((left, op, right));
3967 }
3968 }
3969 None
3970}
3971
3972fn evaluate_key_condition(
3973 expr: &str,
3974 item: &HashMap<String, AttributeValue>,
3975 hash_key_name: &str,
3976 _range_key_name: Option<&str>,
3977 expr_attr_names: &HashMap<String, String>,
3978 expr_attr_values: &HashMap<String, Value>,
3979) -> bool {
3980 let parts: Vec<&str> = split_on_and(expr);
3981 for part in &parts {
3982 let part = part.trim();
3983 if !evaluate_single_key_condition(
3984 part,
3985 item,
3986 hash_key_name,
3987 expr_attr_names,
3988 expr_attr_values,
3989 ) {
3990 return false;
3991 }
3992 }
3993 true
3994}
3995
3996fn split_on_and(expr: &str) -> Vec<&str> {
3997 let mut parts = Vec::new();
3998 let mut start = 0;
3999 let len = expr.len();
4000 let mut i = 0;
4001 let mut depth = 0;
4002 while i < len {
4003 let ch = expr.as_bytes()[i];
4004 if ch == b'(' {
4005 depth += 1;
4006 } else if ch == b')' {
4007 if depth > 0 {
4008 depth -= 1;
4009 }
4010 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
4011 parts.push(&expr[start..i]);
4012 start = i + 5;
4013 i = start;
4014 continue;
4015 }
4016 i += 1;
4017 }
4018 parts.push(&expr[start..]);
4019 parts
4020}
4021
4022fn split_on_or(expr: &str) -> Vec<&str> {
4023 let mut parts = Vec::new();
4024 let mut start = 0;
4025 let len = expr.len();
4026 let mut i = 0;
4027 let mut depth = 0;
4028 while i < len {
4029 let ch = expr.as_bytes()[i];
4030 if ch == b'(' {
4031 depth += 1;
4032 } else if ch == b')' {
4033 if depth > 0 {
4034 depth -= 1;
4035 }
4036 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
4037 parts.push(&expr[start..i]);
4038 start = i + 4;
4039 i = start;
4040 continue;
4041 }
4042 i += 1;
4043 }
4044 parts.push(&expr[start..]);
4045 parts
4046}
4047
4048fn evaluate_single_key_condition(
4049 part: &str,
4050 item: &HashMap<String, AttributeValue>,
4051 _hash_key_name: &str,
4052 expr_attr_names: &HashMap<String, String>,
4053 expr_attr_values: &HashMap<String, Value>,
4054) -> bool {
4055 let part = part.trim();
4056
4057 if let Some(rest) = part
4059 .strip_prefix("begins_with(")
4060 .or_else(|| part.strip_prefix("begins_with ("))
4061 {
4062 if let Some(inner) = rest.strip_suffix(')') {
4063 let mut split = inner.splitn(2, ',');
4064 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4065 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4066 let val_ref = val_ref.trim();
4067 let expected = expr_attr_values.get(val_ref);
4068 let actual = item.get(&attr_name);
4069 return match (actual, expected) {
4070 (Some(a), Some(e)) => {
4071 let a_str = extract_string_value(a);
4072 let e_str = extract_string_value(e);
4073 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4074 }
4075 _ => false,
4076 };
4077 }
4078 }
4079 return false;
4080 }
4081
4082 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4084 let attr_part = part[..between_pos].trim();
4085 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4086 let range_part = &part[between_pos + 7..];
4087 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4088 let lo_ref = range_part[..and_pos].trim();
4089 let hi_ref = range_part[and_pos + 5..].trim();
4090 let lo = expr_attr_values.get(lo_ref);
4091 let hi = expr_attr_values.get(hi_ref);
4092 let actual = item.get(&attr_name);
4093 return match (actual, lo, hi) {
4094 (Some(a), Some(l), Some(h)) => {
4095 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4096 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4097 }
4098 _ => false,
4099 };
4100 }
4101 }
4102
4103 for op in &["<=", ">=", "<>", "=", "<", ">"] {
4105 if let Some(pos) = part.find(op) {
4106 let left = part[..pos].trim();
4107 let right = part[pos + op.len()..].trim();
4108 let attr_name = resolve_attr_name(left, expr_attr_names);
4109 let expected = expr_attr_values.get(right);
4110 let actual = item.get(&attr_name);
4111
4112 return match *op {
4113 "=" => actual == expected,
4114 "<>" => actual != expected,
4115 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4116 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4117 "<=" => {
4118 let cmp = compare_attribute_values(actual, expected);
4119 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4120 }
4121 ">=" => {
4122 let cmp = compare_attribute_values(actual, expected);
4123 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4124 }
4125 _ => true,
4126 };
4127 }
4128 }
4129
4130 true
4131}
4132
4133fn extract_string_value(val: &Value) -> Option<String> {
4134 val.get("S")
4135 .and_then(|v| v.as_str())
4136 .map(|s| s.to_string())
4137 .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
4138}
4139
4140fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4141 match (a, b) {
4142 (None, None) => std::cmp::Ordering::Equal,
4143 (None, Some(_)) => std::cmp::Ordering::Less,
4144 (Some(_), None) => std::cmp::Ordering::Greater,
4145 (Some(a), Some(b)) => {
4146 let a_type = attribute_type_and_value(a);
4147 let b_type = attribute_type_and_value(b);
4148 match (a_type, b_type) {
4149 (Some(("S", a_val)), Some(("S", b_val))) => {
4150 let a_str = a_val.as_str().unwrap_or("");
4151 let b_str = b_val.as_str().unwrap_or("");
4152 a_str.cmp(b_str)
4153 }
4154 (Some(("N", a_val)), Some(("N", b_val))) => {
4155 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4156 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4157 a_num
4158 .partial_cmp(&b_num)
4159 .unwrap_or(std::cmp::Ordering::Equal)
4160 }
4161 (Some(("B", a_val)), Some(("B", b_val))) => {
4162 let a_str = a_val.as_str().unwrap_or("");
4163 let b_str = b_val.as_str().unwrap_or("");
4164 a_str.cmp(b_str)
4165 }
4166 _ => std::cmp::Ordering::Equal,
4167 }
4168 }
4169 }
4170}
4171
4172fn evaluate_filter_expression(
4173 expr: &str,
4174 item: &HashMap<String, AttributeValue>,
4175 expr_attr_names: &HashMap<String, String>,
4176 expr_attr_values: &HashMap<String, Value>,
4177) -> bool {
4178 let trimmed = expr.trim();
4179
4180 let or_parts = split_on_or(trimmed);
4182 if or_parts.len() > 1 {
4183 return or_parts.iter().any(|part| {
4184 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4185 });
4186 }
4187
4188 let and_parts = split_on_and(trimmed);
4190 if and_parts.len() > 1 {
4191 return and_parts.iter().all(|part| {
4192 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4193 });
4194 }
4195
4196 let stripped = strip_outer_parens(trimmed);
4198 if stripped != trimmed {
4199 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4200 }
4201
4202 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4203}
4204
4205fn strip_outer_parens(expr: &str) -> &str {
4207 let trimmed = expr.trim();
4208 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4209 return trimmed;
4210 }
4211 let inner = &trimmed[1..trimmed.len() - 1];
4213 let mut depth = 0;
4214 for ch in inner.bytes() {
4215 match ch {
4216 b'(' => depth += 1,
4217 b')' => {
4218 if depth == 0 {
4219 return trimmed; }
4221 depth -= 1;
4222 }
4223 _ => {}
4224 }
4225 }
4226 if depth == 0 {
4227 inner
4228 } else {
4229 trimmed
4230 }
4231}
4232
4233fn evaluate_single_filter_condition(
4234 part: &str,
4235 item: &HashMap<String, AttributeValue>,
4236 expr_attr_names: &HashMap<String, String>,
4237 expr_attr_values: &HashMap<String, Value>,
4238) -> bool {
4239 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4240 let attr = resolve_attr_name(inner, expr_attr_names);
4241 return item.contains_key(&attr);
4242 }
4243
4244 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4245 let attr = resolve_attr_name(inner, expr_attr_names);
4246 return !item.contains_key(&attr);
4247 }
4248
4249 if let Some(rest) = part
4250 .strip_prefix("begins_with(")
4251 .or_else(|| part.strip_prefix("begins_with ("))
4252 {
4253 if let Some(inner) = rest.strip_suffix(')') {
4254 let mut split = inner.splitn(2, ',');
4255 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4256 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4257 let expected = expr_attr_values.get(val_ref.trim());
4258 let actual = item.get(&attr_name);
4259 return match (actual, expected) {
4260 (Some(a), Some(e)) => {
4261 let a_str = extract_string_value(a);
4262 let e_str = extract_string_value(e);
4263 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4264 }
4265 _ => false,
4266 };
4267 }
4268 }
4269 }
4270
4271 if let Some(rest) = part
4272 .strip_prefix("contains(")
4273 .or_else(|| part.strip_prefix("contains ("))
4274 {
4275 if let Some(inner) = rest.strip_suffix(')') {
4276 let mut split = inner.splitn(2, ',');
4277 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4278 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4279 let expected = expr_attr_values.get(val_ref.trim());
4280 let actual = item.get(&attr_name);
4281 return match (actual, expected) {
4282 (Some(a), Some(e)) => {
4283 let a_str = extract_string_value(a);
4284 let e_str = extract_string_value(e);
4285 matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
4286 }
4287 _ => false,
4288 };
4289 }
4290 }
4291 }
4292
4293 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4294}
4295
4296fn apply_update_expression(
4297 item: &mut HashMap<String, AttributeValue>,
4298 expr: &str,
4299 expr_attr_names: &HashMap<String, String>,
4300 expr_attr_values: &HashMap<String, Value>,
4301) -> Result<(), AwsServiceError> {
4302 let clauses = parse_update_clauses(expr);
4303 for (action, assignments) in &clauses {
4304 match action.to_ascii_uppercase().as_str() {
4305 "SET" => {
4306 for assignment in assignments {
4307 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4308 }
4309 }
4310 "REMOVE" => {
4311 for attr_ref in assignments {
4312 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4313 item.remove(&attr);
4314 }
4315 }
4316 "ADD" => {
4317 for assignment in assignments {
4318 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4319 }
4320 }
4321 "DELETE" => {
4322 for assignment in assignments {
4323 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4324 }
4325 }
4326 _ => {}
4327 }
4328 }
4329 Ok(())
4330}
4331
4332fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4333 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4334 let upper = expr.to_ascii_uppercase();
4335 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4336 let mut positions: Vec<(usize, &str)> = Vec::new();
4337
4338 for kw in &keywords {
4339 let mut search_from = 0;
4340 while let Some(pos) = upper[search_from..].find(kw) {
4341 let abs_pos = search_from + pos;
4342 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4343 let after_pos = abs_pos + kw.len();
4344 let after_ok =
4345 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4346 if before_ok && after_ok {
4347 positions.push((abs_pos, kw));
4348 }
4349 search_from = abs_pos + kw.len();
4350 }
4351 }
4352
4353 positions.sort_by_key(|(pos, _)| *pos);
4354
4355 for (i, &(pos, kw)) in positions.iter().enumerate() {
4356 let start = pos + kw.len();
4357 let end = if i + 1 < positions.len() {
4358 positions[i + 1].0
4359 } else {
4360 expr.len()
4361 };
4362 let content = expr[start..end].trim();
4363 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4364 clauses.push((kw.to_string(), assignments));
4365 }
4366
4367 clauses
4368}
4369
4370fn apply_set_assignment(
4371 item: &mut HashMap<String, AttributeValue>,
4372 assignment: &str,
4373 expr_attr_names: &HashMap<String, String>,
4374 expr_attr_values: &HashMap<String, Value>,
4375) -> Result<(), AwsServiceError> {
4376 let Some((left, right)) = assignment.split_once('=') else {
4377 return Ok(());
4378 };
4379
4380 let attr = resolve_attr_name(left.trim(), expr_attr_names);
4381 let right = right.trim();
4382
4383 if let Some(rest) = right
4385 .strip_prefix("if_not_exists(")
4386 .or_else(|| right.strip_prefix("if_not_exists ("))
4387 {
4388 if let Some(inner) = rest.strip_suffix(')') {
4389 let mut split = inner.splitn(2, ',');
4390 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4391 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4392 if !item.contains_key(&check_name) {
4393 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4394 item.insert(attr, val.clone());
4395 }
4396 }
4397 return Ok(());
4398 }
4399 }
4400 }
4401
4402 if let Some(rest) = right
4404 .strip_prefix("list_append(")
4405 .or_else(|| right.strip_prefix("list_append ("))
4406 {
4407 if let Some(inner) = rest.strip_suffix(')') {
4408 let mut split = inner.splitn(2, ',');
4409 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4410 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4411 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4412
4413 let mut merged = Vec::new();
4414 if let Some(Value::Object(obj)) = &a_val {
4415 if let Some(Value::Array(arr)) = obj.get("L") {
4416 merged.extend(arr.clone());
4417 }
4418 }
4419 if let Some(Value::Object(obj)) = &b_val {
4420 if let Some(Value::Array(arr)) = obj.get("L") {
4421 merged.extend(arr.clone());
4422 }
4423 }
4424
4425 item.insert(attr, json!({"L": merged}));
4426 return Ok(());
4427 }
4428 }
4429 }
4430
4431 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4433 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4434 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4435
4436 let left_num = extract_number(&left_val).unwrap_or(0.0);
4437 let right_num = extract_number(&right_val).unwrap_or(0.0);
4438
4439 let result = if is_add {
4440 left_num + right_num
4441 } else {
4442 left_num - right_num
4443 };
4444
4445 let num_str = if result == result.trunc() {
4446 format!("{}", result as i64)
4447 } else {
4448 format!("{result}")
4449 };
4450
4451 item.insert(attr, json!({"N": num_str}));
4452 return Ok(());
4453 }
4454
4455 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4457 if let Some(v) = val {
4458 item.insert(attr, v);
4459 }
4460
4461 Ok(())
4462}
4463
4464fn resolve_value(
4465 reference: &str,
4466 item: &HashMap<String, AttributeValue>,
4467 expr_attr_names: &HashMap<String, String>,
4468 expr_attr_values: &HashMap<String, Value>,
4469) -> Option<Value> {
4470 let reference = reference.trim();
4471 if reference.starts_with(':') {
4472 expr_attr_values.get(reference).cloned()
4473 } else {
4474 let attr_name = resolve_attr_name(reference, expr_attr_names);
4475 item.get(&attr_name).cloned()
4476 }
4477}
4478
4479fn extract_number(val: &Option<Value>) -> Option<f64> {
4480 val.as_ref()
4481 .and_then(|v| v.get("N"))
4482 .and_then(|n| n.as_str())
4483 .and_then(|s| s.parse().ok())
4484}
4485
4486fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4487 let mut depth = 0;
4488 for (i, c) in expr.char_indices() {
4489 match c {
4490 '(' => depth += 1,
4491 ')' => depth -= 1,
4492 '+' if depth == 0 && i > 0 => {
4493 return Some((&expr[..i], &expr[i + 1..], true));
4494 }
4495 '-' if depth == 0 && i > 0 => {
4496 return Some((&expr[..i], &expr[i + 1..], false));
4497 }
4498 _ => {}
4499 }
4500 }
4501 None
4502}
4503
4504fn apply_add_assignment(
4505 item: &mut HashMap<String, AttributeValue>,
4506 assignment: &str,
4507 expr_attr_names: &HashMap<String, String>,
4508 expr_attr_values: &HashMap<String, Value>,
4509) -> Result<(), AwsServiceError> {
4510 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4511 if parts.len() != 2 {
4512 return Ok(());
4513 }
4514
4515 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4516 let val_ref = parts[1].trim();
4517 let add_val = expr_attr_values.get(val_ref);
4518
4519 if let Some(add_val) = add_val {
4520 if let Some(existing) = item.get(&attr) {
4521 if let (Some(existing_num), Some(add_num)) = (
4522 extract_number(&Some(existing.clone())),
4523 extract_number(&Some(add_val.clone())),
4524 ) {
4525 let result = existing_num + add_num;
4526 let num_str = if result == result.trunc() {
4527 format!("{}", result as i64)
4528 } else {
4529 format!("{result}")
4530 };
4531 item.insert(attr, json!({"N": num_str}));
4532 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4533 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4534 let mut merged: Vec<Value> = existing_set.clone();
4535 for v in add_set {
4536 if !merged.contains(v) {
4537 merged.push(v.clone());
4538 }
4539 }
4540 item.insert(attr, json!({"SS": merged}));
4541 }
4542 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4543 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4544 let mut merged: Vec<Value> = existing_set.clone();
4545 for v in add_set {
4546 if !merged.contains(v) {
4547 merged.push(v.clone());
4548 }
4549 }
4550 item.insert(attr, json!({"NS": merged}));
4551 }
4552 }
4553 } else {
4554 item.insert(attr, add_val.clone());
4555 }
4556 }
4557
4558 Ok(())
4559}
4560
4561fn apply_delete_assignment(
4562 item: &mut HashMap<String, AttributeValue>,
4563 assignment: &str,
4564 expr_attr_names: &HashMap<String, String>,
4565 expr_attr_values: &HashMap<String, Value>,
4566) -> Result<(), AwsServiceError> {
4567 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4568 if parts.len() != 2 {
4569 return Ok(());
4570 }
4571
4572 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4573 let val_ref = parts[1].trim();
4574 let del_val = expr_attr_values.get(val_ref);
4575
4576 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4577 if let (Some(existing_set), Some(del_set)) = (
4578 existing.get("SS").and_then(|v| v.as_array()),
4579 del_val.get("SS").and_then(|v| v.as_array()),
4580 ) {
4581 let filtered: Vec<Value> = existing_set
4582 .iter()
4583 .filter(|v| !del_set.contains(v))
4584 .cloned()
4585 .collect();
4586 if filtered.is_empty() {
4587 item.remove(&attr);
4588 } else {
4589 item.insert(attr, json!({"SS": filtered}));
4590 }
4591 } else if let (Some(existing_set), Some(del_set)) = (
4592 existing.get("NS").and_then(|v| v.as_array()),
4593 del_val.get("NS").and_then(|v| v.as_array()),
4594 ) {
4595 let filtered: Vec<Value> = existing_set
4596 .iter()
4597 .filter(|v| !del_set.contains(v))
4598 .cloned()
4599 .collect();
4600 if filtered.is_empty() {
4601 item.remove(&attr);
4602 } else {
4603 item.insert(attr, json!({"NS": filtered}));
4604 }
4605 }
4606 }
4607
4608 Ok(())
4609}
4610
4611#[allow(clippy::too_many_arguments)]
4612fn build_table_description_json(
4613 arn: &str,
4614 key_schema: &[KeySchemaElement],
4615 attribute_definitions: &[AttributeDefinition],
4616 provisioned_throughput: &ProvisionedThroughput,
4617 gsi: &[GlobalSecondaryIndex],
4618 lsi: &[LocalSecondaryIndex],
4619 billing_mode: &str,
4620 created_at: chrono::DateTime<chrono::Utc>,
4621 item_count: i64,
4622 size_bytes: i64,
4623 status: &str,
4624) -> Value {
4625 let table_name = arn.rsplit('/').next().unwrap_or("");
4626 let creation_timestamp =
4627 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4628
4629 let ks: Vec<Value> = key_schema
4630 .iter()
4631 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4632 .collect();
4633
4634 let ad: Vec<Value> = attribute_definitions
4635 .iter()
4636 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4637 .collect();
4638
4639 let mut desc = json!({
4640 "TableName": table_name,
4641 "TableArn": arn,
4642 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4643 "TableStatus": status,
4644 "KeySchema": ks,
4645 "AttributeDefinitions": ad,
4646 "CreationDateTime": creation_timestamp,
4647 "ItemCount": item_count,
4648 "TableSizeBytes": size_bytes,
4649 "BillingModeSummary": { "BillingMode": billing_mode },
4650 });
4651
4652 if billing_mode != "PAY_PER_REQUEST" {
4653 desc["ProvisionedThroughput"] = json!({
4654 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4655 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4656 "NumberOfDecreasesToday": 0,
4657 });
4658 } else {
4659 desc["ProvisionedThroughput"] = json!({
4660 "ReadCapacityUnits": 0,
4661 "WriteCapacityUnits": 0,
4662 "NumberOfDecreasesToday": 0,
4663 });
4664 }
4665
4666 if !gsi.is_empty() {
4667 let gsi_json: Vec<Value> = gsi
4668 .iter()
4669 .map(|g| {
4670 let gks: Vec<Value> = g
4671 .key_schema
4672 .iter()
4673 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4674 .collect();
4675 let mut idx = json!({
4676 "IndexName": g.index_name,
4677 "KeySchema": gks,
4678 "Projection": { "ProjectionType": g.projection.projection_type },
4679 "IndexStatus": "ACTIVE",
4680 "IndexArn": format!("{arn}/index/{}", g.index_name),
4681 "ItemCount": 0,
4682 "IndexSizeBytes": 0,
4683 });
4684 if !g.projection.non_key_attributes.is_empty() {
4685 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4686 }
4687 if let Some(ref pt) = g.provisioned_throughput {
4688 idx["ProvisionedThroughput"] = json!({
4689 "ReadCapacityUnits": pt.read_capacity_units,
4690 "WriteCapacityUnits": pt.write_capacity_units,
4691 "NumberOfDecreasesToday": 0,
4692 });
4693 }
4694 idx
4695 })
4696 .collect();
4697 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4698 }
4699
4700 if !lsi.is_empty() {
4701 let lsi_json: Vec<Value> = lsi
4702 .iter()
4703 .map(|l| {
4704 let lks: Vec<Value> = l
4705 .key_schema
4706 .iter()
4707 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4708 .collect();
4709 let mut idx = json!({
4710 "IndexName": l.index_name,
4711 "KeySchema": lks,
4712 "Projection": { "ProjectionType": l.projection.projection_type },
4713 "IndexArn": format!("{arn}/index/{}", l.index_name),
4714 "ItemCount": 0,
4715 "IndexSizeBytes": 0,
4716 });
4717 if !l.projection.non_key_attributes.is_empty() {
4718 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4719 }
4720 idx
4721 })
4722 .collect();
4723 desc["LocalSecondaryIndexes"] = json!(lsi_json);
4724 }
4725
4726 desc
4727}
4728
4729fn build_table_description(table: &DynamoTable) -> Value {
4730 let mut desc = build_table_description_json(
4731 &table.arn,
4732 &table.key_schema,
4733 &table.attribute_definitions,
4734 &table.provisioned_throughput,
4735 &table.gsi,
4736 &table.lsi,
4737 &table.billing_mode,
4738 table.created_at,
4739 table.item_count,
4740 table.size_bytes,
4741 &table.status,
4742 );
4743
4744 if table.stream_enabled {
4746 if let Some(ref stream_arn) = table.stream_arn {
4747 desc["LatestStreamArn"] = json!(stream_arn);
4748 desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
4749 }
4750 if let Some(ref view_type) = table.stream_view_type {
4751 desc["StreamSpecification"] = json!({
4752 "StreamEnabled": true,
4753 "StreamViewType": view_type,
4754 });
4755 }
4756 }
4757
4758 if let Some(ref sse_type) = table.sse_type {
4760 let mut sse_desc = json!({
4761 "Status": "ENABLED",
4762 "SSEType": sse_type,
4763 });
4764 if let Some(ref key_arn) = table.sse_kms_key_arn {
4765 sse_desc["KMSMasterKeyArn"] = json!(key_arn);
4766 }
4767 desc["SSEDescription"] = sse_desc;
4768 } else {
4769 desc["SSEDescription"] = json!({
4771 "Status": "ENABLED",
4772 "SSEType": "AES256",
4773 });
4774 }
4775
4776 desc
4777}
4778
4779fn execute_partiql_statement(
4780 state: &SharedDynamoDbState,
4781 statement: &str,
4782 parameters: &[Value],
4783) -> Result<AwsResponse, AwsServiceError> {
4784 let trimmed = statement.trim();
4785 let upper = trimmed.to_ascii_uppercase();
4786
4787 if upper.starts_with("SELECT") {
4788 execute_partiql_select(state, trimmed, parameters)
4789 } else if upper.starts_with("INSERT") {
4790 execute_partiql_insert(state, trimmed, parameters)
4791 } else if upper.starts_with("UPDATE") {
4792 execute_partiql_update(state, trimmed, parameters)
4793 } else if upper.starts_with("DELETE") {
4794 execute_partiql_delete(state, trimmed, parameters)
4795 } else {
4796 Err(AwsServiceError::aws_error(
4797 StatusCode::BAD_REQUEST,
4798 "ValidationException",
4799 format!("Unsupported PartiQL statement: {trimmed}"),
4800 ))
4801 }
4802}
4803
4804fn execute_partiql_select(
4806 state: &SharedDynamoDbState,
4807 statement: &str,
4808 parameters: &[Value],
4809) -> Result<AwsResponse, AwsServiceError> {
4810 let upper = statement.to_ascii_uppercase();
4812 let from_pos = upper.find("FROM").ok_or_else(|| {
4813 AwsServiceError::aws_error(
4814 StatusCode::BAD_REQUEST,
4815 "ValidationException",
4816 "Invalid SELECT statement: missing FROM",
4817 )
4818 })?;
4819
4820 let after_from = statement[from_pos + 4..].trim();
4821 let (table_name, rest) = parse_partiql_table_name(after_from);
4822
4823 let state = state.read();
4824 let table = get_table(&state.tables, &table_name)?;
4825
4826 let rest_upper = rest.trim().to_ascii_uppercase();
4827 if rest_upper.starts_with("WHERE") {
4828 let where_clause = rest.trim()[5..].trim();
4829 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4830 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4831 DynamoDbService::ok_json(json!({ "Items": items }))
4832 } else {
4833 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4835 DynamoDbService::ok_json(json!({ "Items": items }))
4836 }
4837}
4838
4839fn execute_partiql_insert(
4840 state: &SharedDynamoDbState,
4841 statement: &str,
4842 parameters: &[Value],
4843) -> Result<AwsResponse, AwsServiceError> {
4844 let upper = statement.to_ascii_uppercase();
4847 let into_pos = upper.find("INTO").ok_or_else(|| {
4848 AwsServiceError::aws_error(
4849 StatusCode::BAD_REQUEST,
4850 "ValidationException",
4851 "Invalid INSERT statement: missing INTO",
4852 )
4853 })?;
4854
4855 let after_into = statement[into_pos + 4..].trim();
4856 let (table_name, rest) = parse_partiql_table_name(after_into);
4857
4858 let rest_upper = rest.trim().to_ascii_uppercase();
4859 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4860 AwsServiceError::aws_error(
4861 StatusCode::BAD_REQUEST,
4862 "ValidationException",
4863 "Invalid INSERT statement: missing VALUE",
4864 )
4865 })?;
4866
4867 let value_str = rest.trim()[value_pos + 5..].trim();
4868 let item = parse_partiql_value_object(value_str, parameters)?;
4869
4870 let mut state = state.write();
4871 let table = get_table_mut(&mut state.tables, &table_name)?;
4872 let key = extract_key(table, &item);
4873 if table.find_item_index(&key).is_some() {
4874 return Err(AwsServiceError::aws_error(
4876 StatusCode::BAD_REQUEST,
4877 "DuplicateItemException",
4878 "Duplicate primary key exists in table",
4879 ));
4880 } else {
4881 table.items.push(item);
4882 }
4883 table.recalculate_stats();
4884
4885 DynamoDbService::ok_json(json!({}))
4886}
4887
4888fn execute_partiql_update(
4889 state: &SharedDynamoDbState,
4890 statement: &str,
4891 parameters: &[Value],
4892) -> Result<AwsResponse, AwsServiceError> {
4893 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
4897
4898 let rest_upper = rest.trim().to_ascii_uppercase();
4899 let set_pos = rest_upper.find("SET").ok_or_else(|| {
4900 AwsServiceError::aws_error(
4901 StatusCode::BAD_REQUEST,
4902 "ValidationException",
4903 "Invalid UPDATE statement: missing SET",
4904 )
4905 })?;
4906
4907 let after_set = rest.trim()[set_pos + 3..].trim();
4908
4909 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4911 let (set_clause, where_clause) = if let Some(wp) = where_pos {
4912 (&after_set[..wp], after_set[wp + 5..].trim())
4913 } else {
4914 (after_set, "")
4915 };
4916
4917 let mut state = state.write();
4918 let table = get_table_mut(&mut state.tables, &table_name)?;
4919
4920 let matched_indices = if !where_clause.is_empty() {
4921 find_partiql_where_indices(table, where_clause, parameters)?
4922 } else {
4923 (0..table.items.len()).collect()
4924 };
4925
4926 let param_offset = count_params_in_str(where_clause);
4928 let assignments: Vec<&str> = set_clause.split(',').collect();
4929 for idx in &matched_indices {
4930 let mut local_offset = param_offset;
4931 for assignment in &assignments {
4932 let assignment = assignment.trim();
4933 if let Some((attr, val_str)) = assignment.split_once('=') {
4934 let attr = attr.trim().trim_matches('"');
4935 let val_str = val_str.trim();
4936 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4937 if let Some(v) = value {
4938 table.items[*idx].insert(attr.to_string(), v);
4939 }
4940 }
4941 }
4942 }
4943 table.recalculate_stats();
4944
4945 DynamoDbService::ok_json(json!({}))
4946}
4947
4948fn execute_partiql_delete(
4949 state: &SharedDynamoDbState,
4950 statement: &str,
4951 parameters: &[Value],
4952) -> Result<AwsResponse, AwsServiceError> {
4953 let upper = statement.to_ascii_uppercase();
4955 let from_pos = upper.find("FROM").ok_or_else(|| {
4956 AwsServiceError::aws_error(
4957 StatusCode::BAD_REQUEST,
4958 "ValidationException",
4959 "Invalid DELETE statement: missing FROM",
4960 )
4961 })?;
4962
4963 let after_from = statement[from_pos + 4..].trim();
4964 let (table_name, rest) = parse_partiql_table_name(after_from);
4965
4966 let rest_upper = rest.trim().to_ascii_uppercase();
4967 if !rest_upper.starts_with("WHERE") {
4968 return Err(AwsServiceError::aws_error(
4969 StatusCode::BAD_REQUEST,
4970 "ValidationException",
4971 "DELETE requires a WHERE clause",
4972 ));
4973 }
4974 let where_clause = rest.trim()[5..].trim();
4975
4976 let mut state = state.write();
4977 let table = get_table_mut(&mut state.tables, &table_name)?;
4978
4979 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4980 indices.sort_unstable();
4982 indices.reverse();
4983 for idx in indices {
4984 table.items.remove(idx);
4985 }
4986 table.recalculate_stats();
4987
4988 DynamoDbService::ok_json(json!({}))
4989}
4990
4991fn parse_partiql_table_name(s: &str) -> (String, &str) {
4994 let s = s.trim();
4995 if let Some(stripped) = s.strip_prefix('"') {
4996 if let Some(end) = stripped.find('"') {
4998 let name = &stripped[..end];
4999 let rest = &stripped[end + 1..];
5000 (name.to_string(), rest)
5001 } else {
5002 let end = s.find(' ').unwrap_or(s.len());
5003 (s[..end].trim_matches('"').to_string(), &s[end..])
5004 }
5005 } else {
5006 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5007 (s[..end].to_string(), &s[end..])
5008 }
5009}
5010
5011fn evaluate_partiql_where<'a>(
5014 table: &'a DynamoTable,
5015 where_clause: &str,
5016 parameters: &[Value],
5017) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5018 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5019 Ok(indices.iter().map(|i| &table.items[*i]).collect())
5020}
5021
5022fn find_partiql_where_indices(
5023 table: &DynamoTable,
5024 where_clause: &str,
5025 parameters: &[Value],
5026) -> Result<Vec<usize>, AwsServiceError> {
5027 let upper = where_clause.to_uppercase();
5030 let conditions = if upper.contains(" AND ") {
5031 let mut parts = Vec::new();
5033 let mut last = 0;
5034 for (i, _) in upper.match_indices(" AND ") {
5035 parts.push(where_clause[last..i].trim());
5036 last = i + 5;
5037 }
5038 parts.push(where_clause[last..].trim());
5039 parts
5040 } else {
5041 vec![where_clause.trim()]
5042 };
5043
5044 let mut param_idx = 0usize;
5045 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5046
5047 for cond in &conditions {
5048 let cond = cond.trim();
5049 if let Some((left, right)) = cond.split_once('=') {
5050 let attr = left.trim().trim_matches('"').to_string();
5051 let val_str = right.trim();
5052 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5053 if let Some(v) = value {
5054 parsed_conditions.push((attr, v));
5055 }
5056 }
5057 }
5058
5059 let mut indices = Vec::new();
5060 for (i, item) in table.items.iter().enumerate() {
5061 let all_match = parsed_conditions
5062 .iter()
5063 .all(|(attr, expected)| item.get(attr) == Some(expected));
5064 if all_match {
5065 indices.push(i);
5066 }
5067 }
5068
5069 Ok(indices)
5070}
5071
5072fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5077 let s = s.trim();
5078 if s == "?" {
5079 let idx = *param_idx;
5080 *param_idx += 1;
5081 parameters.get(idx).cloned()
5082 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5083 let inner = &s[1..s.len() - 1];
5084 Some(json!({"S": inner}))
5085 } else if let Ok(n) = s.parse::<f64>() {
5086 let num_str = if n == n.trunc() {
5087 format!("{}", n as i64)
5088 } else {
5089 format!("{n}")
5090 };
5091 Some(json!({"N": num_str}))
5092 } else {
5093 None
5094 }
5095}
5096
5097fn parse_partiql_value_object(
5099 s: &str,
5100 parameters: &[Value],
5101) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5102 let s = s.trim();
5103 let inner = s
5104 .strip_prefix('{')
5105 .and_then(|s| s.strip_suffix('}'))
5106 .ok_or_else(|| {
5107 AwsServiceError::aws_error(
5108 StatusCode::BAD_REQUEST,
5109 "ValidationException",
5110 "Invalid VALUE: expected object literal",
5111 )
5112 })?;
5113
5114 let mut item = HashMap::new();
5115 let mut param_idx = 0usize;
5116
5117 for pair in split_partiql_pairs(inner) {
5119 let pair = pair.trim();
5120 if pair.is_empty() {
5121 continue;
5122 }
5123 if let Some((key_part, val_part)) = pair.split_once(':') {
5124 let key = key_part
5125 .trim()
5126 .trim_matches('\'')
5127 .trim_matches('"')
5128 .to_string();
5129 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5130 item.insert(key, val);
5131 }
5132 }
5133 }
5134
5135 Ok(item)
5136}
5137
5138fn split_partiql_pairs(s: &str) -> Vec<&str> {
5140 let mut parts = Vec::new();
5141 let mut start = 0;
5142 let mut depth = 0;
5143 let mut in_quote = false;
5144
5145 for (i, c) in s.char_indices() {
5146 match c {
5147 '\'' if !in_quote => in_quote = true,
5148 '\'' if in_quote => in_quote = false,
5149 '{' if !in_quote => depth += 1,
5150 '}' if !in_quote => depth -= 1,
5151 ',' if !in_quote && depth == 0 => {
5152 parts.push(&s[start..i]);
5153 start = i + 1;
5154 }
5155 _ => {}
5156 }
5157 }
5158 parts.push(&s[start..]);
5159 parts
5160}
5161
5162fn count_params_in_str(s: &str) -> usize {
5164 s.chars().filter(|c| *c == '?').count()
5165}
5166
5167#[cfg(test)]
5168mod tests {
5169 use super::*;
5170 use serde_json::json;
5171
5172 #[test]
5173 fn test_parse_update_clauses_set() {
5174 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5175 assert_eq!(clauses.len(), 1);
5176 assert_eq!(clauses[0].0, "SET");
5177 assert_eq!(clauses[0].1.len(), 2);
5178 }
5179
5180 #[test]
5181 fn test_parse_update_clauses_set_and_remove() {
5182 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5183 assert_eq!(clauses.len(), 2);
5184 assert_eq!(clauses[0].0, "SET");
5185 assert_eq!(clauses[1].0, "REMOVE");
5186 }
5187
5188 #[test]
5189 fn test_evaluate_key_condition_simple() {
5190 let mut item = HashMap::new();
5191 item.insert("pk".to_string(), json!({"S": "user1"}));
5192 item.insert("sk".to_string(), json!({"S": "order1"}));
5193
5194 let mut expr_values = HashMap::new();
5195 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5196
5197 assert!(evaluate_key_condition(
5198 "pk = :pk",
5199 &item,
5200 "pk",
5201 Some("sk"),
5202 &HashMap::new(),
5203 &expr_values,
5204 ));
5205 }
5206
5207 #[test]
5208 fn test_compare_attribute_values_numbers() {
5209 let a = json!({"N": "10"});
5210 let b = json!({"N": "20"});
5211 assert_eq!(
5212 compare_attribute_values(Some(&a), Some(&b)),
5213 std::cmp::Ordering::Less
5214 );
5215 }
5216
5217 #[test]
5218 fn test_compare_attribute_values_strings() {
5219 let a = json!({"S": "apple"});
5220 let b = json!({"S": "banana"});
5221 assert_eq!(
5222 compare_attribute_values(Some(&a), Some(&b)),
5223 std::cmp::Ordering::Less
5224 );
5225 }
5226
5227 #[test]
5228 fn test_split_on_and() {
5229 let parts = split_on_and("pk = :pk AND sk > :sk");
5230 assert_eq!(parts.len(), 2);
5231 assert_eq!(parts[0].trim(), "pk = :pk");
5232 assert_eq!(parts[1].trim(), "sk > :sk");
5233 }
5234
5235 #[test]
5236 fn test_split_on_and_respects_parentheses() {
5237 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5239 assert_eq!(parts.len(), 1);
5241 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5242 }
5243
5244 #[test]
5245 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5246 let mut item = HashMap::new();
5248 item.insert("x".to_string(), json!({"S": "no"}));
5249 item.insert("y".to_string(), json!({"S": "no"}));
5250 item.insert("z".to_string(), json!({"S": "yes"}));
5251
5252 let mut expr_values = HashMap::new();
5253 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5254
5255 let result = evaluate_filter_expression(
5257 "(x = :yes AND y = :yes) OR z = :yes",
5258 &item,
5259 &HashMap::new(),
5260 &expr_values,
5261 );
5262 assert!(result, "should match because z = :yes is true");
5263
5264 let mut item2 = HashMap::new();
5266 item2.insert("x".to_string(), json!({"S": "no"}));
5267 item2.insert("y".to_string(), json!({"S": "no"}));
5268 item2.insert("z".to_string(), json!({"S": "no"}));
5269
5270 let result2 = evaluate_filter_expression(
5271 "(x = :yes AND y = :yes) OR z = :yes",
5272 &item2,
5273 &HashMap::new(),
5274 &expr_values,
5275 );
5276 assert!(!result2, "should not match because nothing is true");
5277 }
5278
5279 #[test]
5280 fn test_project_item_nested_path() {
5281 let mut item = HashMap::new();
5283 item.insert("pk".to_string(), json!({"S": "key1"}));
5284 item.insert(
5285 "data".to_string(),
5286 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5287 );
5288
5289 let body = json!({
5290 "ProjectionExpression": "data[0].name"
5291 });
5292
5293 let projected = project_item(&item, &body);
5294 let name = projected
5296 .get("data")
5297 .and_then(|v| v.get("L"))
5298 .and_then(|v| v.get(0))
5299 .and_then(|v| v.get("M"))
5300 .and_then(|v| v.get("name"))
5301 .and_then(|v| v.get("S"))
5302 .and_then(|v| v.as_str());
5303 assert_eq!(name, Some("Alice"));
5304
5305 let age = projected
5307 .get("data")
5308 .and_then(|v| v.get("L"))
5309 .and_then(|v| v.get(0))
5310 .and_then(|v| v.get("M"))
5311 .and_then(|v| v.get("age"));
5312 assert!(age.is_none(), "age should not be present in projection");
5313 }
5314
5315 #[test]
5316 fn test_resolve_nested_path_map() {
5317 let mut item = HashMap::new();
5318 item.insert(
5319 "info".to_string(),
5320 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5321 );
5322
5323 let result = resolve_nested_path(&item, "info.address.city");
5324 assert_eq!(result, Some(json!({"S": "NYC"})));
5325 }
5326
5327 #[test]
5328 fn test_resolve_nested_path_list_then_map() {
5329 let mut item = HashMap::new();
5330 item.insert(
5331 "items".to_string(),
5332 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5333 );
5334
5335 let result = resolve_nested_path(&item, "items[0].sku");
5336 assert_eq!(result, Some(json!({"S": "ABC"})));
5337 }
5338
5339 use crate::state::SharedDynamoDbState;
5342 use parking_lot::RwLock;
5343 use std::sync::Arc;
5344
5345 fn make_service() -> DynamoDbService {
5346 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5347 "123456789012",
5348 "us-east-1",
5349 )));
5350 DynamoDbService::new(state)
5351 }
5352
5353 fn make_request(action: &str, body: Value) -> AwsRequest {
5354 AwsRequest {
5355 service: "dynamodb".to_string(),
5356 action: action.to_string(),
5357 region: "us-east-1".to_string(),
5358 account_id: "123456789012".to_string(),
5359 request_id: "test-id".to_string(),
5360 headers: http::HeaderMap::new(),
5361 query_params: HashMap::new(),
5362 body: serde_json::to_vec(&body).unwrap().into(),
5363 path_segments: vec![],
5364 raw_path: "/".to_string(),
5365 raw_query: String::new(),
5366 method: http::Method::POST,
5367 is_query_protocol: false,
5368 access_key_id: None,
5369 }
5370 }
5371
5372 fn create_test_table(svc: &DynamoDbService) {
5373 let req = make_request(
5374 "CreateTable",
5375 json!({
5376 "TableName": "test-table",
5377 "KeySchema": [
5378 { "AttributeName": "pk", "KeyType": "HASH" }
5379 ],
5380 "AttributeDefinitions": [
5381 { "AttributeName": "pk", "AttributeType": "S" }
5382 ],
5383 "BillingMode": "PAY_PER_REQUEST"
5384 }),
5385 );
5386 svc.create_table(&req).unwrap();
5387 }
5388
5389 #[test]
5390 fn delete_item_return_values_all_old() {
5391 let svc = make_service();
5392 create_test_table(&svc);
5393
5394 let req = make_request(
5396 "PutItem",
5397 json!({
5398 "TableName": "test-table",
5399 "Item": {
5400 "pk": { "S": "key1" },
5401 "name": { "S": "Alice" },
5402 "age": { "N": "30" }
5403 }
5404 }),
5405 );
5406 svc.put_item(&req).unwrap();
5407
5408 let req = make_request(
5410 "DeleteItem",
5411 json!({
5412 "TableName": "test-table",
5413 "Key": { "pk": { "S": "key1" } },
5414 "ReturnValues": "ALL_OLD"
5415 }),
5416 );
5417 let resp = svc.delete_item(&req).unwrap();
5418 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5419
5420 let attrs = &body["Attributes"];
5422 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5423 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5424 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5425
5426 let req = make_request(
5428 "GetItem",
5429 json!({
5430 "TableName": "test-table",
5431 "Key": { "pk": { "S": "key1" } }
5432 }),
5433 );
5434 let resp = svc.get_item(&req).unwrap();
5435 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5436 assert!(body.get("Item").is_none(), "item should be deleted");
5437 }
5438
5439 #[test]
5440 fn transact_get_items_returns_existing_and_missing() {
5441 let svc = make_service();
5442 create_test_table(&svc);
5443
5444 let req = make_request(
5446 "PutItem",
5447 json!({
5448 "TableName": "test-table",
5449 "Item": {
5450 "pk": { "S": "exists" },
5451 "val": { "S": "hello" }
5452 }
5453 }),
5454 );
5455 svc.put_item(&req).unwrap();
5456
5457 let req = make_request(
5458 "TransactGetItems",
5459 json!({
5460 "TransactItems": [
5461 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5462 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5463 ]
5464 }),
5465 );
5466 let resp = svc.transact_get_items(&req).unwrap();
5467 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5468 let responses = body["Responses"].as_array().unwrap();
5469 assert_eq!(responses.len(), 2);
5470 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5471 assert!(responses[1].get("Item").is_none());
5472 }
5473
5474 #[test]
5475 fn transact_write_items_put_and_delete() {
5476 let svc = make_service();
5477 create_test_table(&svc);
5478
5479 let req = make_request(
5481 "PutItem",
5482 json!({
5483 "TableName": "test-table",
5484 "Item": {
5485 "pk": { "S": "to-delete" },
5486 "val": { "S": "bye" }
5487 }
5488 }),
5489 );
5490 svc.put_item(&req).unwrap();
5491
5492 let req = make_request(
5494 "TransactWriteItems",
5495 json!({
5496 "TransactItems": [
5497 {
5498 "Put": {
5499 "TableName": "test-table",
5500 "Item": {
5501 "pk": { "S": "new-item" },
5502 "val": { "S": "hi" }
5503 }
5504 }
5505 },
5506 {
5507 "Delete": {
5508 "TableName": "test-table",
5509 "Key": { "pk": { "S": "to-delete" } }
5510 }
5511 }
5512 ]
5513 }),
5514 );
5515 let resp = svc.transact_write_items(&req).unwrap();
5516 assert_eq!(resp.status, StatusCode::OK);
5517
5518 let req = make_request(
5520 "GetItem",
5521 json!({
5522 "TableName": "test-table",
5523 "Key": { "pk": { "S": "new-item" } }
5524 }),
5525 );
5526 let resp = svc.get_item(&req).unwrap();
5527 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5528 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5529
5530 let req = make_request(
5532 "GetItem",
5533 json!({
5534 "TableName": "test-table",
5535 "Key": { "pk": { "S": "to-delete" } }
5536 }),
5537 );
5538 let resp = svc.get_item(&req).unwrap();
5539 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5540 assert!(body.get("Item").is_none());
5541 }
5542
5543 #[test]
5544 fn transact_write_items_condition_check_failure() {
5545 let svc = make_service();
5546 create_test_table(&svc);
5547
5548 let req = make_request(
5550 "TransactWriteItems",
5551 json!({
5552 "TransactItems": [
5553 {
5554 "ConditionCheck": {
5555 "TableName": "test-table",
5556 "Key": { "pk": { "S": "nonexistent" } },
5557 "ConditionExpression": "attribute_exists(pk)"
5558 }
5559 }
5560 ]
5561 }),
5562 );
5563 let resp = svc.transact_write_items(&req).unwrap();
5564 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5566 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5567 assert_eq!(
5568 body["__type"].as_str().unwrap(),
5569 "TransactionCanceledException"
5570 );
5571 assert!(body["CancellationReasons"].as_array().is_some());
5572 }
5573
5574 #[test]
5575 fn update_and_describe_time_to_live() {
5576 let svc = make_service();
5577 create_test_table(&svc);
5578
5579 let req = make_request(
5581 "UpdateTimeToLive",
5582 json!({
5583 "TableName": "test-table",
5584 "TimeToLiveSpecification": {
5585 "AttributeName": "ttl",
5586 "Enabled": true
5587 }
5588 }),
5589 );
5590 let resp = svc.update_time_to_live(&req).unwrap();
5591 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5592 assert_eq!(
5593 body["TimeToLiveSpecification"]["AttributeName"]
5594 .as_str()
5595 .unwrap(),
5596 "ttl"
5597 );
5598 assert!(body["TimeToLiveSpecification"]["Enabled"]
5599 .as_bool()
5600 .unwrap());
5601
5602 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5604 let resp = svc.describe_time_to_live(&req).unwrap();
5605 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5606 assert_eq!(
5607 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5608 .as_str()
5609 .unwrap(),
5610 "ENABLED"
5611 );
5612 assert_eq!(
5613 body["TimeToLiveDescription"]["AttributeName"]
5614 .as_str()
5615 .unwrap(),
5616 "ttl"
5617 );
5618
5619 let req = make_request(
5621 "UpdateTimeToLive",
5622 json!({
5623 "TableName": "test-table",
5624 "TimeToLiveSpecification": {
5625 "AttributeName": "ttl",
5626 "Enabled": false
5627 }
5628 }),
5629 );
5630 svc.update_time_to_live(&req).unwrap();
5631
5632 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5633 let resp = svc.describe_time_to_live(&req).unwrap();
5634 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5635 assert_eq!(
5636 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5637 .as_str()
5638 .unwrap(),
5639 "DISABLED"
5640 );
5641 }
5642
5643 #[test]
5644 fn resource_policy_lifecycle() {
5645 let svc = make_service();
5646 create_test_table(&svc);
5647
5648 let table_arn = {
5649 let state = svc.state.read();
5650 state.tables.get("test-table").unwrap().arn.clone()
5651 };
5652
5653 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5655 let req = make_request(
5656 "PutResourcePolicy",
5657 json!({
5658 "ResourceArn": table_arn,
5659 "Policy": policy_doc
5660 }),
5661 );
5662 let resp = svc.put_resource_policy(&req).unwrap();
5663 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5664 assert!(body["RevisionId"].as_str().is_some());
5665
5666 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5668 let resp = svc.get_resource_policy(&req).unwrap();
5669 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5670 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5671
5672 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5674 svc.delete_resource_policy(&req).unwrap();
5675
5676 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5678 let resp = svc.get_resource_policy(&req).unwrap();
5679 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5680 assert!(body["Policy"].is_null());
5681 }
5682
5683 #[test]
5684 fn describe_endpoints() {
5685 let svc = make_service();
5686 let req = make_request("DescribeEndpoints", json!({}));
5687 let resp = svc.describe_endpoints(&req).unwrap();
5688 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5689 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5690 }
5691
5692 #[test]
5693 fn describe_limits() {
5694 let svc = make_service();
5695 let req = make_request("DescribeLimits", json!({}));
5696 let resp = svc.describe_limits(&req).unwrap();
5697 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5698 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5699 }
5700
5701 #[test]
5702 fn backup_lifecycle() {
5703 let svc = make_service();
5704 create_test_table(&svc);
5705
5706 let req = make_request(
5708 "CreateBackup",
5709 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5710 );
5711 let resp = svc.create_backup(&req).unwrap();
5712 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5713 let backup_arn = body["BackupDetails"]["BackupArn"]
5714 .as_str()
5715 .unwrap()
5716 .to_string();
5717 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5718
5719 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5721 let resp = svc.describe_backup(&req).unwrap();
5722 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5723 assert_eq!(
5724 body["BackupDescription"]["BackupDetails"]["BackupName"],
5725 "my-backup"
5726 );
5727
5728 let req = make_request("ListBackups", json!({}));
5730 let resp = svc.list_backups(&req).unwrap();
5731 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5732 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5733
5734 let req = make_request(
5736 "RestoreTableFromBackup",
5737 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5738 );
5739 svc.restore_table_from_backup(&req).unwrap();
5740
5741 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5743 let resp = svc.describe_table(&req).unwrap();
5744 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5745 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5746
5747 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5749 svc.delete_backup(&req).unwrap();
5750
5751 let req = make_request("ListBackups", json!({}));
5753 let resp = svc.list_backups(&req).unwrap();
5754 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5755 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5756 }
5757
5758 #[test]
5759 fn continuous_backups() {
5760 let svc = make_service();
5761 create_test_table(&svc);
5762
5763 let req = make_request(
5765 "DescribeContinuousBackups",
5766 json!({ "TableName": "test-table" }),
5767 );
5768 let resp = svc.describe_continuous_backups(&req).unwrap();
5769 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5770 assert_eq!(
5771 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5772 ["PointInTimeRecoveryStatus"],
5773 "DISABLED"
5774 );
5775
5776 let req = make_request(
5778 "UpdateContinuousBackups",
5779 json!({
5780 "TableName": "test-table",
5781 "PointInTimeRecoverySpecification": {
5782 "PointInTimeRecoveryEnabled": true
5783 }
5784 }),
5785 );
5786 svc.update_continuous_backups(&req).unwrap();
5787
5788 let req = make_request(
5790 "DescribeContinuousBackups",
5791 json!({ "TableName": "test-table" }),
5792 );
5793 let resp = svc.describe_continuous_backups(&req).unwrap();
5794 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5795 assert_eq!(
5796 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5797 ["PointInTimeRecoveryStatus"],
5798 "ENABLED"
5799 );
5800 }
5801
5802 #[test]
5803 fn restore_table_to_point_in_time() {
5804 let svc = make_service();
5805 create_test_table(&svc);
5806
5807 let req = make_request(
5808 "RestoreTableToPointInTime",
5809 json!({
5810 "SourceTableName": "test-table",
5811 "TargetTableName": "pitr-restored"
5812 }),
5813 );
5814 svc.restore_table_to_point_in_time(&req).unwrap();
5815
5816 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5817 let resp = svc.describe_table(&req).unwrap();
5818 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5819 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5820 }
5821
5822 #[test]
5823 fn global_table_lifecycle() {
5824 let svc = make_service();
5825
5826 let req = make_request(
5828 "CreateGlobalTable",
5829 json!({
5830 "GlobalTableName": "my-global",
5831 "ReplicationGroup": [
5832 { "RegionName": "us-east-1" },
5833 { "RegionName": "eu-west-1" }
5834 ]
5835 }),
5836 );
5837 let resp = svc.create_global_table(&req).unwrap();
5838 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5839 assert_eq!(
5840 body["GlobalTableDescription"]["GlobalTableStatus"],
5841 "ACTIVE"
5842 );
5843
5844 let req = make_request(
5846 "DescribeGlobalTable",
5847 json!({ "GlobalTableName": "my-global" }),
5848 );
5849 let resp = svc.describe_global_table(&req).unwrap();
5850 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5851 assert_eq!(
5852 body["GlobalTableDescription"]["ReplicationGroup"]
5853 .as_array()
5854 .unwrap()
5855 .len(),
5856 2
5857 );
5858
5859 let req = make_request("ListGlobalTables", json!({}));
5861 let resp = svc.list_global_tables(&req).unwrap();
5862 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5863 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5864
5865 let req = make_request(
5867 "UpdateGlobalTable",
5868 json!({
5869 "GlobalTableName": "my-global",
5870 "ReplicaUpdates": [
5871 { "Create": { "RegionName": "ap-southeast-1" } }
5872 ]
5873 }),
5874 );
5875 let resp = svc.update_global_table(&req).unwrap();
5876 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5877 assert_eq!(
5878 body["GlobalTableDescription"]["ReplicationGroup"]
5879 .as_array()
5880 .unwrap()
5881 .len(),
5882 3
5883 );
5884
5885 let req = make_request(
5887 "DescribeGlobalTableSettings",
5888 json!({ "GlobalTableName": "my-global" }),
5889 );
5890 let resp = svc.describe_global_table_settings(&req).unwrap();
5891 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5892 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5893
5894 let req = make_request(
5896 "UpdateGlobalTableSettings",
5897 json!({ "GlobalTableName": "my-global" }),
5898 );
5899 svc.update_global_table_settings(&req).unwrap();
5900 }
5901
5902 #[test]
5903 fn table_replica_auto_scaling() {
5904 let svc = make_service();
5905 create_test_table(&svc);
5906
5907 let req = make_request(
5908 "DescribeTableReplicaAutoScaling",
5909 json!({ "TableName": "test-table" }),
5910 );
5911 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5912 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5913 assert_eq!(
5914 body["TableAutoScalingDescription"]["TableName"],
5915 "test-table"
5916 );
5917
5918 let req = make_request(
5919 "UpdateTableReplicaAutoScaling",
5920 json!({ "TableName": "test-table" }),
5921 );
5922 svc.update_table_replica_auto_scaling(&req).unwrap();
5923 }
5924
5925 #[test]
5926 fn kinesis_streaming_lifecycle() {
5927 let svc = make_service();
5928 create_test_table(&svc);
5929
5930 let req = make_request(
5932 "EnableKinesisStreamingDestination",
5933 json!({
5934 "TableName": "test-table",
5935 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5936 }),
5937 );
5938 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5939 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5940 assert_eq!(body["DestinationStatus"], "ACTIVE");
5941
5942 let req = make_request(
5944 "DescribeKinesisStreamingDestination",
5945 json!({ "TableName": "test-table" }),
5946 );
5947 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5948 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5949 assert_eq!(
5950 body["KinesisDataStreamDestinations"]
5951 .as_array()
5952 .unwrap()
5953 .len(),
5954 1
5955 );
5956
5957 let req = make_request(
5959 "UpdateKinesisStreamingDestination",
5960 json!({
5961 "TableName": "test-table",
5962 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5963 "UpdateKinesisStreamingConfiguration": {
5964 "ApproximateCreationDateTimePrecision": "MICROSECOND"
5965 }
5966 }),
5967 );
5968 svc.update_kinesis_streaming_destination(&req).unwrap();
5969
5970 let req = make_request(
5972 "DisableKinesisStreamingDestination",
5973 json!({
5974 "TableName": "test-table",
5975 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5976 }),
5977 );
5978 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5979 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5980 assert_eq!(body["DestinationStatus"], "DISABLED");
5981 }
5982
5983 #[test]
5984 fn contributor_insights_lifecycle() {
5985 let svc = make_service();
5986 create_test_table(&svc);
5987
5988 let req = make_request(
5990 "DescribeContributorInsights",
5991 json!({ "TableName": "test-table" }),
5992 );
5993 let resp = svc.describe_contributor_insights(&req).unwrap();
5994 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5995 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5996
5997 let req = make_request(
5999 "UpdateContributorInsights",
6000 json!({
6001 "TableName": "test-table",
6002 "ContributorInsightsAction": "ENABLE"
6003 }),
6004 );
6005 let resp = svc.update_contributor_insights(&req).unwrap();
6006 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6007 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6008
6009 let req = make_request("ListContributorInsights", json!({}));
6011 let resp = svc.list_contributor_insights(&req).unwrap();
6012 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013 assert_eq!(
6014 body["ContributorInsightsSummaries"]
6015 .as_array()
6016 .unwrap()
6017 .len(),
6018 1
6019 );
6020 }
6021
6022 #[test]
6023 fn export_lifecycle() {
6024 let svc = make_service();
6025 create_test_table(&svc);
6026
6027 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6028
6029 let req = make_request(
6031 "ExportTableToPointInTime",
6032 json!({
6033 "TableArn": table_arn,
6034 "S3Bucket": "my-bucket"
6035 }),
6036 );
6037 let resp = svc.export_table_to_point_in_time(&req).unwrap();
6038 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6039 let export_arn = body["ExportDescription"]["ExportArn"]
6040 .as_str()
6041 .unwrap()
6042 .to_string();
6043 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6044
6045 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6047 let resp = svc.describe_export(&req).unwrap();
6048 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6049 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6050
6051 let req = make_request("ListExports", json!({}));
6053 let resp = svc.list_exports(&req).unwrap();
6054 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6055 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6056 }
6057
6058 #[test]
6059 fn import_lifecycle() {
6060 let svc = make_service();
6061
6062 let req = make_request(
6063 "ImportTable",
6064 json!({
6065 "InputFormat": "DYNAMODB_JSON",
6066 "S3BucketSource": { "S3Bucket": "import-bucket" },
6067 "TableCreationParameters": {
6068 "TableName": "imported-table",
6069 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6070 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6071 }
6072 }),
6073 );
6074 let resp = svc.import_table(&req).unwrap();
6075 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6076 let import_arn = body["ImportTableDescription"]["ImportArn"]
6077 .as_str()
6078 .unwrap()
6079 .to_string();
6080 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6081
6082 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6084 let resp = svc.describe_import(&req).unwrap();
6085 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6086 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6087
6088 let req = make_request("ListImports", json!({}));
6090 let resp = svc.list_imports(&req).unwrap();
6091 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6092 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6093
6094 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6096 let resp = svc.describe_table(&req).unwrap();
6097 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6098 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6099 }
6100
6101 #[test]
6102 fn backup_restore_preserves_items() {
6103 let svc = make_service();
6104 create_test_table(&svc);
6105
6106 for i in 1..=3 {
6108 let req = make_request(
6109 "PutItem",
6110 json!({
6111 "TableName": "test-table",
6112 "Item": {
6113 "pk": { "S": format!("key{i}") },
6114 "data": { "S": format!("value{i}") }
6115 }
6116 }),
6117 );
6118 svc.put_item(&req).unwrap();
6119 }
6120
6121 let req = make_request(
6123 "CreateBackup",
6124 json!({
6125 "TableName": "test-table",
6126 "BackupName": "my-backup"
6127 }),
6128 );
6129 let resp = svc.create_backup(&req).unwrap();
6130 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6131 let backup_arn = body["BackupDetails"]["BackupArn"]
6132 .as_str()
6133 .unwrap()
6134 .to_string();
6135
6136 for i in 1..=3 {
6138 let req = make_request(
6139 "DeleteItem",
6140 json!({
6141 "TableName": "test-table",
6142 "Key": { "pk": { "S": format!("key{i}") } }
6143 }),
6144 );
6145 svc.delete_item(&req).unwrap();
6146 }
6147
6148 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6150 let resp = svc.scan(&req).unwrap();
6151 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6152 assert_eq!(body["Count"], 0);
6153
6154 let req = make_request(
6156 "RestoreTableFromBackup",
6157 json!({
6158 "BackupArn": backup_arn,
6159 "TargetTableName": "restored-table"
6160 }),
6161 );
6162 svc.restore_table_from_backup(&req).unwrap();
6163
6164 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6166 let resp = svc.scan(&req).unwrap();
6167 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168 assert_eq!(body["Count"], 3);
6169 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6170 }
6171
6172 #[test]
6173 fn global_table_replicates_writes() {
6174 let svc = make_service();
6175 create_test_table(&svc);
6176
6177 let req = make_request(
6179 "CreateGlobalTable",
6180 json!({
6181 "GlobalTableName": "test-table",
6182 "ReplicationGroup": [
6183 { "RegionName": "us-east-1" },
6184 { "RegionName": "eu-west-1" }
6185 ]
6186 }),
6187 );
6188 let resp = svc.create_global_table(&req).unwrap();
6189 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6190 assert_eq!(
6191 body["GlobalTableDescription"]["GlobalTableStatus"],
6192 "ACTIVE"
6193 );
6194
6195 let req = make_request(
6197 "PutItem",
6198 json!({
6199 "TableName": "test-table",
6200 "Item": {
6201 "pk": { "S": "replicated-key" },
6202 "data": { "S": "replicated-value" }
6203 }
6204 }),
6205 );
6206 svc.put_item(&req).unwrap();
6207
6208 let req = make_request(
6210 "GetItem",
6211 json!({
6212 "TableName": "test-table",
6213 "Key": { "pk": { "S": "replicated-key" } }
6214 }),
6215 );
6216 let resp = svc.get_item(&req).unwrap();
6217 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6219 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6220 }
6221
6222 #[test]
6223 fn contributor_insights_tracks_access() {
6224 let svc = make_service();
6225 create_test_table(&svc);
6226
6227 let req = make_request(
6229 "UpdateContributorInsights",
6230 json!({
6231 "TableName": "test-table",
6232 "ContributorInsightsAction": "ENABLE"
6233 }),
6234 );
6235 svc.update_contributor_insights(&req).unwrap();
6236
6237 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6239 let req = make_request(
6240 "PutItem",
6241 json!({
6242 "TableName": "test-table",
6243 "Item": {
6244 "pk": { "S": key },
6245 "data": { "S": "value" }
6246 }
6247 }),
6248 );
6249 svc.put_item(&req).unwrap();
6250 }
6251
6252 for _ in 0..3 {
6254 let req = make_request(
6255 "GetItem",
6256 json!({
6257 "TableName": "test-table",
6258 "Key": { "pk": { "S": "alpha" } }
6259 }),
6260 );
6261 svc.get_item(&req).unwrap();
6262 }
6263
6264 let req = make_request(
6266 "DescribeContributorInsights",
6267 json!({ "TableName": "test-table" }),
6268 );
6269 let resp = svc.describe_contributor_insights(&req).unwrap();
6270 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6271 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6272
6273 let contributors = body["TopContributors"].as_array().unwrap();
6274 assert!(
6275 !contributors.is_empty(),
6276 "TopContributors should not be empty"
6277 );
6278
6279 let top = &contributors[0];
6282 assert!(top["Count"].as_u64().unwrap() > 0);
6283
6284 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6286 assert!(!rules.is_empty());
6287 }
6288
6289 #[test]
6290 fn contributor_insights_not_tracked_when_disabled() {
6291 let svc = make_service();
6292 create_test_table(&svc);
6293
6294 let req = make_request(
6296 "PutItem",
6297 json!({
6298 "TableName": "test-table",
6299 "Item": {
6300 "pk": { "S": "key1" },
6301 "data": { "S": "value" }
6302 }
6303 }),
6304 );
6305 svc.put_item(&req).unwrap();
6306
6307 let req = make_request(
6309 "DescribeContributorInsights",
6310 json!({ "TableName": "test-table" }),
6311 );
6312 let resp = svc.describe_contributor_insights(&req).unwrap();
6313 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6314 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6315
6316 let contributors = body["TopContributors"].as_array().unwrap();
6317 assert!(contributors.is_empty());
6318 }
6319
6320 #[test]
6321 fn contributor_insights_disabled_table_no_counters_after_scan() {
6322 let svc = make_service();
6323 create_test_table(&svc);
6324
6325 for key in &["alpha", "beta"] {
6327 let req = make_request(
6328 "PutItem",
6329 json!({
6330 "TableName": "test-table",
6331 "Item": { "pk": { "S": key } }
6332 }),
6333 );
6334 svc.put_item(&req).unwrap();
6335 }
6336
6337 let req = make_request(
6339 "UpdateContributorInsights",
6340 json!({
6341 "TableName": "test-table",
6342 "ContributorInsightsAction": "ENABLE"
6343 }),
6344 );
6345 svc.update_contributor_insights(&req).unwrap();
6346
6347 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6349 svc.scan(&req).unwrap();
6350
6351 let req = make_request(
6353 "DescribeContributorInsights",
6354 json!({ "TableName": "test-table" }),
6355 );
6356 let resp = svc.describe_contributor_insights(&req).unwrap();
6357 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6358 let contributors = body["TopContributors"].as_array().unwrap();
6359 assert!(
6360 !contributors.is_empty(),
6361 "counters should be non-empty while enabled"
6362 );
6363
6364 let req = make_request(
6366 "UpdateContributorInsights",
6367 json!({
6368 "TableName": "test-table",
6369 "ContributorInsightsAction": "DISABLE"
6370 }),
6371 );
6372 svc.update_contributor_insights(&req).unwrap();
6373
6374 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6376 svc.scan(&req).unwrap();
6377
6378 let req = make_request(
6380 "DescribeContributorInsights",
6381 json!({ "TableName": "test-table" }),
6382 );
6383 let resp = svc.describe_contributor_insights(&req).unwrap();
6384 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6385 let contributors = body["TopContributors"].as_array().unwrap();
6386 assert!(
6387 contributors.is_empty(),
6388 "counters should be empty after disabling insights"
6389 );
6390 }
6391
6392 #[test]
6393 fn scan_pagination_with_limit() {
6394 let svc = make_service();
6395 create_test_table(&svc);
6396
6397 for i in 0..5 {
6399 let req = make_request(
6400 "PutItem",
6401 json!({
6402 "TableName": "test-table",
6403 "Item": {
6404 "pk": { "S": format!("item{i}") },
6405 "data": { "S": format!("value{i}") }
6406 }
6407 }),
6408 );
6409 svc.put_item(&req).unwrap();
6410 }
6411
6412 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6414 let resp = svc.scan(&req).unwrap();
6415 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6416 assert_eq!(body["Count"], 2);
6417 assert!(
6418 body["LastEvaluatedKey"].is_object(),
6419 "should have LastEvaluatedKey when limit < total items"
6420 );
6421 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6422
6423 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6425 let mut lek = body["LastEvaluatedKey"].clone();
6426
6427 while lek.is_object() {
6428 let req = make_request(
6429 "Scan",
6430 json!({
6431 "TableName": "test-table",
6432 "Limit": 2,
6433 "ExclusiveStartKey": lek
6434 }),
6435 );
6436 let resp = svc.scan(&req).unwrap();
6437 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6438 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6439 lek = body["LastEvaluatedKey"].clone();
6440 }
6441
6442 assert_eq!(
6443 all_items.len(),
6444 5,
6445 "should retrieve all 5 items via pagination"
6446 );
6447 }
6448
6449 #[test]
6450 fn scan_no_pagination_when_all_fit() {
6451 let svc = make_service();
6452 create_test_table(&svc);
6453
6454 for i in 0..3 {
6455 let req = make_request(
6456 "PutItem",
6457 json!({
6458 "TableName": "test-table",
6459 "Item": {
6460 "pk": { "S": format!("item{i}") }
6461 }
6462 }),
6463 );
6464 svc.put_item(&req).unwrap();
6465 }
6466
6467 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6469 let resp = svc.scan(&req).unwrap();
6470 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6471 assert_eq!(body["Count"], 3);
6472 assert!(
6473 body["LastEvaluatedKey"].is_null(),
6474 "should not have LastEvaluatedKey when all items fit"
6475 );
6476
6477 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6479 let resp = svc.scan(&req).unwrap();
6480 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6481 assert_eq!(body["Count"], 3);
6482 assert!(body["LastEvaluatedKey"].is_null());
6483 }
6484
6485 fn create_composite_table(svc: &DynamoDbService) {
6486 let req = make_request(
6487 "CreateTable",
6488 json!({
6489 "TableName": "composite-table",
6490 "KeySchema": [
6491 { "AttributeName": "pk", "KeyType": "HASH" },
6492 { "AttributeName": "sk", "KeyType": "RANGE" }
6493 ],
6494 "AttributeDefinitions": [
6495 { "AttributeName": "pk", "AttributeType": "S" },
6496 { "AttributeName": "sk", "AttributeType": "S" }
6497 ],
6498 "BillingMode": "PAY_PER_REQUEST"
6499 }),
6500 );
6501 svc.create_table(&req).unwrap();
6502 }
6503
6504 #[test]
6505 fn query_pagination_with_composite_key() {
6506 let svc = make_service();
6507 create_composite_table(&svc);
6508
6509 for i in 0..5 {
6511 let req = make_request(
6512 "PutItem",
6513 json!({
6514 "TableName": "composite-table",
6515 "Item": {
6516 "pk": { "S": "user1" },
6517 "sk": { "S": format!("item{i:03}") },
6518 "data": { "S": format!("value{i}") }
6519 }
6520 }),
6521 );
6522 svc.put_item(&req).unwrap();
6523 }
6524
6525 let req = make_request(
6527 "Query",
6528 json!({
6529 "TableName": "composite-table",
6530 "KeyConditionExpression": "pk = :pk",
6531 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6532 "Limit": 2
6533 }),
6534 );
6535 let resp = svc.query(&req).unwrap();
6536 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6537 assert_eq!(body["Count"], 2);
6538 assert!(body["LastEvaluatedKey"].is_object());
6539 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6540 assert!(body["LastEvaluatedKey"]["sk"].is_object());
6541
6542 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6544 let mut lek = body["LastEvaluatedKey"].clone();
6545
6546 while lek.is_object() {
6547 let req = make_request(
6548 "Query",
6549 json!({
6550 "TableName": "composite-table",
6551 "KeyConditionExpression": "pk = :pk",
6552 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6553 "Limit": 2,
6554 "ExclusiveStartKey": lek
6555 }),
6556 );
6557 let resp = svc.query(&req).unwrap();
6558 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6559 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6560 lek = body["LastEvaluatedKey"].clone();
6561 }
6562
6563 assert_eq!(
6564 all_items.len(),
6565 5,
6566 "should retrieve all 5 items via pagination"
6567 );
6568
6569 let sks: Vec<String> = all_items
6571 .iter()
6572 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6573 .collect();
6574 let mut sorted = sks.clone();
6575 sorted.sort();
6576 assert_eq!(sks, sorted, "items should be sorted by sort key");
6577 }
6578
6579 #[test]
6580 fn query_no_pagination_when_all_fit() {
6581 let svc = make_service();
6582 create_composite_table(&svc);
6583
6584 for i in 0..2 {
6585 let req = make_request(
6586 "PutItem",
6587 json!({
6588 "TableName": "composite-table",
6589 "Item": {
6590 "pk": { "S": "user1" },
6591 "sk": { "S": format!("item{i}") }
6592 }
6593 }),
6594 );
6595 svc.put_item(&req).unwrap();
6596 }
6597
6598 let req = make_request(
6599 "Query",
6600 json!({
6601 "TableName": "composite-table",
6602 "KeyConditionExpression": "pk = :pk",
6603 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6604 "Limit": 10
6605 }),
6606 );
6607 let resp = svc.query(&req).unwrap();
6608 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6609 assert_eq!(body["Count"], 2);
6610 assert!(
6611 body["LastEvaluatedKey"].is_null(),
6612 "should not have LastEvaluatedKey when all items fit"
6613 );
6614 }
6615
6616 fn create_gsi_table(svc: &DynamoDbService) {
6617 let req = make_request(
6618 "CreateTable",
6619 json!({
6620 "TableName": "gsi-table",
6621 "KeySchema": [
6622 { "AttributeName": "pk", "KeyType": "HASH" }
6623 ],
6624 "AttributeDefinitions": [
6625 { "AttributeName": "pk", "AttributeType": "S" },
6626 { "AttributeName": "gsi_pk", "AttributeType": "S" },
6627 { "AttributeName": "gsi_sk", "AttributeType": "S" }
6628 ],
6629 "BillingMode": "PAY_PER_REQUEST",
6630 "GlobalSecondaryIndexes": [
6631 {
6632 "IndexName": "gsi-index",
6633 "KeySchema": [
6634 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6635 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6636 ],
6637 "Projection": { "ProjectionType": "ALL" }
6638 }
6639 ]
6640 }),
6641 );
6642 svc.create_table(&req).unwrap();
6643 }
6644
6645 #[test]
6646 fn gsi_query_last_evaluated_key_includes_table_pk() {
6647 let svc = make_service();
6648 create_gsi_table(&svc);
6649
6650 for i in 0..3 {
6652 let req = make_request(
6653 "PutItem",
6654 json!({
6655 "TableName": "gsi-table",
6656 "Item": {
6657 "pk": { "S": format!("item{i}") },
6658 "gsi_pk": { "S": "shared" },
6659 "gsi_sk": { "S": "sort" }
6660 }
6661 }),
6662 );
6663 svc.put_item(&req).unwrap();
6664 }
6665
6666 let req = make_request(
6668 "Query",
6669 json!({
6670 "TableName": "gsi-table",
6671 "IndexName": "gsi-index",
6672 "KeyConditionExpression": "gsi_pk = :v",
6673 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6674 "Limit": 1
6675 }),
6676 );
6677 let resp = svc.query(&req).unwrap();
6678 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6679 assert_eq!(body["Count"], 1);
6680 let lek = &body["LastEvaluatedKey"];
6681 assert!(lek.is_object(), "should have LastEvaluatedKey");
6682 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6684 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6685 assert!(
6687 lek["pk"].is_object(),
6688 "LEK must contain table PK for GSI queries"
6689 );
6690 }
6691
6692 #[test]
6693 fn gsi_query_pagination_returns_all_items() {
6694 let svc = make_service();
6695 create_gsi_table(&svc);
6696
6697 for i in 0..4 {
6699 let req = make_request(
6700 "PutItem",
6701 json!({
6702 "TableName": "gsi-table",
6703 "Item": {
6704 "pk": { "S": format!("item{i:03}") },
6705 "gsi_pk": { "S": "shared" },
6706 "gsi_sk": { "S": "sort" }
6707 }
6708 }),
6709 );
6710 svc.put_item(&req).unwrap();
6711 }
6712
6713 let mut all_pks = Vec::new();
6715 let mut lek: Option<Value> = None;
6716
6717 loop {
6718 let mut query = json!({
6719 "TableName": "gsi-table",
6720 "IndexName": "gsi-index",
6721 "KeyConditionExpression": "gsi_pk = :v",
6722 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6723 "Limit": 2
6724 });
6725 if let Some(ref start_key) = lek {
6726 query["ExclusiveStartKey"] = start_key.clone();
6727 }
6728
6729 let req = make_request("Query", query);
6730 let resp = svc.query(&req).unwrap();
6731 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6732
6733 for item in body["Items"].as_array().unwrap() {
6734 let pk = item["pk"]["S"].as_str().unwrap().to_string();
6735 all_pks.push(pk);
6736 }
6737
6738 if body["LastEvaluatedKey"].is_object() {
6739 lek = Some(body["LastEvaluatedKey"].clone());
6740 } else {
6741 break;
6742 }
6743 }
6744
6745 all_pks.sort();
6746 assert_eq!(
6747 all_pks,
6748 vec!["item000", "item001", "item002", "item003"],
6749 "pagination should return all items without duplicates"
6750 );
6751 }
6752}