1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21 ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25 state: SharedDynamoDbState,
26 s3_state: Option<SharedS3State>,
27 delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31 pub fn new(state: SharedDynamoDbState) -> Self {
32 Self {
33 state,
34 s3_state: None,
35 delivery: None,
36 }
37 }
38
39 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40 self.s3_state = Some(s3_state);
41 self
42 }
43
44 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45 self.delivery = Some(delivery);
46 self
47 }
48
49 fn deliver_to_kinesis_destinations(
51 &self,
52 table: &DynamoTable,
53 event_name: &str,
54 keys: &HashMap<String, AttributeValue>,
55 old_image: Option<&HashMap<String, AttributeValue>>,
56 new_image: Option<&HashMap<String, AttributeValue>>,
57 ) {
58 let delivery = match &self.delivery {
59 Some(d) => d,
60 None => return,
61 };
62
63 let active_destinations: Vec<_> = table
64 .kinesis_destinations
65 .iter()
66 .filter(|d| d.destination_status == "ACTIVE")
67 .collect();
68
69 if active_destinations.is_empty() {
70 return;
71 }
72
73 let mut record = json!({
74 "eventID": uuid::Uuid::new_v4().to_string(),
75 "eventName": event_name,
76 "eventVersion": "1.1",
77 "eventSource": "aws:dynamodb",
78 "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79 "dynamodb": {
80 "Keys": keys,
81 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83 "StreamViewType": "NEW_AND_OLD_IMAGES",
84 },
85 "eventSourceARN": &table.arn,
86 "tableName": &table.name,
87 });
88
89 if let Some(old) = old_image {
90 record["dynamodb"]["OldImage"] = json!(old);
91 }
92 if let Some(new) = new_image {
93 record["dynamodb"]["NewImage"] = json!(new);
94 }
95
96 let record_str = serde_json::to_string(&record).unwrap_or_default();
97 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98 let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100 for dest in active_destinations {
101 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102 }
103 }
104
105 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106 serde_json::from_slice(&req.body).map_err(|e| {
107 AwsServiceError::aws_error(
108 StatusCode::BAD_REQUEST,
109 "SerializationException",
110 format!("Invalid JSON: {e}"),
111 )
112 })
113 }
114
115 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116 Ok(AwsResponse::ok_json(body))
117 }
118
119 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122 let body = Self::parse_body(req)?;
123
124 let table_name = body["TableName"]
125 .as_str()
126 .ok_or_else(|| {
127 AwsServiceError::aws_error(
128 StatusCode::BAD_REQUEST,
129 "ValidationException",
130 "TableName is required",
131 )
132 })?
133 .to_string();
134
135 let key_schema = parse_key_schema(&body["KeySchema"])?;
136 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138 for ks in &key_schema {
140 if !attribute_definitions
141 .iter()
142 .any(|ad| ad.attribute_name == ks.attribute_name)
143 {
144 return Err(AwsServiceError::aws_error(
145 StatusCode::BAD_REQUEST,
146 "ValidationException",
147 format!(
148 "One or more parameter values were invalid: \
149 Some index key attributes are not defined in AttributeDefinitions. \
150 Keys: [{}], AttributeDefinitions: [{}]",
151 ks.attribute_name,
152 attribute_definitions
153 .iter()
154 .map(|ad| ad.attribute_name.as_str())
155 .collect::<Vec<_>>()
156 .join(", ")
157 ),
158 ));
159 }
160 }
161
162 let billing_mode = body["BillingMode"]
163 .as_str()
164 .unwrap_or("PROVISIONED")
165 .to_string();
166
167 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168 ProvisionedThroughput {
169 read_capacity_units: 0,
170 write_capacity_units: 0,
171 }
172 } else {
173 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174 };
175
176 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178 let tags = parse_tags(&body["Tags"]);
179
180 let (stream_enabled, stream_view_type) =
182 if let Some(stream_spec) = body.get("StreamSpecification") {
183 let enabled = stream_spec
184 .get("StreamEnabled")
185 .and_then(|v| v.as_bool())
186 .unwrap_or(false);
187 let view_type = if enabled {
188 stream_spec
189 .get("StreamViewType")
190 .and_then(|v| v.as_str())
191 .map(|s| s.to_string())
192 } else {
193 None
194 };
195 (enabled, view_type)
196 } else {
197 (false, None)
198 };
199
200 let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202 let enabled = sse_spec
203 .get("Enabled")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false);
206 if enabled {
207 let sse_type = sse_spec
208 .get("SSEType")
209 .and_then(|v| v.as_str())
210 .unwrap_or("KMS")
211 .to_string();
212 let kms_key = sse_spec
213 .get("KMSMasterKeyId")
214 .and_then(|v| v.as_str())
215 .map(|s| s.to_string());
216 (Some(sse_type), kms_key)
217 } else {
218 (None, None)
219 }
220 } else {
221 (None, None)
222 };
223
224 let mut state = self.state.write();
225
226 if state.tables.contains_key(&table_name) {
227 return Err(AwsServiceError::aws_error(
228 StatusCode::BAD_REQUEST,
229 "ResourceInUseException",
230 format!("Table already exists: {table_name}"),
231 ));
232 }
233
234 let now = Utc::now();
235 let arn = format!(
236 "arn:aws:dynamodb:{}:{}:table/{}",
237 state.region, state.account_id, table_name
238 );
239 let stream_arn = if stream_enabled {
240 Some(format!(
241 "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242 state.region,
243 state.account_id,
244 table_name,
245 now.format("%Y-%m-%dT%H:%M:%S.%3f")
246 ))
247 } else {
248 None
249 };
250
251 let table = DynamoTable {
252 name: table_name.clone(),
253 arn: arn.clone(),
254 key_schema: key_schema.clone(),
255 attribute_definitions: attribute_definitions.clone(),
256 provisioned_throughput: provisioned_throughput.clone(),
257 items: Vec::new(),
258 gsi: gsi.clone(),
259 lsi: lsi.clone(),
260 tags,
261 created_at: now,
262 status: "ACTIVE".to_string(),
263 item_count: 0,
264 size_bytes: 0,
265 billing_mode: billing_mode.clone(),
266 ttl_attribute: None,
267 ttl_enabled: false,
268 resource_policy: None,
269 pitr_enabled: false,
270 kinesis_destinations: Vec::new(),
271 contributor_insights_status: "DISABLED".to_string(),
272 contributor_insights_counters: HashMap::new(),
273 stream_enabled,
274 stream_view_type,
275 stream_arn,
276 stream_records: Arc::new(RwLock::new(Vec::new())),
277 sse_type,
278 sse_kms_key_arn,
279 };
280
281 state.tables.insert(table_name, table);
282
283 let table_desc = build_table_description_json(
284 &arn,
285 &key_schema,
286 &attribute_definitions,
287 &provisioned_throughput,
288 &gsi,
289 &lsi,
290 &billing_mode,
291 now,
292 0,
293 0,
294 "ACTIVE",
295 );
296
297 Self::ok_json(json!({ "TableDescription": table_desc }))
298 }
299
300 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = Self::parse_body(req)?;
302 let table_name = require_str(&body, "TableName")?;
303
304 let mut state = self.state.write();
305 let table = state.tables.remove(table_name).ok_or_else(|| {
306 AwsServiceError::aws_error(
307 StatusCode::BAD_REQUEST,
308 "ResourceNotFoundException",
309 format!("Requested resource not found: Table: {table_name} not found"),
310 )
311 })?;
312
313 let table_desc = build_table_description_json(
314 &table.arn,
315 &table.key_schema,
316 &table.attribute_definitions,
317 &table.provisioned_throughput,
318 &table.gsi,
319 &table.lsi,
320 &table.billing_mode,
321 table.created_at,
322 table.item_count,
323 table.size_bytes,
324 "DELETING",
325 );
326
327 Self::ok_json(json!({ "TableDescription": table_desc }))
328 }
329
330 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331 let body = Self::parse_body(req)?;
332 let table_name = require_str(&body, "TableName")?;
333
334 let state = self.state.read();
335 let table = get_table(&state.tables, table_name)?;
336
337 let table_desc = build_table_description(table);
338
339 Self::ok_json(json!({ "Table": table_desc }))
340 }
341
342 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343 let body = Self::parse_body(req)?;
344
345 validate_optional_string_length(
346 "exclusiveStartTableName",
347 body["ExclusiveStartTableName"].as_str(),
348 3,
349 255,
350 )?;
351 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354 let exclusive_start = body["ExclusiveStartTableName"]
355 .as_str()
356 .map(|s| s.to_string());
357
358 let state = self.state.read();
359 let mut names: Vec<&String> = state.tables.keys().collect();
360 names.sort();
361
362 let start_idx = match &exclusive_start {
363 Some(start) => names
364 .iter()
365 .position(|n| n.as_str() > start.as_str())
366 .unwrap_or(names.len()),
367 None => 0,
368 };
369
370 let page: Vec<&str> = names
371 .iter()
372 .skip(start_idx)
373 .take(limit)
374 .map(|n| n.as_str())
375 .collect();
376
377 let mut result = json!({ "TableNames": page });
378
379 if start_idx + limit < names.len() {
380 if let Some(last) = page.last() {
381 result["LastEvaluatedTableName"] = json!(last);
382 }
383 }
384
385 Self::ok_json(result)
386 }
387
388 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389 let body = Self::parse_body(req)?;
390 let table_name = require_str(&body, "TableName")?;
391
392 let mut state = self.state.write();
393 let table = state.tables.get_mut(table_name).ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::BAD_REQUEST,
396 "ResourceNotFoundException",
397 format!("Requested resource not found: Table: {table_name} not found"),
398 )
399 })?;
400
401 if let Some(pt) = body.get("ProvisionedThroughput") {
402 if let Ok(throughput) = parse_provisioned_throughput(pt) {
403 table.provisioned_throughput = throughput;
404 }
405 }
406
407 if let Some(bm) = body["BillingMode"].as_str() {
408 table.billing_mode = bm.to_string();
409 }
410
411 if let Some(sse_spec) = body.get("SSESpecification") {
413 let enabled = sse_spec
414 .get("Enabled")
415 .and_then(|v| v.as_bool())
416 .unwrap_or(false);
417 if enabled {
418 table.sse_type = Some(
419 sse_spec
420 .get("SSEType")
421 .and_then(|v| v.as_str())
422 .unwrap_or("KMS")
423 .to_string(),
424 );
425 table.sse_kms_key_arn = sse_spec
426 .get("KMSMasterKeyId")
427 .and_then(|v| v.as_str())
428 .map(|s| s.to_string());
429 } else {
430 table.sse_type = None;
431 table.sse_kms_key_arn = None;
432 }
433 }
434
435 let table_desc = build_table_description_json(
436 &table.arn,
437 &table.key_schema,
438 &table.attribute_definitions,
439 &table.provisioned_throughput,
440 &table.gsi,
441 &table.lsi,
442 &table.billing_mode,
443 table.created_at,
444 table.item_count,
445 table.size_bytes,
446 &table.status,
447 );
448
449 Self::ok_json(json!({ "TableDescription": table_desc }))
450 }
451
452 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455 let body = Self::parse_body(req)?;
457 let table_name = require_str(&body, "TableName")?;
458 let item = require_object(&body, "Item")?;
459 let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460 let expr_attr_names = parse_expression_attribute_names(&body);
461 let expr_attr_values = parse_expression_attribute_values(&body);
462 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464 let (old_item, kinesis_info) = {
467 let mut state = self.state.write();
468 let region = state.region.clone();
469 let table = get_table_mut(&mut state.tables, table_name)?;
470
471 validate_key_in_item(table, &item)?;
472
473 let key = extract_key(table, &item);
474 let existing_idx = table.find_item_index(&key);
475
476 if let Some(ref cond) = condition {
477 let existing = existing_idx.map(|i| &table.items[i]);
478 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
479 }
480
481 let old_item_for_return = if return_values == "ALL_OLD" {
482 existing_idx.map(|i| table.items[i].clone())
483 } else {
484 None
485 };
486
487 let needs_change_capture = table.stream_enabled
489 || table
490 .kinesis_destinations
491 .iter()
492 .any(|d| d.destination_status == "ACTIVE");
493 let old_item_for_stream = if needs_change_capture {
494 existing_idx.map(|i| table.items[i].clone())
495 } else {
496 None
497 };
498
499 let is_modify = existing_idx.is_some();
500
501 if let Some(idx) = existing_idx {
502 table.items[idx] = item.clone();
503 } else {
504 table.items.push(item.clone());
505 }
506
507 table.record_item_access(&item);
508 table.recalculate_stats();
509
510 let event_name = if is_modify { "MODIFY" } else { "INSERT" };
511 let key = extract_key(table, &item);
512
513 if table.stream_enabled {
515 if let Some(record) = crate::streams::generate_stream_record(
516 table,
517 event_name,
518 key.clone(),
519 old_item_for_stream.clone(),
520 Some(item.clone()),
521 ®ion,
522 ) {
523 crate::streams::add_stream_record(table, record);
524 }
525 }
526
527 let kinesis_info = if table
529 .kinesis_destinations
530 .iter()
531 .any(|d| d.destination_status == "ACTIVE")
532 {
533 Some((
534 table.clone(),
535 event_name.to_string(),
536 key,
537 old_item_for_stream,
538 Some(item.clone()),
539 ))
540 } else {
541 None
542 };
543
544 (old_item_for_return, kinesis_info)
545 };
546 if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
550 self.deliver_to_kinesis_destinations(
551 &table,
552 &event_name,
553 &keys,
554 old_image.as_ref(),
555 new_image.as_ref(),
556 );
557 }
558
559 let mut result = json!({});
560 if let Some(old) = old_item {
561 result["Attributes"] = json!(old);
562 }
563
564 Self::ok_json(result)
565 }
566
567 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568 let body = Self::parse_body(req)?;
570 let table_name = require_str(&body, "TableName")?;
571 let key = require_object(&body, "Key")?;
572
573 let (result, needs_insights) = {
575 let state = self.state.read();
576 let table = get_table(&state.tables, table_name)?;
577 let needs_insights = table.contributor_insights_status == "ENABLED";
578
579 let result = match table.find_item_index(&key) {
580 Some(idx) => {
581 let item = &table.items[idx];
582 let projected = project_item(item, &body);
583 json!({ "Item": projected })
584 }
585 None => json!({}),
586 };
587 (result, needs_insights)
588 };
589 if needs_insights {
593 let mut state = self.state.write();
594 if let Some(table) = state.tables.get_mut(table_name) {
595 table.record_key_access(&key);
596 }
597 }
598
599 Self::ok_json(result)
600 }
601
602 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603 let body = Self::parse_body(req)?;
604
605 validate_optional_enum_value(
606 "conditionalOperator",
607 &body["ConditionalOperator"],
608 &["AND", "OR"],
609 )?;
610 validate_optional_enum_value(
611 "returnConsumedCapacity",
612 &body["ReturnConsumedCapacity"],
613 &["INDEXES", "TOTAL", "NONE"],
614 )?;
615 validate_optional_enum_value(
616 "returnValues",
617 &body["ReturnValues"],
618 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
619 )?;
620 validate_optional_enum_value(
621 "returnItemCollectionMetrics",
622 &body["ReturnItemCollectionMetrics"],
623 &["SIZE", "NONE"],
624 )?;
625 validate_optional_enum_value(
626 "returnValuesOnConditionCheckFailure",
627 &body["ReturnValuesOnConditionCheckFailure"],
628 &["ALL_OLD", "NONE"],
629 )?;
630
631 let table_name = require_str(&body, "TableName")?;
632 let key = require_object(&body, "Key")?;
633
634 let (result, kinesis_info) = {
635 let mut state = self.state.write();
636 let region = state.region.clone();
637 let table = get_table_mut(&mut state.tables, table_name)?;
638
639 let condition = body["ConditionExpression"].as_str();
640 let expr_attr_names = parse_expression_attribute_names(&body);
641 let expr_attr_values = parse_expression_attribute_values(&body);
642
643 let existing_idx = table.find_item_index(&key);
644
645 if let Some(cond) = condition {
646 let existing = existing_idx.map(|i| &table.items[i]);
647 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
648 }
649
650 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
651
652 let mut result = json!({});
653 let mut kinesis_info = None;
654
655 if let Some(idx) = existing_idx {
656 let old_item = table.items[idx].clone();
657 if return_values == "ALL_OLD" {
658 result["Attributes"] = json!(old_item.clone());
659 }
660
661 if table.stream_enabled {
663 if let Some(record) = crate::streams::generate_stream_record(
664 table,
665 "REMOVE",
666 key.clone(),
667 Some(old_item.clone()),
668 None,
669 ®ion,
670 ) {
671 crate::streams::add_stream_record(table, record);
672 }
673 }
674
675 if table
677 .kinesis_destinations
678 .iter()
679 .any(|d| d.destination_status == "ACTIVE")
680 {
681 kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
682 }
683
684 table.items.remove(idx);
685 table.recalculate_stats();
686 }
687
688 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
689 let return_icm = body["ReturnItemCollectionMetrics"]
690 .as_str()
691 .unwrap_or("NONE");
692
693 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
694 result["ConsumedCapacity"] = json!({
695 "TableName": table_name,
696 "CapacityUnits": 1.0,
697 });
698 }
699
700 if return_icm == "SIZE" {
701 result["ItemCollectionMetrics"] = json!({});
702 }
703
704 (result, kinesis_info)
705 };
706
707 if let Some((table, keys, old_image)) = kinesis_info {
709 self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
710 }
711
712 Self::ok_json(result)
713 }
714
715 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
716 let body = Self::parse_body(req)?;
717 let table_name = require_str(&body, "TableName")?;
718 let key = require_object(&body, "Key")?;
719
720 let mut state = self.state.write();
721 let region = state.region.clone();
722 let table = get_table_mut(&mut state.tables, table_name)?;
723
724 validate_key_attributes_in_key(table, &key)?;
725
726 let condition = body["ConditionExpression"].as_str();
727 let expr_attr_names = parse_expression_attribute_names(&body);
728 let expr_attr_values = parse_expression_attribute_values(&body);
729 let update_expression = body["UpdateExpression"].as_str();
730
731 let existing_idx = table.find_item_index(&key);
732
733 if let Some(cond) = condition {
734 let existing = existing_idx.map(|i| &table.items[i]);
735 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
736 }
737
738 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
739
740 let is_insert = existing_idx.is_none();
741 let idx = match existing_idx {
742 Some(i) => i,
743 None => {
744 let mut new_item = HashMap::new();
745 for (k, v) in &key {
746 new_item.insert(k.clone(), v.clone());
747 }
748 table.items.push(new_item);
749 table.items.len() - 1
750 }
751 };
752
753 let needs_change_capture = table.stream_enabled
755 || table
756 .kinesis_destinations
757 .iter()
758 .any(|d| d.destination_status == "ACTIVE");
759 let old_item_for_stream = if needs_change_capture {
760 Some(table.items[idx].clone())
761 } else {
762 None
763 };
764
765 let old_item = if return_values == "ALL_OLD" {
766 Some(table.items[idx].clone())
767 } else {
768 None
769 };
770
771 if let Some(expr) = update_expression {
772 apply_update_expression(
773 &mut table.items[idx],
774 expr,
775 &expr_attr_names,
776 &expr_attr_values,
777 )?;
778 }
779
780 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
781 Some(table.items[idx].clone())
782 } else {
783 None
784 };
785
786 let event_name = if is_insert { "INSERT" } else { "MODIFY" };
787 let new_item_for_stream = table.items[idx].clone();
788
789 if table.stream_enabled {
791 if let Some(record) = crate::streams::generate_stream_record(
792 table,
793 event_name,
794 key.clone(),
795 old_item_for_stream.clone(),
796 Some(new_item_for_stream.clone()),
797 ®ion,
798 ) {
799 crate::streams::add_stream_record(table, record);
800 }
801 }
802
803 let kinesis_info = if table
805 .kinesis_destinations
806 .iter()
807 .any(|d| d.destination_status == "ACTIVE")
808 {
809 Some((
810 table.clone(),
811 event_name.to_string(),
812 key.clone(),
813 old_item_for_stream,
814 Some(new_item_for_stream),
815 ))
816 } else {
817 None
818 };
819
820 table.recalculate_stats();
821
822 drop(state);
824
825 if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
827 self.deliver_to_kinesis_destinations(
828 &tbl,
829 &ev,
830 &keys,
831 old_image.as_ref(),
832 new_image.as_ref(),
833 );
834 }
835
836 let mut result = json!({});
837 if let Some(old) = old_item {
838 result["Attributes"] = json!(old);
839 } else if let Some(new) = new_item {
840 result["Attributes"] = json!(new);
841 }
842
843 Self::ok_json(result)
844 }
845
846 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849 let body = Self::parse_body(req)?;
850 let table_name = require_str(&body, "TableName")?;
851
852 let state = self.state.read();
853 let table = get_table(&state.tables, table_name)?;
854
855 let expr_attr_names = parse_expression_attribute_names(&body);
856 let expr_attr_values = parse_expression_attribute_values(&body);
857
858 let key_condition = body["KeyConditionExpression"].as_str();
859 let filter_expression = body["FilterExpression"].as_str();
860 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
861 let limit = body["Limit"].as_i64().map(|l| l as usize);
862 let index_name = body["IndexName"].as_str();
863 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
864 parse_key_map(&body["ExclusiveStartKey"]);
865
866 let (items_to_scan, hash_key_name, range_key_name): (
867 &[HashMap<String, AttributeValue>],
868 String,
869 Option<String>,
870 ) = if let Some(idx_name) = index_name {
871 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
872 let hk = gsi
873 .key_schema
874 .iter()
875 .find(|k| k.key_type == "HASH")
876 .map(|k| k.attribute_name.clone())
877 .unwrap_or_default();
878 let rk = gsi
879 .key_schema
880 .iter()
881 .find(|k| k.key_type == "RANGE")
882 .map(|k| k.attribute_name.clone());
883 (&table.items, hk, rk)
884 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
885 let hk = lsi
886 .key_schema
887 .iter()
888 .find(|k| k.key_type == "HASH")
889 .map(|k| k.attribute_name.clone())
890 .unwrap_or_default();
891 let rk = lsi
892 .key_schema
893 .iter()
894 .find(|k| k.key_type == "RANGE")
895 .map(|k| k.attribute_name.clone());
896 (&table.items, hk, rk)
897 } else {
898 return Err(AwsServiceError::aws_error(
899 StatusCode::BAD_REQUEST,
900 "ValidationException",
901 format!("The table does not have the specified index: {idx_name}"),
902 ));
903 }
904 } else {
905 (
906 &table.items[..],
907 table.hash_key_name().to_string(),
908 table.range_key_name().map(|s| s.to_string()),
909 )
910 };
911
912 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
913 .iter()
914 .filter(|item| {
915 if let Some(kc) = key_condition {
916 evaluate_key_condition(
917 kc,
918 item,
919 &hash_key_name,
920 range_key_name.as_deref(),
921 &expr_attr_names,
922 &expr_attr_values,
923 )
924 } else {
925 true
926 }
927 })
928 .collect();
929
930 if let Some(ref rk) = range_key_name {
931 matched.sort_by(|a, b| {
932 let av = a.get(rk.as_str());
933 let bv = b.get(rk.as_str());
934 compare_attribute_values(av, bv)
935 });
936 if !scan_forward {
937 matched.reverse();
938 }
939 }
940
941 let table_pk_hash = table.hash_key_name().to_string();
944 let table_pk_range = table.range_key_name().map(|s| s.to_string());
945 let is_gsi_query = index_name.is_some()
946 && (hash_key_name != table_pk_hash
947 || range_key_name.as_deref() != table_pk_range.as_deref());
948
949 if let Some(ref start_key) = exclusive_start_key {
953 if let Some(pos) = matched.iter().position(|item| {
954 let index_match =
955 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
956 if is_gsi_query {
957 index_match
958 && item_matches_key(
959 item,
960 start_key,
961 &table_pk_hash,
962 table_pk_range.as_deref(),
963 )
964 } else {
965 index_match
966 }
967 }) {
968 matched = matched.split_off(pos + 1);
969 }
970 }
971
972 if let Some(filter) = filter_expression {
973 matched.retain(|item| {
974 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
975 });
976 }
977
978 let scanned_count = matched.len();
979
980 let has_more = if let Some(lim) = limit {
981 let more = matched.len() > lim;
982 matched.truncate(lim);
983 more
984 } else {
985 false
986 };
987
988 let last_evaluated_key = if has_more {
992 matched.last().map(|item| {
993 let mut key =
994 extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
995 if is_gsi_query {
996 let table_key =
997 extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
998 key.extend(table_key);
999 }
1000 key
1001 })
1002 } else {
1003 None
1004 };
1005
1006 let insights_enabled = table.contributor_insights_status == "ENABLED";
1008 let pk_name = table.hash_key_name().to_string();
1009 let accessed_keys: Vec<String> = if insights_enabled {
1010 matched
1011 .iter()
1012 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1013 .collect()
1014 } else {
1015 Vec::new()
1016 };
1017
1018 let items: Vec<Value> = matched
1019 .iter()
1020 .map(|item| {
1021 let projected = project_item(item, &body);
1022 json!(projected)
1023 })
1024 .collect();
1025
1026 let mut result = json!({
1027 "Items": items,
1028 "Count": items.len(),
1029 "ScannedCount": scanned_count,
1030 });
1031
1032 if let Some(lek) = last_evaluated_key {
1033 result["LastEvaluatedKey"] = json!(lek);
1034 }
1035
1036 drop(state);
1037
1038 if !accessed_keys.is_empty() {
1039 let mut state = self.state.write();
1040 if let Some(table) = state.tables.get_mut(table_name) {
1041 if table.contributor_insights_status == "ENABLED" {
1044 for key_str in accessed_keys {
1045 *table
1046 .contributor_insights_counters
1047 .entry(key_str)
1048 .or_insert(0) += 1;
1049 }
1050 }
1051 }
1052 }
1053
1054 Self::ok_json(result)
1055 }
1056
1057 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1058 let body = Self::parse_body(req)?;
1059 let table_name = require_str(&body, "TableName")?;
1060
1061 let state = self.state.read();
1062 let table = get_table(&state.tables, table_name)?;
1063
1064 let expr_attr_names = parse_expression_attribute_names(&body);
1065 let expr_attr_values = parse_expression_attribute_values(&body);
1066 let filter_expression = body["FilterExpression"].as_str();
1067 let limit = body["Limit"].as_i64().map(|l| l as usize);
1068 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1069 parse_key_map(&body["ExclusiveStartKey"]);
1070
1071 let hash_key_name = table.hash_key_name().to_string();
1072 let range_key_name = table.range_key_name().map(|s| s.to_string());
1073
1074 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1075
1076 if let Some(ref start_key) = exclusive_start_key {
1078 if let Some(pos) = matched.iter().position(|item| {
1079 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1080 }) {
1081 matched = matched.split_off(pos + 1);
1082 }
1083 }
1084
1085 let scanned_count = matched.len();
1086
1087 if let Some(filter) = filter_expression {
1088 matched.retain(|item| {
1089 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1090 });
1091 }
1092
1093 let has_more = if let Some(lim) = limit {
1094 let more = matched.len() > lim;
1095 matched.truncate(lim);
1096 more
1097 } else {
1098 false
1099 };
1100
1101 let last_evaluated_key = if has_more {
1103 matched
1104 .last()
1105 .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1106 } else {
1107 None
1108 };
1109
1110 let insights_enabled = table.contributor_insights_status == "ENABLED";
1112 let pk_name = table.hash_key_name().to_string();
1113 let accessed_keys: Vec<String> = if insights_enabled {
1114 matched
1115 .iter()
1116 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1117 .collect()
1118 } else {
1119 Vec::new()
1120 };
1121
1122 let items: Vec<Value> = matched
1123 .iter()
1124 .map(|item| {
1125 let projected = project_item(item, &body);
1126 json!(projected)
1127 })
1128 .collect();
1129
1130 let mut result = json!({
1131 "Items": items,
1132 "Count": items.len(),
1133 "ScannedCount": scanned_count,
1134 });
1135
1136 if let Some(lek) = last_evaluated_key {
1137 result["LastEvaluatedKey"] = json!(lek);
1138 }
1139
1140 drop(state);
1141
1142 if !accessed_keys.is_empty() {
1143 let mut state = self.state.write();
1144 if let Some(table) = state.tables.get_mut(table_name) {
1145 if table.contributor_insights_status == "ENABLED" {
1148 for key_str in accessed_keys {
1149 *table
1150 .contributor_insights_counters
1151 .entry(key_str)
1152 .or_insert(0) += 1;
1153 }
1154 }
1155 }
1156 }
1157
1158 Self::ok_json(result)
1159 }
1160
1161 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1164 let body = Self::parse_body(req)?;
1165
1166 validate_optional_enum_value(
1167 "returnConsumedCapacity",
1168 &body["ReturnConsumedCapacity"],
1169 &["INDEXES", "TOTAL", "NONE"],
1170 )?;
1171
1172 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1173
1174 let request_items = body["RequestItems"]
1175 .as_object()
1176 .ok_or_else(|| {
1177 AwsServiceError::aws_error(
1178 StatusCode::BAD_REQUEST,
1179 "ValidationException",
1180 "RequestItems is required",
1181 )
1182 })?
1183 .clone();
1184
1185 let state = self.state.read();
1186 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1187 let mut consumed_capacity: Vec<Value> = Vec::new();
1188
1189 for (table_name, params) in &request_items {
1190 let table = get_table(&state.tables, table_name)?;
1191 let keys = params["Keys"].as_array().ok_or_else(|| {
1192 AwsServiceError::aws_error(
1193 StatusCode::BAD_REQUEST,
1194 "ValidationException",
1195 "Keys is required",
1196 )
1197 })?;
1198
1199 let mut items = Vec::new();
1200 for key_val in keys {
1201 let key: HashMap<String, AttributeValue> =
1202 serde_json::from_value(key_val.clone()).unwrap_or_default();
1203 if let Some(idx) = table.find_item_index(&key) {
1204 items.push(json!(table.items[idx]));
1205 }
1206 }
1207 responses.insert(table_name.clone(), items);
1208
1209 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1210 consumed_capacity.push(json!({
1211 "TableName": table_name,
1212 "CapacityUnits": 1.0,
1213 }));
1214 }
1215 }
1216
1217 let mut result = json!({
1218 "Responses": responses,
1219 "UnprocessedKeys": {},
1220 });
1221
1222 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1223 result["ConsumedCapacity"] = json!(consumed_capacity);
1224 }
1225
1226 Self::ok_json(result)
1227 }
1228
1229 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230 let body = Self::parse_body(req)?;
1231
1232 validate_optional_enum_value(
1233 "returnConsumedCapacity",
1234 &body["ReturnConsumedCapacity"],
1235 &["INDEXES", "TOTAL", "NONE"],
1236 )?;
1237 validate_optional_enum_value(
1238 "returnItemCollectionMetrics",
1239 &body["ReturnItemCollectionMetrics"],
1240 &["SIZE", "NONE"],
1241 )?;
1242
1243 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1244 let return_icm = body["ReturnItemCollectionMetrics"]
1245 .as_str()
1246 .unwrap_or("NONE");
1247
1248 let request_items = body["RequestItems"]
1249 .as_object()
1250 .ok_or_else(|| {
1251 AwsServiceError::aws_error(
1252 StatusCode::BAD_REQUEST,
1253 "ValidationException",
1254 "RequestItems is required",
1255 )
1256 })?
1257 .clone();
1258
1259 let mut state = self.state.write();
1260 let mut consumed_capacity: Vec<Value> = Vec::new();
1261 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1262
1263 for (table_name, requests) in &request_items {
1264 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1265 AwsServiceError::aws_error(
1266 StatusCode::BAD_REQUEST,
1267 "ResourceNotFoundException",
1268 format!("Requested resource not found: Table: {table_name} not found"),
1269 )
1270 })?;
1271
1272 let reqs = requests.as_array().ok_or_else(|| {
1273 AwsServiceError::aws_error(
1274 StatusCode::BAD_REQUEST,
1275 "ValidationException",
1276 "Request list must be an array",
1277 )
1278 })?;
1279
1280 for request in reqs {
1281 if let Some(put_req) = request.get("PutRequest") {
1282 let item: HashMap<String, AttributeValue> =
1283 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1284 let key = extract_key(table, &item);
1285 if let Some(idx) = table.find_item_index(&key) {
1286 table.items[idx] = item;
1287 } else {
1288 table.items.push(item);
1289 }
1290 } else if let Some(del_req) = request.get("DeleteRequest") {
1291 let key: HashMap<String, AttributeValue> =
1292 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1293 if let Some(idx) = table.find_item_index(&key) {
1294 table.items.remove(idx);
1295 }
1296 }
1297 }
1298
1299 table.recalculate_stats();
1300
1301 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1302 consumed_capacity.push(json!({
1303 "TableName": table_name,
1304 "CapacityUnits": 1.0,
1305 }));
1306 }
1307
1308 if return_icm == "SIZE" {
1309 item_collection_metrics.insert(table_name.clone(), vec![]);
1310 }
1311 }
1312
1313 let mut result = json!({
1314 "UnprocessedItems": {},
1315 });
1316
1317 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1318 result["ConsumedCapacity"] = json!(consumed_capacity);
1319 }
1320
1321 if return_icm == "SIZE" {
1322 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1323 }
1324
1325 Self::ok_json(result)
1326 }
1327
1328 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331 let body = Self::parse_body(req)?;
1332 let resource_arn = require_str(&body, "ResourceArn")?;
1333 validate_required("Tags", &body["Tags"])?;
1334
1335 let mut state = self.state.write();
1336 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1337
1338 fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1339 |f| {
1340 AwsServiceError::aws_error(
1341 StatusCode::BAD_REQUEST,
1342 "ValidationException",
1343 format!("{f} must be a list"),
1344 )
1345 },
1346 )?;
1347
1348 Self::ok_json(json!({}))
1349 }
1350
1351 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352 let body = Self::parse_body(req)?;
1353 let resource_arn = require_str(&body, "ResourceArn")?;
1354 validate_required("TagKeys", &body["TagKeys"])?;
1355
1356 let mut state = self.state.write();
1357 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1358
1359 fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1360 AwsServiceError::aws_error(
1361 StatusCode::BAD_REQUEST,
1362 "ValidationException",
1363 format!("{f} must be a list"),
1364 )
1365 })?;
1366
1367 Self::ok_json(json!({}))
1368 }
1369
1370 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371 let body = Self::parse_body(req)?;
1372 let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374 let state = self.state.read();
1375 let table = find_table_by_arn(&state.tables, resource_arn)?;
1376
1377 let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1378
1379 Self::ok_json(json!({ "Tags": tags }))
1380 }
1381
1382 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385 let body = Self::parse_body(req)?;
1386 validate_optional_enum_value(
1387 "returnConsumedCapacity",
1388 &body["ReturnConsumedCapacity"],
1389 &["INDEXES", "TOTAL", "NONE"],
1390 )?;
1391 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1392 AwsServiceError::aws_error(
1393 StatusCode::BAD_REQUEST,
1394 "ValidationException",
1395 "TransactItems is required",
1396 )
1397 })?;
1398
1399 let state = self.state.read();
1400 let mut responses: Vec<Value> = Vec::new();
1401
1402 for ti in transact_items {
1403 let get = &ti["Get"];
1404 let table_name = get["TableName"].as_str().ok_or_else(|| {
1405 AwsServiceError::aws_error(
1406 StatusCode::BAD_REQUEST,
1407 "ValidationException",
1408 "TableName is required in Get",
1409 )
1410 })?;
1411 let key: HashMap<String, AttributeValue> =
1412 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1413
1414 let table = get_table(&state.tables, table_name)?;
1415 match table.find_item_index(&key) {
1416 Some(idx) => {
1417 responses.push(json!({ "Item": table.items[idx] }));
1418 }
1419 None => {
1420 responses.push(json!({}));
1421 }
1422 }
1423 }
1424
1425 Self::ok_json(json!({ "Responses": responses }))
1426 }
1427
1428 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1429 let body = Self::parse_body(req)?;
1430 validate_optional_string_length(
1431 "clientRequestToken",
1432 body["ClientRequestToken"].as_str(),
1433 1,
1434 36,
1435 )?;
1436 validate_optional_enum_value(
1437 "returnConsumedCapacity",
1438 &body["ReturnConsumedCapacity"],
1439 &["INDEXES", "TOTAL", "NONE"],
1440 )?;
1441 validate_optional_enum_value(
1442 "returnItemCollectionMetrics",
1443 &body["ReturnItemCollectionMetrics"],
1444 &["SIZE", "NONE"],
1445 )?;
1446 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1447 AwsServiceError::aws_error(
1448 StatusCode::BAD_REQUEST,
1449 "ValidationException",
1450 "TransactItems is required",
1451 )
1452 })?;
1453
1454 let mut state = self.state.write();
1455
1456 let mut cancellation_reasons: Vec<Value> = Vec::new();
1458 let mut any_failed = false;
1459
1460 for ti in transact_items {
1461 if let Some(put) = ti.get("Put") {
1462 let table_name = put["TableName"].as_str().unwrap_or_default();
1463 let item: HashMap<String, AttributeValue> =
1464 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1465 let condition = put["ConditionExpression"].as_str();
1466
1467 if let Some(cond) = condition {
1468 let table = get_table(&state.tables, table_name)?;
1469 let expr_attr_names = parse_expression_attribute_names(put);
1470 let expr_attr_values = parse_expression_attribute_values(put);
1471 let key = extract_key(table, &item);
1472 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1473 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1474 .is_err()
1475 {
1476 cancellation_reasons.push(json!({
1477 "Code": "ConditionalCheckFailed",
1478 "Message": "The conditional request failed"
1479 }));
1480 any_failed = true;
1481 continue;
1482 }
1483 }
1484 cancellation_reasons.push(json!({ "Code": "None" }));
1485 } else if let Some(delete) = ti.get("Delete") {
1486 let table_name = delete["TableName"].as_str().unwrap_or_default();
1487 let key: HashMap<String, AttributeValue> =
1488 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1489 let condition = delete["ConditionExpression"].as_str();
1490
1491 if let Some(cond) = condition {
1492 let table = get_table(&state.tables, table_name)?;
1493 let expr_attr_names = parse_expression_attribute_names(delete);
1494 let expr_attr_values = parse_expression_attribute_values(delete);
1495 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1496 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1497 .is_err()
1498 {
1499 cancellation_reasons.push(json!({
1500 "Code": "ConditionalCheckFailed",
1501 "Message": "The conditional request failed"
1502 }));
1503 any_failed = true;
1504 continue;
1505 }
1506 }
1507 cancellation_reasons.push(json!({ "Code": "None" }));
1508 } else if let Some(update) = ti.get("Update") {
1509 let table_name = update["TableName"].as_str().unwrap_or_default();
1510 let key: HashMap<String, AttributeValue> =
1511 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1512 let condition = update["ConditionExpression"].as_str();
1513
1514 if let Some(cond) = condition {
1515 let table = get_table(&state.tables, table_name)?;
1516 let expr_attr_names = parse_expression_attribute_names(update);
1517 let expr_attr_values = parse_expression_attribute_values(update);
1518 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1519 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1520 .is_err()
1521 {
1522 cancellation_reasons.push(json!({
1523 "Code": "ConditionalCheckFailed",
1524 "Message": "The conditional request failed"
1525 }));
1526 any_failed = true;
1527 continue;
1528 }
1529 }
1530 cancellation_reasons.push(json!({ "Code": "None" }));
1531 } else if let Some(check) = ti.get("ConditionCheck") {
1532 let table_name = check["TableName"].as_str().unwrap_or_default();
1533 let key: HashMap<String, AttributeValue> =
1534 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1535 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1536
1537 let table = get_table(&state.tables, table_name)?;
1538 let expr_attr_names = parse_expression_attribute_names(check);
1539 let expr_attr_values = parse_expression_attribute_values(check);
1540 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1541 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1542 {
1543 cancellation_reasons.push(json!({
1544 "Code": "ConditionalCheckFailed",
1545 "Message": "The conditional request failed"
1546 }));
1547 any_failed = true;
1548 continue;
1549 }
1550 cancellation_reasons.push(json!({ "Code": "None" }));
1551 } else {
1552 cancellation_reasons.push(json!({ "Code": "None" }));
1553 }
1554 }
1555
1556 if any_failed {
1557 let error_body = json!({
1558 "__type": "TransactionCanceledException",
1559 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1560 "CancellationReasons": cancellation_reasons
1561 });
1562 return Ok(AwsResponse::json(
1563 StatusCode::BAD_REQUEST,
1564 serde_json::to_vec(&error_body).unwrap(),
1565 ));
1566 }
1567
1568 for ti in transact_items {
1570 if let Some(put) = ti.get("Put") {
1571 let table_name = put["TableName"].as_str().unwrap_or_default();
1572 let item: HashMap<String, AttributeValue> =
1573 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1574 let table = get_table_mut(&mut state.tables, table_name)?;
1575 let key = extract_key(table, &item);
1576 if let Some(idx) = table.find_item_index(&key) {
1577 table.items[idx] = item;
1578 } else {
1579 table.items.push(item);
1580 }
1581 table.recalculate_stats();
1582 } else if let Some(delete) = ti.get("Delete") {
1583 let table_name = delete["TableName"].as_str().unwrap_or_default();
1584 let key: HashMap<String, AttributeValue> =
1585 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1586 let table = get_table_mut(&mut state.tables, table_name)?;
1587 if let Some(idx) = table.find_item_index(&key) {
1588 table.items.remove(idx);
1589 }
1590 table.recalculate_stats();
1591 } else if let Some(update) = ti.get("Update") {
1592 let table_name = update["TableName"].as_str().unwrap_or_default();
1593 let key: HashMap<String, AttributeValue> =
1594 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1595 let update_expression = update["UpdateExpression"].as_str();
1596 let expr_attr_names = parse_expression_attribute_names(update);
1597 let expr_attr_values = parse_expression_attribute_values(update);
1598
1599 let table = get_table_mut(&mut state.tables, table_name)?;
1600 let idx = match table.find_item_index(&key) {
1601 Some(i) => i,
1602 None => {
1603 let mut new_item = HashMap::new();
1604 for (k, v) in &key {
1605 new_item.insert(k.clone(), v.clone());
1606 }
1607 table.items.push(new_item);
1608 table.items.len() - 1
1609 }
1610 };
1611
1612 if let Some(expr) = update_expression {
1613 apply_update_expression(
1614 &mut table.items[idx],
1615 expr,
1616 &expr_attr_names,
1617 &expr_attr_values,
1618 )?;
1619 }
1620 table.recalculate_stats();
1621 }
1622 }
1624
1625 Self::ok_json(json!({}))
1626 }
1627
1628 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1629 let body = Self::parse_body(req)?;
1630 let statement = require_str(&body, "Statement")?;
1631 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1632
1633 execute_partiql_statement(&self.state, statement, ¶meters)
1634 }
1635
1636 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1637 let body = Self::parse_body(req)?;
1638 validate_optional_enum_value(
1639 "returnConsumedCapacity",
1640 &body["ReturnConsumedCapacity"],
1641 &["INDEXES", "TOTAL", "NONE"],
1642 )?;
1643 let statements = body["Statements"].as_array().ok_or_else(|| {
1644 AwsServiceError::aws_error(
1645 StatusCode::BAD_REQUEST,
1646 "ValidationException",
1647 "Statements is required",
1648 )
1649 })?;
1650
1651 let mut responses: Vec<Value> = Vec::new();
1652 for stmt_obj in statements {
1653 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1654 let parameters = stmt_obj["Parameters"]
1655 .as_array()
1656 .cloned()
1657 .unwrap_or_default();
1658
1659 match execute_partiql_statement(&self.state, statement, ¶meters) {
1660 Ok(resp) => {
1661 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1662 responses.push(resp_body);
1663 }
1664 Err(e) => {
1665 responses.push(json!({
1666 "Error": {
1667 "Code": "ValidationException",
1668 "Message": e.to_string()
1669 }
1670 }));
1671 }
1672 }
1673 }
1674
1675 Self::ok_json(json!({ "Responses": responses }))
1676 }
1677
1678 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1679 let body = Self::parse_body(req)?;
1680 validate_optional_string_length(
1681 "clientRequestToken",
1682 body["ClientRequestToken"].as_str(),
1683 1,
1684 36,
1685 )?;
1686 validate_optional_enum_value(
1687 "returnConsumedCapacity",
1688 &body["ReturnConsumedCapacity"],
1689 &["INDEXES", "TOTAL", "NONE"],
1690 )?;
1691 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1692 AwsServiceError::aws_error(
1693 StatusCode::BAD_REQUEST,
1694 "ValidationException",
1695 "TransactStatements is required",
1696 )
1697 })?;
1698
1699 let mut results: Vec<Result<Value, String>> = Vec::new();
1701 for stmt_obj in transact_statements {
1702 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1703 let parameters = stmt_obj["Parameters"]
1704 .as_array()
1705 .cloned()
1706 .unwrap_or_default();
1707
1708 match execute_partiql_statement(&self.state, statement, ¶meters) {
1709 Ok(resp) => {
1710 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1711 results.push(Ok(resp_body));
1712 }
1713 Err(e) => {
1714 results.push(Err(e.to_string()));
1715 }
1716 }
1717 }
1718
1719 let any_failed = results.iter().any(|r| r.is_err());
1720 if any_failed {
1721 let reasons: Vec<Value> = results
1722 .iter()
1723 .map(|r| match r {
1724 Ok(_) => json!({ "Code": "None" }),
1725 Err(msg) => json!({
1726 "Code": "ValidationException",
1727 "Message": msg
1728 }),
1729 })
1730 .collect();
1731 let error_body = json!({
1732 "__type": "TransactionCanceledException",
1733 "message": "Transaction cancelled due to validation errors",
1734 "CancellationReasons": reasons
1735 });
1736 return Ok(AwsResponse::json(
1737 StatusCode::BAD_REQUEST,
1738 serde_json::to_vec(&error_body).unwrap(),
1739 ));
1740 }
1741
1742 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1743 Self::ok_json(json!({ "Responses": responses }))
1744 }
1745
1746 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749 let body = Self::parse_body(req)?;
1750 let table_name = require_str(&body, "TableName")?;
1751 let spec = &body["TimeToLiveSpecification"];
1752 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1753 AwsServiceError::aws_error(
1754 StatusCode::BAD_REQUEST,
1755 "ValidationException",
1756 "TimeToLiveSpecification.AttributeName is required",
1757 )
1758 })?;
1759 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1760 AwsServiceError::aws_error(
1761 StatusCode::BAD_REQUEST,
1762 "ValidationException",
1763 "TimeToLiveSpecification.Enabled is required",
1764 )
1765 })?;
1766
1767 let mut state = self.state.write();
1768 let table = get_table_mut(&mut state.tables, table_name)?;
1769
1770 if enabled {
1771 table.ttl_attribute = Some(attr_name.to_string());
1772 table.ttl_enabled = true;
1773 } else {
1774 table.ttl_enabled = false;
1775 }
1776
1777 Self::ok_json(json!({
1778 "TimeToLiveSpecification": {
1779 "AttributeName": attr_name,
1780 "Enabled": enabled
1781 }
1782 }))
1783 }
1784
1785 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1786 let body = Self::parse_body(req)?;
1787 let table_name = require_str(&body, "TableName")?;
1788
1789 let state = self.state.read();
1790 let table = get_table(&state.tables, table_name)?;
1791
1792 let status = if table.ttl_enabled {
1793 "ENABLED"
1794 } else {
1795 "DISABLED"
1796 };
1797
1798 let mut desc = json!({
1799 "TimeToLiveDescription": {
1800 "TimeToLiveStatus": status
1801 }
1802 });
1803
1804 if let Some(ref attr) = table.ttl_attribute {
1805 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1806 }
1807
1808 Self::ok_json(desc)
1809 }
1810
1811 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1814 let body = Self::parse_body(req)?;
1815 let resource_arn = require_str(&body, "ResourceArn")?;
1816 let policy = require_str(&body, "Policy")?;
1817
1818 let mut state = self.state.write();
1819 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1820 table.resource_policy = Some(policy.to_string());
1821
1822 let revision_id = uuid::Uuid::new_v4().to_string();
1823 Self::ok_json(json!({ "RevisionId": revision_id }))
1824 }
1825
1826 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1827 let body = Self::parse_body(req)?;
1828 let resource_arn = require_str(&body, "ResourceArn")?;
1829
1830 let state = self.state.read();
1831 let table = find_table_by_arn(&state.tables, resource_arn)?;
1832
1833 match &table.resource_policy {
1834 Some(policy) => {
1835 let revision_id = uuid::Uuid::new_v4().to_string();
1836 Self::ok_json(json!({
1837 "Policy": policy,
1838 "RevisionId": revision_id
1839 }))
1840 }
1841 None => Self::ok_json(json!({ "Policy": null })),
1842 }
1843 }
1844
1845 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1846 let body = Self::parse_body(req)?;
1847 let resource_arn = require_str(&body, "ResourceArn")?;
1848
1849 let mut state = self.state.write();
1850 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1851 table.resource_policy = None;
1852
1853 Self::ok_json(json!({}))
1854 }
1855
1856 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1859 Self::ok_json(json!({
1860 "Endpoints": [{
1861 "Address": "dynamodb.us-east-1.amazonaws.com",
1862 "CachePeriodInMinutes": 1440
1863 }]
1864 }))
1865 }
1866
1867 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868 Self::ok_json(json!({
1869 "AccountMaxReadCapacityUnits": 80000,
1870 "AccountMaxWriteCapacityUnits": 80000,
1871 "TableMaxReadCapacityUnits": 40000,
1872 "TableMaxWriteCapacityUnits": 40000
1873 }))
1874 }
1875
1876 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1879 let body = Self::parse_body(req)?;
1880 let table_name = require_str(&body, "TableName")?;
1881 let backup_name = require_str(&body, "BackupName")?;
1882
1883 let mut state = self.state.write();
1884 let table = get_table(&state.tables, table_name)?;
1885
1886 let backup_arn = format!(
1887 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1888 state.region,
1889 state.account_id,
1890 table_name,
1891 Utc::now().format("%Y%m%d%H%M%S")
1892 );
1893 let now = Utc::now();
1894
1895 let backup = BackupDescription {
1896 backup_arn: backup_arn.clone(),
1897 backup_name: backup_name.to_string(),
1898 table_name: table_name.to_string(),
1899 table_arn: table.arn.clone(),
1900 backup_status: "AVAILABLE".to_string(),
1901 backup_type: "USER".to_string(),
1902 backup_creation_date: now,
1903 key_schema: table.key_schema.clone(),
1904 attribute_definitions: table.attribute_definitions.clone(),
1905 provisioned_throughput: table.provisioned_throughput.clone(),
1906 billing_mode: table.billing_mode.clone(),
1907 item_count: table.item_count,
1908 size_bytes: table.size_bytes,
1909 items: table.items.clone(),
1910 };
1911
1912 state.backups.insert(backup_arn.clone(), backup);
1913
1914 Self::ok_json(json!({
1915 "BackupDetails": {
1916 "BackupArn": backup_arn,
1917 "BackupName": backup_name,
1918 "BackupStatus": "AVAILABLE",
1919 "BackupType": "USER",
1920 "BackupCreationDateTime": now.timestamp() as f64,
1921 "BackupSizeBytes": 0
1922 }
1923 }))
1924 }
1925
1926 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1927 let body = Self::parse_body(req)?;
1928 let backup_arn = require_str(&body, "BackupArn")?;
1929
1930 let mut state = self.state.write();
1931 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1932 AwsServiceError::aws_error(
1933 StatusCode::BAD_REQUEST,
1934 "BackupNotFoundException",
1935 format!("Backup not found: {backup_arn}"),
1936 )
1937 })?;
1938
1939 Self::ok_json(json!({
1940 "BackupDescription": {
1941 "BackupDetails": {
1942 "BackupArn": backup.backup_arn,
1943 "BackupName": backup.backup_name,
1944 "BackupStatus": "DELETED",
1945 "BackupType": backup.backup_type,
1946 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1947 "BackupSizeBytes": backup.size_bytes
1948 },
1949 "SourceTableDetails": {
1950 "TableName": backup.table_name,
1951 "TableArn": backup.table_arn,
1952 "TableId": uuid::Uuid::new_v4().to_string(),
1953 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1954 "AttributeName": ks.attribute_name,
1955 "KeyType": ks.key_type
1956 })).collect::<Vec<_>>(),
1957 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1958 "ProvisionedThroughput": {
1959 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1960 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1961 },
1962 "ItemCount": backup.item_count,
1963 "BillingMode": backup.billing_mode,
1964 "TableSizeBytes": backup.size_bytes
1965 },
1966 "SourceTableFeatureDetails": {}
1967 }
1968 }))
1969 }
1970
1971 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1972 let body = Self::parse_body(req)?;
1973 let backup_arn = require_str(&body, "BackupArn")?;
1974
1975 let state = self.state.read();
1976 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1977 AwsServiceError::aws_error(
1978 StatusCode::BAD_REQUEST,
1979 "BackupNotFoundException",
1980 format!("Backup not found: {backup_arn}"),
1981 )
1982 })?;
1983
1984 Self::ok_json(json!({
1985 "BackupDescription": {
1986 "BackupDetails": {
1987 "BackupArn": backup.backup_arn,
1988 "BackupName": backup.backup_name,
1989 "BackupStatus": backup.backup_status,
1990 "BackupType": backup.backup_type,
1991 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1992 "BackupSizeBytes": backup.size_bytes
1993 },
1994 "SourceTableDetails": {
1995 "TableName": backup.table_name,
1996 "TableArn": backup.table_arn,
1997 "TableId": uuid::Uuid::new_v4().to_string(),
1998 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1999 "AttributeName": ks.attribute_name,
2000 "KeyType": ks.key_type
2001 })).collect::<Vec<_>>(),
2002 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
2003 "ProvisionedThroughput": {
2004 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
2005 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2006 },
2007 "ItemCount": backup.item_count,
2008 "BillingMode": backup.billing_mode,
2009 "TableSizeBytes": backup.size_bytes
2010 },
2011 "SourceTableFeatureDetails": {}
2012 }
2013 }))
2014 }
2015
2016 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2017 let body = Self::parse_body(req)?;
2018 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2019 validate_optional_string_length(
2020 "exclusiveStartBackupArn",
2021 body["ExclusiveStartBackupArn"].as_str(),
2022 37,
2023 1024,
2024 )?;
2025 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2026 validate_optional_enum_value(
2027 "backupType",
2028 &body["BackupType"],
2029 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2030 )?;
2031 let table_name = body["TableName"].as_str();
2032
2033 let state = self.state.read();
2034 let summaries: Vec<Value> = state
2035 .backups
2036 .values()
2037 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2038 .map(|b| {
2039 json!({
2040 "TableName": b.table_name,
2041 "TableArn": b.table_arn,
2042 "BackupArn": b.backup_arn,
2043 "BackupName": b.backup_name,
2044 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2045 "BackupStatus": b.backup_status,
2046 "BackupType": b.backup_type,
2047 "BackupSizeBytes": b.size_bytes
2048 })
2049 })
2050 .collect();
2051
2052 Self::ok_json(json!({
2053 "BackupSummaries": summaries
2054 }))
2055 }
2056
2057 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058 let body = Self::parse_body(req)?;
2059 let backup_arn = require_str(&body, "BackupArn")?;
2060 let target_table_name = require_str(&body, "TargetTableName")?;
2061
2062 let mut state = self.state.write();
2063 let backup = state.backups.get(backup_arn).ok_or_else(|| {
2064 AwsServiceError::aws_error(
2065 StatusCode::BAD_REQUEST,
2066 "BackupNotFoundException",
2067 format!("Backup not found: {backup_arn}"),
2068 )
2069 })?;
2070
2071 if state.tables.contains_key(target_table_name) {
2072 return Err(AwsServiceError::aws_error(
2073 StatusCode::BAD_REQUEST,
2074 "TableAlreadyExistsException",
2075 format!("Table already exists: {target_table_name}"),
2076 ));
2077 }
2078
2079 let now = Utc::now();
2080 let arn = format!(
2081 "arn:aws:dynamodb:{}:{}:table/{}",
2082 state.region, state.account_id, target_table_name
2083 );
2084
2085 let restored_items = backup.items.clone();
2086 let mut table = DynamoTable {
2087 name: target_table_name.to_string(),
2088 arn: arn.clone(),
2089 key_schema: backup.key_schema.clone(),
2090 attribute_definitions: backup.attribute_definitions.clone(),
2091 provisioned_throughput: backup.provisioned_throughput.clone(),
2092 items: restored_items,
2093 gsi: Vec::new(),
2094 lsi: Vec::new(),
2095 tags: HashMap::new(),
2096 created_at: now,
2097 status: "ACTIVE".to_string(),
2098 item_count: 0,
2099 size_bytes: 0,
2100 billing_mode: backup.billing_mode.clone(),
2101 ttl_attribute: None,
2102 ttl_enabled: false,
2103 resource_policy: None,
2104 pitr_enabled: false,
2105 kinesis_destinations: Vec::new(),
2106 contributor_insights_status: "DISABLED".to_string(),
2107 contributor_insights_counters: HashMap::new(),
2108 stream_enabled: false,
2109 stream_view_type: None,
2110 stream_arn: None,
2111 stream_records: Arc::new(RwLock::new(Vec::new())),
2112 sse_type: None,
2113 sse_kms_key_arn: None,
2114 };
2115 table.recalculate_stats();
2116
2117 let desc = build_table_description(&table);
2118 state.tables.insert(target_table_name.to_string(), table);
2119
2120 Self::ok_json(json!({
2121 "TableDescription": desc
2122 }))
2123 }
2124
2125 fn restore_table_to_point_in_time(
2126 &self,
2127 req: &AwsRequest,
2128 ) -> Result<AwsResponse, AwsServiceError> {
2129 let body = Self::parse_body(req)?;
2130 let target_table_name = require_str(&body, "TargetTableName")?;
2131 let source_table_name = body["SourceTableName"].as_str();
2132 let source_table_arn = body["SourceTableArn"].as_str();
2133
2134 let mut state = self.state.write();
2135
2136 let source = if let Some(name) = source_table_name {
2138 get_table(&state.tables, name)?.clone()
2139 } else if let Some(arn) = source_table_arn {
2140 find_table_by_arn(&state.tables, arn)?.clone()
2141 } else {
2142 return Err(AwsServiceError::aws_error(
2143 StatusCode::BAD_REQUEST,
2144 "ValidationException",
2145 "SourceTableName or SourceTableArn is required",
2146 ));
2147 };
2148
2149 if state.tables.contains_key(target_table_name) {
2150 return Err(AwsServiceError::aws_error(
2151 StatusCode::BAD_REQUEST,
2152 "TableAlreadyExistsException",
2153 format!("Table already exists: {target_table_name}"),
2154 ));
2155 }
2156
2157 let now = Utc::now();
2158 let arn = format!(
2159 "arn:aws:dynamodb:{}:{}:table/{}",
2160 state.region, state.account_id, target_table_name
2161 );
2162
2163 let mut table = DynamoTable {
2164 name: target_table_name.to_string(),
2165 arn: arn.clone(),
2166 key_schema: source.key_schema.clone(),
2167 attribute_definitions: source.attribute_definitions.clone(),
2168 provisioned_throughput: source.provisioned_throughput.clone(),
2169 items: source.items.clone(),
2170 gsi: Vec::new(),
2171 lsi: Vec::new(),
2172 tags: HashMap::new(),
2173 created_at: now,
2174 status: "ACTIVE".to_string(),
2175 item_count: 0,
2176 size_bytes: 0,
2177 billing_mode: source.billing_mode.clone(),
2178 ttl_attribute: None,
2179 ttl_enabled: false,
2180 resource_policy: None,
2181 pitr_enabled: false,
2182 kinesis_destinations: Vec::new(),
2183 contributor_insights_status: "DISABLED".to_string(),
2184 contributor_insights_counters: HashMap::new(),
2185 stream_enabled: false,
2186 stream_view_type: None,
2187 stream_arn: None,
2188 stream_records: Arc::new(RwLock::new(Vec::new())),
2189 sse_type: None,
2190 sse_kms_key_arn: None,
2191 };
2192 table.recalculate_stats();
2193
2194 let desc = build_table_description(&table);
2195 state.tables.insert(target_table_name.to_string(), table);
2196
2197 Self::ok_json(json!({
2198 "TableDescription": desc
2199 }))
2200 }
2201
2202 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2203 let body = Self::parse_body(req)?;
2204 let table_name = require_str(&body, "TableName")?;
2205
2206 let pitr_spec = body["PointInTimeRecoverySpecification"]
2207 .as_object()
2208 .ok_or_else(|| {
2209 AwsServiceError::aws_error(
2210 StatusCode::BAD_REQUEST,
2211 "ValidationException",
2212 "PointInTimeRecoverySpecification is required",
2213 )
2214 })?;
2215 let enabled = pitr_spec
2216 .get("PointInTimeRecoveryEnabled")
2217 .and_then(|v| v.as_bool())
2218 .unwrap_or(false);
2219
2220 let mut state = self.state.write();
2221 let table = get_table_mut(&mut state.tables, table_name)?;
2222 table.pitr_enabled = enabled;
2223
2224 let status = if enabled { "ENABLED" } else { "DISABLED" };
2225 Self::ok_json(json!({
2226 "ContinuousBackupsDescription": {
2227 "ContinuousBackupsStatus": status,
2228 "PointInTimeRecoveryDescription": {
2229 "PointInTimeRecoveryStatus": status,
2230 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2231 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2232 }
2233 }
2234 }))
2235 }
2236
2237 fn describe_continuous_backups(
2238 &self,
2239 req: &AwsRequest,
2240 ) -> Result<AwsResponse, AwsServiceError> {
2241 let body = Self::parse_body(req)?;
2242 let table_name = require_str(&body, "TableName")?;
2243
2244 let state = self.state.read();
2245 let table = get_table(&state.tables, table_name)?;
2246
2247 let status = if table.pitr_enabled {
2248 "ENABLED"
2249 } else {
2250 "DISABLED"
2251 };
2252 Self::ok_json(json!({
2253 "ContinuousBackupsDescription": {
2254 "ContinuousBackupsStatus": status,
2255 "PointInTimeRecoveryDescription": {
2256 "PointInTimeRecoveryStatus": status,
2257 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2258 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2259 }
2260 }
2261 }))
2262 }
2263
2264 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2267 let body = Self::parse_body(req)?;
2268 let global_table_name = require_str(&body, "GlobalTableName")?;
2269 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2270
2271 let replication_group = body["ReplicationGroup"]
2272 .as_array()
2273 .ok_or_else(|| {
2274 AwsServiceError::aws_error(
2275 StatusCode::BAD_REQUEST,
2276 "ValidationException",
2277 "ReplicationGroup is required",
2278 )
2279 })?
2280 .iter()
2281 .filter_map(|r| {
2282 r["RegionName"].as_str().map(|rn| ReplicaDescription {
2283 region_name: rn.to_string(),
2284 replica_status: "ACTIVE".to_string(),
2285 })
2286 })
2287 .collect::<Vec<_>>();
2288
2289 let mut state = self.state.write();
2290
2291 if state.global_tables.contains_key(global_table_name) {
2292 return Err(AwsServiceError::aws_error(
2293 StatusCode::BAD_REQUEST,
2294 "GlobalTableAlreadyExistsException",
2295 format!("Global table already exists: {global_table_name}"),
2296 ));
2297 }
2298
2299 let arn = format!(
2300 "arn:aws:dynamodb::{}:global-table/{}",
2301 state.account_id, global_table_name
2302 );
2303 let now = Utc::now();
2304
2305 let gt = GlobalTableDescription {
2306 global_table_name: global_table_name.to_string(),
2307 global_table_arn: arn.clone(),
2308 global_table_status: "ACTIVE".to_string(),
2309 creation_date: now,
2310 replication_group: replication_group.clone(),
2311 };
2312
2313 state
2314 .global_tables
2315 .insert(global_table_name.to_string(), gt);
2316
2317 Self::ok_json(json!({
2318 "GlobalTableDescription": {
2319 "GlobalTableName": global_table_name,
2320 "GlobalTableArn": arn,
2321 "GlobalTableStatus": "ACTIVE",
2322 "CreationDateTime": now.timestamp() as f64,
2323 "ReplicationGroup": replication_group.iter().map(|r| json!({
2324 "RegionName": r.region_name,
2325 "ReplicaStatus": r.replica_status
2326 })).collect::<Vec<_>>()
2327 }
2328 }))
2329 }
2330
2331 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2332 let body = Self::parse_body(req)?;
2333 let global_table_name = require_str(&body, "GlobalTableName")?;
2334 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2335
2336 let state = self.state.read();
2337 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2338 AwsServiceError::aws_error(
2339 StatusCode::BAD_REQUEST,
2340 "GlobalTableNotFoundException",
2341 format!("Global table not found: {global_table_name}"),
2342 )
2343 })?;
2344
2345 Self::ok_json(json!({
2346 "GlobalTableDescription": {
2347 "GlobalTableName": gt.global_table_name,
2348 "GlobalTableArn": gt.global_table_arn,
2349 "GlobalTableStatus": gt.global_table_status,
2350 "CreationDateTime": gt.creation_date.timestamp() as f64,
2351 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2352 "RegionName": r.region_name,
2353 "ReplicaStatus": r.replica_status
2354 })).collect::<Vec<_>>()
2355 }
2356 }))
2357 }
2358
2359 fn describe_global_table_settings(
2360 &self,
2361 req: &AwsRequest,
2362 ) -> Result<AwsResponse, AwsServiceError> {
2363 let body = Self::parse_body(req)?;
2364 let global_table_name = require_str(&body, "GlobalTableName")?;
2365 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2366
2367 let state = self.state.read();
2368 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2369 AwsServiceError::aws_error(
2370 StatusCode::BAD_REQUEST,
2371 "GlobalTableNotFoundException",
2372 format!("Global table not found: {global_table_name}"),
2373 )
2374 })?;
2375
2376 let replica_settings: Vec<Value> = gt
2377 .replication_group
2378 .iter()
2379 .map(|r| {
2380 json!({
2381 "RegionName": r.region_name,
2382 "ReplicaStatus": r.replica_status,
2383 "ReplicaProvisionedReadCapacityUnits": 0,
2384 "ReplicaProvisionedWriteCapacityUnits": 0
2385 })
2386 })
2387 .collect();
2388
2389 Self::ok_json(json!({
2390 "GlobalTableName": gt.global_table_name,
2391 "ReplicaSettings": replica_settings
2392 }))
2393 }
2394
2395 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396 let body = Self::parse_body(req)?;
2397 validate_optional_string_length(
2398 "exclusiveStartGlobalTableName",
2399 body["ExclusiveStartGlobalTableName"].as_str(),
2400 3,
2401 255,
2402 )?;
2403 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2404 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2405
2406 let state = self.state.read();
2407 let tables: Vec<Value> = state
2408 .global_tables
2409 .values()
2410 .take(limit)
2411 .map(|gt| {
2412 json!({
2413 "GlobalTableName": gt.global_table_name,
2414 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2415 "RegionName": r.region_name
2416 })).collect::<Vec<_>>()
2417 })
2418 })
2419 .collect();
2420
2421 Self::ok_json(json!({
2422 "GlobalTables": tables
2423 }))
2424 }
2425
2426 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2427 let body = Self::parse_body(req)?;
2428 let global_table_name = require_str(&body, "GlobalTableName")?;
2429 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2430 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2431
2432 let mut state = self.state.write();
2433 let gt = state
2434 .global_tables
2435 .get_mut(global_table_name)
2436 .ok_or_else(|| {
2437 AwsServiceError::aws_error(
2438 StatusCode::BAD_REQUEST,
2439 "GlobalTableNotFoundException",
2440 format!("Global table not found: {global_table_name}"),
2441 )
2442 })?;
2443
2444 if let Some(updates) = body["ReplicaUpdates"].as_array() {
2445 for update in updates {
2446 if let Some(create) = update["Create"].as_object() {
2447 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2448 gt.replication_group.push(ReplicaDescription {
2449 region_name: region.to_string(),
2450 replica_status: "ACTIVE".to_string(),
2451 });
2452 }
2453 }
2454 if let Some(delete) = update["Delete"].as_object() {
2455 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2456 gt.replication_group.retain(|r| r.region_name != region);
2457 }
2458 }
2459 }
2460 }
2461
2462 Self::ok_json(json!({
2463 "GlobalTableDescription": {
2464 "GlobalTableName": gt.global_table_name,
2465 "GlobalTableArn": gt.global_table_arn,
2466 "GlobalTableStatus": gt.global_table_status,
2467 "CreationDateTime": gt.creation_date.timestamp() as f64,
2468 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2469 "RegionName": r.region_name,
2470 "ReplicaStatus": r.replica_status
2471 })).collect::<Vec<_>>()
2472 }
2473 }))
2474 }
2475
2476 fn update_global_table_settings(
2477 &self,
2478 req: &AwsRequest,
2479 ) -> Result<AwsResponse, AwsServiceError> {
2480 let body = Self::parse_body(req)?;
2481 let global_table_name = require_str(&body, "GlobalTableName")?;
2482 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2483 validate_optional_enum_value(
2484 "globalTableBillingMode",
2485 &body["GlobalTableBillingMode"],
2486 &["PROVISIONED", "PAY_PER_REQUEST"],
2487 )?;
2488 validate_optional_range_i64(
2489 "globalTableProvisionedWriteCapacityUnits",
2490 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2491 1,
2492 i64::MAX,
2493 )?;
2494
2495 let state = self.state.read();
2496 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2497 AwsServiceError::aws_error(
2498 StatusCode::BAD_REQUEST,
2499 "GlobalTableNotFoundException",
2500 format!("Global table not found: {global_table_name}"),
2501 )
2502 })?;
2503
2504 let replica_settings: Vec<Value> = gt
2505 .replication_group
2506 .iter()
2507 .map(|r| {
2508 json!({
2509 "RegionName": r.region_name,
2510 "ReplicaStatus": r.replica_status,
2511 "ReplicaProvisionedReadCapacityUnits": 0,
2512 "ReplicaProvisionedWriteCapacityUnits": 0
2513 })
2514 })
2515 .collect();
2516
2517 Self::ok_json(json!({
2518 "GlobalTableName": gt.global_table_name,
2519 "ReplicaSettings": replica_settings
2520 }))
2521 }
2522
2523 fn describe_table_replica_auto_scaling(
2524 &self,
2525 req: &AwsRequest,
2526 ) -> Result<AwsResponse, AwsServiceError> {
2527 let body = Self::parse_body(req)?;
2528 let table_name = require_str(&body, "TableName")?;
2529
2530 let state = self.state.read();
2531 let table = get_table(&state.tables, table_name)?;
2532
2533 Self::ok_json(json!({
2534 "TableAutoScalingDescription": {
2535 "TableName": table.name,
2536 "TableStatus": table.status,
2537 "Replicas": []
2538 }
2539 }))
2540 }
2541
2542 fn update_table_replica_auto_scaling(
2543 &self,
2544 req: &AwsRequest,
2545 ) -> Result<AwsResponse, AwsServiceError> {
2546 let body = Self::parse_body(req)?;
2547 let table_name = require_str(&body, "TableName")?;
2548
2549 let state = self.state.read();
2550 let table = get_table(&state.tables, table_name)?;
2551
2552 Self::ok_json(json!({
2553 "TableAutoScalingDescription": {
2554 "TableName": table.name,
2555 "TableStatus": table.status,
2556 "Replicas": []
2557 }
2558 }))
2559 }
2560
2561 fn enable_kinesis_streaming_destination(
2564 &self,
2565 req: &AwsRequest,
2566 ) -> Result<AwsResponse, AwsServiceError> {
2567 let body = Self::parse_body(req)?;
2568 let table_name = require_str(&body, "TableName")?;
2569 let stream_arn = require_str(&body, "StreamArn")?;
2570 let precision = body["EnableKinesisStreamingConfiguration"]
2571 ["ApproximateCreationDateTimePrecision"]
2572 .as_str()
2573 .unwrap_or("MILLISECOND");
2574
2575 let mut state = self.state.write();
2576 let table = get_table_mut(&mut state.tables, table_name)?;
2577
2578 table.kinesis_destinations.push(KinesisDestination {
2579 stream_arn: stream_arn.to_string(),
2580 destination_status: "ACTIVE".to_string(),
2581 approximate_creation_date_time_precision: precision.to_string(),
2582 });
2583
2584 Self::ok_json(json!({
2585 "TableName": table_name,
2586 "StreamArn": stream_arn,
2587 "DestinationStatus": "ACTIVE",
2588 "EnableKinesisStreamingConfiguration": {
2589 "ApproximateCreationDateTimePrecision": precision
2590 }
2591 }))
2592 }
2593
2594 fn disable_kinesis_streaming_destination(
2595 &self,
2596 req: &AwsRequest,
2597 ) -> Result<AwsResponse, AwsServiceError> {
2598 let body = Self::parse_body(req)?;
2599 let table_name = require_str(&body, "TableName")?;
2600 let stream_arn = require_str(&body, "StreamArn")?;
2601
2602 let mut state = self.state.write();
2603 let table = get_table_mut(&mut state.tables, table_name)?;
2604
2605 if let Some(dest) = table
2606 .kinesis_destinations
2607 .iter_mut()
2608 .find(|d| d.stream_arn == stream_arn)
2609 {
2610 dest.destination_status = "DISABLED".to_string();
2611 }
2612
2613 Self::ok_json(json!({
2614 "TableName": table_name,
2615 "StreamArn": stream_arn,
2616 "DestinationStatus": "DISABLED"
2617 }))
2618 }
2619
2620 fn describe_kinesis_streaming_destination(
2621 &self,
2622 req: &AwsRequest,
2623 ) -> Result<AwsResponse, AwsServiceError> {
2624 let body = Self::parse_body(req)?;
2625 let table_name = require_str(&body, "TableName")?;
2626
2627 let state = self.state.read();
2628 let table = get_table(&state.tables, table_name)?;
2629
2630 let destinations: Vec<Value> = table
2631 .kinesis_destinations
2632 .iter()
2633 .map(|d| {
2634 json!({
2635 "StreamArn": d.stream_arn,
2636 "DestinationStatus": d.destination_status,
2637 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2638 })
2639 })
2640 .collect();
2641
2642 Self::ok_json(json!({
2643 "TableName": table_name,
2644 "KinesisDataStreamDestinations": destinations
2645 }))
2646 }
2647
2648 fn update_kinesis_streaming_destination(
2649 &self,
2650 req: &AwsRequest,
2651 ) -> Result<AwsResponse, AwsServiceError> {
2652 let body = Self::parse_body(req)?;
2653 let table_name = require_str(&body, "TableName")?;
2654 let stream_arn = require_str(&body, "StreamArn")?;
2655 let precision = body["UpdateKinesisStreamingConfiguration"]
2656 ["ApproximateCreationDateTimePrecision"]
2657 .as_str()
2658 .unwrap_or("MILLISECOND");
2659
2660 let mut state = self.state.write();
2661 let table = get_table_mut(&mut state.tables, table_name)?;
2662
2663 if let Some(dest) = table
2664 .kinesis_destinations
2665 .iter_mut()
2666 .find(|d| d.stream_arn == stream_arn)
2667 {
2668 dest.approximate_creation_date_time_precision = precision.to_string();
2669 }
2670
2671 Self::ok_json(json!({
2672 "TableName": table_name,
2673 "StreamArn": stream_arn,
2674 "DestinationStatus": "ACTIVE",
2675 "UpdateKinesisStreamingConfiguration": {
2676 "ApproximateCreationDateTimePrecision": precision
2677 }
2678 }))
2679 }
2680
2681 fn describe_contributor_insights(
2684 &self,
2685 req: &AwsRequest,
2686 ) -> Result<AwsResponse, AwsServiceError> {
2687 let body = Self::parse_body(req)?;
2688 let table_name = require_str(&body, "TableName")?;
2689 let index_name = body["IndexName"].as_str();
2690
2691 let state = self.state.read();
2692 let table = get_table(&state.tables, table_name)?;
2693
2694 let top = table.top_contributors(10);
2695 let contributors: Vec<Value> = top
2696 .iter()
2697 .map(|(key, count)| {
2698 json!({
2699 "Key": key,
2700 "Count": count
2701 })
2702 })
2703 .collect();
2704
2705 let mut result = json!({
2706 "TableName": table_name,
2707 "ContributorInsightsStatus": table.contributor_insights_status,
2708 "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2709 "TopContributors": contributors
2710 });
2711 if let Some(idx) = index_name {
2712 result["IndexName"] = json!(idx);
2713 }
2714
2715 Self::ok_json(result)
2716 }
2717
2718 fn update_contributor_insights(
2719 &self,
2720 req: &AwsRequest,
2721 ) -> Result<AwsResponse, AwsServiceError> {
2722 let body = Self::parse_body(req)?;
2723 let table_name = require_str(&body, "TableName")?;
2724 let action = require_str(&body, "ContributorInsightsAction")?;
2725 let index_name = body["IndexName"].as_str();
2726
2727 let mut state = self.state.write();
2728 let table = get_table_mut(&mut state.tables, table_name)?;
2729
2730 let status = match action {
2731 "ENABLE" => "ENABLED",
2732 "DISABLE" => "DISABLED",
2733 _ => {
2734 return Err(AwsServiceError::aws_error(
2735 StatusCode::BAD_REQUEST,
2736 "ValidationException",
2737 format!("Invalid ContributorInsightsAction: {action}"),
2738 ))
2739 }
2740 };
2741 table.contributor_insights_status = status.to_string();
2742 if status == "DISABLED" {
2743 table.contributor_insights_counters.clear();
2744 }
2745
2746 let mut result = json!({
2747 "TableName": table_name,
2748 "ContributorInsightsStatus": status
2749 });
2750 if let Some(idx) = index_name {
2751 result["IndexName"] = json!(idx);
2752 }
2753
2754 Self::ok_json(result)
2755 }
2756
2757 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2758 let body = Self::parse_body(req)?;
2759 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2760 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2761 let table_name = body["TableName"].as_str();
2762
2763 let state = self.state.read();
2764 let summaries: Vec<Value> = state
2765 .tables
2766 .values()
2767 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2768 .map(|t| {
2769 json!({
2770 "TableName": t.name,
2771 "ContributorInsightsStatus": t.contributor_insights_status
2772 })
2773 })
2774 .collect();
2775
2776 Self::ok_json(json!({
2777 "ContributorInsightsSummaries": summaries
2778 }))
2779 }
2780
2781 fn export_table_to_point_in_time(
2784 &self,
2785 req: &AwsRequest,
2786 ) -> Result<AwsResponse, AwsServiceError> {
2787 let body = Self::parse_body(req)?;
2788 let table_arn = require_str(&body, "TableArn")?;
2789 let s3_bucket = require_str(&body, "S3Bucket")?;
2790 let s3_prefix = body["S3Prefix"].as_str();
2791 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2792
2793 let state = self.state.read();
2794 let table = find_table_by_arn(&state.tables, table_arn)?;
2796 let items = table.items.clone();
2797 let item_count = items.len() as i64;
2798
2799 let now = Utc::now();
2800 let export_arn = format!(
2801 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2802 state.region,
2803 state.account_id,
2804 table_arn.rsplit('/').next().unwrap_or("unknown"),
2805 uuid::Uuid::new_v4()
2806 );
2807
2808 drop(state);
2809
2810 let mut json_lines = String::new();
2812 for item in &items {
2813 let item_json = if export_format == "DYNAMODB_JSON" {
2814 json!({ "Item": item })
2815 } else {
2816 json!(item)
2817 };
2818 json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2819 json_lines.push('\n');
2820 }
2821 let data_size = json_lines.len() as i64;
2822
2823 let s3_key = if let Some(prefix) = s3_prefix {
2825 format!("{prefix}/data/manifest-files.json")
2826 } else {
2827 "data/manifest-files.json".to_string()
2828 };
2829
2830 let mut export_failed = false;
2832 let mut failure_reason = String::new();
2833 if let Some(ref s3_state) = self.s3_state {
2834 let mut s3 = s3_state.write();
2835 if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2836 let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2837 let obj = fakecloud_s3::state::S3Object {
2838 key: s3_key.clone(),
2839 data: bytes::Bytes::from(json_lines),
2840 content_type: "application/json".to_string(),
2841 etag,
2842 size: data_size as u64,
2843 last_modified: now,
2844 metadata: HashMap::new(),
2845 storage_class: "STANDARD".to_string(),
2846 tags: HashMap::new(),
2847 acl_grants: Vec::new(),
2848 acl_owner_id: None,
2849 parts_count: None,
2850 part_sizes: None,
2851 sse_algorithm: None,
2852 sse_kms_key_id: None,
2853 bucket_key_enabled: None,
2854 version_id: None,
2855 is_delete_marker: false,
2856 content_encoding: None,
2857 website_redirect_location: None,
2858 restore_ongoing: None,
2859 restore_expiry: None,
2860 checksum_algorithm: None,
2861 checksum_value: None,
2862 lock_mode: None,
2863 lock_retain_until: None,
2864 lock_legal_hold: None,
2865 };
2866 bucket.objects.insert(s3_key, obj);
2867 } else {
2868 export_failed = true;
2869 failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2870 }
2871 }
2872
2873 let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2874
2875 let export = ExportDescription {
2876 export_arn: export_arn.clone(),
2877 export_status: export_status.to_string(),
2878 table_arn: table_arn.to_string(),
2879 s3_bucket: s3_bucket.to_string(),
2880 s3_prefix: s3_prefix.map(|s| s.to_string()),
2881 export_format: export_format.to_string(),
2882 start_time: now,
2883 end_time: now,
2884 export_time: now,
2885 item_count,
2886 billed_size_bytes: data_size,
2887 };
2888
2889 let mut state = self.state.write();
2890 state.exports.insert(export_arn.clone(), export);
2891
2892 let mut response = json!({
2893 "ExportDescription": {
2894 "ExportArn": export_arn,
2895 "ExportStatus": export_status,
2896 "TableArn": table_arn,
2897 "S3Bucket": s3_bucket,
2898 "S3Prefix": s3_prefix,
2899 "ExportFormat": export_format,
2900 "StartTime": now.timestamp() as f64,
2901 "EndTime": now.timestamp() as f64,
2902 "ExportTime": now.timestamp() as f64,
2903 "ItemCount": item_count,
2904 "BilledSizeBytes": data_size
2905 }
2906 });
2907 if export_failed {
2908 response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2909 response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2910 }
2911 Self::ok_json(response)
2912 }
2913
2914 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2915 let body = Self::parse_body(req)?;
2916 let export_arn = require_str(&body, "ExportArn")?;
2917
2918 let state = self.state.read();
2919 let export = state.exports.get(export_arn).ok_or_else(|| {
2920 AwsServiceError::aws_error(
2921 StatusCode::BAD_REQUEST,
2922 "ExportNotFoundException",
2923 format!("Export not found: {export_arn}"),
2924 )
2925 })?;
2926
2927 Self::ok_json(json!({
2928 "ExportDescription": {
2929 "ExportArn": export.export_arn,
2930 "ExportStatus": export.export_status,
2931 "TableArn": export.table_arn,
2932 "S3Bucket": export.s3_bucket,
2933 "S3Prefix": export.s3_prefix,
2934 "ExportFormat": export.export_format,
2935 "StartTime": export.start_time.timestamp() as f64,
2936 "EndTime": export.end_time.timestamp() as f64,
2937 "ExportTime": export.export_time.timestamp() as f64,
2938 "ItemCount": export.item_count,
2939 "BilledSizeBytes": export.billed_size_bytes
2940 }
2941 }))
2942 }
2943
2944 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2945 let body = Self::parse_body(req)?;
2946 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2947 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2948 let table_arn = body["TableArn"].as_str();
2949
2950 let state = self.state.read();
2951 let summaries: Vec<Value> = state
2952 .exports
2953 .values()
2954 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2955 .map(|e| {
2956 json!({
2957 "ExportArn": e.export_arn,
2958 "ExportStatus": e.export_status,
2959 "TableArn": e.table_arn
2960 })
2961 })
2962 .collect();
2963
2964 Self::ok_json(json!({
2965 "ExportSummaries": summaries
2966 }))
2967 }
2968
2969 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2970 let body = Self::parse_body(req)?;
2971 let input_format = require_str(&body, "InputFormat")?;
2972 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2973 AwsServiceError::aws_error(
2974 StatusCode::BAD_REQUEST,
2975 "ValidationException",
2976 "S3BucketSource is required",
2977 )
2978 })?;
2979 let s3_bucket = s3_source
2980 .get("S3Bucket")
2981 .and_then(|v| v.as_str())
2982 .unwrap_or("");
2983 let s3_key_prefix = s3_source
2984 .get("S3KeyPrefix")
2985 .and_then(|v| v.as_str())
2986 .unwrap_or("");
2987
2988 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2989 AwsServiceError::aws_error(
2990 StatusCode::BAD_REQUEST,
2991 "ValidationException",
2992 "TableCreationParameters is required",
2993 )
2994 })?;
2995 let table_name = table_params
2996 .get("TableName")
2997 .and_then(|v| v.as_str())
2998 .ok_or_else(|| {
2999 AwsServiceError::aws_error(
3000 StatusCode::BAD_REQUEST,
3001 "ValidationException",
3002 "TableCreationParameters.TableName is required",
3003 )
3004 })?;
3005
3006 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3007 let attribute_definitions = parse_attribute_definitions(
3008 table_params
3009 .get("AttributeDefinitions")
3010 .unwrap_or(&Value::Null),
3011 )?;
3012
3013 let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3015 let mut processed_size_bytes: i64 = 0;
3016 if let Some(ref s3_state) = self.s3_state {
3017 let s3 = s3_state.read();
3018 let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3019 AwsServiceError::aws_error(
3020 StatusCode::BAD_REQUEST,
3021 "ImportConflictException",
3022 format!("S3 bucket does not exist: {s3_bucket}"),
3023 )
3024 })?;
3025 let prefix = if s3_key_prefix.is_empty() {
3027 String::new()
3028 } else {
3029 s3_key_prefix.to_string()
3030 };
3031 for (key, obj) in &bucket.objects {
3032 if !prefix.is_empty() && !key.starts_with(&prefix) {
3033 continue;
3034 }
3035 let data = std::str::from_utf8(&obj.data).unwrap_or("");
3036 processed_size_bytes += obj.size as i64;
3037 for line in data.lines() {
3038 let line = line.trim();
3039 if line.is_empty() {
3040 continue;
3041 }
3042 if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3043 let item = if input_format == "DYNAMODB_JSON" {
3045 if let Some(item_obj) = parsed.get("Item") {
3046 item_obj.as_object().cloned().unwrap_or_default()
3047 } else {
3048 parsed.as_object().cloned().unwrap_or_default()
3049 }
3050 } else {
3051 parsed.as_object().cloned().unwrap_or_default()
3052 };
3053 if !item.is_empty() {
3054 imported_items
3055 .push(item.into_iter().collect::<HashMap<String, Value>>());
3056 }
3057 }
3058 }
3059 }
3060 }
3061
3062 let mut state = self.state.write();
3063
3064 if state.tables.contains_key(table_name) {
3065 return Err(AwsServiceError::aws_error(
3066 StatusCode::BAD_REQUEST,
3067 "ResourceInUseException",
3068 format!("Table already exists: {table_name}"),
3069 ));
3070 }
3071
3072 let now = Utc::now();
3073 let table_arn = format!(
3074 "arn:aws:dynamodb:{}:{}:table/{}",
3075 state.region, state.account_id, table_name
3076 );
3077 let import_arn = format!(
3078 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3079 state.region,
3080 state.account_id,
3081 table_name,
3082 uuid::Uuid::new_v4()
3083 );
3084
3085 let processed_item_count = imported_items.len() as i64;
3086
3087 let mut table = DynamoTable {
3088 name: table_name.to_string(),
3089 arn: table_arn.clone(),
3090 key_schema,
3091 attribute_definitions,
3092 provisioned_throughput: ProvisionedThroughput {
3093 read_capacity_units: 0,
3094 write_capacity_units: 0,
3095 },
3096 items: imported_items,
3097 gsi: Vec::new(),
3098 lsi: Vec::new(),
3099 tags: HashMap::new(),
3100 created_at: now,
3101 status: "ACTIVE".to_string(),
3102 item_count: 0,
3103 size_bytes: 0,
3104 billing_mode: "PAY_PER_REQUEST".to_string(),
3105 ttl_attribute: None,
3106 ttl_enabled: false,
3107 resource_policy: None,
3108 pitr_enabled: false,
3109 kinesis_destinations: Vec::new(),
3110 contributor_insights_status: "DISABLED".to_string(),
3111 contributor_insights_counters: HashMap::new(),
3112 stream_enabled: false,
3113 stream_view_type: None,
3114 stream_arn: None,
3115 stream_records: Arc::new(RwLock::new(Vec::new())),
3116 sse_type: None,
3117 sse_kms_key_arn: None,
3118 };
3119 table.recalculate_stats();
3120 state.tables.insert(table_name.to_string(), table);
3121
3122 let import_desc = ImportDescription {
3123 import_arn: import_arn.clone(),
3124 import_status: "COMPLETED".to_string(),
3125 table_arn: table_arn.clone(),
3126 table_name: table_name.to_string(),
3127 s3_bucket_source: s3_bucket.to_string(),
3128 input_format: input_format.to_string(),
3129 start_time: now,
3130 end_time: now,
3131 processed_item_count,
3132 processed_size_bytes,
3133 };
3134 state.imports.insert(import_arn.clone(), import_desc);
3135
3136 let table_ref = state.tables.get(table_name).unwrap();
3137 let ks: Vec<Value> = table_ref
3138 .key_schema
3139 .iter()
3140 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3141 .collect();
3142 let ad: Vec<Value> = table_ref
3143 .attribute_definitions
3144 .iter()
3145 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3146 .collect();
3147
3148 Self::ok_json(json!({
3149 "ImportTableDescription": {
3150 "ImportArn": import_arn,
3151 "ImportStatus": "COMPLETED",
3152 "TableArn": table_arn,
3153 "TableId": uuid::Uuid::new_v4().to_string(),
3154 "S3BucketSource": {
3155 "S3Bucket": s3_bucket
3156 },
3157 "InputFormat": input_format,
3158 "TableCreationParameters": {
3159 "TableName": table_name,
3160 "KeySchema": ks,
3161 "AttributeDefinitions": ad
3162 },
3163 "StartTime": now.timestamp() as f64,
3164 "EndTime": now.timestamp() as f64,
3165 "ProcessedItemCount": processed_item_count,
3166 "ProcessedSizeBytes": processed_size_bytes
3167 }
3168 }))
3169 }
3170
3171 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3172 let body = Self::parse_body(req)?;
3173 let import_arn = require_str(&body, "ImportArn")?;
3174
3175 let state = self.state.read();
3176 let import = state.imports.get(import_arn).ok_or_else(|| {
3177 AwsServiceError::aws_error(
3178 StatusCode::BAD_REQUEST,
3179 "ImportNotFoundException",
3180 format!("Import not found: {import_arn}"),
3181 )
3182 })?;
3183
3184 Self::ok_json(json!({
3185 "ImportTableDescription": {
3186 "ImportArn": import.import_arn,
3187 "ImportStatus": import.import_status,
3188 "TableArn": import.table_arn,
3189 "S3BucketSource": {
3190 "S3Bucket": import.s3_bucket_source
3191 },
3192 "InputFormat": import.input_format,
3193 "StartTime": import.start_time.timestamp() as f64,
3194 "EndTime": import.end_time.timestamp() as f64,
3195 "ProcessedItemCount": import.processed_item_count,
3196 "ProcessedSizeBytes": import.processed_size_bytes
3197 }
3198 }))
3199 }
3200
3201 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3202 let body = Self::parse_body(req)?;
3203 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3204 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3205 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3206 let table_arn = body["TableArn"].as_str();
3207
3208 let state = self.state.read();
3209 let summaries: Vec<Value> = state
3210 .imports
3211 .values()
3212 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3213 .map(|i| {
3214 json!({
3215 "ImportArn": i.import_arn,
3216 "ImportStatus": i.import_status,
3217 "TableArn": i.table_arn
3218 })
3219 })
3220 .collect();
3221
3222 Self::ok_json(json!({
3223 "ImportSummaryList": summaries
3224 }))
3225 }
3226}
3227
3228#[async_trait]
3229impl AwsService for DynamoDbService {
3230 fn service_name(&self) -> &str {
3231 "dynamodb"
3232 }
3233
3234 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3235 match req.action.as_str() {
3236 "CreateTable" => self.create_table(&req),
3237 "DeleteTable" => self.delete_table(&req),
3238 "DescribeTable" => self.describe_table(&req),
3239 "ListTables" => self.list_tables(&req),
3240 "UpdateTable" => self.update_table(&req),
3241 "PutItem" => self.put_item(&req),
3242 "GetItem" => self.get_item(&req),
3243 "DeleteItem" => self.delete_item(&req),
3244 "UpdateItem" => self.update_item(&req),
3245 "Query" => self.query(&req),
3246 "Scan" => self.scan(&req),
3247 "BatchGetItem" => self.batch_get_item(&req),
3248 "BatchWriteItem" => self.batch_write_item(&req),
3249 "TagResource" => self.tag_resource(&req),
3250 "UntagResource" => self.untag_resource(&req),
3251 "ListTagsOfResource" => self.list_tags_of_resource(&req),
3252 "TransactGetItems" => self.transact_get_items(&req),
3253 "TransactWriteItems" => self.transact_write_items(&req),
3254 "ExecuteStatement" => self.execute_statement(&req),
3255 "BatchExecuteStatement" => self.batch_execute_statement(&req),
3256 "ExecuteTransaction" => self.execute_transaction(&req),
3257 "UpdateTimeToLive" => self.update_time_to_live(&req),
3258 "DescribeTimeToLive" => self.describe_time_to_live(&req),
3259 "PutResourcePolicy" => self.put_resource_policy(&req),
3260 "GetResourcePolicy" => self.get_resource_policy(&req),
3261 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3262 "DescribeEndpoints" => self.describe_endpoints(&req),
3264 "DescribeLimits" => self.describe_limits(&req),
3265 "CreateBackup" => self.create_backup(&req),
3267 "DeleteBackup" => self.delete_backup(&req),
3268 "DescribeBackup" => self.describe_backup(&req),
3269 "ListBackups" => self.list_backups(&req),
3270 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3271 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3272 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3273 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3274 "CreateGlobalTable" => self.create_global_table(&req),
3276 "DescribeGlobalTable" => self.describe_global_table(&req),
3277 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3278 "ListGlobalTables" => self.list_global_tables(&req),
3279 "UpdateGlobalTable" => self.update_global_table(&req),
3280 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3281 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3282 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3283 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3285 "DisableKinesisStreamingDestination" => {
3286 self.disable_kinesis_streaming_destination(&req)
3287 }
3288 "DescribeKinesisStreamingDestination" => {
3289 self.describe_kinesis_streaming_destination(&req)
3290 }
3291 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3292 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3294 "UpdateContributorInsights" => self.update_contributor_insights(&req),
3295 "ListContributorInsights" => self.list_contributor_insights(&req),
3296 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3298 "DescribeExport" => self.describe_export(&req),
3299 "ListExports" => self.list_exports(&req),
3300 "ImportTable" => self.import_table(&req),
3301 "DescribeImport" => self.describe_import(&req),
3302 "ListImports" => self.list_imports(&req),
3303 _ => Err(AwsServiceError::action_not_implemented(
3304 "dynamodb",
3305 &req.action,
3306 )),
3307 }
3308 }
3309
3310 fn supported_actions(&self) -> &[&str] {
3311 &[
3312 "CreateTable",
3313 "DeleteTable",
3314 "DescribeTable",
3315 "ListTables",
3316 "UpdateTable",
3317 "PutItem",
3318 "GetItem",
3319 "DeleteItem",
3320 "UpdateItem",
3321 "Query",
3322 "Scan",
3323 "BatchGetItem",
3324 "BatchWriteItem",
3325 "TagResource",
3326 "UntagResource",
3327 "ListTagsOfResource",
3328 "TransactGetItems",
3329 "TransactWriteItems",
3330 "ExecuteStatement",
3331 "BatchExecuteStatement",
3332 "ExecuteTransaction",
3333 "UpdateTimeToLive",
3334 "DescribeTimeToLive",
3335 "PutResourcePolicy",
3336 "GetResourcePolicy",
3337 "DeleteResourcePolicy",
3338 "DescribeEndpoints",
3339 "DescribeLimits",
3340 "CreateBackup",
3341 "DeleteBackup",
3342 "DescribeBackup",
3343 "ListBackups",
3344 "RestoreTableFromBackup",
3345 "RestoreTableToPointInTime",
3346 "UpdateContinuousBackups",
3347 "DescribeContinuousBackups",
3348 "CreateGlobalTable",
3349 "DescribeGlobalTable",
3350 "DescribeGlobalTableSettings",
3351 "ListGlobalTables",
3352 "UpdateGlobalTable",
3353 "UpdateGlobalTableSettings",
3354 "DescribeTableReplicaAutoScaling",
3355 "UpdateTableReplicaAutoScaling",
3356 "EnableKinesisStreamingDestination",
3357 "DisableKinesisStreamingDestination",
3358 "DescribeKinesisStreamingDestination",
3359 "UpdateKinesisStreamingDestination",
3360 "DescribeContributorInsights",
3361 "UpdateContributorInsights",
3362 "ListContributorInsights",
3363 "ExportTableToPointInTime",
3364 "DescribeExport",
3365 "ListExports",
3366 "ImportTable",
3367 "DescribeImport",
3368 "ListImports",
3369 ]
3370 }
3371}
3372
3373fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3376 body[field].as_str().ok_or_else(|| {
3377 AwsServiceError::aws_error(
3378 StatusCode::BAD_REQUEST,
3379 "ValidationException",
3380 format!("{field} is required"),
3381 )
3382 })
3383}
3384
3385fn require_object(
3386 body: &Value,
3387 field: &str,
3388) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3389 let obj = body[field].as_object().ok_or_else(|| {
3390 AwsServiceError::aws_error(
3391 StatusCode::BAD_REQUEST,
3392 "ValidationException",
3393 format!("{field} is required"),
3394 )
3395 })?;
3396 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3397}
3398
3399fn get_table<'a>(
3400 tables: &'a HashMap<String, DynamoTable>,
3401 name: &str,
3402) -> Result<&'a DynamoTable, AwsServiceError> {
3403 tables.get(name).ok_or_else(|| {
3404 AwsServiceError::aws_error(
3405 StatusCode::BAD_REQUEST,
3406 "ResourceNotFoundException",
3407 format!("Requested resource not found: Table: {name} not found"),
3408 )
3409 })
3410}
3411
3412fn get_table_mut<'a>(
3413 tables: &'a mut HashMap<String, DynamoTable>,
3414 name: &str,
3415) -> Result<&'a mut DynamoTable, AwsServiceError> {
3416 tables.get_mut(name).ok_or_else(|| {
3417 AwsServiceError::aws_error(
3418 StatusCode::BAD_REQUEST,
3419 "ResourceNotFoundException",
3420 format!("Requested resource not found: Table: {name} not found"),
3421 )
3422 })
3423}
3424
3425fn find_table_by_arn<'a>(
3426 tables: &'a HashMap<String, DynamoTable>,
3427 arn: &str,
3428) -> Result<&'a DynamoTable, AwsServiceError> {
3429 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3430 AwsServiceError::aws_error(
3431 StatusCode::BAD_REQUEST,
3432 "ResourceNotFoundException",
3433 format!("Requested resource not found: {arn}"),
3434 )
3435 })
3436}
3437
3438fn find_table_by_arn_mut<'a>(
3439 tables: &'a mut HashMap<String, DynamoTable>,
3440 arn: &str,
3441) -> Result<&'a mut DynamoTable, AwsServiceError> {
3442 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3443 AwsServiceError::aws_error(
3444 StatusCode::BAD_REQUEST,
3445 "ResourceNotFoundException",
3446 format!("Requested resource not found: {arn}"),
3447 )
3448 })
3449}
3450
3451fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3452 let arr = val.as_array().ok_or_else(|| {
3453 AwsServiceError::aws_error(
3454 StatusCode::BAD_REQUEST,
3455 "ValidationException",
3456 "KeySchema is required",
3457 )
3458 })?;
3459 Ok(arr
3460 .iter()
3461 .map(|elem| KeySchemaElement {
3462 attribute_name: elem["AttributeName"]
3463 .as_str()
3464 .unwrap_or_default()
3465 .to_string(),
3466 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3467 })
3468 .collect())
3469}
3470
3471fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3472 let arr = val.as_array().ok_or_else(|| {
3473 AwsServiceError::aws_error(
3474 StatusCode::BAD_REQUEST,
3475 "ValidationException",
3476 "AttributeDefinitions is required",
3477 )
3478 })?;
3479 Ok(arr
3480 .iter()
3481 .map(|elem| AttributeDefinition {
3482 attribute_name: elem["AttributeName"]
3483 .as_str()
3484 .unwrap_or_default()
3485 .to_string(),
3486 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3487 })
3488 .collect())
3489}
3490
3491fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3492 Ok(ProvisionedThroughput {
3493 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3494 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3495 })
3496}
3497
3498fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3499 let Some(arr) = val.as_array() else {
3500 return Vec::new();
3501 };
3502 arr.iter()
3503 .filter_map(|g| {
3504 Some(GlobalSecondaryIndex {
3505 index_name: g["IndexName"].as_str()?.to_string(),
3506 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3507 projection: parse_projection(&g["Projection"]),
3508 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3509 .ok(),
3510 })
3511 })
3512 .collect()
3513}
3514
3515fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3516 let Some(arr) = val.as_array() else {
3517 return Vec::new();
3518 };
3519 arr.iter()
3520 .filter_map(|l| {
3521 Some(LocalSecondaryIndex {
3522 index_name: l["IndexName"].as_str()?.to_string(),
3523 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3524 projection: parse_projection(&l["Projection"]),
3525 })
3526 })
3527 .collect()
3528}
3529
3530fn parse_projection(val: &Value) -> Projection {
3531 Projection {
3532 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3533 non_key_attributes: val["NonKeyAttributes"]
3534 .as_array()
3535 .map(|arr| {
3536 arr.iter()
3537 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3538 .collect()
3539 })
3540 .unwrap_or_default(),
3541 }
3542}
3543
3544fn parse_tags(val: &Value) -> HashMap<String, String> {
3545 let mut tags = HashMap::new();
3546 if let Some(arr) = val.as_array() {
3547 for tag in arr {
3548 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3549 tags.insert(k.to_string(), v.to_string());
3550 }
3551 }
3552 }
3553 tags
3554}
3555
3556fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3557 let mut names = HashMap::new();
3558 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3559 for (k, v) in obj {
3560 if let Some(s) = v.as_str() {
3561 names.insert(k.clone(), s.to_string());
3562 }
3563 }
3564 }
3565 names
3566}
3567
3568fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3569 let mut values = HashMap::new();
3570 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3571 for (k, v) in obj {
3572 values.insert(k.clone(), v.clone());
3573 }
3574 }
3575 values
3576}
3577
3578fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3579 if name.starts_with('#') {
3580 expr_attr_names
3581 .get(name)
3582 .cloned()
3583 .unwrap_or_else(|| name.to_string())
3584 } else {
3585 name.to_string()
3586 }
3587}
3588
3589fn extract_key(
3590 table: &DynamoTable,
3591 item: &HashMap<String, AttributeValue>,
3592) -> HashMap<String, AttributeValue> {
3593 let mut key = HashMap::new();
3594 let hash_key = table.hash_key_name();
3595 if let Some(v) = item.get(hash_key) {
3596 key.insert(hash_key.to_string(), v.clone());
3597 }
3598 if let Some(range_key) = table.range_key_name() {
3599 if let Some(v) = item.get(range_key) {
3600 key.insert(range_key.to_string(), v.clone());
3601 }
3602 }
3603 key
3604}
3605
3606fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3608 let obj = value.as_object()?;
3609 if obj.is_empty() {
3610 return None;
3611 }
3612 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3613}
3614
3615fn item_matches_key(
3617 item: &HashMap<String, AttributeValue>,
3618 key: &HashMap<String, AttributeValue>,
3619 hash_key_name: &str,
3620 range_key_name: Option<&str>,
3621) -> bool {
3622 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3623 (Some(a), Some(b)) => a == b,
3624 _ => false,
3625 };
3626 if !hash_match {
3627 return false;
3628 }
3629 match range_key_name {
3630 Some(rk) => match (item.get(rk), key.get(rk)) {
3631 (Some(a), Some(b)) => a == b,
3632 (None, None) => true,
3633 _ => false,
3634 },
3635 None => true,
3636 }
3637}
3638
3639fn extract_key_for_schema(
3641 item: &HashMap<String, AttributeValue>,
3642 hash_key_name: &str,
3643 range_key_name: Option<&str>,
3644) -> HashMap<String, AttributeValue> {
3645 let mut key = HashMap::new();
3646 if let Some(v) = item.get(hash_key_name) {
3647 key.insert(hash_key_name.to_string(), v.clone());
3648 }
3649 if let Some(rk) = range_key_name {
3650 if let Some(v) = item.get(rk) {
3651 key.insert(rk.to_string(), v.clone());
3652 }
3653 }
3654 key
3655}
3656
3657fn validate_key_in_item(
3658 table: &DynamoTable,
3659 item: &HashMap<String, AttributeValue>,
3660) -> Result<(), AwsServiceError> {
3661 let hash_key = table.hash_key_name();
3662 if !item.contains_key(hash_key) {
3663 return Err(AwsServiceError::aws_error(
3664 StatusCode::BAD_REQUEST,
3665 "ValidationException",
3666 format!("Missing the key {hash_key} in the item"),
3667 ));
3668 }
3669 if let Some(range_key) = table.range_key_name() {
3670 if !item.contains_key(range_key) {
3671 return Err(AwsServiceError::aws_error(
3672 StatusCode::BAD_REQUEST,
3673 "ValidationException",
3674 format!("Missing the key {range_key} in the item"),
3675 ));
3676 }
3677 }
3678 Ok(())
3679}
3680
3681fn validate_key_attributes_in_key(
3682 table: &DynamoTable,
3683 key: &HashMap<String, AttributeValue>,
3684) -> Result<(), AwsServiceError> {
3685 let hash_key = table.hash_key_name();
3686 if !key.contains_key(hash_key) {
3687 return Err(AwsServiceError::aws_error(
3688 StatusCode::BAD_REQUEST,
3689 "ValidationException",
3690 format!("Missing the key {hash_key} in the item"),
3691 ));
3692 }
3693 Ok(())
3694}
3695
3696fn project_item(
3697 item: &HashMap<String, AttributeValue>,
3698 body: &Value,
3699) -> HashMap<String, AttributeValue> {
3700 let projection = body["ProjectionExpression"].as_str();
3701 match projection {
3702 Some(proj) if !proj.is_empty() => {
3703 let expr_attr_names = parse_expression_attribute_names(body);
3704 let attrs: Vec<String> = proj
3705 .split(',')
3706 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3707 .collect();
3708 let mut result = HashMap::new();
3709 for attr in &attrs {
3710 if let Some(v) = resolve_nested_path(item, attr) {
3711 insert_nested_value(&mut result, attr, v);
3712 }
3713 }
3714 result
3715 }
3716 _ => item.clone(),
3717 }
3718}
3719
3720fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3723 let mut result = String::new();
3725 for (i, segment) in path.split('.').enumerate() {
3726 if i > 0 {
3727 result.push('.');
3728 }
3729 if let Some(bracket_pos) = segment.find('[') {
3731 let key_part = &segment[..bracket_pos];
3732 let index_part = &segment[bracket_pos..];
3733 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3734 result.push_str(index_part);
3735 } else {
3736 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3737 }
3738 }
3739 result
3740}
3741
3742fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3744 let segments = parse_path_segments(path);
3745 if segments.is_empty() {
3746 return None;
3747 }
3748
3749 let first = &segments[0];
3750 let top_key = match first {
3751 PathSegment::Key(k) => k.as_str(),
3752 _ => return None,
3753 };
3754
3755 let mut current = item.get(top_key)?.clone();
3756
3757 for segment in &segments[1..] {
3758 match segment {
3759 PathSegment::Key(k) => {
3760 current = current.get("M")?.get(k)?.clone();
3762 }
3763 PathSegment::Index(idx) => {
3764 current = current.get("L")?.get(*idx)?.clone();
3766 }
3767 }
3768 }
3769
3770 Some(current)
3771}
3772
3773#[derive(Debug)]
3774enum PathSegment {
3775 Key(String),
3776 Index(usize),
3777}
3778
3779fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3781 let mut segments = Vec::new();
3782 let mut current = String::new();
3783
3784 let chars: Vec<char> = path.chars().collect();
3785 let mut i = 0;
3786 while i < chars.len() {
3787 match chars[i] {
3788 '.' => {
3789 if !current.is_empty() {
3790 segments.push(PathSegment::Key(current.clone()));
3791 current.clear();
3792 }
3793 }
3794 '[' => {
3795 if !current.is_empty() {
3796 segments.push(PathSegment::Key(current.clone()));
3797 current.clear();
3798 }
3799 i += 1;
3800 let mut num = String::new();
3801 while i < chars.len() && chars[i] != ']' {
3802 num.push(chars[i]);
3803 i += 1;
3804 }
3805 if let Ok(idx) = num.parse::<usize>() {
3806 segments.push(PathSegment::Index(idx));
3807 }
3808 }
3810 c => {
3811 current.push(c);
3812 }
3813 }
3814 i += 1;
3815 }
3816 if !current.is_empty() {
3817 segments.push(PathSegment::Key(current));
3818 }
3819 segments
3820}
3821
3822fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3825 if !path.contains('.') && !path.contains('[') {
3827 result.insert(path.to_string(), value);
3828 return;
3829 }
3830
3831 let segments = parse_path_segments(path);
3832 if segments.is_empty() {
3833 return;
3834 }
3835
3836 let top_key = match &segments[0] {
3837 PathSegment::Key(k) => k.clone(),
3838 _ => return,
3839 };
3840
3841 if segments.len() == 1 {
3842 result.insert(top_key, value);
3843 return;
3844 }
3845
3846 let wrapped = wrap_value_in_path(&segments[1..], value);
3848 let existing = result.remove(&top_key);
3850 let merged = match existing {
3851 Some(existing) => merge_attribute_values(existing, wrapped),
3852 None => wrapped,
3853 };
3854 result.insert(top_key, merged);
3855}
3856
3857fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3859 if segments.is_empty() {
3860 return value;
3861 }
3862 let inner = wrap_value_in_path(&segments[1..], value);
3863 match &segments[0] {
3864 PathSegment::Key(k) => {
3865 json!({"M": {k.clone(): inner}})
3866 }
3867 PathSegment::Index(idx) => {
3868 let mut arr = vec![Value::Null; idx + 1];
3869 arr[*idx] = inner;
3870 json!({"L": arr})
3871 }
3872 }
3873}
3874
3875fn merge_attribute_values(a: Value, b: Value) -> Value {
3877 if let (Some(a_map), Some(b_map)) = (
3878 a.get("M").and_then(|v| v.as_object()),
3879 b.get("M").and_then(|v| v.as_object()),
3880 ) {
3881 let mut merged = a_map.clone();
3882 for (k, v) in b_map {
3883 if let Some(existing) = merged.get(k) {
3884 merged.insert(
3885 k.clone(),
3886 merge_attribute_values(existing.clone(), v.clone()),
3887 );
3888 } else {
3889 merged.insert(k.clone(), v.clone());
3890 }
3891 }
3892 json!({"M": merged})
3893 } else {
3894 b
3895 }
3896}
3897
3898fn evaluate_condition(
3899 condition: &str,
3900 existing: Option<&HashMap<String, AttributeValue>>,
3901 expr_attr_names: &HashMap<String, String>,
3902 expr_attr_values: &HashMap<String, Value>,
3903) -> Result<(), AwsServiceError> {
3904 let empty = HashMap::new();
3909 let item = existing.unwrap_or(&empty);
3910 if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3911 Ok(())
3912 } else {
3913 Err(AwsServiceError::aws_error(
3914 StatusCode::BAD_REQUEST,
3915 "ConditionalCheckFailedException",
3916 "The conditional request failed",
3917 ))
3918 }
3919}
3920
3921fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3922 let with_paren = format!("{func_name}(");
3926 let with_space = format!("{func_name} (");
3927 let rest = expr
3928 .strip_prefix(&with_paren)
3929 .or_else(|| expr.strip_prefix(&with_space))?;
3930 let inner = rest.strip_suffix(')')?;
3931 Some(inner.trim())
3932}
3933
3934fn evaluate_key_condition(
3935 expr: &str,
3936 item: &HashMap<String, AttributeValue>,
3937 hash_key_name: &str,
3938 _range_key_name: Option<&str>,
3939 expr_attr_names: &HashMap<String, String>,
3940 expr_attr_values: &HashMap<String, Value>,
3941) -> bool {
3942 let parts: Vec<&str> = split_on_and(expr);
3943 for part in &parts {
3944 let part = part.trim();
3945 if !evaluate_single_key_condition(
3946 part,
3947 item,
3948 hash_key_name,
3949 expr_attr_names,
3950 expr_attr_values,
3951 ) {
3952 return false;
3953 }
3954 }
3955 true
3956}
3957
3958fn split_on_and(expr: &str) -> Vec<&str> {
3959 let mut parts = Vec::new();
3960 let mut start = 0;
3961 let len = expr.len();
3962 let mut i = 0;
3963 let mut depth = 0;
3964 while i < len {
3965 let ch = expr.as_bytes()[i];
3966 if ch == b'(' {
3967 depth += 1;
3968 } else if ch == b')' {
3969 if depth > 0 {
3970 depth -= 1;
3971 }
3972 } else if depth == 0
3973 && i + 5 <= len
3974 && expr.is_char_boundary(i)
3975 && expr.is_char_boundary(i + 5)
3976 && expr[i..i + 5].eq_ignore_ascii_case(" AND ")
3977 {
3978 parts.push(&expr[start..i]);
3979 start = i + 5;
3980 i = start;
3981 continue;
3982 }
3983 i += 1;
3984 }
3985 parts.push(&expr[start..]);
3986 parts
3987}
3988
3989fn split_on_or(expr: &str) -> Vec<&str> {
3990 let mut parts = Vec::new();
3991 let mut start = 0;
3992 let len = expr.len();
3993 let mut i = 0;
3994 let mut depth = 0;
3995 while i < len {
3996 let ch = expr.as_bytes()[i];
3997 if ch == b'(' {
3998 depth += 1;
3999 } else if ch == b')' {
4000 if depth > 0 {
4001 depth -= 1;
4002 }
4003 } else if depth == 0
4004 && i + 4 <= len
4005 && expr.is_char_boundary(i)
4006 && expr.is_char_boundary(i + 4)
4007 && expr[i..i + 4].eq_ignore_ascii_case(" OR ")
4008 {
4009 parts.push(&expr[start..i]);
4010 start = i + 4;
4011 i = start;
4012 continue;
4013 }
4014 i += 1;
4015 }
4016 parts.push(&expr[start..]);
4017 parts
4018}
4019
4020fn evaluate_single_key_condition(
4021 part: &str,
4022 item: &HashMap<String, AttributeValue>,
4023 _hash_key_name: &str,
4024 expr_attr_names: &HashMap<String, String>,
4025 expr_attr_values: &HashMap<String, Value>,
4026) -> bool {
4027 let part = part.trim();
4028
4029 if let Some(rest) = part
4031 .strip_prefix("begins_with(")
4032 .or_else(|| part.strip_prefix("begins_with ("))
4033 {
4034 if let Some(inner) = rest.strip_suffix(')') {
4035 let mut split = inner.splitn(2, ',');
4036 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4037 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4038 let val_ref = val_ref.trim();
4039 let expected = expr_attr_values.get(val_ref);
4040 let actual = item.get(&attr_name);
4041 return match (actual, expected) {
4042 (Some(a), Some(e)) => {
4043 let a_str = a.get("S").and_then(|v| v.as_str());
4044 let e_str = e.get("S").and_then(|v| v.as_str());
4045 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4046 }
4047 _ => false,
4048 };
4049 }
4050 }
4051 return false;
4052 }
4053
4054 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4056 let attr_part = part[..between_pos].trim();
4057 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4058 let range_part = &part[between_pos + 7..];
4059 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4060 let lo_ref = range_part[..and_pos].trim();
4061 let hi_ref = range_part[and_pos + 5..].trim();
4062 let lo = expr_attr_values.get(lo_ref);
4063 let hi = expr_attr_values.get(hi_ref);
4064 let actual = item.get(&attr_name);
4065 return match (actual, lo, hi) {
4066 (Some(a), Some(l), Some(h)) => {
4067 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4068 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4069 }
4070 _ => false,
4071 };
4072 }
4073 }
4074
4075 for op in &["<=", ">=", "<>", "=", "<", ">"] {
4077 if let Some(pos) = part.find(op) {
4078 let left = part[..pos].trim();
4079 let right = part[pos + op.len()..].trim();
4080 let attr_name = resolve_attr_name(left, expr_attr_names);
4081 let expected = expr_attr_values.get(right);
4082 let actual = item.get(&attr_name);
4083
4084 return match *op {
4085 "=" => actual == expected,
4086 "<>" => actual != expected,
4087 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4088 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4089 "<=" => {
4090 let cmp = compare_attribute_values(actual, expected);
4091 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4092 }
4093 ">=" => {
4094 let cmp = compare_attribute_values(actual, expected);
4095 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4096 }
4097 _ => false,
4098 };
4099 }
4100 }
4101
4102 false
4103}
4104
4105fn attribute_size(val: &Value) -> Option<usize> {
4110 if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
4111 return Some(s.len());
4112 }
4113 if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
4114 let decoded_len = base64::engine::general_purpose::STANDARD
4116 .decode(b)
4117 .map(|v| v.len())
4118 .unwrap_or(b.len());
4119 return Some(decoded_len);
4120 }
4121 if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
4122 return Some(arr.len());
4123 }
4124 if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
4125 return Some(arr.len());
4126 }
4127 if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
4128 return Some(arr.len());
4129 }
4130 if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
4131 return Some(arr.len());
4132 }
4133 if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
4134 return Some(obj.len());
4135 }
4136 if val.get("N").is_some() {
4137 return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
4139 }
4140 if val.get("BOOL").is_some() || val.get("NULL").is_some() {
4141 return Some(1);
4142 }
4143 None
4144}
4145
4146fn evaluate_size_comparison(
4148 part: &str,
4149 item: &HashMap<String, AttributeValue>,
4150 expr_attr_names: &HashMap<String, String>,
4151 expr_attr_values: &HashMap<String, Value>,
4152) -> Option<bool> {
4153 let open = part.find('(')?;
4155 let close = part[open..].find(')')? + open;
4156 let path = part[open + 1..close].trim();
4157 let remainder = part[close + 1..].trim();
4158
4159 let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
4161 ("<=", rest.trim())
4162 } else if let Some(rest) = remainder.strip_prefix(">=") {
4163 (">=", rest.trim())
4164 } else if let Some(rest) = remainder.strip_prefix("<>") {
4165 ("<>", rest.trim())
4166 } else if let Some(rest) = remainder.strip_prefix('<') {
4167 ("<", rest.trim())
4168 } else if let Some(rest) = remainder.strip_prefix('>') {
4169 (">", rest.trim())
4170 } else if let Some(rest) = remainder.strip_prefix('=') {
4171 ("=", rest.trim())
4172 } else {
4173 return None;
4174 };
4175
4176 let attr_name = resolve_attr_name(path, expr_attr_names);
4177 let actual = item.get(&attr_name)?;
4178 let size = attribute_size(actual)? as f64;
4179
4180 let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
4181
4182 Some(match op {
4183 "=" => (size - expected).abs() < f64::EPSILON,
4184 "<>" => (size - expected).abs() >= f64::EPSILON,
4185 "<" => size < expected,
4186 ">" => size > expected,
4187 "<=" => size <= expected,
4188 ">=" => size >= expected,
4189 _ => false,
4190 })
4191}
4192
4193fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4194 match (a, b) {
4195 (None, None) => std::cmp::Ordering::Equal,
4196 (None, Some(_)) => std::cmp::Ordering::Less,
4197 (Some(_), None) => std::cmp::Ordering::Greater,
4198 (Some(a), Some(b)) => {
4199 let a_type = attribute_type_and_value(a);
4200 let b_type = attribute_type_and_value(b);
4201 match (a_type, b_type) {
4202 (Some(("S", a_val)), Some(("S", b_val))) => {
4203 let a_str = a_val.as_str().unwrap_or("");
4204 let b_str = b_val.as_str().unwrap_or("");
4205 a_str.cmp(b_str)
4206 }
4207 (Some(("N", a_val)), Some(("N", b_val))) => {
4208 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4209 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4210 a_num
4211 .partial_cmp(&b_num)
4212 .unwrap_or(std::cmp::Ordering::Equal)
4213 }
4214 (Some(("B", a_val)), Some(("B", b_val))) => {
4215 let a_str = a_val.as_str().unwrap_or("");
4216 let b_str = b_val.as_str().unwrap_or("");
4217 a_str.cmp(b_str)
4218 }
4219 _ => std::cmp::Ordering::Equal,
4220 }
4221 }
4222 }
4223}
4224
4225fn evaluate_filter_expression(
4226 expr: &str,
4227 item: &HashMap<String, AttributeValue>,
4228 expr_attr_names: &HashMap<String, String>,
4229 expr_attr_values: &HashMap<String, Value>,
4230) -> bool {
4231 let trimmed = expr.trim();
4232
4233 let or_parts = split_on_or(trimmed);
4235 if or_parts.len() > 1 {
4236 return or_parts.iter().any(|part| {
4237 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4238 });
4239 }
4240
4241 let and_parts = split_on_and(trimmed);
4243 if and_parts.len() > 1 {
4244 return and_parts.iter().all(|part| {
4245 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4246 });
4247 }
4248
4249 let stripped = strip_outer_parens(trimmed);
4251 if stripped != trimmed {
4252 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4253 }
4254
4255 if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4257 return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4258 }
4259
4260 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4261}
4262
4263fn strip_outer_parens(expr: &str) -> &str {
4265 let trimmed = expr.trim();
4266 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4267 return trimmed;
4268 }
4269 let inner = &trimmed[1..trimmed.len() - 1];
4271 let mut depth = 0;
4272 for ch in inner.bytes() {
4273 match ch {
4274 b'(' => depth += 1,
4275 b')' => {
4276 if depth == 0 {
4277 return trimmed; }
4279 depth -= 1;
4280 }
4281 _ => {}
4282 }
4283 }
4284 if depth == 0 {
4285 inner
4286 } else {
4287 trimmed
4288 }
4289}
4290
4291fn evaluate_single_filter_condition(
4292 part: &str,
4293 item: &HashMap<String, AttributeValue>,
4294 expr_attr_names: &HashMap<String, String>,
4295 expr_attr_values: &HashMap<String, Value>,
4296) -> bool {
4297 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4298 let attr = resolve_attr_name(inner, expr_attr_names);
4299 return item.contains_key(&attr);
4300 }
4301
4302 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4303 let attr = resolve_attr_name(inner, expr_attr_names);
4304 return !item.contains_key(&attr);
4305 }
4306
4307 if let Some(rest) = part
4309 .strip_prefix("begins_with(")
4310 .or_else(|| part.strip_prefix("begins_with ("))
4311 {
4312 if let Some(inner) = rest.strip_suffix(')') {
4313 let mut split = inner.splitn(2, ',');
4314 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4315 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4316 let expected = expr_attr_values.get(val_ref.trim());
4317 let actual = item.get(&attr_name);
4318 return match (actual, expected) {
4319 (Some(a), Some(e)) => {
4320 let a_str = a.get("S").and_then(|v| v.as_str());
4321 let e_str = e.get("S").and_then(|v| v.as_str());
4322 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4323 }
4324 _ => false,
4325 };
4326 }
4327 }
4328 }
4329
4330 if let Some(rest) = part
4332 .strip_prefix("contains(")
4333 .or_else(|| part.strip_prefix("contains ("))
4334 {
4335 if let Some(inner) = rest.strip_suffix(')') {
4336 let mut split = inner.splitn(2, ',');
4337 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4338 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4339 let expected = expr_attr_values.get(val_ref.trim());
4340 let actual = item.get(&attr_name);
4341 return match (actual, expected) {
4342 (Some(a), Some(e)) => {
4343 if let (Some(a_s), Some(e_s)) = (
4345 a.get("S").and_then(|v| v.as_str()),
4346 e.get("S").and_then(|v| v.as_str()),
4347 ) {
4348 return a_s.contains(e_s);
4349 }
4350 if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
4352 if let Some(val) = e.get("S") {
4353 return set.contains(val);
4354 }
4355 }
4356 if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
4357 if let Some(val) = e.get("N") {
4358 return set.contains(val);
4359 }
4360 }
4361 if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
4362 if let Some(val) = e.get("B") {
4363 return set.contains(val);
4364 }
4365 }
4366 if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
4367 return list.contains(e);
4368 }
4369 false
4370 }
4371 _ => false,
4372 };
4373 }
4374 }
4375 }
4376
4377 if part.starts_with("size(") || part.starts_with("size (") {
4379 if let Some(result) =
4380 evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
4381 {
4382 return result;
4383 }
4384 }
4385
4386 if part.starts_with("attribute_type(") || part.starts_with("attribute_type (") {
4388 if let Some(rest) = part
4389 .strip_prefix("attribute_type(")
4390 .or_else(|| part.strip_prefix("attribute_type ("))
4391 {
4392 if let Some(inner) = rest.strip_suffix(')') {
4393 let mut split = inner.splitn(2, ',');
4394 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4395 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4396 let expected_type = expr_attr_values
4397 .get(val_ref.trim())
4398 .and_then(|v| v.get("S"))
4399 .and_then(|v| v.as_str());
4400 let actual = item.get(&attr_name);
4401 return match (actual, expected_type) {
4402 (Some(val), Some(t)) => match t {
4403 "S" => val.get("S").is_some(),
4404 "N" => val.get("N").is_some(),
4405 "B" => val.get("B").is_some(),
4406 "BOOL" => val.get("BOOL").is_some(),
4407 "NULL" => val.get("NULL").is_some(),
4408 "SS" => val.get("SS").is_some(),
4409 "NS" => val.get("NS").is_some(),
4410 "BS" => val.get("BS").is_some(),
4411 "L" => val.get("L").is_some(),
4412 "M" => val.get("M").is_some(),
4413 _ => false,
4414 },
4415 _ => false,
4416 };
4417 }
4418 }
4419 }
4420 }
4421
4422 if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
4423 let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
4424 let actual = item.get(&attr_name);
4425 return evaluate_in_match(actual, &value_refs, expr_attr_values);
4426 }
4427
4428 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4429}
4430
4431fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
4439 let upper = expr.to_ascii_uppercase();
4440 let in_pos = upper.find(" IN ")?;
4441 let attr_ref = expr[..in_pos].trim();
4442 if attr_ref.is_empty() {
4443 return None;
4444 }
4445 let rest = expr[in_pos + 4..].trim_start();
4446 let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
4447 let values: Vec<&str> = inner
4448 .split(',')
4449 .map(|s| s.trim())
4450 .filter(|s| !s.is_empty())
4451 .collect();
4452 if values.is_empty() {
4453 return None;
4454 }
4455 Some((attr_ref, values))
4456}
4457
4458fn evaluate_in_match(
4462 actual: Option<&AttributeValue>,
4463 value_refs: &[&str],
4464 expr_attr_values: &HashMap<String, Value>,
4465) -> bool {
4466 value_refs.iter().any(|v_ref| {
4467 let expected = expr_attr_values.get(*v_ref);
4468 matches!((actual, expected), (Some(a), Some(e)) if a == e)
4469 })
4470}
4471
4472fn apply_update_expression(
4473 item: &mut HashMap<String, AttributeValue>,
4474 expr: &str,
4475 expr_attr_names: &HashMap<String, String>,
4476 expr_attr_values: &HashMap<String, Value>,
4477) -> Result<(), AwsServiceError> {
4478 let clauses = parse_update_clauses(expr);
4479 if clauses.is_empty() && !expr.trim().is_empty() {
4480 return Err(AwsServiceError::aws_error(
4481 StatusCode::BAD_REQUEST,
4482 "ValidationException",
4483 "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
4484 ));
4485 }
4486 for (action, assignments) in &clauses {
4487 match action.to_ascii_uppercase().as_str() {
4488 "SET" => {
4489 for assignment in assignments {
4490 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4491 }
4492 }
4493 "REMOVE" => {
4494 for attr_ref in assignments {
4495 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4496 item.remove(&attr);
4497 }
4498 }
4499 "ADD" => {
4500 for assignment in assignments {
4501 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4502 }
4503 }
4504 "DELETE" => {
4505 for assignment in assignments {
4506 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4507 }
4508 }
4509 other => {
4510 return Err(AwsServiceError::aws_error(
4511 StatusCode::BAD_REQUEST,
4512 "ValidationException",
4513 format!("Invalid UpdateExpression: Invalid action: {}", other),
4514 ));
4515 }
4516 }
4517 }
4518 Ok(())
4519}
4520
4521fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4522 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4523 let upper = expr.to_ascii_uppercase();
4524 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4525 let mut positions: Vec<(usize, &str)> = Vec::new();
4526
4527 for kw in &keywords {
4528 let mut search_from = 0;
4529 while let Some(pos) = upper[search_from..].find(kw) {
4530 let abs_pos = search_from + pos;
4531 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4532 let after_pos = abs_pos + kw.len();
4533 let after_ok =
4534 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4535 if before_ok && after_ok {
4536 positions.push((abs_pos, kw));
4537 }
4538 search_from = abs_pos + kw.len();
4539 }
4540 }
4541
4542 positions.sort_by_key(|(pos, _)| *pos);
4543
4544 for (i, &(pos, kw)) in positions.iter().enumerate() {
4545 let start = pos + kw.len();
4546 let end = if i + 1 < positions.len() {
4547 positions[i + 1].0
4548 } else {
4549 expr.len()
4550 };
4551 let content = expr[start..end].trim();
4552 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4553 clauses.push((kw.to_string(), assignments));
4554 }
4555
4556 clauses
4557}
4558
4559fn apply_set_assignment(
4560 item: &mut HashMap<String, AttributeValue>,
4561 assignment: &str,
4562 expr_attr_names: &HashMap<String, String>,
4563 expr_attr_values: &HashMap<String, Value>,
4564) -> Result<(), AwsServiceError> {
4565 let Some((left, right)) = assignment.split_once('=') else {
4566 return Ok(());
4567 };
4568
4569 let left_trimmed = left.trim();
4570 let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
4574 Some((name, idx)) => (name, Some(idx)),
4575 None => (left_trimmed, None),
4576 };
4577 let attr = resolve_attr_name(attr_ref, expr_attr_names);
4578 let right = right.trim();
4579
4580 if let Some(rest) = right
4582 .strip_prefix("if_not_exists(")
4583 .or_else(|| right.strip_prefix("if_not_exists ("))
4584 {
4585 if let Some(inner) = rest.strip_suffix(')') {
4586 let mut split = inner.splitn(2, ',');
4587 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4588 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4589 if !item.contains_key(&check_name) {
4590 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4591 item.insert(attr, val.clone());
4592 }
4593 }
4594 return Ok(());
4595 }
4596 }
4597 }
4598
4599 if let Some(rest) = right
4601 .strip_prefix("list_append(")
4602 .or_else(|| right.strip_prefix("list_append ("))
4603 {
4604 if let Some(inner) = rest.strip_suffix(')') {
4605 let mut split = inner.splitn(2, ',');
4606 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4607 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4608 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4609
4610 let mut merged = Vec::new();
4611 if let Some(Value::Object(obj)) = &a_val {
4612 if let Some(Value::Array(arr)) = obj.get("L") {
4613 merged.extend(arr.clone());
4614 }
4615 }
4616 if let Some(Value::Object(obj)) = &b_val {
4617 if let Some(Value::Array(arr)) = obj.get("L") {
4618 merged.extend(arr.clone());
4619 }
4620 }
4621
4622 item.insert(attr, json!({"L": merged}));
4623 return Ok(());
4624 }
4625 }
4626 }
4627
4628 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4630 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4631 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4632
4633 let left_num = match extract_number(&left_val) {
4635 Some(n) => n,
4636 None if left_val.is_some() => {
4637 return Err(AwsServiceError::aws_error(
4638 StatusCode::BAD_REQUEST,
4639 "ValidationException",
4640 "An operand in the update expression has an incorrect data type",
4641 ));
4642 }
4643 None => 0.0, };
4645 let right_num = match extract_number(&right_val) {
4646 Some(n) => n,
4647 None => {
4648 return Err(AwsServiceError::aws_error(
4649 StatusCode::BAD_REQUEST,
4650 "ValidationException",
4651 "An operand in the update expression has an incorrect data type",
4652 ));
4653 }
4654 };
4655
4656 let result = if is_add {
4657 left_num + right_num
4658 } else {
4659 left_num - right_num
4660 };
4661
4662 let num_str = if result == result.trunc() {
4663 format!("{}", result as i64)
4664 } else {
4665 format!("{result}")
4666 };
4667
4668 item.insert(attr, json!({"N": num_str}));
4669 return Ok(());
4670 }
4671
4672 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4674 if let Some(v) = val {
4675 match list_index {
4676 Some(idx) => assign_list_index(item, &attr, idx, v)?,
4677 None => {
4678 item.insert(attr, v);
4679 }
4680 }
4681 }
4682
4683 Ok(())
4684}
4685
4686fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
4690 let path = path.trim();
4691 if !path.ends_with(']') {
4692 return None;
4693 }
4694 let open = path.rfind('[')?;
4695 let idx_str = &path[open + 1..path.len() - 1];
4699 let idx: usize = idx_str.parse().ok()?;
4700 let name = &path[..open];
4701 if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
4702 return None;
4703 }
4704 Some((name, idx))
4705}
4706
4707fn assign_list_index(
4712 item: &mut HashMap<String, AttributeValue>,
4713 attr: &str,
4714 idx: usize,
4715 value: Value,
4716) -> Result<(), AwsServiceError> {
4717 let Some(existing) = item.get_mut(attr) else {
4718 return Err(AwsServiceError::aws_error(
4719 StatusCode::BAD_REQUEST,
4720 "ValidationException",
4721 "The document path provided in the update expression is invalid for update",
4722 ));
4723 };
4724 let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
4725 return Err(AwsServiceError::aws_error(
4726 StatusCode::BAD_REQUEST,
4727 "ValidationException",
4728 "The document path provided in the update expression is invalid for update",
4729 ));
4730 };
4731 if idx < list.len() {
4732 list[idx] = value;
4733 } else if idx == list.len() {
4734 list.push(value);
4735 } else {
4736 return Err(AwsServiceError::aws_error(
4737 StatusCode::BAD_REQUEST,
4738 "ValidationException",
4739 "The document path provided in the update expression is invalid for update",
4740 ));
4741 }
4742 Ok(())
4743}
4744
4745fn resolve_value(
4746 reference: &str,
4747 item: &HashMap<String, AttributeValue>,
4748 expr_attr_names: &HashMap<String, String>,
4749 expr_attr_values: &HashMap<String, Value>,
4750) -> Option<Value> {
4751 let reference = reference.trim();
4752 if reference.starts_with(':') {
4753 expr_attr_values.get(reference).cloned()
4754 } else {
4755 let attr_name = resolve_attr_name(reference, expr_attr_names);
4756 item.get(&attr_name).cloned()
4757 }
4758}
4759
4760fn extract_number(val: &Option<Value>) -> Option<f64> {
4761 val.as_ref()
4762 .and_then(|v| v.get("N"))
4763 .and_then(|n| n.as_str())
4764 .and_then(|s| s.parse().ok())
4765}
4766
4767fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4768 let mut depth = 0;
4769 for (i, c) in expr.char_indices() {
4770 match c {
4771 '(' => depth += 1,
4772 ')' => depth -= 1,
4773 '+' if depth == 0 && i > 0 => {
4774 return Some((&expr[..i], &expr[i + 1..], true));
4775 }
4776 '-' if depth == 0 && i > 0 => {
4777 return Some((&expr[..i], &expr[i + 1..], false));
4778 }
4779 _ => {}
4780 }
4781 }
4782 None
4783}
4784
4785fn apply_add_assignment(
4786 item: &mut HashMap<String, AttributeValue>,
4787 assignment: &str,
4788 expr_attr_names: &HashMap<String, String>,
4789 expr_attr_values: &HashMap<String, Value>,
4790) -> Result<(), AwsServiceError> {
4791 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4792 if parts.len() != 2 {
4793 return Ok(());
4794 }
4795
4796 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4797 let val_ref = parts[1].trim();
4798 let add_val = expr_attr_values.get(val_ref);
4799
4800 if let Some(add_val) = add_val {
4801 if let Some(existing) = item.get(&attr) {
4802 if let (Some(existing_num), Some(add_num)) = (
4803 extract_number(&Some(existing.clone())),
4804 extract_number(&Some(add_val.clone())),
4805 ) {
4806 let result = existing_num + add_num;
4807 let num_str = if result == result.trunc() {
4808 format!("{}", result as i64)
4809 } else {
4810 format!("{result}")
4811 };
4812 item.insert(attr, json!({"N": num_str}));
4813 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4814 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4815 let mut merged: Vec<Value> = existing_set.clone();
4816 for v in add_set {
4817 if !merged.contains(v) {
4818 merged.push(v.clone());
4819 }
4820 }
4821 item.insert(attr, json!({"SS": merged}));
4822 }
4823 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4824 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4825 let mut merged: Vec<Value> = existing_set.clone();
4826 for v in add_set {
4827 if !merged.contains(v) {
4828 merged.push(v.clone());
4829 }
4830 }
4831 item.insert(attr, json!({"NS": merged}));
4832 }
4833 } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
4834 if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
4835 let mut merged: Vec<Value> = existing_set.clone();
4836 for v in add_set {
4837 if !merged.contains(v) {
4838 merged.push(v.clone());
4839 }
4840 }
4841 item.insert(attr, json!({"BS": merged}));
4842 }
4843 }
4844 } else {
4845 item.insert(attr, add_val.clone());
4846 }
4847 }
4848
4849 Ok(())
4850}
4851
4852fn apply_delete_assignment(
4853 item: &mut HashMap<String, AttributeValue>,
4854 assignment: &str,
4855 expr_attr_names: &HashMap<String, String>,
4856 expr_attr_values: &HashMap<String, Value>,
4857) -> Result<(), AwsServiceError> {
4858 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4859 if parts.len() != 2 {
4860 return Ok(());
4861 }
4862
4863 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4864 let val_ref = parts[1].trim();
4865 let del_val = expr_attr_values.get(val_ref);
4866
4867 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4868 if let (Some(existing_set), Some(del_set)) = (
4869 existing.get("SS").and_then(|v| v.as_array()),
4870 del_val.get("SS").and_then(|v| v.as_array()),
4871 ) {
4872 let filtered: Vec<Value> = existing_set
4873 .iter()
4874 .filter(|v| !del_set.contains(v))
4875 .cloned()
4876 .collect();
4877 if filtered.is_empty() {
4878 item.remove(&attr);
4879 } else {
4880 item.insert(attr, json!({"SS": filtered}));
4881 }
4882 } else if let (Some(existing_set), Some(del_set)) = (
4883 existing.get("NS").and_then(|v| v.as_array()),
4884 del_val.get("NS").and_then(|v| v.as_array()),
4885 ) {
4886 let filtered: Vec<Value> = existing_set
4887 .iter()
4888 .filter(|v| !del_set.contains(v))
4889 .cloned()
4890 .collect();
4891 if filtered.is_empty() {
4892 item.remove(&attr);
4893 } else {
4894 item.insert(attr, json!({"NS": filtered}));
4895 }
4896 } else if let (Some(existing_set), Some(del_set)) = (
4897 existing.get("BS").and_then(|v| v.as_array()),
4898 del_val.get("BS").and_then(|v| v.as_array()),
4899 ) {
4900 let filtered: Vec<Value> = existing_set
4901 .iter()
4902 .filter(|v| !del_set.contains(v))
4903 .cloned()
4904 .collect();
4905 if filtered.is_empty() {
4906 item.remove(&attr);
4907 } else {
4908 item.insert(attr, json!({"BS": filtered}));
4909 }
4910 }
4911 }
4912
4913 Ok(())
4914}
4915
4916#[allow(clippy::too_many_arguments)]
4917fn build_table_description_json(
4918 arn: &str,
4919 key_schema: &[KeySchemaElement],
4920 attribute_definitions: &[AttributeDefinition],
4921 provisioned_throughput: &ProvisionedThroughput,
4922 gsi: &[GlobalSecondaryIndex],
4923 lsi: &[LocalSecondaryIndex],
4924 billing_mode: &str,
4925 created_at: chrono::DateTime<chrono::Utc>,
4926 item_count: i64,
4927 size_bytes: i64,
4928 status: &str,
4929) -> Value {
4930 let table_name = arn.rsplit('/').next().unwrap_or("");
4931 let creation_timestamp =
4932 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4933
4934 let ks: Vec<Value> = key_schema
4935 .iter()
4936 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4937 .collect();
4938
4939 let ad: Vec<Value> = attribute_definitions
4940 .iter()
4941 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4942 .collect();
4943
4944 let mut desc = json!({
4945 "TableName": table_name,
4946 "TableArn": arn,
4947 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4948 "TableStatus": status,
4949 "KeySchema": ks,
4950 "AttributeDefinitions": ad,
4951 "CreationDateTime": creation_timestamp,
4952 "ItemCount": item_count,
4953 "TableSizeBytes": size_bytes,
4954 "BillingModeSummary": { "BillingMode": billing_mode },
4955 });
4956
4957 if billing_mode != "PAY_PER_REQUEST" {
4958 desc["ProvisionedThroughput"] = json!({
4959 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4960 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4961 "NumberOfDecreasesToday": 0,
4962 });
4963 } else {
4964 desc["ProvisionedThroughput"] = json!({
4965 "ReadCapacityUnits": 0,
4966 "WriteCapacityUnits": 0,
4967 "NumberOfDecreasesToday": 0,
4968 });
4969 }
4970
4971 if !gsi.is_empty() {
4972 let gsi_json: Vec<Value> = gsi
4973 .iter()
4974 .map(|g| {
4975 let gks: Vec<Value> = g
4976 .key_schema
4977 .iter()
4978 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4979 .collect();
4980 let mut idx = json!({
4981 "IndexName": g.index_name,
4982 "KeySchema": gks,
4983 "Projection": { "ProjectionType": g.projection.projection_type },
4984 "IndexStatus": "ACTIVE",
4985 "IndexArn": format!("{arn}/index/{}", g.index_name),
4986 "ItemCount": 0,
4987 "IndexSizeBytes": 0,
4988 });
4989 if !g.projection.non_key_attributes.is_empty() {
4990 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4991 }
4992 if let Some(ref pt) = g.provisioned_throughput {
4993 idx["ProvisionedThroughput"] = json!({
4994 "ReadCapacityUnits": pt.read_capacity_units,
4995 "WriteCapacityUnits": pt.write_capacity_units,
4996 "NumberOfDecreasesToday": 0,
4997 });
4998 }
4999 idx
5000 })
5001 .collect();
5002 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
5003 }
5004
5005 if !lsi.is_empty() {
5006 let lsi_json: Vec<Value> = lsi
5007 .iter()
5008 .map(|l| {
5009 let lks: Vec<Value> = l
5010 .key_schema
5011 .iter()
5012 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
5013 .collect();
5014 let mut idx = json!({
5015 "IndexName": l.index_name,
5016 "KeySchema": lks,
5017 "Projection": { "ProjectionType": l.projection.projection_type },
5018 "IndexArn": format!("{arn}/index/{}", l.index_name),
5019 "ItemCount": 0,
5020 "IndexSizeBytes": 0,
5021 });
5022 if !l.projection.non_key_attributes.is_empty() {
5023 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
5024 }
5025 idx
5026 })
5027 .collect();
5028 desc["LocalSecondaryIndexes"] = json!(lsi_json);
5029 }
5030
5031 desc
5032}
5033
5034fn build_table_description(table: &DynamoTable) -> Value {
5035 let mut desc = build_table_description_json(
5036 &table.arn,
5037 &table.key_schema,
5038 &table.attribute_definitions,
5039 &table.provisioned_throughput,
5040 &table.gsi,
5041 &table.lsi,
5042 &table.billing_mode,
5043 table.created_at,
5044 table.item_count,
5045 table.size_bytes,
5046 &table.status,
5047 );
5048
5049 if table.stream_enabled {
5051 if let Some(ref stream_arn) = table.stream_arn {
5052 desc["LatestStreamArn"] = json!(stream_arn);
5053 desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
5054 }
5055 if let Some(ref view_type) = table.stream_view_type {
5056 desc["StreamSpecification"] = json!({
5057 "StreamEnabled": true,
5058 "StreamViewType": view_type,
5059 });
5060 }
5061 }
5062
5063 if let Some(ref sse_type) = table.sse_type {
5065 let mut sse_desc = json!({
5066 "Status": "ENABLED",
5067 "SSEType": sse_type,
5068 });
5069 if let Some(ref key_arn) = table.sse_kms_key_arn {
5070 sse_desc["KMSMasterKeyArn"] = json!(key_arn);
5071 }
5072 desc["SSEDescription"] = sse_desc;
5073 } else {
5074 desc["SSEDescription"] = json!({
5076 "Status": "ENABLED",
5077 "SSEType": "AES256",
5078 });
5079 }
5080
5081 desc
5082}
5083
5084fn execute_partiql_statement(
5085 state: &SharedDynamoDbState,
5086 statement: &str,
5087 parameters: &[Value],
5088) -> Result<AwsResponse, AwsServiceError> {
5089 let trimmed = statement.trim();
5090 let upper = trimmed.to_ascii_uppercase();
5091
5092 if upper.starts_with("SELECT") {
5093 execute_partiql_select(state, trimmed, parameters)
5094 } else if upper.starts_with("INSERT") {
5095 execute_partiql_insert(state, trimmed, parameters)
5096 } else if upper.starts_with("UPDATE") {
5097 execute_partiql_update(state, trimmed, parameters)
5098 } else if upper.starts_with("DELETE") {
5099 execute_partiql_delete(state, trimmed, parameters)
5100 } else {
5101 Err(AwsServiceError::aws_error(
5102 StatusCode::BAD_REQUEST,
5103 "ValidationException",
5104 format!("Unsupported PartiQL statement: {trimmed}"),
5105 ))
5106 }
5107}
5108
5109fn execute_partiql_select(
5111 state: &SharedDynamoDbState,
5112 statement: &str,
5113 parameters: &[Value],
5114) -> Result<AwsResponse, AwsServiceError> {
5115 let upper = statement.to_ascii_uppercase();
5117 let from_pos = upper.find("FROM").ok_or_else(|| {
5118 AwsServiceError::aws_error(
5119 StatusCode::BAD_REQUEST,
5120 "ValidationException",
5121 "Invalid SELECT statement: missing FROM",
5122 )
5123 })?;
5124
5125 let after_from = statement[from_pos + 4..].trim();
5126 let (table_name, rest) = parse_partiql_table_name(after_from);
5127
5128 let state = state.read();
5129 let table = get_table(&state.tables, &table_name)?;
5130
5131 let rest_upper = rest.trim().to_ascii_uppercase();
5132 if rest_upper.starts_with("WHERE") {
5133 let where_clause = rest.trim()[5..].trim();
5134 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
5135 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
5136 DynamoDbService::ok_json(json!({ "Items": items }))
5137 } else {
5138 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
5140 DynamoDbService::ok_json(json!({ "Items": items }))
5141 }
5142}
5143
5144fn execute_partiql_insert(
5145 state: &SharedDynamoDbState,
5146 statement: &str,
5147 parameters: &[Value],
5148) -> Result<AwsResponse, AwsServiceError> {
5149 let upper = statement.to_ascii_uppercase();
5152 let into_pos = upper.find("INTO").ok_or_else(|| {
5153 AwsServiceError::aws_error(
5154 StatusCode::BAD_REQUEST,
5155 "ValidationException",
5156 "Invalid INSERT statement: missing INTO",
5157 )
5158 })?;
5159
5160 let after_into = statement[into_pos + 4..].trim();
5161 let (table_name, rest) = parse_partiql_table_name(after_into);
5162
5163 let rest_upper = rest.trim().to_ascii_uppercase();
5164 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
5165 AwsServiceError::aws_error(
5166 StatusCode::BAD_REQUEST,
5167 "ValidationException",
5168 "Invalid INSERT statement: missing VALUE",
5169 )
5170 })?;
5171
5172 let value_str = rest.trim()[value_pos + 5..].trim();
5173 let item = parse_partiql_value_object(value_str, parameters)?;
5174
5175 let mut state = state.write();
5176 let table = get_table_mut(&mut state.tables, &table_name)?;
5177 let key = extract_key(table, &item);
5178 if table.find_item_index(&key).is_some() {
5179 return Err(AwsServiceError::aws_error(
5181 StatusCode::BAD_REQUEST,
5182 "DuplicateItemException",
5183 "Duplicate primary key exists in table",
5184 ));
5185 } else {
5186 table.items.push(item);
5187 }
5188 table.recalculate_stats();
5189
5190 DynamoDbService::ok_json(json!({}))
5191}
5192
5193fn execute_partiql_update(
5194 state: &SharedDynamoDbState,
5195 statement: &str,
5196 parameters: &[Value],
5197) -> Result<AwsResponse, AwsServiceError> {
5198 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
5202
5203 let rest_upper = rest.trim().to_ascii_uppercase();
5204 let set_pos = rest_upper.find("SET").ok_or_else(|| {
5205 AwsServiceError::aws_error(
5206 StatusCode::BAD_REQUEST,
5207 "ValidationException",
5208 "Invalid UPDATE statement: missing SET",
5209 )
5210 })?;
5211
5212 let after_set = rest.trim()[set_pos + 3..].trim();
5213
5214 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
5216 let (set_clause, where_clause) = if let Some(wp) = where_pos {
5217 (&after_set[..wp], after_set[wp + 5..].trim())
5218 } else {
5219 (after_set, "")
5220 };
5221
5222 let mut state = state.write();
5223 let table = get_table_mut(&mut state.tables, &table_name)?;
5224
5225 let matched_indices = if !where_clause.is_empty() {
5226 find_partiql_where_indices(table, where_clause, parameters)?
5227 } else {
5228 (0..table.items.len()).collect()
5229 };
5230
5231 let param_offset = count_params_in_str(where_clause);
5233 let assignments: Vec<&str> = set_clause.split(',').collect();
5234 for idx in &matched_indices {
5235 let mut local_offset = param_offset;
5236 for assignment in &assignments {
5237 let assignment = assignment.trim();
5238 if let Some((attr, val_str)) = assignment.split_once('=') {
5239 let attr = attr.trim().trim_matches('"');
5240 let val_str = val_str.trim();
5241 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
5242 if let Some(v) = value {
5243 table.items[*idx].insert(attr.to_string(), v);
5244 }
5245 }
5246 }
5247 }
5248 table.recalculate_stats();
5249
5250 DynamoDbService::ok_json(json!({}))
5251}
5252
5253fn execute_partiql_delete(
5254 state: &SharedDynamoDbState,
5255 statement: &str,
5256 parameters: &[Value],
5257) -> Result<AwsResponse, AwsServiceError> {
5258 let upper = statement.to_ascii_uppercase();
5260 let from_pos = upper.find("FROM").ok_or_else(|| {
5261 AwsServiceError::aws_error(
5262 StatusCode::BAD_REQUEST,
5263 "ValidationException",
5264 "Invalid DELETE statement: missing FROM",
5265 )
5266 })?;
5267
5268 let after_from = statement[from_pos + 4..].trim();
5269 let (table_name, rest) = parse_partiql_table_name(after_from);
5270
5271 let rest_upper = rest.trim().to_ascii_uppercase();
5272 if !rest_upper.starts_with("WHERE") {
5273 return Err(AwsServiceError::aws_error(
5274 StatusCode::BAD_REQUEST,
5275 "ValidationException",
5276 "DELETE requires a WHERE clause",
5277 ));
5278 }
5279 let where_clause = rest.trim()[5..].trim();
5280
5281 let mut state = state.write();
5282 let table = get_table_mut(&mut state.tables, &table_name)?;
5283
5284 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
5285 indices.sort_unstable();
5287 indices.reverse();
5288 for idx in indices {
5289 table.items.remove(idx);
5290 }
5291 table.recalculate_stats();
5292
5293 DynamoDbService::ok_json(json!({}))
5294}
5295
5296fn parse_partiql_table_name(s: &str) -> (String, &str) {
5299 let s = s.trim();
5300 if let Some(stripped) = s.strip_prefix('"') {
5301 if let Some(end) = stripped.find('"') {
5303 let name = &stripped[..end];
5304 let rest = &stripped[end + 1..];
5305 (name.to_string(), rest)
5306 } else {
5307 let end = s.find(' ').unwrap_or(s.len());
5308 (s[..end].trim_matches('"').to_string(), &s[end..])
5309 }
5310 } else {
5311 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5312 (s[..end].to_string(), &s[end..])
5313 }
5314}
5315
5316fn evaluate_partiql_where<'a>(
5319 table: &'a DynamoTable,
5320 where_clause: &str,
5321 parameters: &[Value],
5322) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5323 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5324 Ok(indices.iter().map(|i| &table.items[*i]).collect())
5325}
5326
5327fn find_partiql_where_indices(
5328 table: &DynamoTable,
5329 where_clause: &str,
5330 parameters: &[Value],
5331) -> Result<Vec<usize>, AwsServiceError> {
5332 let upper = where_clause.to_uppercase();
5335 let conditions = if upper.contains(" AND ") {
5336 let mut parts = Vec::new();
5338 let mut last = 0;
5339 for (i, _) in upper.match_indices(" AND ") {
5340 parts.push(where_clause[last..i].trim());
5341 last = i + 5;
5342 }
5343 parts.push(where_clause[last..].trim());
5344 parts
5345 } else {
5346 vec![where_clause.trim()]
5347 };
5348
5349 let mut param_idx = 0usize;
5350 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5351
5352 for cond in &conditions {
5353 let cond = cond.trim();
5354 if let Some((left, right)) = cond.split_once('=') {
5355 let attr = left.trim().trim_matches('"').to_string();
5356 let val_str = right.trim();
5357 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5358 if let Some(v) = value {
5359 parsed_conditions.push((attr, v));
5360 }
5361 }
5362 }
5363
5364 let mut indices = Vec::new();
5365 for (i, item) in table.items.iter().enumerate() {
5366 let all_match = parsed_conditions
5367 .iter()
5368 .all(|(attr, expected)| item.get(attr) == Some(expected));
5369 if all_match {
5370 indices.push(i);
5371 }
5372 }
5373
5374 Ok(indices)
5375}
5376
5377fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5382 let s = s.trim();
5383 if s == "?" {
5384 let idx = *param_idx;
5385 *param_idx += 1;
5386 parameters.get(idx).cloned()
5387 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5388 let inner = &s[1..s.len() - 1];
5389 Some(json!({"S": inner}))
5390 } else if let Ok(n) = s.parse::<f64>() {
5391 let num_str = if n == n.trunc() {
5392 format!("{}", n as i64)
5393 } else {
5394 format!("{n}")
5395 };
5396 Some(json!({"N": num_str}))
5397 } else {
5398 None
5399 }
5400}
5401
5402fn parse_partiql_value_object(
5404 s: &str,
5405 parameters: &[Value],
5406) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5407 let s = s.trim();
5408 let inner = s
5409 .strip_prefix('{')
5410 .and_then(|s| s.strip_suffix('}'))
5411 .ok_or_else(|| {
5412 AwsServiceError::aws_error(
5413 StatusCode::BAD_REQUEST,
5414 "ValidationException",
5415 "Invalid VALUE: expected object literal",
5416 )
5417 })?;
5418
5419 let mut item = HashMap::new();
5420 let mut param_idx = 0usize;
5421
5422 for pair in split_partiql_pairs(inner) {
5424 let pair = pair.trim();
5425 if pair.is_empty() {
5426 continue;
5427 }
5428 if let Some((key_part, val_part)) = pair.split_once(':') {
5429 let key = key_part
5430 .trim()
5431 .trim_matches('\'')
5432 .trim_matches('"')
5433 .to_string();
5434 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5435 item.insert(key, val);
5436 }
5437 }
5438 }
5439
5440 Ok(item)
5441}
5442
5443fn split_partiql_pairs(s: &str) -> Vec<&str> {
5445 let mut parts = Vec::new();
5446 let mut start = 0;
5447 let mut depth = 0;
5448 let mut in_quote = false;
5449
5450 for (i, c) in s.char_indices() {
5451 match c {
5452 '\'' if !in_quote => in_quote = true,
5453 '\'' if in_quote => in_quote = false,
5454 '{' if !in_quote => depth += 1,
5455 '}' if !in_quote => depth -= 1,
5456 ',' if !in_quote && depth == 0 => {
5457 parts.push(&s[start..i]);
5458 start = i + 1;
5459 }
5460 _ => {}
5461 }
5462 }
5463 parts.push(&s[start..]);
5464 parts
5465}
5466
5467fn count_params_in_str(s: &str) -> usize {
5469 s.chars().filter(|c| *c == '?').count()
5470}
5471
5472#[cfg(test)]
5473mod tests {
5474 use super::*;
5475 use serde_json::json;
5476
5477 #[test]
5478 fn test_parse_update_clauses_set() {
5479 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5480 assert_eq!(clauses.len(), 1);
5481 assert_eq!(clauses[0].0, "SET");
5482 assert_eq!(clauses[0].1.len(), 2);
5483 }
5484
5485 #[test]
5486 fn test_parse_update_clauses_set_and_remove() {
5487 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5488 assert_eq!(clauses.len(), 2);
5489 assert_eq!(clauses[0].0, "SET");
5490 assert_eq!(clauses[1].0, "REMOVE");
5491 }
5492
5493 #[test]
5494 fn test_evaluate_key_condition_simple() {
5495 let mut item = HashMap::new();
5496 item.insert("pk".to_string(), json!({"S": "user1"}));
5497 item.insert("sk".to_string(), json!({"S": "order1"}));
5498
5499 let mut expr_values = HashMap::new();
5500 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5501
5502 assert!(evaluate_key_condition(
5503 "pk = :pk",
5504 &item,
5505 "pk",
5506 Some("sk"),
5507 &HashMap::new(),
5508 &expr_values,
5509 ));
5510 }
5511
5512 #[test]
5513 fn test_compare_attribute_values_numbers() {
5514 let a = json!({"N": "10"});
5515 let b = json!({"N": "20"});
5516 assert_eq!(
5517 compare_attribute_values(Some(&a), Some(&b)),
5518 std::cmp::Ordering::Less
5519 );
5520 }
5521
5522 #[test]
5523 fn test_compare_attribute_values_strings() {
5524 let a = json!({"S": "apple"});
5525 let b = json!({"S": "banana"});
5526 assert_eq!(
5527 compare_attribute_values(Some(&a), Some(&b)),
5528 std::cmp::Ordering::Less
5529 );
5530 }
5531
5532 #[test]
5533 fn test_split_on_and() {
5534 let parts = split_on_and("pk = :pk AND sk > :sk");
5535 assert_eq!(parts.len(), 2);
5536 assert_eq!(parts[0].trim(), "pk = :pk");
5537 assert_eq!(parts[1].trim(), "sk > :sk");
5538 }
5539
5540 #[test]
5541 fn test_split_on_and_respects_parentheses() {
5542 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5544 assert_eq!(parts.len(), 1);
5546 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5547 }
5548
5549 #[test]
5550 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5551 let mut item = HashMap::new();
5553 item.insert("x".to_string(), json!({"S": "no"}));
5554 item.insert("y".to_string(), json!({"S": "no"}));
5555 item.insert("z".to_string(), json!({"S": "yes"}));
5556
5557 let mut expr_values = HashMap::new();
5558 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5559
5560 let result = evaluate_filter_expression(
5562 "(x = :yes AND y = :yes) OR z = :yes",
5563 &item,
5564 &HashMap::new(),
5565 &expr_values,
5566 );
5567 assert!(result, "should match because z = :yes is true");
5568
5569 let mut item2 = HashMap::new();
5571 item2.insert("x".to_string(), json!({"S": "no"}));
5572 item2.insert("y".to_string(), json!({"S": "no"}));
5573 item2.insert("z".to_string(), json!({"S": "no"}));
5574
5575 let result2 = evaluate_filter_expression(
5576 "(x = :yes AND y = :yes) OR z = :yes",
5577 &item2,
5578 &HashMap::new(),
5579 &expr_values,
5580 );
5581 assert!(!result2, "should not match because nothing is true");
5582 }
5583
5584 #[test]
5585 fn test_project_item_nested_path() {
5586 let mut item = HashMap::new();
5588 item.insert("pk".to_string(), json!({"S": "key1"}));
5589 item.insert(
5590 "data".to_string(),
5591 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5592 );
5593
5594 let body = json!({
5595 "ProjectionExpression": "data[0].name"
5596 });
5597
5598 let projected = project_item(&item, &body);
5599 let name = projected
5601 .get("data")
5602 .and_then(|v| v.get("L"))
5603 .and_then(|v| v.get(0))
5604 .and_then(|v| v.get("M"))
5605 .and_then(|v| v.get("name"))
5606 .and_then(|v| v.get("S"))
5607 .and_then(|v| v.as_str());
5608 assert_eq!(name, Some("Alice"));
5609
5610 let age = projected
5612 .get("data")
5613 .and_then(|v| v.get("L"))
5614 .and_then(|v| v.get(0))
5615 .and_then(|v| v.get("M"))
5616 .and_then(|v| v.get("age"));
5617 assert!(age.is_none(), "age should not be present in projection");
5618 }
5619
5620 #[test]
5621 fn test_resolve_nested_path_map() {
5622 let mut item = HashMap::new();
5623 item.insert(
5624 "info".to_string(),
5625 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5626 );
5627
5628 let result = resolve_nested_path(&item, "info.address.city");
5629 assert_eq!(result, Some(json!({"S": "NYC"})));
5630 }
5631
5632 #[test]
5633 fn test_resolve_nested_path_list_then_map() {
5634 let mut item = HashMap::new();
5635 item.insert(
5636 "items".to_string(),
5637 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5638 );
5639
5640 let result = resolve_nested_path(&item, "items[0].sku");
5641 assert_eq!(result, Some(json!({"S": "ABC"})));
5642 }
5643
5644 use crate::state::SharedDynamoDbState;
5647 use parking_lot::RwLock;
5648 use std::sync::Arc;
5649
5650 fn make_service() -> DynamoDbService {
5651 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5652 "123456789012",
5653 "us-east-1",
5654 )));
5655 DynamoDbService::new(state)
5656 }
5657
5658 fn make_request(action: &str, body: Value) -> AwsRequest {
5659 AwsRequest {
5660 service: "dynamodb".to_string(),
5661 action: action.to_string(),
5662 region: "us-east-1".to_string(),
5663 account_id: "123456789012".to_string(),
5664 request_id: "test-id".to_string(),
5665 headers: http::HeaderMap::new(),
5666 query_params: HashMap::new(),
5667 body: serde_json::to_vec(&body).unwrap().into(),
5668 path_segments: vec![],
5669 raw_path: "/".to_string(),
5670 raw_query: String::new(),
5671 method: http::Method::POST,
5672 is_query_protocol: false,
5673 access_key_id: None,
5674 }
5675 }
5676
5677 fn create_test_table(svc: &DynamoDbService) {
5678 let req = make_request(
5679 "CreateTable",
5680 json!({
5681 "TableName": "test-table",
5682 "KeySchema": [
5683 { "AttributeName": "pk", "KeyType": "HASH" }
5684 ],
5685 "AttributeDefinitions": [
5686 { "AttributeName": "pk", "AttributeType": "S" }
5687 ],
5688 "BillingMode": "PAY_PER_REQUEST"
5689 }),
5690 );
5691 svc.create_table(&req).unwrap();
5692 }
5693
5694 #[test]
5695 fn delete_item_return_values_all_old() {
5696 let svc = make_service();
5697 create_test_table(&svc);
5698
5699 let req = make_request(
5701 "PutItem",
5702 json!({
5703 "TableName": "test-table",
5704 "Item": {
5705 "pk": { "S": "key1" },
5706 "name": { "S": "Alice" },
5707 "age": { "N": "30" }
5708 }
5709 }),
5710 );
5711 svc.put_item(&req).unwrap();
5712
5713 let req = make_request(
5715 "DeleteItem",
5716 json!({
5717 "TableName": "test-table",
5718 "Key": { "pk": { "S": "key1" } },
5719 "ReturnValues": "ALL_OLD"
5720 }),
5721 );
5722 let resp = svc.delete_item(&req).unwrap();
5723 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5724
5725 let attrs = &body["Attributes"];
5727 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5728 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5729 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5730
5731 let req = make_request(
5733 "GetItem",
5734 json!({
5735 "TableName": "test-table",
5736 "Key": { "pk": { "S": "key1" } }
5737 }),
5738 );
5739 let resp = svc.get_item(&req).unwrap();
5740 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5741 assert!(body.get("Item").is_none(), "item should be deleted");
5742 }
5743
5744 #[test]
5745 fn transact_get_items_returns_existing_and_missing() {
5746 let svc = make_service();
5747 create_test_table(&svc);
5748
5749 let req = make_request(
5751 "PutItem",
5752 json!({
5753 "TableName": "test-table",
5754 "Item": {
5755 "pk": { "S": "exists" },
5756 "val": { "S": "hello" }
5757 }
5758 }),
5759 );
5760 svc.put_item(&req).unwrap();
5761
5762 let req = make_request(
5763 "TransactGetItems",
5764 json!({
5765 "TransactItems": [
5766 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5767 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5768 ]
5769 }),
5770 );
5771 let resp = svc.transact_get_items(&req).unwrap();
5772 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5773 let responses = body["Responses"].as_array().unwrap();
5774 assert_eq!(responses.len(), 2);
5775 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5776 assert!(responses[1].get("Item").is_none());
5777 }
5778
5779 #[test]
5780 fn transact_write_items_put_and_delete() {
5781 let svc = make_service();
5782 create_test_table(&svc);
5783
5784 let req = make_request(
5786 "PutItem",
5787 json!({
5788 "TableName": "test-table",
5789 "Item": {
5790 "pk": { "S": "to-delete" },
5791 "val": { "S": "bye" }
5792 }
5793 }),
5794 );
5795 svc.put_item(&req).unwrap();
5796
5797 let req = make_request(
5799 "TransactWriteItems",
5800 json!({
5801 "TransactItems": [
5802 {
5803 "Put": {
5804 "TableName": "test-table",
5805 "Item": {
5806 "pk": { "S": "new-item" },
5807 "val": { "S": "hi" }
5808 }
5809 }
5810 },
5811 {
5812 "Delete": {
5813 "TableName": "test-table",
5814 "Key": { "pk": { "S": "to-delete" } }
5815 }
5816 }
5817 ]
5818 }),
5819 );
5820 let resp = svc.transact_write_items(&req).unwrap();
5821 assert_eq!(resp.status, StatusCode::OK);
5822
5823 let req = make_request(
5825 "GetItem",
5826 json!({
5827 "TableName": "test-table",
5828 "Key": { "pk": { "S": "new-item" } }
5829 }),
5830 );
5831 let resp = svc.get_item(&req).unwrap();
5832 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5833 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5834
5835 let req = make_request(
5837 "GetItem",
5838 json!({
5839 "TableName": "test-table",
5840 "Key": { "pk": { "S": "to-delete" } }
5841 }),
5842 );
5843 let resp = svc.get_item(&req).unwrap();
5844 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5845 assert!(body.get("Item").is_none());
5846 }
5847
5848 #[test]
5849 fn transact_write_items_condition_check_failure() {
5850 let svc = make_service();
5851 create_test_table(&svc);
5852
5853 let req = make_request(
5855 "TransactWriteItems",
5856 json!({
5857 "TransactItems": [
5858 {
5859 "ConditionCheck": {
5860 "TableName": "test-table",
5861 "Key": { "pk": { "S": "nonexistent" } },
5862 "ConditionExpression": "attribute_exists(pk)"
5863 }
5864 }
5865 ]
5866 }),
5867 );
5868 let resp = svc.transact_write_items(&req).unwrap();
5869 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5871 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5872 assert_eq!(
5873 body["__type"].as_str().unwrap(),
5874 "TransactionCanceledException"
5875 );
5876 assert!(body["CancellationReasons"].as_array().is_some());
5877 }
5878
5879 #[test]
5880 fn update_and_describe_time_to_live() {
5881 let svc = make_service();
5882 create_test_table(&svc);
5883
5884 let req = make_request(
5886 "UpdateTimeToLive",
5887 json!({
5888 "TableName": "test-table",
5889 "TimeToLiveSpecification": {
5890 "AttributeName": "ttl",
5891 "Enabled": true
5892 }
5893 }),
5894 );
5895 let resp = svc.update_time_to_live(&req).unwrap();
5896 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5897 assert_eq!(
5898 body["TimeToLiveSpecification"]["AttributeName"]
5899 .as_str()
5900 .unwrap(),
5901 "ttl"
5902 );
5903 assert!(body["TimeToLiveSpecification"]["Enabled"]
5904 .as_bool()
5905 .unwrap());
5906
5907 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5909 let resp = svc.describe_time_to_live(&req).unwrap();
5910 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5911 assert_eq!(
5912 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5913 .as_str()
5914 .unwrap(),
5915 "ENABLED"
5916 );
5917 assert_eq!(
5918 body["TimeToLiveDescription"]["AttributeName"]
5919 .as_str()
5920 .unwrap(),
5921 "ttl"
5922 );
5923
5924 let req = make_request(
5926 "UpdateTimeToLive",
5927 json!({
5928 "TableName": "test-table",
5929 "TimeToLiveSpecification": {
5930 "AttributeName": "ttl",
5931 "Enabled": false
5932 }
5933 }),
5934 );
5935 svc.update_time_to_live(&req).unwrap();
5936
5937 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5938 let resp = svc.describe_time_to_live(&req).unwrap();
5939 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5940 assert_eq!(
5941 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5942 .as_str()
5943 .unwrap(),
5944 "DISABLED"
5945 );
5946 }
5947
5948 #[test]
5949 fn resource_policy_lifecycle() {
5950 let svc = make_service();
5951 create_test_table(&svc);
5952
5953 let table_arn = {
5954 let state = svc.state.read();
5955 state.tables.get("test-table").unwrap().arn.clone()
5956 };
5957
5958 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5960 let req = make_request(
5961 "PutResourcePolicy",
5962 json!({
5963 "ResourceArn": table_arn,
5964 "Policy": policy_doc
5965 }),
5966 );
5967 let resp = svc.put_resource_policy(&req).unwrap();
5968 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5969 assert!(body["RevisionId"].as_str().is_some());
5970
5971 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5973 let resp = svc.get_resource_policy(&req).unwrap();
5974 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5975 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5976
5977 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5979 svc.delete_resource_policy(&req).unwrap();
5980
5981 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5983 let resp = svc.get_resource_policy(&req).unwrap();
5984 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5985 assert!(body["Policy"].is_null());
5986 }
5987
5988 #[test]
5989 fn describe_endpoints() {
5990 let svc = make_service();
5991 let req = make_request("DescribeEndpoints", json!({}));
5992 let resp = svc.describe_endpoints(&req).unwrap();
5993 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5994 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5995 }
5996
5997 #[test]
5998 fn describe_limits() {
5999 let svc = make_service();
6000 let req = make_request("DescribeLimits", json!({}));
6001 let resp = svc.describe_limits(&req).unwrap();
6002 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6003 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
6004 }
6005
6006 #[test]
6007 fn backup_lifecycle() {
6008 let svc = make_service();
6009 create_test_table(&svc);
6010
6011 let req = make_request(
6013 "CreateBackup",
6014 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
6015 );
6016 let resp = svc.create_backup(&req).unwrap();
6017 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6018 let backup_arn = body["BackupDetails"]["BackupArn"]
6019 .as_str()
6020 .unwrap()
6021 .to_string();
6022 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
6023
6024 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
6026 let resp = svc.describe_backup(&req).unwrap();
6027 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6028 assert_eq!(
6029 body["BackupDescription"]["BackupDetails"]["BackupName"],
6030 "my-backup"
6031 );
6032
6033 let req = make_request("ListBackups", json!({}));
6035 let resp = svc.list_backups(&req).unwrap();
6036 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6037 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
6038
6039 let req = make_request(
6041 "RestoreTableFromBackup",
6042 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
6043 );
6044 svc.restore_table_from_backup(&req).unwrap();
6045
6046 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
6048 let resp = svc.describe_table(&req).unwrap();
6049 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6050 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6051
6052 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
6054 svc.delete_backup(&req).unwrap();
6055
6056 let req = make_request("ListBackups", json!({}));
6058 let resp = svc.list_backups(&req).unwrap();
6059 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6060 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
6061 }
6062
6063 #[test]
6064 fn continuous_backups() {
6065 let svc = make_service();
6066 create_test_table(&svc);
6067
6068 let req = make_request(
6070 "DescribeContinuousBackups",
6071 json!({ "TableName": "test-table" }),
6072 );
6073 let resp = svc.describe_continuous_backups(&req).unwrap();
6074 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6075 assert_eq!(
6076 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6077 ["PointInTimeRecoveryStatus"],
6078 "DISABLED"
6079 );
6080
6081 let req = make_request(
6083 "UpdateContinuousBackups",
6084 json!({
6085 "TableName": "test-table",
6086 "PointInTimeRecoverySpecification": {
6087 "PointInTimeRecoveryEnabled": true
6088 }
6089 }),
6090 );
6091 svc.update_continuous_backups(&req).unwrap();
6092
6093 let req = make_request(
6095 "DescribeContinuousBackups",
6096 json!({ "TableName": "test-table" }),
6097 );
6098 let resp = svc.describe_continuous_backups(&req).unwrap();
6099 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6100 assert_eq!(
6101 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6102 ["PointInTimeRecoveryStatus"],
6103 "ENABLED"
6104 );
6105 }
6106
6107 #[test]
6108 fn restore_table_to_point_in_time() {
6109 let svc = make_service();
6110 create_test_table(&svc);
6111
6112 let req = make_request(
6113 "RestoreTableToPointInTime",
6114 json!({
6115 "SourceTableName": "test-table",
6116 "TargetTableName": "pitr-restored"
6117 }),
6118 );
6119 svc.restore_table_to_point_in_time(&req).unwrap();
6120
6121 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
6122 let resp = svc.describe_table(&req).unwrap();
6123 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6124 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6125 }
6126
6127 #[test]
6128 fn global_table_lifecycle() {
6129 let svc = make_service();
6130
6131 let req = make_request(
6133 "CreateGlobalTable",
6134 json!({
6135 "GlobalTableName": "my-global",
6136 "ReplicationGroup": [
6137 { "RegionName": "us-east-1" },
6138 { "RegionName": "eu-west-1" }
6139 ]
6140 }),
6141 );
6142 let resp = svc.create_global_table(&req).unwrap();
6143 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6144 assert_eq!(
6145 body["GlobalTableDescription"]["GlobalTableStatus"],
6146 "ACTIVE"
6147 );
6148
6149 let req = make_request(
6151 "DescribeGlobalTable",
6152 json!({ "GlobalTableName": "my-global" }),
6153 );
6154 let resp = svc.describe_global_table(&req).unwrap();
6155 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6156 assert_eq!(
6157 body["GlobalTableDescription"]["ReplicationGroup"]
6158 .as_array()
6159 .unwrap()
6160 .len(),
6161 2
6162 );
6163
6164 let req = make_request("ListGlobalTables", json!({}));
6166 let resp = svc.list_global_tables(&req).unwrap();
6167 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
6169
6170 let req = make_request(
6172 "UpdateGlobalTable",
6173 json!({
6174 "GlobalTableName": "my-global",
6175 "ReplicaUpdates": [
6176 { "Create": { "RegionName": "ap-southeast-1" } }
6177 ]
6178 }),
6179 );
6180 let resp = svc.update_global_table(&req).unwrap();
6181 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6182 assert_eq!(
6183 body["GlobalTableDescription"]["ReplicationGroup"]
6184 .as_array()
6185 .unwrap()
6186 .len(),
6187 3
6188 );
6189
6190 let req = make_request(
6192 "DescribeGlobalTableSettings",
6193 json!({ "GlobalTableName": "my-global" }),
6194 );
6195 let resp = svc.describe_global_table_settings(&req).unwrap();
6196 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6197 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
6198
6199 let req = make_request(
6201 "UpdateGlobalTableSettings",
6202 json!({ "GlobalTableName": "my-global" }),
6203 );
6204 svc.update_global_table_settings(&req).unwrap();
6205 }
6206
6207 #[test]
6208 fn table_replica_auto_scaling() {
6209 let svc = make_service();
6210 create_test_table(&svc);
6211
6212 let req = make_request(
6213 "DescribeTableReplicaAutoScaling",
6214 json!({ "TableName": "test-table" }),
6215 );
6216 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
6217 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6218 assert_eq!(
6219 body["TableAutoScalingDescription"]["TableName"],
6220 "test-table"
6221 );
6222
6223 let req = make_request(
6224 "UpdateTableReplicaAutoScaling",
6225 json!({ "TableName": "test-table" }),
6226 );
6227 svc.update_table_replica_auto_scaling(&req).unwrap();
6228 }
6229
6230 #[test]
6231 fn kinesis_streaming_lifecycle() {
6232 let svc = make_service();
6233 create_test_table(&svc);
6234
6235 let req = make_request(
6237 "EnableKinesisStreamingDestination",
6238 json!({
6239 "TableName": "test-table",
6240 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6241 }),
6242 );
6243 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
6244 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6245 assert_eq!(body["DestinationStatus"], "ACTIVE");
6246
6247 let req = make_request(
6249 "DescribeKinesisStreamingDestination",
6250 json!({ "TableName": "test-table" }),
6251 );
6252 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
6253 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6254 assert_eq!(
6255 body["KinesisDataStreamDestinations"]
6256 .as_array()
6257 .unwrap()
6258 .len(),
6259 1
6260 );
6261
6262 let req = make_request(
6264 "UpdateKinesisStreamingDestination",
6265 json!({
6266 "TableName": "test-table",
6267 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
6268 "UpdateKinesisStreamingConfiguration": {
6269 "ApproximateCreationDateTimePrecision": "MICROSECOND"
6270 }
6271 }),
6272 );
6273 svc.update_kinesis_streaming_destination(&req).unwrap();
6274
6275 let req = make_request(
6277 "DisableKinesisStreamingDestination",
6278 json!({
6279 "TableName": "test-table",
6280 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6281 }),
6282 );
6283 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
6284 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6285 assert_eq!(body["DestinationStatus"], "DISABLED");
6286 }
6287
6288 #[test]
6289 fn contributor_insights_lifecycle() {
6290 let svc = make_service();
6291 create_test_table(&svc);
6292
6293 let req = make_request(
6295 "DescribeContributorInsights",
6296 json!({ "TableName": "test-table" }),
6297 );
6298 let resp = svc.describe_contributor_insights(&req).unwrap();
6299 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6300 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6301
6302 let req = make_request(
6304 "UpdateContributorInsights",
6305 json!({
6306 "TableName": "test-table",
6307 "ContributorInsightsAction": "ENABLE"
6308 }),
6309 );
6310 let resp = svc.update_contributor_insights(&req).unwrap();
6311 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6312 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6313
6314 let req = make_request("ListContributorInsights", json!({}));
6316 let resp = svc.list_contributor_insights(&req).unwrap();
6317 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6318 assert_eq!(
6319 body["ContributorInsightsSummaries"]
6320 .as_array()
6321 .unwrap()
6322 .len(),
6323 1
6324 );
6325 }
6326
6327 #[test]
6328 fn export_lifecycle() {
6329 let svc = make_service();
6330 create_test_table(&svc);
6331
6332 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6333
6334 let req = make_request(
6336 "ExportTableToPointInTime",
6337 json!({
6338 "TableArn": table_arn,
6339 "S3Bucket": "my-bucket"
6340 }),
6341 );
6342 let resp = svc.export_table_to_point_in_time(&req).unwrap();
6343 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6344 let export_arn = body["ExportDescription"]["ExportArn"]
6345 .as_str()
6346 .unwrap()
6347 .to_string();
6348 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6349
6350 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6352 let resp = svc.describe_export(&req).unwrap();
6353 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6354 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6355
6356 let req = make_request("ListExports", json!({}));
6358 let resp = svc.list_exports(&req).unwrap();
6359 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6360 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6361 }
6362
6363 #[test]
6364 fn import_lifecycle() {
6365 let svc = make_service();
6366
6367 let req = make_request(
6368 "ImportTable",
6369 json!({
6370 "InputFormat": "DYNAMODB_JSON",
6371 "S3BucketSource": { "S3Bucket": "import-bucket" },
6372 "TableCreationParameters": {
6373 "TableName": "imported-table",
6374 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6375 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6376 }
6377 }),
6378 );
6379 let resp = svc.import_table(&req).unwrap();
6380 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6381 let import_arn = body["ImportTableDescription"]["ImportArn"]
6382 .as_str()
6383 .unwrap()
6384 .to_string();
6385 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6386
6387 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6389 let resp = svc.describe_import(&req).unwrap();
6390 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6391 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6392
6393 let req = make_request("ListImports", json!({}));
6395 let resp = svc.list_imports(&req).unwrap();
6396 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6397 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6398
6399 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6401 let resp = svc.describe_table(&req).unwrap();
6402 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6403 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6404 }
6405
6406 #[test]
6407 fn backup_restore_preserves_items() {
6408 let svc = make_service();
6409 create_test_table(&svc);
6410
6411 for i in 1..=3 {
6413 let req = make_request(
6414 "PutItem",
6415 json!({
6416 "TableName": "test-table",
6417 "Item": {
6418 "pk": { "S": format!("key{i}") },
6419 "data": { "S": format!("value{i}") }
6420 }
6421 }),
6422 );
6423 svc.put_item(&req).unwrap();
6424 }
6425
6426 let req = make_request(
6428 "CreateBackup",
6429 json!({
6430 "TableName": "test-table",
6431 "BackupName": "my-backup"
6432 }),
6433 );
6434 let resp = svc.create_backup(&req).unwrap();
6435 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6436 let backup_arn = body["BackupDetails"]["BackupArn"]
6437 .as_str()
6438 .unwrap()
6439 .to_string();
6440
6441 for i in 1..=3 {
6443 let req = make_request(
6444 "DeleteItem",
6445 json!({
6446 "TableName": "test-table",
6447 "Key": { "pk": { "S": format!("key{i}") } }
6448 }),
6449 );
6450 svc.delete_item(&req).unwrap();
6451 }
6452
6453 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6455 let resp = svc.scan(&req).unwrap();
6456 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6457 assert_eq!(body["Count"], 0);
6458
6459 let req = make_request(
6461 "RestoreTableFromBackup",
6462 json!({
6463 "BackupArn": backup_arn,
6464 "TargetTableName": "restored-table"
6465 }),
6466 );
6467 svc.restore_table_from_backup(&req).unwrap();
6468
6469 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6471 let resp = svc.scan(&req).unwrap();
6472 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6473 assert_eq!(body["Count"], 3);
6474 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6475 }
6476
6477 #[test]
6478 fn global_table_replicates_writes() {
6479 let svc = make_service();
6480 create_test_table(&svc);
6481
6482 let req = make_request(
6484 "CreateGlobalTable",
6485 json!({
6486 "GlobalTableName": "test-table",
6487 "ReplicationGroup": [
6488 { "RegionName": "us-east-1" },
6489 { "RegionName": "eu-west-1" }
6490 ]
6491 }),
6492 );
6493 let resp = svc.create_global_table(&req).unwrap();
6494 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6495 assert_eq!(
6496 body["GlobalTableDescription"]["GlobalTableStatus"],
6497 "ACTIVE"
6498 );
6499
6500 let req = make_request(
6502 "PutItem",
6503 json!({
6504 "TableName": "test-table",
6505 "Item": {
6506 "pk": { "S": "replicated-key" },
6507 "data": { "S": "replicated-value" }
6508 }
6509 }),
6510 );
6511 svc.put_item(&req).unwrap();
6512
6513 let req = make_request(
6515 "GetItem",
6516 json!({
6517 "TableName": "test-table",
6518 "Key": { "pk": { "S": "replicated-key" } }
6519 }),
6520 );
6521 let resp = svc.get_item(&req).unwrap();
6522 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6523 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6524 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6525 }
6526
6527 #[test]
6528 fn contributor_insights_tracks_access() {
6529 let svc = make_service();
6530 create_test_table(&svc);
6531
6532 let req = make_request(
6534 "UpdateContributorInsights",
6535 json!({
6536 "TableName": "test-table",
6537 "ContributorInsightsAction": "ENABLE"
6538 }),
6539 );
6540 svc.update_contributor_insights(&req).unwrap();
6541
6542 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6544 let req = make_request(
6545 "PutItem",
6546 json!({
6547 "TableName": "test-table",
6548 "Item": {
6549 "pk": { "S": key },
6550 "data": { "S": "value" }
6551 }
6552 }),
6553 );
6554 svc.put_item(&req).unwrap();
6555 }
6556
6557 for _ in 0..3 {
6559 let req = make_request(
6560 "GetItem",
6561 json!({
6562 "TableName": "test-table",
6563 "Key": { "pk": { "S": "alpha" } }
6564 }),
6565 );
6566 svc.get_item(&req).unwrap();
6567 }
6568
6569 let req = make_request(
6571 "DescribeContributorInsights",
6572 json!({ "TableName": "test-table" }),
6573 );
6574 let resp = svc.describe_contributor_insights(&req).unwrap();
6575 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6576 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6577
6578 let contributors = body["TopContributors"].as_array().unwrap();
6579 assert!(
6580 !contributors.is_empty(),
6581 "TopContributors should not be empty"
6582 );
6583
6584 let top = &contributors[0];
6587 assert!(top["Count"].as_u64().unwrap() > 0);
6588
6589 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6591 assert!(!rules.is_empty());
6592 }
6593
6594 #[test]
6595 fn contributor_insights_not_tracked_when_disabled() {
6596 let svc = make_service();
6597 create_test_table(&svc);
6598
6599 let req = make_request(
6601 "PutItem",
6602 json!({
6603 "TableName": "test-table",
6604 "Item": {
6605 "pk": { "S": "key1" },
6606 "data": { "S": "value" }
6607 }
6608 }),
6609 );
6610 svc.put_item(&req).unwrap();
6611
6612 let req = make_request(
6614 "DescribeContributorInsights",
6615 json!({ "TableName": "test-table" }),
6616 );
6617 let resp = svc.describe_contributor_insights(&req).unwrap();
6618 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6619 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6620
6621 let contributors = body["TopContributors"].as_array().unwrap();
6622 assert!(contributors.is_empty());
6623 }
6624
6625 #[test]
6626 fn contributor_insights_disabled_table_no_counters_after_scan() {
6627 let svc = make_service();
6628 create_test_table(&svc);
6629
6630 for key in &["alpha", "beta"] {
6632 let req = make_request(
6633 "PutItem",
6634 json!({
6635 "TableName": "test-table",
6636 "Item": { "pk": { "S": key } }
6637 }),
6638 );
6639 svc.put_item(&req).unwrap();
6640 }
6641
6642 let req = make_request(
6644 "UpdateContributorInsights",
6645 json!({
6646 "TableName": "test-table",
6647 "ContributorInsightsAction": "ENABLE"
6648 }),
6649 );
6650 svc.update_contributor_insights(&req).unwrap();
6651
6652 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6654 svc.scan(&req).unwrap();
6655
6656 let req = make_request(
6658 "DescribeContributorInsights",
6659 json!({ "TableName": "test-table" }),
6660 );
6661 let resp = svc.describe_contributor_insights(&req).unwrap();
6662 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6663 let contributors = body["TopContributors"].as_array().unwrap();
6664 assert!(
6665 !contributors.is_empty(),
6666 "counters should be non-empty while enabled"
6667 );
6668
6669 let req = make_request(
6671 "UpdateContributorInsights",
6672 json!({
6673 "TableName": "test-table",
6674 "ContributorInsightsAction": "DISABLE"
6675 }),
6676 );
6677 svc.update_contributor_insights(&req).unwrap();
6678
6679 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6681 svc.scan(&req).unwrap();
6682
6683 let req = make_request(
6685 "DescribeContributorInsights",
6686 json!({ "TableName": "test-table" }),
6687 );
6688 let resp = svc.describe_contributor_insights(&req).unwrap();
6689 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6690 let contributors = body["TopContributors"].as_array().unwrap();
6691 assert!(
6692 contributors.is_empty(),
6693 "counters should be empty after disabling insights"
6694 );
6695 }
6696
6697 #[test]
6698 fn scan_pagination_with_limit() {
6699 let svc = make_service();
6700 create_test_table(&svc);
6701
6702 for i in 0..5 {
6704 let req = make_request(
6705 "PutItem",
6706 json!({
6707 "TableName": "test-table",
6708 "Item": {
6709 "pk": { "S": format!("item{i}") },
6710 "data": { "S": format!("value{i}") }
6711 }
6712 }),
6713 );
6714 svc.put_item(&req).unwrap();
6715 }
6716
6717 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6719 let resp = svc.scan(&req).unwrap();
6720 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6721 assert_eq!(body["Count"], 2);
6722 assert!(
6723 body["LastEvaluatedKey"].is_object(),
6724 "should have LastEvaluatedKey when limit < total items"
6725 );
6726 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6727
6728 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6730 let mut lek = body["LastEvaluatedKey"].clone();
6731
6732 while lek.is_object() {
6733 let req = make_request(
6734 "Scan",
6735 json!({
6736 "TableName": "test-table",
6737 "Limit": 2,
6738 "ExclusiveStartKey": lek
6739 }),
6740 );
6741 let resp = svc.scan(&req).unwrap();
6742 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6743 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6744 lek = body["LastEvaluatedKey"].clone();
6745 }
6746
6747 assert_eq!(
6748 all_items.len(),
6749 5,
6750 "should retrieve all 5 items via pagination"
6751 );
6752 }
6753
6754 #[test]
6755 fn scan_no_pagination_when_all_fit() {
6756 let svc = make_service();
6757 create_test_table(&svc);
6758
6759 for i in 0..3 {
6760 let req = make_request(
6761 "PutItem",
6762 json!({
6763 "TableName": "test-table",
6764 "Item": {
6765 "pk": { "S": format!("item{i}") }
6766 }
6767 }),
6768 );
6769 svc.put_item(&req).unwrap();
6770 }
6771
6772 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6774 let resp = svc.scan(&req).unwrap();
6775 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6776 assert_eq!(body["Count"], 3);
6777 assert!(
6778 body["LastEvaluatedKey"].is_null(),
6779 "should not have LastEvaluatedKey when all items fit"
6780 );
6781
6782 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6784 let resp = svc.scan(&req).unwrap();
6785 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6786 assert_eq!(body["Count"], 3);
6787 assert!(body["LastEvaluatedKey"].is_null());
6788 }
6789
6790 fn create_composite_table(svc: &DynamoDbService) {
6791 let req = make_request(
6792 "CreateTable",
6793 json!({
6794 "TableName": "composite-table",
6795 "KeySchema": [
6796 { "AttributeName": "pk", "KeyType": "HASH" },
6797 { "AttributeName": "sk", "KeyType": "RANGE" }
6798 ],
6799 "AttributeDefinitions": [
6800 { "AttributeName": "pk", "AttributeType": "S" },
6801 { "AttributeName": "sk", "AttributeType": "S" }
6802 ],
6803 "BillingMode": "PAY_PER_REQUEST"
6804 }),
6805 );
6806 svc.create_table(&req).unwrap();
6807 }
6808
6809 #[test]
6810 fn query_pagination_with_composite_key() {
6811 let svc = make_service();
6812 create_composite_table(&svc);
6813
6814 for i in 0..5 {
6816 let req = make_request(
6817 "PutItem",
6818 json!({
6819 "TableName": "composite-table",
6820 "Item": {
6821 "pk": { "S": "user1" },
6822 "sk": { "S": format!("item{i:03}") },
6823 "data": { "S": format!("value{i}") }
6824 }
6825 }),
6826 );
6827 svc.put_item(&req).unwrap();
6828 }
6829
6830 let req = make_request(
6832 "Query",
6833 json!({
6834 "TableName": "composite-table",
6835 "KeyConditionExpression": "pk = :pk",
6836 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6837 "Limit": 2
6838 }),
6839 );
6840 let resp = svc.query(&req).unwrap();
6841 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6842 assert_eq!(body["Count"], 2);
6843 assert!(body["LastEvaluatedKey"].is_object());
6844 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6845 assert!(body["LastEvaluatedKey"]["sk"].is_object());
6846
6847 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6849 let mut lek = body["LastEvaluatedKey"].clone();
6850
6851 while lek.is_object() {
6852 let req = make_request(
6853 "Query",
6854 json!({
6855 "TableName": "composite-table",
6856 "KeyConditionExpression": "pk = :pk",
6857 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6858 "Limit": 2,
6859 "ExclusiveStartKey": lek
6860 }),
6861 );
6862 let resp = svc.query(&req).unwrap();
6863 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6864 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6865 lek = body["LastEvaluatedKey"].clone();
6866 }
6867
6868 assert_eq!(
6869 all_items.len(),
6870 5,
6871 "should retrieve all 5 items via pagination"
6872 );
6873
6874 let sks: Vec<String> = all_items
6876 .iter()
6877 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6878 .collect();
6879 let mut sorted = sks.clone();
6880 sorted.sort();
6881 assert_eq!(sks, sorted, "items should be sorted by sort key");
6882 }
6883
6884 #[test]
6885 fn query_no_pagination_when_all_fit() {
6886 let svc = make_service();
6887 create_composite_table(&svc);
6888
6889 for i in 0..2 {
6890 let req = make_request(
6891 "PutItem",
6892 json!({
6893 "TableName": "composite-table",
6894 "Item": {
6895 "pk": { "S": "user1" },
6896 "sk": { "S": format!("item{i}") }
6897 }
6898 }),
6899 );
6900 svc.put_item(&req).unwrap();
6901 }
6902
6903 let req = make_request(
6904 "Query",
6905 json!({
6906 "TableName": "composite-table",
6907 "KeyConditionExpression": "pk = :pk",
6908 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6909 "Limit": 10
6910 }),
6911 );
6912 let resp = svc.query(&req).unwrap();
6913 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6914 assert_eq!(body["Count"], 2);
6915 assert!(
6916 body["LastEvaluatedKey"].is_null(),
6917 "should not have LastEvaluatedKey when all items fit"
6918 );
6919 }
6920
6921 fn create_gsi_table(svc: &DynamoDbService) {
6922 let req = make_request(
6923 "CreateTable",
6924 json!({
6925 "TableName": "gsi-table",
6926 "KeySchema": [
6927 { "AttributeName": "pk", "KeyType": "HASH" }
6928 ],
6929 "AttributeDefinitions": [
6930 { "AttributeName": "pk", "AttributeType": "S" },
6931 { "AttributeName": "gsi_pk", "AttributeType": "S" },
6932 { "AttributeName": "gsi_sk", "AttributeType": "S" }
6933 ],
6934 "BillingMode": "PAY_PER_REQUEST",
6935 "GlobalSecondaryIndexes": [
6936 {
6937 "IndexName": "gsi-index",
6938 "KeySchema": [
6939 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6940 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6941 ],
6942 "Projection": { "ProjectionType": "ALL" }
6943 }
6944 ]
6945 }),
6946 );
6947 svc.create_table(&req).unwrap();
6948 }
6949
6950 #[test]
6951 fn gsi_query_last_evaluated_key_includes_table_pk() {
6952 let svc = make_service();
6953 create_gsi_table(&svc);
6954
6955 for i in 0..3 {
6957 let req = make_request(
6958 "PutItem",
6959 json!({
6960 "TableName": "gsi-table",
6961 "Item": {
6962 "pk": { "S": format!("item{i}") },
6963 "gsi_pk": { "S": "shared" },
6964 "gsi_sk": { "S": "sort" }
6965 }
6966 }),
6967 );
6968 svc.put_item(&req).unwrap();
6969 }
6970
6971 let req = make_request(
6973 "Query",
6974 json!({
6975 "TableName": "gsi-table",
6976 "IndexName": "gsi-index",
6977 "KeyConditionExpression": "gsi_pk = :v",
6978 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6979 "Limit": 1
6980 }),
6981 );
6982 let resp = svc.query(&req).unwrap();
6983 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6984 assert_eq!(body["Count"], 1);
6985 let lek = &body["LastEvaluatedKey"];
6986 assert!(lek.is_object(), "should have LastEvaluatedKey");
6987 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6989 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6990 assert!(
6992 lek["pk"].is_object(),
6993 "LEK must contain table PK for GSI queries"
6994 );
6995 }
6996
6997 #[test]
6998 fn gsi_query_pagination_returns_all_items() {
6999 let svc = make_service();
7000 create_gsi_table(&svc);
7001
7002 for i in 0..4 {
7004 let req = make_request(
7005 "PutItem",
7006 json!({
7007 "TableName": "gsi-table",
7008 "Item": {
7009 "pk": { "S": format!("item{i:03}") },
7010 "gsi_pk": { "S": "shared" },
7011 "gsi_sk": { "S": "sort" }
7012 }
7013 }),
7014 );
7015 svc.put_item(&req).unwrap();
7016 }
7017
7018 let mut all_pks = Vec::new();
7020 let mut lek: Option<Value> = None;
7021
7022 loop {
7023 let mut query = json!({
7024 "TableName": "gsi-table",
7025 "IndexName": "gsi-index",
7026 "KeyConditionExpression": "gsi_pk = :v",
7027 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
7028 "Limit": 2
7029 });
7030 if let Some(ref start_key) = lek {
7031 query["ExclusiveStartKey"] = start_key.clone();
7032 }
7033
7034 let req = make_request("Query", query);
7035 let resp = svc.query(&req).unwrap();
7036 let body: Value = serde_json::from_slice(&resp.body).unwrap();
7037
7038 for item in body["Items"].as_array().unwrap() {
7039 let pk = item["pk"]["S"].as_str().unwrap().to_string();
7040 all_pks.push(pk);
7041 }
7042
7043 if body["LastEvaluatedKey"].is_object() {
7044 lek = Some(body["LastEvaluatedKey"].clone());
7045 } else {
7046 break;
7047 }
7048 }
7049
7050 all_pks.sort();
7051 assert_eq!(
7052 all_pks,
7053 vec!["item000", "item001", "item002", "item003"],
7054 "pagination should return all items without duplicates"
7055 );
7056 }
7057
7058 fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
7059 pairs
7060 .iter()
7061 .map(|(k, v)| (k.to_string(), json!({"S": v})))
7062 .collect()
7063 }
7064
7065 fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
7066 pairs
7067 .iter()
7068 .map(|(k, v)| (k.to_string(), v.to_string()))
7069 .collect()
7070 }
7071
7072 fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
7073 pairs
7074 .iter()
7075 .map(|(k, v)| (k.to_string(), json!({"S": v})))
7076 .collect()
7077 }
7078
7079 #[test]
7080 fn test_evaluate_condition_bare_not_equal() {
7081 let item = cond_item(&[("state", "active")]);
7082 let names = cond_names(&[("#s", "state")]);
7083 let values = cond_values(&[(":c", "complete")]);
7084
7085 assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
7086
7087 let item2 = cond_item(&[("state", "complete")]);
7088 assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
7089 }
7090
7091 #[test]
7092 fn test_evaluate_condition_parenthesized_not_equal() {
7093 let item = cond_item(&[("state", "active")]);
7094 let names = cond_names(&[("#s", "state")]);
7095 let values = cond_values(&[(":c", "complete")]);
7096
7097 assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
7098 }
7099
7100 #[test]
7101 fn test_evaluate_condition_parenthesized_equal_mismatch() {
7102 let item = cond_item(&[("state", "active")]);
7103 let names = cond_names(&[("#s", "state")]);
7104 let values = cond_values(&[(":c", "complete")]);
7105
7106 assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
7107 }
7108
7109 #[test]
7110 fn test_evaluate_condition_compound_and() {
7111 let item = cond_item(&[("state", "active")]);
7112 let names = cond_names(&[("#s", "state")]);
7113 let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
7114
7115 assert!(
7117 evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
7118 );
7119 }
7120
7121 #[test]
7122 fn test_evaluate_condition_compound_and_mismatch() {
7123 let item = cond_item(&[("state", "inactive")]);
7124 let names = cond_names(&[("#s", "state")]);
7125 let values = cond_values(&[(":a", "active"), (":b", "active")]);
7126
7127 assert!(
7129 evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
7130 );
7131 }
7132
7133 #[test]
7134 fn test_evaluate_condition_compound_or() {
7135 let item = cond_item(&[("state", "running")]);
7136 let names = cond_names(&[("#s", "state")]);
7137 let values = cond_values(&[(":a", "active"), (":b", "idle")]);
7138
7139 assert!(
7141 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
7142 );
7143
7144 let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
7146 assert!(
7147 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
7148 );
7149 }
7150
7151 #[test]
7152 fn test_evaluate_condition_not_operator() {
7153 let item = cond_item(&[("state", "active")]);
7154 let names = cond_names(&[("#s", "state")]);
7155 let values = cond_values(&[(":c", "complete")]);
7156
7157 assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
7159
7160 assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
7162
7163 assert!(
7165 evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
7166 );
7167
7168 assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
7170 }
7171
7172 #[test]
7173 fn test_evaluate_condition_begins_with() {
7174 let item = cond_item(&[("name", "fakecloud-dynamodb")]);
7177 let names = cond_names(&[("#n", "name")]);
7178 let values = cond_values(&[(":p", "fakecloud")]);
7179
7180 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
7181
7182 let values2 = cond_values(&[(":p", "realcloud")]);
7183 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
7184 }
7185
7186 #[test]
7187 fn test_evaluate_condition_contains() {
7188 let item = cond_item(&[("tags", "alpha,beta,gamma")]);
7189 let names = cond_names(&[("#t", "tags")]);
7190 let values = cond_values(&[(":v", "beta")]);
7191
7192 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
7193
7194 let values2 = cond_values(&[(":v", "delta")]);
7195 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
7196 }
7197
7198 #[test]
7199 fn test_evaluate_condition_no_existing_item() {
7200 let names = cond_names(&[("#s", "state")]);
7203 let values = cond_values(&[(":v", "active")]);
7204
7205 assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
7206 assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
7207 assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
7209 assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
7211 }
7212
7213 #[test]
7214 fn test_evaluate_filter_not_operator() {
7215 let item = cond_item(&[("status", "pending")]);
7216 let names = cond_names(&[("#s", "status")]);
7217 let values = cond_values(&[(":v", "pending")]);
7218
7219 assert!(!evaluate_filter_expression(
7220 "NOT (#s = :v)",
7221 &item,
7222 &names,
7223 &values
7224 ));
7225 assert!(evaluate_filter_expression(
7226 "NOT (#s <> :v)",
7227 &item,
7228 &names,
7229 &values
7230 ));
7231 }
7232
7233 #[test]
7234 fn test_evaluate_filter_expression_in_match() {
7235 let item = cond_item(&[("state", "active")]);
7241 let names = cond_names(&[("#s", "state")]);
7242 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7243
7244 assert!(
7245 evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7246 "state=active should match IN (active, pending)"
7247 );
7248 }
7249
7250 #[test]
7251 fn test_evaluate_filter_expression_in_no_match() {
7252 let item = cond_item(&[("state", "complete")]);
7253 let names = cond_names(&[("#s", "state")]);
7254 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7255
7256 assert!(
7257 !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7258 "state=complete should not match IN (active, pending)"
7259 );
7260 }
7261
7262 #[test]
7263 fn test_evaluate_filter_expression_in_no_spaces() {
7264 let item = cond_item(&[("status", "shipped")]);
7268 let names = cond_names(&[("#s", "status")]);
7269 let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
7270
7271 assert!(
7272 evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
7273 "no-space IN list should still parse"
7274 );
7275 }
7276
7277 #[test]
7278 fn test_evaluate_filter_expression_in_missing_attribute() {
7279 let item: HashMap<String, AttributeValue> = HashMap::new();
7282 let names = cond_names(&[("#s", "state")]);
7283 let values = cond_values(&[(":a", "active")]);
7284
7285 assert!(
7286 !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
7287 "missing attribute should not match any IN list"
7288 );
7289 }
7290
7291 #[test]
7292 fn test_evaluate_filter_expression_compound_in_and_eq() {
7293 let item = cond_item(&[("state", "active"), ("priority", "high")]);
7299 let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
7300 let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
7301
7302 assert!(
7303 evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
7304 "(active IN (active, pending)) AND (high = high) should match"
7305 );
7306
7307 let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
7308 assert!(
7309 !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
7310 "(complete IN (active, pending)) AND (high = high) should not match"
7311 );
7312 }
7313
7314 #[test]
7315 fn test_evaluate_condition_attribute_exists_with_space() {
7316 let item = cond_item(&[("store_id", "s-1")]);
7324 let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
7325 let values = cond_values(&[(":0", "tab-A")]);
7326
7327 assert!(
7330 evaluate_condition(
7331 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7332 Some(&item),
7333 &names,
7334 &values,
7335 )
7336 .is_ok(),
7337 "claim-lease compound on free item should succeed"
7338 );
7339
7340 assert!(
7342 evaluate_condition(
7343 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7344 None,
7345 &names,
7346 &values,
7347 )
7348 .is_err(),
7349 "claim-lease compound on missing item must fail attribute_exists branch"
7350 );
7351
7352 let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
7355 assert!(
7356 evaluate_condition(
7357 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7358 Some(&held),
7359 &names,
7360 &values,
7361 )
7362 .is_err(),
7363 "claim-lease compound on item held by another tab must fail"
7364 );
7365
7366 let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
7369 assert!(
7370 evaluate_condition(
7371 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7372 Some(&self_held),
7373 &names,
7374 &values,
7375 )
7376 .is_ok(),
7377 "same-tab re-claim must succeed"
7378 );
7379 }
7380
7381 #[test]
7382 fn test_evaluate_condition_in_match() {
7383 let item = cond_item(&[("state", "active")]);
7386 let names = cond_names(&[("#s", "state")]);
7387 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7388
7389 assert!(
7390 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
7391 "IN should succeed when actual value is in the list"
7392 );
7393 }
7394
7395 #[test]
7396 fn test_evaluate_condition_in_no_match() {
7397 let item = cond_item(&[("state", "complete")]);
7401 let names = cond_names(&[("#s", "state")]);
7402 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7403
7404 assert!(
7405 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
7406 "IN should fail when actual value is not in the list"
7407 );
7408 }
7409
7410 #[test]
7411 fn test_apply_update_set_list_index_replaces_existing() {
7412 let mut item = HashMap::new();
7419 item.insert(
7420 "items".to_string(),
7421 json!({"L": [
7422 {"M": {"sku": {"S": "OLD-A"}}},
7423 {"M": {"sku": {"S": "OLD-B"}}},
7424 ]}),
7425 );
7426
7427 let names = cond_names(&[("#items", "items")]);
7428 let mut values = HashMap::new();
7429 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
7430
7431 apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
7432
7433 let items_list = item
7434 .get("items")
7435 .and_then(|v| v.get("L"))
7436 .and_then(|v| v.as_array())
7437 .expect("items should still be a list");
7438 assert_eq!(items_list.len(), 2, "list length should be unchanged");
7439 let sku0 = items_list[0]
7440 .get("M")
7441 .and_then(|m| m.get("sku"))
7442 .and_then(|s| s.get("S"))
7443 .and_then(|s| s.as_str());
7444 assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
7445 let sku1 = items_list[1]
7446 .get("M")
7447 .and_then(|m| m.get("sku"))
7448 .and_then(|s| s.get("S"))
7449 .and_then(|s| s.as_str());
7450 assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
7451
7452 assert!(!item.contains_key("items[0]"));
7453 assert!(!item.contains_key("#items[0]"));
7454 }
7455
7456 #[test]
7457 fn test_apply_update_set_list_index_second_slot() {
7458 let mut item = HashMap::new();
7459 item.insert(
7460 "items".to_string(),
7461 json!({"L": [
7462 {"M": {"sku": {"S": "A"}}},
7463 {"M": {"sku": {"S": "B"}}},
7464 {"M": {"sku": {"S": "C"}}},
7465 ]}),
7466 );
7467
7468 let names = cond_names(&[("#items", "items")]);
7469 let mut values = HashMap::new();
7470 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
7471
7472 apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
7473
7474 let items_list = item
7475 .get("items")
7476 .and_then(|v| v.get("L"))
7477 .and_then(|v| v.as_array())
7478 .unwrap();
7479 let skus: Vec<&str> = items_list
7480 .iter()
7481 .map(|v| {
7482 v.get("M")
7483 .and_then(|m| m.get("sku"))
7484 .and_then(|s| s.get("S"))
7485 .and_then(|s| s.as_str())
7486 .unwrap()
7487 })
7488 .collect();
7489 assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
7490 }
7491
7492 #[test]
7493 fn test_apply_update_set_list_index_without_name_ref() {
7494 let mut item = HashMap::new();
7497 item.insert(
7498 "tags".to_string(),
7499 json!({"L": [{"S": "red"}, {"S": "blue"}]}),
7500 );
7501
7502 let names: HashMap<String, String> = HashMap::new();
7503 let mut values = HashMap::new();
7504 values.insert(":t".to_string(), json!({"S": "green"}));
7505
7506 apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
7507
7508 let tags = item
7509 .get("tags")
7510 .and_then(|v| v.get("L"))
7511 .and_then(|v| v.as_array())
7512 .unwrap();
7513 assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
7514 assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
7515 }
7516
7517 #[test]
7518 fn test_unrecognized_expression_returns_false() {
7519 let item = cond_item(&[("x", "1")]);
7522 let names: HashMap<String, String> = HashMap::new();
7523 let values: HashMap<String, Value> = HashMap::new();
7524
7525 assert!(
7526 !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
7527 "unrecognized expression must return false"
7528 );
7529 }
7530
7531 #[test]
7532 fn test_set_list_index_out_of_range_returns_error() {
7533 let mut item = HashMap::new();
7536 item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
7537
7538 let names: HashMap<String, String> = HashMap::new();
7539 let mut values = HashMap::new();
7540 values.insert(":v".to_string(), json!({"S": "z"}));
7541
7542 let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
7543 assert!(
7544 result.is_err(),
7545 "out-of-range list index must return an error"
7546 );
7547
7548 let list = item
7550 .get("items")
7551 .and_then(|v| v.get("L"))
7552 .and_then(|v| v.as_array())
7553 .unwrap();
7554 assert_eq!(list.len(), 2);
7555 }
7556
7557 #[test]
7558 fn test_set_list_index_on_non_list_returns_error() {
7559 let mut item = HashMap::new();
7562 item.insert("name".to_string(), json!({"S": "hello"}));
7563
7564 let names: HashMap<String, String> = HashMap::new();
7565 let mut values = HashMap::new();
7566 values.insert(":v".to_string(), json!({"S": "z"}));
7567
7568 let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
7569 assert!(
7570 result.is_err(),
7571 "list index on non-list attribute must return an error"
7572 );
7573 }
7574
7575 #[test]
7576 fn test_unrecognized_update_action_returns_error() {
7577 let mut item = HashMap::new();
7578 item.insert("name".to_string(), json!({"S": "hello"}));
7579
7580 let names: HashMap<String, String> = HashMap::new();
7581 let mut values = HashMap::new();
7582 values.insert(":bar".to_string(), json!({"S": "baz"}));
7583
7584 let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
7585 assert!(
7586 result.is_err(),
7587 "unrecognized UpdateExpression action must return an error"
7588 );
7589 let err_msg = format!("{}", result.unwrap_err());
7590 assert!(
7591 err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
7592 "error should mention Invalid UpdateExpression, got: {err_msg}"
7593 );
7594 }
7595
7596 #[test]
7599 fn test_size_string() {
7600 let mut item = HashMap::new();
7601 item.insert("name".to_string(), json!({"S": "hello"}));
7602 let names = HashMap::new();
7603 let mut values = HashMap::new();
7604 values.insert(":limit".to_string(), json!({"N": "5"}));
7605
7606 assert!(evaluate_single_filter_condition(
7607 "size(name) = :limit",
7608 &item,
7609 &names,
7610 &values,
7611 ));
7612 values.insert(":limit".to_string(), json!({"N": "4"}));
7613 assert!(evaluate_single_filter_condition(
7614 "size(name) > :limit",
7615 &item,
7616 &names,
7617 &values,
7618 ));
7619 }
7620
7621 #[test]
7622 fn test_size_list() {
7623 let mut item = HashMap::new();
7624 item.insert(
7625 "items".to_string(),
7626 json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
7627 );
7628 let names = HashMap::new();
7629 let mut values = HashMap::new();
7630 values.insert(":limit".to_string(), json!({"N": "3"}));
7631
7632 assert!(evaluate_single_filter_condition(
7633 "size(items) = :limit",
7634 &item,
7635 &names,
7636 &values,
7637 ));
7638 }
7639
7640 #[test]
7641 fn test_size_map() {
7642 let mut item = HashMap::new();
7643 item.insert(
7644 "data".to_string(),
7645 json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
7646 );
7647 let names = HashMap::new();
7648 let mut values = HashMap::new();
7649 values.insert(":limit".to_string(), json!({"N": "2"}));
7650
7651 assert!(evaluate_single_filter_condition(
7652 "size(data) = :limit",
7653 &item,
7654 &names,
7655 &values,
7656 ));
7657 }
7658
7659 #[test]
7660 fn test_size_set() {
7661 let mut item = HashMap::new();
7662 item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
7663 let names = HashMap::new();
7664 let mut values = HashMap::new();
7665 values.insert(":limit".to_string(), json!({"N": "3"}));
7666
7667 assert!(evaluate_single_filter_condition(
7668 "size(tags) > :limit",
7669 &item,
7670 &names,
7671 &values,
7672 ));
7673 }
7674
7675 #[test]
7678 fn test_attribute_type_string() {
7679 let mut item = HashMap::new();
7680 item.insert("name".to_string(), json!({"S": "hello"}));
7681 let names = HashMap::new();
7682 let mut values = HashMap::new();
7683 values.insert(":t".to_string(), json!({"S": "S"}));
7684
7685 assert!(evaluate_single_filter_condition(
7686 "attribute_type(name, :t)",
7687 &item,
7688 &names,
7689 &values,
7690 ));
7691
7692 values.insert(":t".to_string(), json!({"S": "N"}));
7693 assert!(!evaluate_single_filter_condition(
7694 "attribute_type(name, :t)",
7695 &item,
7696 &names,
7697 &values,
7698 ));
7699 }
7700
7701 #[test]
7702 fn test_attribute_type_number() {
7703 let mut item = HashMap::new();
7704 item.insert("age".to_string(), json!({"N": "42"}));
7705 let names = HashMap::new();
7706 let mut values = HashMap::new();
7707 values.insert(":t".to_string(), json!({"S": "N"}));
7708
7709 assert!(evaluate_single_filter_condition(
7710 "attribute_type(age, :t)",
7711 &item,
7712 &names,
7713 &values,
7714 ));
7715 }
7716
7717 #[test]
7718 fn test_attribute_type_list() {
7719 let mut item = HashMap::new();
7720 item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
7721 let names = HashMap::new();
7722 let mut values = HashMap::new();
7723 values.insert(":t".to_string(), json!({"S": "L"}));
7724
7725 assert!(evaluate_single_filter_condition(
7726 "attribute_type(items, :t)",
7727 &item,
7728 &names,
7729 &values,
7730 ));
7731 }
7732
7733 #[test]
7734 fn test_attribute_type_map() {
7735 let mut item = HashMap::new();
7736 item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
7737 let names = HashMap::new();
7738 let mut values = HashMap::new();
7739 values.insert(":t".to_string(), json!({"S": "M"}));
7740
7741 assert!(evaluate_single_filter_condition(
7742 "attribute_type(data, :t)",
7743 &item,
7744 &names,
7745 &values,
7746 ));
7747 }
7748
7749 #[test]
7750 fn test_attribute_type_bool() {
7751 let mut item = HashMap::new();
7752 item.insert("active".to_string(), json!({"BOOL": true}));
7753 let names = HashMap::new();
7754 let mut values = HashMap::new();
7755 values.insert(":t".to_string(), json!({"S": "BOOL"}));
7756
7757 assert!(evaluate_single_filter_condition(
7758 "attribute_type(active, :t)",
7759 &item,
7760 &names,
7761 &values,
7762 ));
7763 }
7764
7765 #[test]
7768 fn test_begins_with_rejects_number_type() {
7769 let mut item = HashMap::new();
7770 item.insert("code".to_string(), json!({"N": "12345"}));
7771 let names = HashMap::new();
7772 let mut values = HashMap::new();
7773 values.insert(":prefix".to_string(), json!({"S": "123"}));
7774
7775 assert!(
7776 !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
7777 "begins_with must return false for N-type attributes"
7778 );
7779 }
7780
7781 #[test]
7782 fn test_begins_with_works_on_string_type() {
7783 let mut item = HashMap::new();
7784 item.insert("code".to_string(), json!({"S": "abc123"}));
7785 let names = HashMap::new();
7786 let mut values = HashMap::new();
7787 values.insert(":prefix".to_string(), json!({"S": "abc"}));
7788
7789 assert!(evaluate_single_filter_condition(
7790 "begins_with(code, :prefix)",
7791 &item,
7792 &names,
7793 &values,
7794 ));
7795 }
7796
7797 #[test]
7800 fn test_contains_string_set() {
7801 let mut item = HashMap::new();
7802 item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
7803 let names = HashMap::new();
7804 let mut values = HashMap::new();
7805 values.insert(":val".to_string(), json!({"S": "blue"}));
7806
7807 assert!(evaluate_single_filter_condition(
7808 "contains(tags, :val)",
7809 &item,
7810 &names,
7811 &values,
7812 ));
7813
7814 values.insert(":val".to_string(), json!({"S": "yellow"}));
7815 assert!(!evaluate_single_filter_condition(
7816 "contains(tags, :val)",
7817 &item,
7818 &names,
7819 &values,
7820 ));
7821 }
7822
7823 #[test]
7824 fn test_contains_number_set() {
7825 let mut item = HashMap::new();
7826 item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
7827 let names = HashMap::new();
7828 let mut values = HashMap::new();
7829 values.insert(":val".to_string(), json!({"N": "2"}));
7830
7831 assert!(evaluate_single_filter_condition(
7832 "contains(scores, :val)",
7833 &item,
7834 &names,
7835 &values,
7836 ));
7837 }
7838
7839 #[test]
7842 fn test_set_arithmetic_rejects_string_operand() {
7843 let mut item = HashMap::new();
7844 item.insert("name".to_string(), json!({"S": "hello"}));
7845 let names = HashMap::new();
7846 let mut values = HashMap::new();
7847 values.insert(":val".to_string(), json!({"N": "1"}));
7848
7849 let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
7850 assert!(
7851 result.is_err(),
7852 "arithmetic on S-type attribute must return a ValidationException"
7853 );
7854 }
7855
7856 #[test]
7857 fn test_set_arithmetic_rejects_string_value() {
7858 let mut item = HashMap::new();
7859 item.insert("count".to_string(), json!({"N": "5"}));
7860 let names = HashMap::new();
7861 let mut values = HashMap::new();
7862 values.insert(":val".to_string(), json!({"S": "notanumber"}));
7863
7864 let result =
7865 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7866 assert!(
7867 result.is_err(),
7868 "arithmetic with S-type value must return a ValidationException"
7869 );
7870 }
7871
7872 #[test]
7873 fn test_set_arithmetic_valid_numbers() {
7874 let mut item = HashMap::new();
7875 item.insert("count".to_string(), json!({"N": "10"}));
7876 let names = HashMap::new();
7877 let mut values = HashMap::new();
7878 values.insert(":val".to_string(), json!({"N": "3"}));
7879
7880 let result =
7881 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7882 assert!(result.is_ok());
7883 assert_eq!(item["count"], json!({"N": "13"}));
7884 }
7885
7886 #[test]
7889 fn test_add_binary_set() {
7890 let mut item = HashMap::new();
7891 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
7892 let names = HashMap::new();
7893 let mut values = HashMap::new();
7894 values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
7895
7896 let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
7897 assert!(result.is_ok());
7898 let bs = item["data"]["BS"].as_array().unwrap();
7899 assert_eq!(bs.len(), 3, "should merge sets without duplicates");
7900 assert!(bs.contains(&json!("YQ==")));
7901 assert!(bs.contains(&json!("Yg==")));
7902 assert!(bs.contains(&json!("Yw==")));
7903 }
7904
7905 #[test]
7906 fn test_delete_binary_set() {
7907 let mut item = HashMap::new();
7908 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
7909 let names = HashMap::new();
7910 let mut values = HashMap::new();
7911 values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
7912
7913 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7914 assert!(result.is_ok());
7915 let bs = item["data"]["BS"].as_array().unwrap();
7916 assert_eq!(bs.len(), 2);
7917 assert!(!bs.contains(&json!("Yg==")));
7918 }
7919
7920 #[test]
7921 fn test_delete_binary_set_removes_attr_when_empty() {
7922 let mut item = HashMap::new();
7923 item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
7924 let names = HashMap::new();
7925 let mut values = HashMap::new();
7926 values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
7927
7928 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7929 assert!(result.is_ok());
7930 assert!(
7931 !item.contains_key("data"),
7932 "attribute should be removed when set becomes empty"
7933 );
7934 }
7935}