1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21 ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25 state: SharedDynamoDbState,
26 s3_state: Option<SharedS3State>,
27 delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31 pub fn new(state: SharedDynamoDbState) -> Self {
32 Self {
33 state,
34 s3_state: None,
35 delivery: None,
36 }
37 }
38
39 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40 self.s3_state = Some(s3_state);
41 self
42 }
43
44 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45 self.delivery = Some(delivery);
46 self
47 }
48
49 fn deliver_to_kinesis_destinations(
51 &self,
52 table: &DynamoTable,
53 event_name: &str,
54 keys: &HashMap<String, AttributeValue>,
55 old_image: Option<&HashMap<String, AttributeValue>>,
56 new_image: Option<&HashMap<String, AttributeValue>>,
57 ) {
58 let delivery = match &self.delivery {
59 Some(d) => d,
60 None => return,
61 };
62
63 let active_destinations: Vec<_> = table
64 .kinesis_destinations
65 .iter()
66 .filter(|d| d.destination_status == "ACTIVE")
67 .collect();
68
69 if active_destinations.is_empty() {
70 return;
71 }
72
73 let mut record = json!({
74 "eventID": uuid::Uuid::new_v4().to_string(),
75 "eventName": event_name,
76 "eventVersion": "1.1",
77 "eventSource": "aws:dynamodb",
78 "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79 "dynamodb": {
80 "Keys": keys,
81 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83 "StreamViewType": "NEW_AND_OLD_IMAGES",
84 },
85 "eventSourceARN": &table.arn,
86 "tableName": &table.name,
87 });
88
89 if let Some(old) = old_image {
90 record["dynamodb"]["OldImage"] = json!(old);
91 }
92 if let Some(new) = new_image {
93 record["dynamodb"]["NewImage"] = json!(new);
94 }
95
96 let record_str = serde_json::to_string(&record).unwrap_or_default();
97 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98 let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100 for dest in active_destinations {
101 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102 }
103 }
104
105 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106 serde_json::from_slice(&req.body).map_err(|e| {
107 AwsServiceError::aws_error(
108 StatusCode::BAD_REQUEST,
109 "SerializationException",
110 format!("Invalid JSON: {e}"),
111 )
112 })
113 }
114
115 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116 Ok(AwsResponse::ok_json(body))
117 }
118
119 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122 let body = Self::parse_body(req)?;
123
124 let table_name = body["TableName"]
125 .as_str()
126 .ok_or_else(|| {
127 AwsServiceError::aws_error(
128 StatusCode::BAD_REQUEST,
129 "ValidationException",
130 "TableName is required",
131 )
132 })?
133 .to_string();
134
135 let key_schema = parse_key_schema(&body["KeySchema"])?;
136 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138 for ks in &key_schema {
140 if !attribute_definitions
141 .iter()
142 .any(|ad| ad.attribute_name == ks.attribute_name)
143 {
144 return Err(AwsServiceError::aws_error(
145 StatusCode::BAD_REQUEST,
146 "ValidationException",
147 format!(
148 "One or more parameter values were invalid: \
149 Some index key attributes are not defined in AttributeDefinitions. \
150 Keys: [{}], AttributeDefinitions: [{}]",
151 ks.attribute_name,
152 attribute_definitions
153 .iter()
154 .map(|ad| ad.attribute_name.as_str())
155 .collect::<Vec<_>>()
156 .join(", ")
157 ),
158 ));
159 }
160 }
161
162 let billing_mode = body["BillingMode"]
163 .as_str()
164 .unwrap_or("PROVISIONED")
165 .to_string();
166
167 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168 ProvisionedThroughput {
169 read_capacity_units: 0,
170 write_capacity_units: 0,
171 }
172 } else {
173 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174 };
175
176 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178 let tags = parse_tags(&body["Tags"]);
179
180 let (stream_enabled, stream_view_type) =
182 if let Some(stream_spec) = body.get("StreamSpecification") {
183 let enabled = stream_spec
184 .get("StreamEnabled")
185 .and_then(|v| v.as_bool())
186 .unwrap_or(false);
187 let view_type = if enabled {
188 stream_spec
189 .get("StreamViewType")
190 .and_then(|v| v.as_str())
191 .map(|s| s.to_string())
192 } else {
193 None
194 };
195 (enabled, view_type)
196 } else {
197 (false, None)
198 };
199
200 let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202 let enabled = sse_spec
203 .get("Enabled")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false);
206 if enabled {
207 let sse_type = sse_spec
208 .get("SSEType")
209 .and_then(|v| v.as_str())
210 .unwrap_or("KMS")
211 .to_string();
212 let kms_key = sse_spec
213 .get("KMSMasterKeyId")
214 .and_then(|v| v.as_str())
215 .map(|s| s.to_string());
216 (Some(sse_type), kms_key)
217 } else {
218 (None, None)
219 }
220 } else {
221 (None, None)
222 };
223
224 let mut state = self.state.write();
225
226 if state.tables.contains_key(&table_name) {
227 return Err(AwsServiceError::aws_error(
228 StatusCode::BAD_REQUEST,
229 "ResourceInUseException",
230 format!("Table already exists: {table_name}"),
231 ));
232 }
233
234 let now = Utc::now();
235 let arn = format!(
236 "arn:aws:dynamodb:{}:{}:table/{}",
237 state.region, state.account_id, table_name
238 );
239 let stream_arn = if stream_enabled {
240 Some(format!(
241 "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242 state.region,
243 state.account_id,
244 table_name,
245 now.format("%Y-%m-%dT%H:%M:%S.%3f")
246 ))
247 } else {
248 None
249 };
250
251 let table = DynamoTable {
252 name: table_name.clone(),
253 arn: arn.clone(),
254 key_schema: key_schema.clone(),
255 attribute_definitions: attribute_definitions.clone(),
256 provisioned_throughput: provisioned_throughput.clone(),
257 items: Vec::new(),
258 gsi: gsi.clone(),
259 lsi: lsi.clone(),
260 tags,
261 created_at: now,
262 status: "ACTIVE".to_string(),
263 item_count: 0,
264 size_bytes: 0,
265 billing_mode: billing_mode.clone(),
266 ttl_attribute: None,
267 ttl_enabled: false,
268 resource_policy: None,
269 pitr_enabled: false,
270 kinesis_destinations: Vec::new(),
271 contributor_insights_status: "DISABLED".to_string(),
272 contributor_insights_counters: HashMap::new(),
273 stream_enabled,
274 stream_view_type,
275 stream_arn,
276 stream_records: Arc::new(RwLock::new(Vec::new())),
277 sse_type,
278 sse_kms_key_arn,
279 };
280
281 state.tables.insert(table_name, table);
282
283 let table_desc = build_table_description_json(
284 &arn,
285 &key_schema,
286 &attribute_definitions,
287 &provisioned_throughput,
288 &gsi,
289 &lsi,
290 &billing_mode,
291 now,
292 0,
293 0,
294 "ACTIVE",
295 );
296
297 Self::ok_json(json!({ "TableDescription": table_desc }))
298 }
299
300 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = Self::parse_body(req)?;
302 let table_name = require_str(&body, "TableName")?;
303
304 let mut state = self.state.write();
305 let table = state.tables.remove(table_name).ok_or_else(|| {
306 AwsServiceError::aws_error(
307 StatusCode::BAD_REQUEST,
308 "ResourceNotFoundException",
309 format!("Requested resource not found: Table: {table_name} not found"),
310 )
311 })?;
312
313 let table_desc = build_table_description_json(
314 &table.arn,
315 &table.key_schema,
316 &table.attribute_definitions,
317 &table.provisioned_throughput,
318 &table.gsi,
319 &table.lsi,
320 &table.billing_mode,
321 table.created_at,
322 table.item_count,
323 table.size_bytes,
324 "DELETING",
325 );
326
327 Self::ok_json(json!({ "TableDescription": table_desc }))
328 }
329
330 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331 let body = Self::parse_body(req)?;
332 let table_name = require_str(&body, "TableName")?;
333
334 let state = self.state.read();
335 let table = get_table(&state.tables, table_name)?;
336
337 let table_desc = build_table_description(table);
338
339 Self::ok_json(json!({ "Table": table_desc }))
340 }
341
342 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343 let body = Self::parse_body(req)?;
344
345 validate_optional_string_length(
346 "exclusiveStartTableName",
347 body["ExclusiveStartTableName"].as_str(),
348 3,
349 255,
350 )?;
351 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354 let exclusive_start = body["ExclusiveStartTableName"]
355 .as_str()
356 .map(|s| s.to_string());
357
358 let state = self.state.read();
359 let mut names: Vec<&String> = state.tables.keys().collect();
360 names.sort();
361
362 let start_idx = match &exclusive_start {
363 Some(start) => names
364 .iter()
365 .position(|n| n.as_str() > start.as_str())
366 .unwrap_or(names.len()),
367 None => 0,
368 };
369
370 let page: Vec<&str> = names
371 .iter()
372 .skip(start_idx)
373 .take(limit)
374 .map(|n| n.as_str())
375 .collect();
376
377 let mut result = json!({ "TableNames": page });
378
379 if start_idx + limit < names.len() {
380 if let Some(last) = page.last() {
381 result["LastEvaluatedTableName"] = json!(last);
382 }
383 }
384
385 Self::ok_json(result)
386 }
387
388 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389 let body = Self::parse_body(req)?;
390 let table_name = require_str(&body, "TableName")?;
391
392 let mut state = self.state.write();
393 let table = state.tables.get_mut(table_name).ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::BAD_REQUEST,
396 "ResourceNotFoundException",
397 format!("Requested resource not found: Table: {table_name} not found"),
398 )
399 })?;
400
401 if let Some(pt) = body.get("ProvisionedThroughput") {
402 if let Ok(throughput) = parse_provisioned_throughput(pt) {
403 table.provisioned_throughput = throughput;
404 }
405 }
406
407 if let Some(bm) = body["BillingMode"].as_str() {
408 table.billing_mode = bm.to_string();
409 }
410
411 if let Some(sse_spec) = body.get("SSESpecification") {
413 let enabled = sse_spec
414 .get("Enabled")
415 .and_then(|v| v.as_bool())
416 .unwrap_or(false);
417 if enabled {
418 table.sse_type = Some(
419 sse_spec
420 .get("SSEType")
421 .and_then(|v| v.as_str())
422 .unwrap_or("KMS")
423 .to_string(),
424 );
425 table.sse_kms_key_arn = sse_spec
426 .get("KMSMasterKeyId")
427 .and_then(|v| v.as_str())
428 .map(|s| s.to_string());
429 } else {
430 table.sse_type = None;
431 table.sse_kms_key_arn = None;
432 }
433 }
434
435 let table_desc = build_table_description_json(
436 &table.arn,
437 &table.key_schema,
438 &table.attribute_definitions,
439 &table.provisioned_throughput,
440 &table.gsi,
441 &table.lsi,
442 &table.billing_mode,
443 table.created_at,
444 table.item_count,
445 table.size_bytes,
446 &table.status,
447 );
448
449 Self::ok_json(json!({ "TableDescription": table_desc }))
450 }
451
452 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455 let body = Self::parse_body(req)?;
457 let table_name = require_str(&body, "TableName")?;
458 let item = require_object(&body, "Item")?;
459 let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460 let expr_attr_names = parse_expression_attribute_names(&body);
461 let expr_attr_values = parse_expression_attribute_values(&body);
462 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464 let (old_item, kinesis_info) = {
467 let mut state = self.state.write();
468 let region = state.region.clone();
469 let table = get_table_mut(&mut state.tables, table_name)?;
470
471 validate_key_in_item(table, &item)?;
472
473 let key = extract_key(table, &item);
474 let existing_idx = table.find_item_index(&key);
475
476 if let Some(ref cond) = condition {
477 let existing = existing_idx.map(|i| &table.items[i]);
478 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
479 }
480
481 let old_item_for_return = if return_values == "ALL_OLD" {
482 existing_idx.map(|i| table.items[i].clone())
483 } else {
484 None
485 };
486
487 let needs_change_capture = table.stream_enabled
489 || table
490 .kinesis_destinations
491 .iter()
492 .any(|d| d.destination_status == "ACTIVE");
493 let old_item_for_stream = if needs_change_capture {
494 existing_idx.map(|i| table.items[i].clone())
495 } else {
496 None
497 };
498
499 let is_modify = existing_idx.is_some();
500
501 if let Some(idx) = existing_idx {
502 table.items[idx] = item.clone();
503 } else {
504 table.items.push(item.clone());
505 }
506
507 table.record_item_access(&item);
508 table.recalculate_stats();
509
510 let event_name = if is_modify { "MODIFY" } else { "INSERT" };
511 let key = extract_key(table, &item);
512
513 if table.stream_enabled {
515 if let Some(record) = crate::streams::generate_stream_record(
516 table,
517 event_name,
518 key.clone(),
519 old_item_for_stream.clone(),
520 Some(item.clone()),
521 ®ion,
522 ) {
523 crate::streams::add_stream_record(table, record);
524 }
525 }
526
527 let kinesis_info = if table
529 .kinesis_destinations
530 .iter()
531 .any(|d| d.destination_status == "ACTIVE")
532 {
533 Some((
534 table.clone(),
535 event_name.to_string(),
536 key,
537 old_item_for_stream,
538 Some(item.clone()),
539 ))
540 } else {
541 None
542 };
543
544 (old_item_for_return, kinesis_info)
545 };
546 if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
550 self.deliver_to_kinesis_destinations(
551 &table,
552 &event_name,
553 &keys,
554 old_image.as_ref(),
555 new_image.as_ref(),
556 );
557 }
558
559 let mut result = json!({});
560 if let Some(old) = old_item {
561 result["Attributes"] = json!(old);
562 }
563
564 Self::ok_json(result)
565 }
566
567 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
568 let body = Self::parse_body(req)?;
570 let table_name = require_str(&body, "TableName")?;
571 let key = require_object(&body, "Key")?;
572
573 let (result, needs_insights) = {
575 let state = self.state.read();
576 let table = get_table(&state.tables, table_name)?;
577 let needs_insights = table.contributor_insights_status == "ENABLED";
578
579 let result = match table.find_item_index(&key) {
580 Some(idx) => {
581 let item = &table.items[idx];
582 let projected = project_item(item, &body);
583 json!({ "Item": projected })
584 }
585 None => json!({}),
586 };
587 (result, needs_insights)
588 };
589 if needs_insights {
593 let mut state = self.state.write();
594 if let Some(table) = state.tables.get_mut(table_name) {
595 table.record_key_access(&key);
596 }
597 }
598
599 Self::ok_json(result)
600 }
601
602 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
603 let body = Self::parse_body(req)?;
604
605 validate_optional_enum_value(
606 "conditionalOperator",
607 &body["ConditionalOperator"],
608 &["AND", "OR"],
609 )?;
610 validate_optional_enum_value(
611 "returnConsumedCapacity",
612 &body["ReturnConsumedCapacity"],
613 &["INDEXES", "TOTAL", "NONE"],
614 )?;
615 validate_optional_enum_value(
616 "returnValues",
617 &body["ReturnValues"],
618 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
619 )?;
620 validate_optional_enum_value(
621 "returnItemCollectionMetrics",
622 &body["ReturnItemCollectionMetrics"],
623 &["SIZE", "NONE"],
624 )?;
625 validate_optional_enum_value(
626 "returnValuesOnConditionCheckFailure",
627 &body["ReturnValuesOnConditionCheckFailure"],
628 &["ALL_OLD", "NONE"],
629 )?;
630
631 let table_name = require_str(&body, "TableName")?;
632 let key = require_object(&body, "Key")?;
633
634 let (result, kinesis_info) = {
635 let mut state = self.state.write();
636 let region = state.region.clone();
637 let table = get_table_mut(&mut state.tables, table_name)?;
638
639 let condition = body["ConditionExpression"].as_str();
640 let expr_attr_names = parse_expression_attribute_names(&body);
641 let expr_attr_values = parse_expression_attribute_values(&body);
642
643 let existing_idx = table.find_item_index(&key);
644
645 if let Some(cond) = condition {
646 let existing = existing_idx.map(|i| &table.items[i]);
647 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
648 }
649
650 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
651
652 let mut result = json!({});
653 let mut kinesis_info = None;
654
655 if let Some(idx) = existing_idx {
656 let old_item = table.items[idx].clone();
657 if return_values == "ALL_OLD" {
658 result["Attributes"] = json!(old_item.clone());
659 }
660
661 if table.stream_enabled {
663 if let Some(record) = crate::streams::generate_stream_record(
664 table,
665 "REMOVE",
666 key.clone(),
667 Some(old_item.clone()),
668 None,
669 ®ion,
670 ) {
671 crate::streams::add_stream_record(table, record);
672 }
673 }
674
675 if table
677 .kinesis_destinations
678 .iter()
679 .any(|d| d.destination_status == "ACTIVE")
680 {
681 kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
682 }
683
684 table.items.remove(idx);
685 table.recalculate_stats();
686 }
687
688 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
689 let return_icm = body["ReturnItemCollectionMetrics"]
690 .as_str()
691 .unwrap_or("NONE");
692
693 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
694 result["ConsumedCapacity"] = json!({
695 "TableName": table_name,
696 "CapacityUnits": 1.0,
697 });
698 }
699
700 if return_icm == "SIZE" {
701 result["ItemCollectionMetrics"] = json!({});
702 }
703
704 (result, kinesis_info)
705 };
706
707 if let Some((table, keys, old_image)) = kinesis_info {
709 self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
710 }
711
712 Self::ok_json(result)
713 }
714
715 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
716 let body = Self::parse_body(req)?;
717 let table_name = require_str(&body, "TableName")?;
718 let key = require_object(&body, "Key")?;
719
720 let mut state = self.state.write();
721 let region = state.region.clone();
722 let table = get_table_mut(&mut state.tables, table_name)?;
723
724 validate_key_attributes_in_key(table, &key)?;
725
726 let condition = body["ConditionExpression"].as_str();
727 let expr_attr_names = parse_expression_attribute_names(&body);
728 let expr_attr_values = parse_expression_attribute_values(&body);
729 let update_expression = body["UpdateExpression"].as_str();
730
731 let existing_idx = table.find_item_index(&key);
732
733 if let Some(cond) = condition {
734 let existing = existing_idx.map(|i| &table.items[i]);
735 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
736 }
737
738 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
739
740 let is_insert = existing_idx.is_none();
741 let idx = match existing_idx {
742 Some(i) => i,
743 None => {
744 let mut new_item = HashMap::new();
745 for (k, v) in &key {
746 new_item.insert(k.clone(), v.clone());
747 }
748 table.items.push(new_item);
749 table.items.len() - 1
750 }
751 };
752
753 let needs_change_capture = table.stream_enabled
755 || table
756 .kinesis_destinations
757 .iter()
758 .any(|d| d.destination_status == "ACTIVE");
759 let old_item_for_stream = if needs_change_capture {
760 Some(table.items[idx].clone())
761 } else {
762 None
763 };
764
765 let old_item = if return_values == "ALL_OLD" {
766 Some(table.items[idx].clone())
767 } else {
768 None
769 };
770
771 if let Some(expr) = update_expression {
772 apply_update_expression(
773 &mut table.items[idx],
774 expr,
775 &expr_attr_names,
776 &expr_attr_values,
777 )?;
778 }
779
780 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
781 Some(table.items[idx].clone())
782 } else {
783 None
784 };
785
786 let event_name = if is_insert { "INSERT" } else { "MODIFY" };
787 let new_item_for_stream = table.items[idx].clone();
788
789 if table.stream_enabled {
791 if let Some(record) = crate::streams::generate_stream_record(
792 table,
793 event_name,
794 key.clone(),
795 old_item_for_stream.clone(),
796 Some(new_item_for_stream.clone()),
797 ®ion,
798 ) {
799 crate::streams::add_stream_record(table, record);
800 }
801 }
802
803 let kinesis_info = if table
805 .kinesis_destinations
806 .iter()
807 .any(|d| d.destination_status == "ACTIVE")
808 {
809 Some((
810 table.clone(),
811 event_name.to_string(),
812 key.clone(),
813 old_item_for_stream,
814 Some(new_item_for_stream),
815 ))
816 } else {
817 None
818 };
819
820 table.recalculate_stats();
821
822 drop(state);
824
825 if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
827 self.deliver_to_kinesis_destinations(
828 &tbl,
829 &ev,
830 &keys,
831 old_image.as_ref(),
832 new_image.as_ref(),
833 );
834 }
835
836 let mut result = json!({});
837 if let Some(old) = old_item {
838 result["Attributes"] = json!(old);
839 } else if let Some(new) = new_item {
840 result["Attributes"] = json!(new);
841 }
842
843 Self::ok_json(result)
844 }
845
846 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
849 let body = Self::parse_body(req)?;
850 let table_name = require_str(&body, "TableName")?;
851
852 let state = self.state.read();
853 let table = get_table(&state.tables, table_name)?;
854
855 let expr_attr_names = parse_expression_attribute_names(&body);
856 let expr_attr_values = parse_expression_attribute_values(&body);
857
858 let key_condition = body["KeyConditionExpression"].as_str();
859 let filter_expression = body["FilterExpression"].as_str();
860 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
861 let limit = body["Limit"].as_i64().map(|l| l as usize);
862 let index_name = body["IndexName"].as_str();
863 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
864 parse_key_map(&body["ExclusiveStartKey"]);
865
866 let (items_to_scan, hash_key_name, range_key_name): (
867 &[HashMap<String, AttributeValue>],
868 String,
869 Option<String>,
870 ) = if let Some(idx_name) = index_name {
871 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
872 let hk = gsi
873 .key_schema
874 .iter()
875 .find(|k| k.key_type == "HASH")
876 .map(|k| k.attribute_name.clone())
877 .unwrap_or_default();
878 let rk = gsi
879 .key_schema
880 .iter()
881 .find(|k| k.key_type == "RANGE")
882 .map(|k| k.attribute_name.clone());
883 (&table.items, hk, rk)
884 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
885 let hk = lsi
886 .key_schema
887 .iter()
888 .find(|k| k.key_type == "HASH")
889 .map(|k| k.attribute_name.clone())
890 .unwrap_or_default();
891 let rk = lsi
892 .key_schema
893 .iter()
894 .find(|k| k.key_type == "RANGE")
895 .map(|k| k.attribute_name.clone());
896 (&table.items, hk, rk)
897 } else {
898 return Err(AwsServiceError::aws_error(
899 StatusCode::BAD_REQUEST,
900 "ValidationException",
901 format!("The table does not have the specified index: {idx_name}"),
902 ));
903 }
904 } else {
905 (
906 &table.items[..],
907 table.hash_key_name().to_string(),
908 table.range_key_name().map(|s| s.to_string()),
909 )
910 };
911
912 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
913 .iter()
914 .filter(|item| {
915 if let Some(kc) = key_condition {
916 evaluate_key_condition(
917 kc,
918 item,
919 &hash_key_name,
920 range_key_name.as_deref(),
921 &expr_attr_names,
922 &expr_attr_values,
923 )
924 } else {
925 true
926 }
927 })
928 .collect();
929
930 if let Some(ref rk) = range_key_name {
931 matched.sort_by(|a, b| {
932 let av = a.get(rk.as_str());
933 let bv = b.get(rk.as_str());
934 compare_attribute_values(av, bv)
935 });
936 if !scan_forward {
937 matched.reverse();
938 }
939 }
940
941 let table_pk_hash = table.hash_key_name().to_string();
944 let table_pk_range = table.range_key_name().map(|s| s.to_string());
945 let is_gsi_query = index_name.is_some()
946 && (hash_key_name != table_pk_hash
947 || range_key_name.as_deref() != table_pk_range.as_deref());
948
949 if let Some(ref start_key) = exclusive_start_key {
953 if let Some(pos) = matched.iter().position(|item| {
954 let index_match =
955 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
956 if is_gsi_query {
957 index_match
958 && item_matches_key(
959 item,
960 start_key,
961 &table_pk_hash,
962 table_pk_range.as_deref(),
963 )
964 } else {
965 index_match
966 }
967 }) {
968 matched = matched.split_off(pos + 1);
969 }
970 }
971
972 if let Some(filter) = filter_expression {
973 matched.retain(|item| {
974 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
975 });
976 }
977
978 let scanned_count = matched.len();
979
980 let has_more = if let Some(lim) = limit {
981 let more = matched.len() > lim;
982 matched.truncate(lim);
983 more
984 } else {
985 false
986 };
987
988 let last_evaluated_key = if has_more {
992 matched.last().map(|item| {
993 let mut key =
994 extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
995 if is_gsi_query {
996 let table_key =
997 extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
998 key.extend(table_key);
999 }
1000 key
1001 })
1002 } else {
1003 None
1004 };
1005
1006 let insights_enabled = table.contributor_insights_status == "ENABLED";
1008 let pk_name = table.hash_key_name().to_string();
1009 let accessed_keys: Vec<String> = if insights_enabled {
1010 matched
1011 .iter()
1012 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1013 .collect()
1014 } else {
1015 Vec::new()
1016 };
1017
1018 let items: Vec<Value> = matched
1019 .iter()
1020 .map(|item| {
1021 let projected = project_item(item, &body);
1022 json!(projected)
1023 })
1024 .collect();
1025
1026 let mut result = json!({
1027 "Items": items,
1028 "Count": items.len(),
1029 "ScannedCount": scanned_count,
1030 });
1031
1032 if let Some(lek) = last_evaluated_key {
1033 result["LastEvaluatedKey"] = json!(lek);
1034 }
1035
1036 drop(state);
1037
1038 if !accessed_keys.is_empty() {
1039 let mut state = self.state.write();
1040 if let Some(table) = state.tables.get_mut(table_name) {
1041 if table.contributor_insights_status == "ENABLED" {
1044 for key_str in accessed_keys {
1045 *table
1046 .contributor_insights_counters
1047 .entry(key_str)
1048 .or_insert(0) += 1;
1049 }
1050 }
1051 }
1052 }
1053
1054 Self::ok_json(result)
1055 }
1056
1057 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1058 let body = Self::parse_body(req)?;
1059 let table_name = require_str(&body, "TableName")?;
1060
1061 let state = self.state.read();
1062 let table = get_table(&state.tables, table_name)?;
1063
1064 let expr_attr_names = parse_expression_attribute_names(&body);
1065 let expr_attr_values = parse_expression_attribute_values(&body);
1066 let filter_expression = body["FilterExpression"].as_str();
1067 let limit = body["Limit"].as_i64().map(|l| l as usize);
1068 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1069 parse_key_map(&body["ExclusiveStartKey"]);
1070
1071 let hash_key_name = table.hash_key_name().to_string();
1072 let range_key_name = table.range_key_name().map(|s| s.to_string());
1073
1074 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1075
1076 if let Some(ref start_key) = exclusive_start_key {
1078 if let Some(pos) = matched.iter().position(|item| {
1079 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1080 }) {
1081 matched = matched.split_off(pos + 1);
1082 }
1083 }
1084
1085 let scanned_count = matched.len();
1086
1087 if let Some(filter) = filter_expression {
1088 matched.retain(|item| {
1089 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1090 });
1091 }
1092
1093 let has_more = if let Some(lim) = limit {
1094 let more = matched.len() > lim;
1095 matched.truncate(lim);
1096 more
1097 } else {
1098 false
1099 };
1100
1101 let last_evaluated_key = if has_more {
1103 matched
1104 .last()
1105 .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1106 } else {
1107 None
1108 };
1109
1110 let insights_enabled = table.contributor_insights_status == "ENABLED";
1112 let pk_name = table.hash_key_name().to_string();
1113 let accessed_keys: Vec<String> = if insights_enabled {
1114 matched
1115 .iter()
1116 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1117 .collect()
1118 } else {
1119 Vec::new()
1120 };
1121
1122 let items: Vec<Value> = matched
1123 .iter()
1124 .map(|item| {
1125 let projected = project_item(item, &body);
1126 json!(projected)
1127 })
1128 .collect();
1129
1130 let mut result = json!({
1131 "Items": items,
1132 "Count": items.len(),
1133 "ScannedCount": scanned_count,
1134 });
1135
1136 if let Some(lek) = last_evaluated_key {
1137 result["LastEvaluatedKey"] = json!(lek);
1138 }
1139
1140 drop(state);
1141
1142 if !accessed_keys.is_empty() {
1143 let mut state = self.state.write();
1144 if let Some(table) = state.tables.get_mut(table_name) {
1145 if table.contributor_insights_status == "ENABLED" {
1148 for key_str in accessed_keys {
1149 *table
1150 .contributor_insights_counters
1151 .entry(key_str)
1152 .or_insert(0) += 1;
1153 }
1154 }
1155 }
1156 }
1157
1158 Self::ok_json(result)
1159 }
1160
1161 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1164 let body = Self::parse_body(req)?;
1165
1166 validate_optional_enum_value(
1167 "returnConsumedCapacity",
1168 &body["ReturnConsumedCapacity"],
1169 &["INDEXES", "TOTAL", "NONE"],
1170 )?;
1171
1172 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1173
1174 let request_items = body["RequestItems"]
1175 .as_object()
1176 .ok_or_else(|| {
1177 AwsServiceError::aws_error(
1178 StatusCode::BAD_REQUEST,
1179 "ValidationException",
1180 "RequestItems is required",
1181 )
1182 })?
1183 .clone();
1184
1185 let state = self.state.read();
1186 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1187 let mut consumed_capacity: Vec<Value> = Vec::new();
1188
1189 for (table_name, params) in &request_items {
1190 let table = get_table(&state.tables, table_name)?;
1191 let keys = params["Keys"].as_array().ok_or_else(|| {
1192 AwsServiceError::aws_error(
1193 StatusCode::BAD_REQUEST,
1194 "ValidationException",
1195 "Keys is required",
1196 )
1197 })?;
1198
1199 let mut items = Vec::new();
1200 for key_val in keys {
1201 let key: HashMap<String, AttributeValue> =
1202 serde_json::from_value(key_val.clone()).unwrap_or_default();
1203 if let Some(idx) = table.find_item_index(&key) {
1204 items.push(json!(table.items[idx]));
1205 }
1206 }
1207 responses.insert(table_name.clone(), items);
1208
1209 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1210 consumed_capacity.push(json!({
1211 "TableName": table_name,
1212 "CapacityUnits": 1.0,
1213 }));
1214 }
1215 }
1216
1217 let mut result = json!({
1218 "Responses": responses,
1219 "UnprocessedKeys": {},
1220 });
1221
1222 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1223 result["ConsumedCapacity"] = json!(consumed_capacity);
1224 }
1225
1226 Self::ok_json(result)
1227 }
1228
1229 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1230 let body = Self::parse_body(req)?;
1231
1232 validate_optional_enum_value(
1233 "returnConsumedCapacity",
1234 &body["ReturnConsumedCapacity"],
1235 &["INDEXES", "TOTAL", "NONE"],
1236 )?;
1237 validate_optional_enum_value(
1238 "returnItemCollectionMetrics",
1239 &body["ReturnItemCollectionMetrics"],
1240 &["SIZE", "NONE"],
1241 )?;
1242
1243 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1244 let return_icm = body["ReturnItemCollectionMetrics"]
1245 .as_str()
1246 .unwrap_or("NONE");
1247
1248 let request_items = body["RequestItems"]
1249 .as_object()
1250 .ok_or_else(|| {
1251 AwsServiceError::aws_error(
1252 StatusCode::BAD_REQUEST,
1253 "ValidationException",
1254 "RequestItems is required",
1255 )
1256 })?
1257 .clone();
1258
1259 let mut state = self.state.write();
1260 let mut consumed_capacity: Vec<Value> = Vec::new();
1261 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1262
1263 for (table_name, requests) in &request_items {
1264 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1265 AwsServiceError::aws_error(
1266 StatusCode::BAD_REQUEST,
1267 "ResourceNotFoundException",
1268 format!("Requested resource not found: Table: {table_name} not found"),
1269 )
1270 })?;
1271
1272 let reqs = requests.as_array().ok_or_else(|| {
1273 AwsServiceError::aws_error(
1274 StatusCode::BAD_REQUEST,
1275 "ValidationException",
1276 "Request list must be an array",
1277 )
1278 })?;
1279
1280 for request in reqs {
1281 if let Some(put_req) = request.get("PutRequest") {
1282 let item: HashMap<String, AttributeValue> =
1283 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1284 let key = extract_key(table, &item);
1285 if let Some(idx) = table.find_item_index(&key) {
1286 table.items[idx] = item;
1287 } else {
1288 table.items.push(item);
1289 }
1290 } else if let Some(del_req) = request.get("DeleteRequest") {
1291 let key: HashMap<String, AttributeValue> =
1292 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1293 if let Some(idx) = table.find_item_index(&key) {
1294 table.items.remove(idx);
1295 }
1296 }
1297 }
1298
1299 table.recalculate_stats();
1300
1301 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1302 consumed_capacity.push(json!({
1303 "TableName": table_name,
1304 "CapacityUnits": 1.0,
1305 }));
1306 }
1307
1308 if return_icm == "SIZE" {
1309 item_collection_metrics.insert(table_name.clone(), vec![]);
1310 }
1311 }
1312
1313 let mut result = json!({
1314 "UnprocessedItems": {},
1315 });
1316
1317 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1318 result["ConsumedCapacity"] = json!(consumed_capacity);
1319 }
1320
1321 if return_icm == "SIZE" {
1322 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1323 }
1324
1325 Self::ok_json(result)
1326 }
1327
1328 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1331 let body = Self::parse_body(req)?;
1332 let resource_arn = require_str(&body, "ResourceArn")?;
1333 validate_required("Tags", &body["Tags"])?;
1334
1335 let mut state = self.state.write();
1336 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1337
1338 fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1339 |f| {
1340 AwsServiceError::aws_error(
1341 StatusCode::BAD_REQUEST,
1342 "ValidationException",
1343 format!("{f} must be a list"),
1344 )
1345 },
1346 )?;
1347
1348 Self::ok_json(json!({}))
1349 }
1350
1351 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1352 let body = Self::parse_body(req)?;
1353 let resource_arn = require_str(&body, "ResourceArn")?;
1354 validate_required("TagKeys", &body["TagKeys"])?;
1355
1356 let mut state = self.state.write();
1357 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1358
1359 fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1360 AwsServiceError::aws_error(
1361 StatusCode::BAD_REQUEST,
1362 "ValidationException",
1363 format!("{f} must be a list"),
1364 )
1365 })?;
1366
1367 Self::ok_json(json!({}))
1368 }
1369
1370 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1371 let body = Self::parse_body(req)?;
1372 let resource_arn = require_str(&body, "ResourceArn")?;
1373
1374 let state = self.state.read();
1375 let table = find_table_by_arn(&state.tables, resource_arn)?;
1376
1377 let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1378
1379 Self::ok_json(json!({ "Tags": tags }))
1380 }
1381
1382 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1385 let body = Self::parse_body(req)?;
1386 validate_optional_enum_value(
1387 "returnConsumedCapacity",
1388 &body["ReturnConsumedCapacity"],
1389 &["INDEXES", "TOTAL", "NONE"],
1390 )?;
1391 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1392 AwsServiceError::aws_error(
1393 StatusCode::BAD_REQUEST,
1394 "ValidationException",
1395 "TransactItems is required",
1396 )
1397 })?;
1398
1399 let state = self.state.read();
1400 let mut responses: Vec<Value> = Vec::new();
1401
1402 for ti in transact_items {
1403 let get = &ti["Get"];
1404 let table_name = get["TableName"].as_str().ok_or_else(|| {
1405 AwsServiceError::aws_error(
1406 StatusCode::BAD_REQUEST,
1407 "ValidationException",
1408 "TableName is required in Get",
1409 )
1410 })?;
1411 let key: HashMap<String, AttributeValue> =
1412 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1413
1414 let table = get_table(&state.tables, table_name)?;
1415 match table.find_item_index(&key) {
1416 Some(idx) => {
1417 responses.push(json!({ "Item": table.items[idx] }));
1418 }
1419 None => {
1420 responses.push(json!({}));
1421 }
1422 }
1423 }
1424
1425 Self::ok_json(json!({ "Responses": responses }))
1426 }
1427
1428 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1429 let body = Self::parse_body(req)?;
1430 validate_optional_string_length(
1431 "clientRequestToken",
1432 body["ClientRequestToken"].as_str(),
1433 1,
1434 36,
1435 )?;
1436 validate_optional_enum_value(
1437 "returnConsumedCapacity",
1438 &body["ReturnConsumedCapacity"],
1439 &["INDEXES", "TOTAL", "NONE"],
1440 )?;
1441 validate_optional_enum_value(
1442 "returnItemCollectionMetrics",
1443 &body["ReturnItemCollectionMetrics"],
1444 &["SIZE", "NONE"],
1445 )?;
1446 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1447 AwsServiceError::aws_error(
1448 StatusCode::BAD_REQUEST,
1449 "ValidationException",
1450 "TransactItems is required",
1451 )
1452 })?;
1453
1454 let mut state = self.state.write();
1455
1456 let mut cancellation_reasons: Vec<Value> = Vec::new();
1458 let mut any_failed = false;
1459
1460 for ti in transact_items {
1461 if let Some(put) = ti.get("Put") {
1462 let table_name = put["TableName"].as_str().unwrap_or_default();
1463 let item: HashMap<String, AttributeValue> =
1464 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1465 let condition = put["ConditionExpression"].as_str();
1466
1467 if let Some(cond) = condition {
1468 let table = get_table(&state.tables, table_name)?;
1469 let expr_attr_names = parse_expression_attribute_names(put);
1470 let expr_attr_values = parse_expression_attribute_values(put);
1471 let key = extract_key(table, &item);
1472 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1473 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1474 .is_err()
1475 {
1476 cancellation_reasons.push(json!({
1477 "Code": "ConditionalCheckFailed",
1478 "Message": "The conditional request failed"
1479 }));
1480 any_failed = true;
1481 continue;
1482 }
1483 }
1484 cancellation_reasons.push(json!({ "Code": "None" }));
1485 } else if let Some(delete) = ti.get("Delete") {
1486 let table_name = delete["TableName"].as_str().unwrap_or_default();
1487 let key: HashMap<String, AttributeValue> =
1488 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1489 let condition = delete["ConditionExpression"].as_str();
1490
1491 if let Some(cond) = condition {
1492 let table = get_table(&state.tables, table_name)?;
1493 let expr_attr_names = parse_expression_attribute_names(delete);
1494 let expr_attr_values = parse_expression_attribute_values(delete);
1495 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1496 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1497 .is_err()
1498 {
1499 cancellation_reasons.push(json!({
1500 "Code": "ConditionalCheckFailed",
1501 "Message": "The conditional request failed"
1502 }));
1503 any_failed = true;
1504 continue;
1505 }
1506 }
1507 cancellation_reasons.push(json!({ "Code": "None" }));
1508 } else if let Some(update) = ti.get("Update") {
1509 let table_name = update["TableName"].as_str().unwrap_or_default();
1510 let key: HashMap<String, AttributeValue> =
1511 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1512 let condition = update["ConditionExpression"].as_str();
1513
1514 if let Some(cond) = condition {
1515 let table = get_table(&state.tables, table_name)?;
1516 let expr_attr_names = parse_expression_attribute_names(update);
1517 let expr_attr_values = parse_expression_attribute_values(update);
1518 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1519 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1520 .is_err()
1521 {
1522 cancellation_reasons.push(json!({
1523 "Code": "ConditionalCheckFailed",
1524 "Message": "The conditional request failed"
1525 }));
1526 any_failed = true;
1527 continue;
1528 }
1529 }
1530 cancellation_reasons.push(json!({ "Code": "None" }));
1531 } else if let Some(check) = ti.get("ConditionCheck") {
1532 let table_name = check["TableName"].as_str().unwrap_or_default();
1533 let key: HashMap<String, AttributeValue> =
1534 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1535 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1536
1537 let table = get_table(&state.tables, table_name)?;
1538 let expr_attr_names = parse_expression_attribute_names(check);
1539 let expr_attr_values = parse_expression_attribute_values(check);
1540 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1541 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1542 {
1543 cancellation_reasons.push(json!({
1544 "Code": "ConditionalCheckFailed",
1545 "Message": "The conditional request failed"
1546 }));
1547 any_failed = true;
1548 continue;
1549 }
1550 cancellation_reasons.push(json!({ "Code": "None" }));
1551 } else {
1552 cancellation_reasons.push(json!({ "Code": "None" }));
1553 }
1554 }
1555
1556 if any_failed {
1557 let error_body = json!({
1558 "__type": "TransactionCanceledException",
1559 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1560 "CancellationReasons": cancellation_reasons
1561 });
1562 return Ok(AwsResponse::json(
1563 StatusCode::BAD_REQUEST,
1564 serde_json::to_vec(&error_body).unwrap(),
1565 ));
1566 }
1567
1568 for ti in transact_items {
1570 if let Some(put) = ti.get("Put") {
1571 let table_name = put["TableName"].as_str().unwrap_or_default();
1572 let item: HashMap<String, AttributeValue> =
1573 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1574 let table = get_table_mut(&mut state.tables, table_name)?;
1575 let key = extract_key(table, &item);
1576 if let Some(idx) = table.find_item_index(&key) {
1577 table.items[idx] = item;
1578 } else {
1579 table.items.push(item);
1580 }
1581 table.recalculate_stats();
1582 } else if let Some(delete) = ti.get("Delete") {
1583 let table_name = delete["TableName"].as_str().unwrap_or_default();
1584 let key: HashMap<String, AttributeValue> =
1585 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1586 let table = get_table_mut(&mut state.tables, table_name)?;
1587 if let Some(idx) = table.find_item_index(&key) {
1588 table.items.remove(idx);
1589 }
1590 table.recalculate_stats();
1591 } else if let Some(update) = ti.get("Update") {
1592 let table_name = update["TableName"].as_str().unwrap_or_default();
1593 let key: HashMap<String, AttributeValue> =
1594 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1595 let update_expression = update["UpdateExpression"].as_str();
1596 let expr_attr_names = parse_expression_attribute_names(update);
1597 let expr_attr_values = parse_expression_attribute_values(update);
1598
1599 let table = get_table_mut(&mut state.tables, table_name)?;
1600 let idx = match table.find_item_index(&key) {
1601 Some(i) => i,
1602 None => {
1603 let mut new_item = HashMap::new();
1604 for (k, v) in &key {
1605 new_item.insert(k.clone(), v.clone());
1606 }
1607 table.items.push(new_item);
1608 table.items.len() - 1
1609 }
1610 };
1611
1612 if let Some(expr) = update_expression {
1613 apply_update_expression(
1614 &mut table.items[idx],
1615 expr,
1616 &expr_attr_names,
1617 &expr_attr_values,
1618 )?;
1619 }
1620 table.recalculate_stats();
1621 }
1622 }
1624
1625 Self::ok_json(json!({}))
1626 }
1627
1628 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1629 let body = Self::parse_body(req)?;
1630 let statement = require_str(&body, "Statement")?;
1631 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1632
1633 execute_partiql_statement(&self.state, statement, ¶meters)
1634 }
1635
1636 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1637 let body = Self::parse_body(req)?;
1638 validate_optional_enum_value(
1639 "returnConsumedCapacity",
1640 &body["ReturnConsumedCapacity"],
1641 &["INDEXES", "TOTAL", "NONE"],
1642 )?;
1643 let statements = body["Statements"].as_array().ok_or_else(|| {
1644 AwsServiceError::aws_error(
1645 StatusCode::BAD_REQUEST,
1646 "ValidationException",
1647 "Statements is required",
1648 )
1649 })?;
1650
1651 let mut responses: Vec<Value> = Vec::new();
1652 for stmt_obj in statements {
1653 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1654 let parameters = stmt_obj["Parameters"]
1655 .as_array()
1656 .cloned()
1657 .unwrap_or_default();
1658
1659 match execute_partiql_statement(&self.state, statement, ¶meters) {
1660 Ok(resp) => {
1661 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1662 responses.push(resp_body);
1663 }
1664 Err(e) => {
1665 responses.push(json!({
1666 "Error": {
1667 "Code": "ValidationException",
1668 "Message": e.to_string()
1669 }
1670 }));
1671 }
1672 }
1673 }
1674
1675 Self::ok_json(json!({ "Responses": responses }))
1676 }
1677
1678 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1679 let body = Self::parse_body(req)?;
1680 validate_optional_string_length(
1681 "clientRequestToken",
1682 body["ClientRequestToken"].as_str(),
1683 1,
1684 36,
1685 )?;
1686 validate_optional_enum_value(
1687 "returnConsumedCapacity",
1688 &body["ReturnConsumedCapacity"],
1689 &["INDEXES", "TOTAL", "NONE"],
1690 )?;
1691 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1692 AwsServiceError::aws_error(
1693 StatusCode::BAD_REQUEST,
1694 "ValidationException",
1695 "TransactStatements is required",
1696 )
1697 })?;
1698
1699 let mut results: Vec<Result<Value, String>> = Vec::new();
1701 for stmt_obj in transact_statements {
1702 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1703 let parameters = stmt_obj["Parameters"]
1704 .as_array()
1705 .cloned()
1706 .unwrap_or_default();
1707
1708 match execute_partiql_statement(&self.state, statement, ¶meters) {
1709 Ok(resp) => {
1710 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1711 results.push(Ok(resp_body));
1712 }
1713 Err(e) => {
1714 results.push(Err(e.to_string()));
1715 }
1716 }
1717 }
1718
1719 let any_failed = results.iter().any(|r| r.is_err());
1720 if any_failed {
1721 let reasons: Vec<Value> = results
1722 .iter()
1723 .map(|r| match r {
1724 Ok(_) => json!({ "Code": "None" }),
1725 Err(msg) => json!({
1726 "Code": "ValidationException",
1727 "Message": msg
1728 }),
1729 })
1730 .collect();
1731 let error_body = json!({
1732 "__type": "TransactionCanceledException",
1733 "message": "Transaction cancelled due to validation errors",
1734 "CancellationReasons": reasons
1735 });
1736 return Ok(AwsResponse::json(
1737 StatusCode::BAD_REQUEST,
1738 serde_json::to_vec(&error_body).unwrap(),
1739 ));
1740 }
1741
1742 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1743 Self::ok_json(json!({ "Responses": responses }))
1744 }
1745
1746 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1749 let body = Self::parse_body(req)?;
1750 let table_name = require_str(&body, "TableName")?;
1751 let spec = &body["TimeToLiveSpecification"];
1752 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1753 AwsServiceError::aws_error(
1754 StatusCode::BAD_REQUEST,
1755 "ValidationException",
1756 "TimeToLiveSpecification.AttributeName is required",
1757 )
1758 })?;
1759 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1760 AwsServiceError::aws_error(
1761 StatusCode::BAD_REQUEST,
1762 "ValidationException",
1763 "TimeToLiveSpecification.Enabled is required",
1764 )
1765 })?;
1766
1767 let mut state = self.state.write();
1768 let table = get_table_mut(&mut state.tables, table_name)?;
1769
1770 if enabled {
1771 table.ttl_attribute = Some(attr_name.to_string());
1772 table.ttl_enabled = true;
1773 } else {
1774 table.ttl_enabled = false;
1775 }
1776
1777 Self::ok_json(json!({
1778 "TimeToLiveSpecification": {
1779 "AttributeName": attr_name,
1780 "Enabled": enabled
1781 }
1782 }))
1783 }
1784
1785 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1786 let body = Self::parse_body(req)?;
1787 let table_name = require_str(&body, "TableName")?;
1788
1789 let state = self.state.read();
1790 let table = get_table(&state.tables, table_name)?;
1791
1792 let status = if table.ttl_enabled {
1793 "ENABLED"
1794 } else {
1795 "DISABLED"
1796 };
1797
1798 let mut desc = json!({
1799 "TimeToLiveDescription": {
1800 "TimeToLiveStatus": status
1801 }
1802 });
1803
1804 if let Some(ref attr) = table.ttl_attribute {
1805 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1806 }
1807
1808 Self::ok_json(desc)
1809 }
1810
1811 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1814 let body = Self::parse_body(req)?;
1815 let resource_arn = require_str(&body, "ResourceArn")?;
1816 let policy = require_str(&body, "Policy")?;
1817
1818 let mut state = self.state.write();
1819 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1820 table.resource_policy = Some(policy.to_string());
1821
1822 let revision_id = uuid::Uuid::new_v4().to_string();
1823 Self::ok_json(json!({ "RevisionId": revision_id }))
1824 }
1825
1826 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1827 let body = Self::parse_body(req)?;
1828 let resource_arn = require_str(&body, "ResourceArn")?;
1829
1830 let state = self.state.read();
1831 let table = find_table_by_arn(&state.tables, resource_arn)?;
1832
1833 match &table.resource_policy {
1834 Some(policy) => {
1835 let revision_id = uuid::Uuid::new_v4().to_string();
1836 Self::ok_json(json!({
1837 "Policy": policy,
1838 "RevisionId": revision_id
1839 }))
1840 }
1841 None => Self::ok_json(json!({ "Policy": null })),
1842 }
1843 }
1844
1845 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1846 let body = Self::parse_body(req)?;
1847 let resource_arn = require_str(&body, "ResourceArn")?;
1848
1849 let mut state = self.state.write();
1850 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1851 table.resource_policy = None;
1852
1853 Self::ok_json(json!({}))
1854 }
1855
1856 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1859 Self::ok_json(json!({
1860 "Endpoints": [{
1861 "Address": "dynamodb.us-east-1.amazonaws.com",
1862 "CachePeriodInMinutes": 1440
1863 }]
1864 }))
1865 }
1866
1867 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1868 Self::ok_json(json!({
1869 "AccountMaxReadCapacityUnits": 80000,
1870 "AccountMaxWriteCapacityUnits": 80000,
1871 "TableMaxReadCapacityUnits": 40000,
1872 "TableMaxWriteCapacityUnits": 40000
1873 }))
1874 }
1875
1876 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1879 let body = Self::parse_body(req)?;
1880 let table_name = require_str(&body, "TableName")?;
1881 let backup_name = require_str(&body, "BackupName")?;
1882
1883 let mut state = self.state.write();
1884 let table = get_table(&state.tables, table_name)?;
1885
1886 let backup_arn = format!(
1887 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1888 state.region,
1889 state.account_id,
1890 table_name,
1891 Utc::now().format("%Y%m%d%H%M%S")
1892 );
1893 let now = Utc::now();
1894
1895 let backup = BackupDescription {
1896 backup_arn: backup_arn.clone(),
1897 backup_name: backup_name.to_string(),
1898 table_name: table_name.to_string(),
1899 table_arn: table.arn.clone(),
1900 backup_status: "AVAILABLE".to_string(),
1901 backup_type: "USER".to_string(),
1902 backup_creation_date: now,
1903 key_schema: table.key_schema.clone(),
1904 attribute_definitions: table.attribute_definitions.clone(),
1905 provisioned_throughput: table.provisioned_throughput.clone(),
1906 billing_mode: table.billing_mode.clone(),
1907 item_count: table.item_count,
1908 size_bytes: table.size_bytes,
1909 items: table.items.clone(),
1910 };
1911
1912 state.backups.insert(backup_arn.clone(), backup);
1913
1914 Self::ok_json(json!({
1915 "BackupDetails": {
1916 "BackupArn": backup_arn,
1917 "BackupName": backup_name,
1918 "BackupStatus": "AVAILABLE",
1919 "BackupType": "USER",
1920 "BackupCreationDateTime": now.timestamp() as f64,
1921 "BackupSizeBytes": 0
1922 }
1923 }))
1924 }
1925
1926 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1927 let body = Self::parse_body(req)?;
1928 let backup_arn = require_str(&body, "BackupArn")?;
1929
1930 let mut state = self.state.write();
1931 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1932 AwsServiceError::aws_error(
1933 StatusCode::BAD_REQUEST,
1934 "BackupNotFoundException",
1935 format!("Backup not found: {backup_arn}"),
1936 )
1937 })?;
1938
1939 Self::ok_json(json!({
1940 "BackupDescription": {
1941 "BackupDetails": {
1942 "BackupArn": backup.backup_arn,
1943 "BackupName": backup.backup_name,
1944 "BackupStatus": "DELETED",
1945 "BackupType": backup.backup_type,
1946 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1947 "BackupSizeBytes": backup.size_bytes
1948 },
1949 "SourceTableDetails": {
1950 "TableName": backup.table_name,
1951 "TableArn": backup.table_arn,
1952 "TableId": uuid::Uuid::new_v4().to_string(),
1953 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1954 "AttributeName": ks.attribute_name,
1955 "KeyType": ks.key_type
1956 })).collect::<Vec<_>>(),
1957 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1958 "ProvisionedThroughput": {
1959 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1960 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1961 },
1962 "ItemCount": backup.item_count,
1963 "BillingMode": backup.billing_mode,
1964 "TableSizeBytes": backup.size_bytes
1965 },
1966 "SourceTableFeatureDetails": {}
1967 }
1968 }))
1969 }
1970
1971 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1972 let body = Self::parse_body(req)?;
1973 let backup_arn = require_str(&body, "BackupArn")?;
1974
1975 let state = self.state.read();
1976 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1977 AwsServiceError::aws_error(
1978 StatusCode::BAD_REQUEST,
1979 "BackupNotFoundException",
1980 format!("Backup not found: {backup_arn}"),
1981 )
1982 })?;
1983
1984 Self::ok_json(json!({
1985 "BackupDescription": {
1986 "BackupDetails": {
1987 "BackupArn": backup.backup_arn,
1988 "BackupName": backup.backup_name,
1989 "BackupStatus": backup.backup_status,
1990 "BackupType": backup.backup_type,
1991 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1992 "BackupSizeBytes": backup.size_bytes
1993 },
1994 "SourceTableDetails": {
1995 "TableName": backup.table_name,
1996 "TableArn": backup.table_arn,
1997 "TableId": uuid::Uuid::new_v4().to_string(),
1998 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1999 "AttributeName": ks.attribute_name,
2000 "KeyType": ks.key_type
2001 })).collect::<Vec<_>>(),
2002 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
2003 "ProvisionedThroughput": {
2004 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
2005 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2006 },
2007 "ItemCount": backup.item_count,
2008 "BillingMode": backup.billing_mode,
2009 "TableSizeBytes": backup.size_bytes
2010 },
2011 "SourceTableFeatureDetails": {}
2012 }
2013 }))
2014 }
2015
2016 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2017 let body = Self::parse_body(req)?;
2018 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2019 validate_optional_string_length(
2020 "exclusiveStartBackupArn",
2021 body["ExclusiveStartBackupArn"].as_str(),
2022 37,
2023 1024,
2024 )?;
2025 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2026 validate_optional_enum_value(
2027 "backupType",
2028 &body["BackupType"],
2029 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2030 )?;
2031 let table_name = body["TableName"].as_str();
2032
2033 let state = self.state.read();
2034 let summaries: Vec<Value> = state
2035 .backups
2036 .values()
2037 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2038 .map(|b| {
2039 json!({
2040 "TableName": b.table_name,
2041 "TableArn": b.table_arn,
2042 "BackupArn": b.backup_arn,
2043 "BackupName": b.backup_name,
2044 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2045 "BackupStatus": b.backup_status,
2046 "BackupType": b.backup_type,
2047 "BackupSizeBytes": b.size_bytes
2048 })
2049 })
2050 .collect();
2051
2052 Self::ok_json(json!({
2053 "BackupSummaries": summaries
2054 }))
2055 }
2056
2057 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2058 let body = Self::parse_body(req)?;
2059 let backup_arn = require_str(&body, "BackupArn")?;
2060 let target_table_name = require_str(&body, "TargetTableName")?;
2061
2062 let mut state = self.state.write();
2063 let backup = state.backups.get(backup_arn).ok_or_else(|| {
2064 AwsServiceError::aws_error(
2065 StatusCode::BAD_REQUEST,
2066 "BackupNotFoundException",
2067 format!("Backup not found: {backup_arn}"),
2068 )
2069 })?;
2070
2071 if state.tables.contains_key(target_table_name) {
2072 return Err(AwsServiceError::aws_error(
2073 StatusCode::BAD_REQUEST,
2074 "TableAlreadyExistsException",
2075 format!("Table already exists: {target_table_name}"),
2076 ));
2077 }
2078
2079 let now = Utc::now();
2080 let arn = format!(
2081 "arn:aws:dynamodb:{}:{}:table/{}",
2082 state.region, state.account_id, target_table_name
2083 );
2084
2085 let restored_items = backup.items.clone();
2086 let mut table = DynamoTable {
2087 name: target_table_name.to_string(),
2088 arn: arn.clone(),
2089 key_schema: backup.key_schema.clone(),
2090 attribute_definitions: backup.attribute_definitions.clone(),
2091 provisioned_throughput: backup.provisioned_throughput.clone(),
2092 items: restored_items,
2093 gsi: Vec::new(),
2094 lsi: Vec::new(),
2095 tags: HashMap::new(),
2096 created_at: now,
2097 status: "ACTIVE".to_string(),
2098 item_count: 0,
2099 size_bytes: 0,
2100 billing_mode: backup.billing_mode.clone(),
2101 ttl_attribute: None,
2102 ttl_enabled: false,
2103 resource_policy: None,
2104 pitr_enabled: false,
2105 kinesis_destinations: Vec::new(),
2106 contributor_insights_status: "DISABLED".to_string(),
2107 contributor_insights_counters: HashMap::new(),
2108 stream_enabled: false,
2109 stream_view_type: None,
2110 stream_arn: None,
2111 stream_records: Arc::new(RwLock::new(Vec::new())),
2112 sse_type: None,
2113 sse_kms_key_arn: None,
2114 };
2115 table.recalculate_stats();
2116
2117 let desc = build_table_description(&table);
2118 state.tables.insert(target_table_name.to_string(), table);
2119
2120 Self::ok_json(json!({
2121 "TableDescription": desc
2122 }))
2123 }
2124
2125 fn restore_table_to_point_in_time(
2126 &self,
2127 req: &AwsRequest,
2128 ) -> Result<AwsResponse, AwsServiceError> {
2129 let body = Self::parse_body(req)?;
2130 let target_table_name = require_str(&body, "TargetTableName")?;
2131 let source_table_name = body["SourceTableName"].as_str();
2132 let source_table_arn = body["SourceTableArn"].as_str();
2133
2134 let mut state = self.state.write();
2135
2136 let source = if let Some(name) = source_table_name {
2138 get_table(&state.tables, name)?.clone()
2139 } else if let Some(arn) = source_table_arn {
2140 find_table_by_arn(&state.tables, arn)?.clone()
2141 } else {
2142 return Err(AwsServiceError::aws_error(
2143 StatusCode::BAD_REQUEST,
2144 "ValidationException",
2145 "SourceTableName or SourceTableArn is required",
2146 ));
2147 };
2148
2149 if state.tables.contains_key(target_table_name) {
2150 return Err(AwsServiceError::aws_error(
2151 StatusCode::BAD_REQUEST,
2152 "TableAlreadyExistsException",
2153 format!("Table already exists: {target_table_name}"),
2154 ));
2155 }
2156
2157 let now = Utc::now();
2158 let arn = format!(
2159 "arn:aws:dynamodb:{}:{}:table/{}",
2160 state.region, state.account_id, target_table_name
2161 );
2162
2163 let mut table = DynamoTable {
2164 name: target_table_name.to_string(),
2165 arn: arn.clone(),
2166 key_schema: source.key_schema.clone(),
2167 attribute_definitions: source.attribute_definitions.clone(),
2168 provisioned_throughput: source.provisioned_throughput.clone(),
2169 items: source.items.clone(),
2170 gsi: Vec::new(),
2171 lsi: Vec::new(),
2172 tags: HashMap::new(),
2173 created_at: now,
2174 status: "ACTIVE".to_string(),
2175 item_count: 0,
2176 size_bytes: 0,
2177 billing_mode: source.billing_mode.clone(),
2178 ttl_attribute: None,
2179 ttl_enabled: false,
2180 resource_policy: None,
2181 pitr_enabled: false,
2182 kinesis_destinations: Vec::new(),
2183 contributor_insights_status: "DISABLED".to_string(),
2184 contributor_insights_counters: HashMap::new(),
2185 stream_enabled: false,
2186 stream_view_type: None,
2187 stream_arn: None,
2188 stream_records: Arc::new(RwLock::new(Vec::new())),
2189 sse_type: None,
2190 sse_kms_key_arn: None,
2191 };
2192 table.recalculate_stats();
2193
2194 let desc = build_table_description(&table);
2195 state.tables.insert(target_table_name.to_string(), table);
2196
2197 Self::ok_json(json!({
2198 "TableDescription": desc
2199 }))
2200 }
2201
2202 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2203 let body = Self::parse_body(req)?;
2204 let table_name = require_str(&body, "TableName")?;
2205
2206 let pitr_spec = body["PointInTimeRecoverySpecification"]
2207 .as_object()
2208 .ok_or_else(|| {
2209 AwsServiceError::aws_error(
2210 StatusCode::BAD_REQUEST,
2211 "ValidationException",
2212 "PointInTimeRecoverySpecification is required",
2213 )
2214 })?;
2215 let enabled = pitr_spec
2216 .get("PointInTimeRecoveryEnabled")
2217 .and_then(|v| v.as_bool())
2218 .unwrap_or(false);
2219
2220 let mut state = self.state.write();
2221 let table = get_table_mut(&mut state.tables, table_name)?;
2222 table.pitr_enabled = enabled;
2223
2224 let status = if enabled { "ENABLED" } else { "DISABLED" };
2225 Self::ok_json(json!({
2226 "ContinuousBackupsDescription": {
2227 "ContinuousBackupsStatus": status,
2228 "PointInTimeRecoveryDescription": {
2229 "PointInTimeRecoveryStatus": status,
2230 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2231 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2232 }
2233 }
2234 }))
2235 }
2236
2237 fn describe_continuous_backups(
2238 &self,
2239 req: &AwsRequest,
2240 ) -> Result<AwsResponse, AwsServiceError> {
2241 let body = Self::parse_body(req)?;
2242 let table_name = require_str(&body, "TableName")?;
2243
2244 let state = self.state.read();
2245 let table = get_table(&state.tables, table_name)?;
2246
2247 let status = if table.pitr_enabled {
2248 "ENABLED"
2249 } else {
2250 "DISABLED"
2251 };
2252 Self::ok_json(json!({
2253 "ContinuousBackupsDescription": {
2254 "ContinuousBackupsStatus": status,
2255 "PointInTimeRecoveryDescription": {
2256 "PointInTimeRecoveryStatus": status,
2257 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2258 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2259 }
2260 }
2261 }))
2262 }
2263
2264 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2267 let body = Self::parse_body(req)?;
2268 let global_table_name = require_str(&body, "GlobalTableName")?;
2269 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2270
2271 let replication_group = body["ReplicationGroup"]
2272 .as_array()
2273 .ok_or_else(|| {
2274 AwsServiceError::aws_error(
2275 StatusCode::BAD_REQUEST,
2276 "ValidationException",
2277 "ReplicationGroup is required",
2278 )
2279 })?
2280 .iter()
2281 .filter_map(|r| {
2282 r["RegionName"].as_str().map(|rn| ReplicaDescription {
2283 region_name: rn.to_string(),
2284 replica_status: "ACTIVE".to_string(),
2285 })
2286 })
2287 .collect::<Vec<_>>();
2288
2289 let mut state = self.state.write();
2290
2291 if state.global_tables.contains_key(global_table_name) {
2292 return Err(AwsServiceError::aws_error(
2293 StatusCode::BAD_REQUEST,
2294 "GlobalTableAlreadyExistsException",
2295 format!("Global table already exists: {global_table_name}"),
2296 ));
2297 }
2298
2299 let arn = format!(
2300 "arn:aws:dynamodb::{}:global-table/{}",
2301 state.account_id, global_table_name
2302 );
2303 let now = Utc::now();
2304
2305 let gt = GlobalTableDescription {
2306 global_table_name: global_table_name.to_string(),
2307 global_table_arn: arn.clone(),
2308 global_table_status: "ACTIVE".to_string(),
2309 creation_date: now,
2310 replication_group: replication_group.clone(),
2311 };
2312
2313 state
2314 .global_tables
2315 .insert(global_table_name.to_string(), gt);
2316
2317 Self::ok_json(json!({
2318 "GlobalTableDescription": {
2319 "GlobalTableName": global_table_name,
2320 "GlobalTableArn": arn,
2321 "GlobalTableStatus": "ACTIVE",
2322 "CreationDateTime": now.timestamp() as f64,
2323 "ReplicationGroup": replication_group.iter().map(|r| json!({
2324 "RegionName": r.region_name,
2325 "ReplicaStatus": r.replica_status
2326 })).collect::<Vec<_>>()
2327 }
2328 }))
2329 }
2330
2331 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2332 let body = Self::parse_body(req)?;
2333 let global_table_name = require_str(&body, "GlobalTableName")?;
2334 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2335
2336 let state = self.state.read();
2337 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2338 AwsServiceError::aws_error(
2339 StatusCode::BAD_REQUEST,
2340 "GlobalTableNotFoundException",
2341 format!("Global table not found: {global_table_name}"),
2342 )
2343 })?;
2344
2345 Self::ok_json(json!({
2346 "GlobalTableDescription": {
2347 "GlobalTableName": gt.global_table_name,
2348 "GlobalTableArn": gt.global_table_arn,
2349 "GlobalTableStatus": gt.global_table_status,
2350 "CreationDateTime": gt.creation_date.timestamp() as f64,
2351 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2352 "RegionName": r.region_name,
2353 "ReplicaStatus": r.replica_status
2354 })).collect::<Vec<_>>()
2355 }
2356 }))
2357 }
2358
2359 fn describe_global_table_settings(
2360 &self,
2361 req: &AwsRequest,
2362 ) -> Result<AwsResponse, AwsServiceError> {
2363 let body = Self::parse_body(req)?;
2364 let global_table_name = require_str(&body, "GlobalTableName")?;
2365 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2366
2367 let state = self.state.read();
2368 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2369 AwsServiceError::aws_error(
2370 StatusCode::BAD_REQUEST,
2371 "GlobalTableNotFoundException",
2372 format!("Global table not found: {global_table_name}"),
2373 )
2374 })?;
2375
2376 let replica_settings: Vec<Value> = gt
2377 .replication_group
2378 .iter()
2379 .map(|r| {
2380 json!({
2381 "RegionName": r.region_name,
2382 "ReplicaStatus": r.replica_status,
2383 "ReplicaProvisionedReadCapacityUnits": 0,
2384 "ReplicaProvisionedWriteCapacityUnits": 0
2385 })
2386 })
2387 .collect();
2388
2389 Self::ok_json(json!({
2390 "GlobalTableName": gt.global_table_name,
2391 "ReplicaSettings": replica_settings
2392 }))
2393 }
2394
2395 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2396 let body = Self::parse_body(req)?;
2397 validate_optional_string_length(
2398 "exclusiveStartGlobalTableName",
2399 body["ExclusiveStartGlobalTableName"].as_str(),
2400 3,
2401 255,
2402 )?;
2403 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2404 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2405
2406 let state = self.state.read();
2407 let tables: Vec<Value> = state
2408 .global_tables
2409 .values()
2410 .take(limit)
2411 .map(|gt| {
2412 json!({
2413 "GlobalTableName": gt.global_table_name,
2414 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2415 "RegionName": r.region_name
2416 })).collect::<Vec<_>>()
2417 })
2418 })
2419 .collect();
2420
2421 Self::ok_json(json!({
2422 "GlobalTables": tables
2423 }))
2424 }
2425
2426 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2427 let body = Self::parse_body(req)?;
2428 let global_table_name = require_str(&body, "GlobalTableName")?;
2429 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2430 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2431
2432 let mut state = self.state.write();
2433 let gt = state
2434 .global_tables
2435 .get_mut(global_table_name)
2436 .ok_or_else(|| {
2437 AwsServiceError::aws_error(
2438 StatusCode::BAD_REQUEST,
2439 "GlobalTableNotFoundException",
2440 format!("Global table not found: {global_table_name}"),
2441 )
2442 })?;
2443
2444 if let Some(updates) = body["ReplicaUpdates"].as_array() {
2445 for update in updates {
2446 if let Some(create) = update["Create"].as_object() {
2447 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2448 gt.replication_group.push(ReplicaDescription {
2449 region_name: region.to_string(),
2450 replica_status: "ACTIVE".to_string(),
2451 });
2452 }
2453 }
2454 if let Some(delete) = update["Delete"].as_object() {
2455 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2456 gt.replication_group.retain(|r| r.region_name != region);
2457 }
2458 }
2459 }
2460 }
2461
2462 Self::ok_json(json!({
2463 "GlobalTableDescription": {
2464 "GlobalTableName": gt.global_table_name,
2465 "GlobalTableArn": gt.global_table_arn,
2466 "GlobalTableStatus": gt.global_table_status,
2467 "CreationDateTime": gt.creation_date.timestamp() as f64,
2468 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2469 "RegionName": r.region_name,
2470 "ReplicaStatus": r.replica_status
2471 })).collect::<Vec<_>>()
2472 }
2473 }))
2474 }
2475
2476 fn update_global_table_settings(
2477 &self,
2478 req: &AwsRequest,
2479 ) -> Result<AwsResponse, AwsServiceError> {
2480 let body = Self::parse_body(req)?;
2481 let global_table_name = require_str(&body, "GlobalTableName")?;
2482 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2483 validate_optional_enum_value(
2484 "globalTableBillingMode",
2485 &body["GlobalTableBillingMode"],
2486 &["PROVISIONED", "PAY_PER_REQUEST"],
2487 )?;
2488 validate_optional_range_i64(
2489 "globalTableProvisionedWriteCapacityUnits",
2490 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2491 1,
2492 i64::MAX,
2493 )?;
2494
2495 let state = self.state.read();
2496 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2497 AwsServiceError::aws_error(
2498 StatusCode::BAD_REQUEST,
2499 "GlobalTableNotFoundException",
2500 format!("Global table not found: {global_table_name}"),
2501 )
2502 })?;
2503
2504 let replica_settings: Vec<Value> = gt
2505 .replication_group
2506 .iter()
2507 .map(|r| {
2508 json!({
2509 "RegionName": r.region_name,
2510 "ReplicaStatus": r.replica_status,
2511 "ReplicaProvisionedReadCapacityUnits": 0,
2512 "ReplicaProvisionedWriteCapacityUnits": 0
2513 })
2514 })
2515 .collect();
2516
2517 Self::ok_json(json!({
2518 "GlobalTableName": gt.global_table_name,
2519 "ReplicaSettings": replica_settings
2520 }))
2521 }
2522
2523 fn describe_table_replica_auto_scaling(
2524 &self,
2525 req: &AwsRequest,
2526 ) -> Result<AwsResponse, AwsServiceError> {
2527 let body = Self::parse_body(req)?;
2528 let table_name = require_str(&body, "TableName")?;
2529
2530 let state = self.state.read();
2531 let table = get_table(&state.tables, table_name)?;
2532
2533 Self::ok_json(json!({
2534 "TableAutoScalingDescription": {
2535 "TableName": table.name,
2536 "TableStatus": table.status,
2537 "Replicas": []
2538 }
2539 }))
2540 }
2541
2542 fn update_table_replica_auto_scaling(
2543 &self,
2544 req: &AwsRequest,
2545 ) -> Result<AwsResponse, AwsServiceError> {
2546 let body = Self::parse_body(req)?;
2547 let table_name = require_str(&body, "TableName")?;
2548
2549 let state = self.state.read();
2550 let table = get_table(&state.tables, table_name)?;
2551
2552 Self::ok_json(json!({
2553 "TableAutoScalingDescription": {
2554 "TableName": table.name,
2555 "TableStatus": table.status,
2556 "Replicas": []
2557 }
2558 }))
2559 }
2560
2561 fn enable_kinesis_streaming_destination(
2564 &self,
2565 req: &AwsRequest,
2566 ) -> Result<AwsResponse, AwsServiceError> {
2567 let body = Self::parse_body(req)?;
2568 let table_name = require_str(&body, "TableName")?;
2569 let stream_arn = require_str(&body, "StreamArn")?;
2570 let precision = body["EnableKinesisStreamingConfiguration"]
2571 ["ApproximateCreationDateTimePrecision"]
2572 .as_str()
2573 .unwrap_or("MILLISECOND");
2574
2575 let mut state = self.state.write();
2576 let table = get_table_mut(&mut state.tables, table_name)?;
2577
2578 table.kinesis_destinations.push(KinesisDestination {
2579 stream_arn: stream_arn.to_string(),
2580 destination_status: "ACTIVE".to_string(),
2581 approximate_creation_date_time_precision: precision.to_string(),
2582 });
2583
2584 Self::ok_json(json!({
2585 "TableName": table_name,
2586 "StreamArn": stream_arn,
2587 "DestinationStatus": "ACTIVE",
2588 "EnableKinesisStreamingConfiguration": {
2589 "ApproximateCreationDateTimePrecision": precision
2590 }
2591 }))
2592 }
2593
2594 fn disable_kinesis_streaming_destination(
2595 &self,
2596 req: &AwsRequest,
2597 ) -> Result<AwsResponse, AwsServiceError> {
2598 let body = Self::parse_body(req)?;
2599 let table_name = require_str(&body, "TableName")?;
2600 let stream_arn = require_str(&body, "StreamArn")?;
2601
2602 let mut state = self.state.write();
2603 let table = get_table_mut(&mut state.tables, table_name)?;
2604
2605 if let Some(dest) = table
2606 .kinesis_destinations
2607 .iter_mut()
2608 .find(|d| d.stream_arn == stream_arn)
2609 {
2610 dest.destination_status = "DISABLED".to_string();
2611 }
2612
2613 Self::ok_json(json!({
2614 "TableName": table_name,
2615 "StreamArn": stream_arn,
2616 "DestinationStatus": "DISABLED"
2617 }))
2618 }
2619
2620 fn describe_kinesis_streaming_destination(
2621 &self,
2622 req: &AwsRequest,
2623 ) -> Result<AwsResponse, AwsServiceError> {
2624 let body = Self::parse_body(req)?;
2625 let table_name = require_str(&body, "TableName")?;
2626
2627 let state = self.state.read();
2628 let table = get_table(&state.tables, table_name)?;
2629
2630 let destinations: Vec<Value> = table
2631 .kinesis_destinations
2632 .iter()
2633 .map(|d| {
2634 json!({
2635 "StreamArn": d.stream_arn,
2636 "DestinationStatus": d.destination_status,
2637 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2638 })
2639 })
2640 .collect();
2641
2642 Self::ok_json(json!({
2643 "TableName": table_name,
2644 "KinesisDataStreamDestinations": destinations
2645 }))
2646 }
2647
2648 fn update_kinesis_streaming_destination(
2649 &self,
2650 req: &AwsRequest,
2651 ) -> Result<AwsResponse, AwsServiceError> {
2652 let body = Self::parse_body(req)?;
2653 let table_name = require_str(&body, "TableName")?;
2654 let stream_arn = require_str(&body, "StreamArn")?;
2655 let precision = body["UpdateKinesisStreamingConfiguration"]
2656 ["ApproximateCreationDateTimePrecision"]
2657 .as_str()
2658 .unwrap_or("MILLISECOND");
2659
2660 let mut state = self.state.write();
2661 let table = get_table_mut(&mut state.tables, table_name)?;
2662
2663 if let Some(dest) = table
2664 .kinesis_destinations
2665 .iter_mut()
2666 .find(|d| d.stream_arn == stream_arn)
2667 {
2668 dest.approximate_creation_date_time_precision = precision.to_string();
2669 }
2670
2671 Self::ok_json(json!({
2672 "TableName": table_name,
2673 "StreamArn": stream_arn,
2674 "DestinationStatus": "ACTIVE",
2675 "UpdateKinesisStreamingConfiguration": {
2676 "ApproximateCreationDateTimePrecision": precision
2677 }
2678 }))
2679 }
2680
2681 fn describe_contributor_insights(
2684 &self,
2685 req: &AwsRequest,
2686 ) -> Result<AwsResponse, AwsServiceError> {
2687 let body = Self::parse_body(req)?;
2688 let table_name = require_str(&body, "TableName")?;
2689 let index_name = body["IndexName"].as_str();
2690
2691 let state = self.state.read();
2692 let table = get_table(&state.tables, table_name)?;
2693
2694 let top = table.top_contributors(10);
2695 let contributors: Vec<Value> = top
2696 .iter()
2697 .map(|(key, count)| {
2698 json!({
2699 "Key": key,
2700 "Count": count
2701 })
2702 })
2703 .collect();
2704
2705 let mut result = json!({
2706 "TableName": table_name,
2707 "ContributorInsightsStatus": table.contributor_insights_status,
2708 "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2709 "TopContributors": contributors
2710 });
2711 if let Some(idx) = index_name {
2712 result["IndexName"] = json!(idx);
2713 }
2714
2715 Self::ok_json(result)
2716 }
2717
2718 fn update_contributor_insights(
2719 &self,
2720 req: &AwsRequest,
2721 ) -> Result<AwsResponse, AwsServiceError> {
2722 let body = Self::parse_body(req)?;
2723 let table_name = require_str(&body, "TableName")?;
2724 let action = require_str(&body, "ContributorInsightsAction")?;
2725 let index_name = body["IndexName"].as_str();
2726
2727 let mut state = self.state.write();
2728 let table = get_table_mut(&mut state.tables, table_name)?;
2729
2730 let status = match action {
2731 "ENABLE" => "ENABLED",
2732 "DISABLE" => "DISABLED",
2733 _ => {
2734 return Err(AwsServiceError::aws_error(
2735 StatusCode::BAD_REQUEST,
2736 "ValidationException",
2737 format!("Invalid ContributorInsightsAction: {action}"),
2738 ))
2739 }
2740 };
2741 table.contributor_insights_status = status.to_string();
2742 if status == "DISABLED" {
2743 table.contributor_insights_counters.clear();
2744 }
2745
2746 let mut result = json!({
2747 "TableName": table_name,
2748 "ContributorInsightsStatus": status
2749 });
2750 if let Some(idx) = index_name {
2751 result["IndexName"] = json!(idx);
2752 }
2753
2754 Self::ok_json(result)
2755 }
2756
2757 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2758 let body = Self::parse_body(req)?;
2759 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2760 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2761 let table_name = body["TableName"].as_str();
2762
2763 let state = self.state.read();
2764 let summaries: Vec<Value> = state
2765 .tables
2766 .values()
2767 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2768 .map(|t| {
2769 json!({
2770 "TableName": t.name,
2771 "ContributorInsightsStatus": t.contributor_insights_status
2772 })
2773 })
2774 .collect();
2775
2776 Self::ok_json(json!({
2777 "ContributorInsightsSummaries": summaries
2778 }))
2779 }
2780
2781 fn export_table_to_point_in_time(
2784 &self,
2785 req: &AwsRequest,
2786 ) -> Result<AwsResponse, AwsServiceError> {
2787 let body = Self::parse_body(req)?;
2788 let table_arn = require_str(&body, "TableArn")?;
2789 let s3_bucket = require_str(&body, "S3Bucket")?;
2790 let s3_prefix = body["S3Prefix"].as_str();
2791 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2792
2793 let state = self.state.read();
2794 let table = find_table_by_arn(&state.tables, table_arn)?;
2796 let items = table.items.clone();
2797 let item_count = items.len() as i64;
2798
2799 let now = Utc::now();
2800 let export_arn = format!(
2801 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2802 state.region,
2803 state.account_id,
2804 table_arn.rsplit('/').next().unwrap_or("unknown"),
2805 uuid::Uuid::new_v4()
2806 );
2807
2808 drop(state);
2809
2810 let mut json_lines = String::new();
2812 for item in &items {
2813 let item_json = if export_format == "DYNAMODB_JSON" {
2814 json!({ "Item": item })
2815 } else {
2816 json!(item)
2817 };
2818 json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2819 json_lines.push('\n');
2820 }
2821 let data_size = json_lines.len() as i64;
2822
2823 let s3_key = if let Some(prefix) = s3_prefix {
2825 format!("{prefix}/data/manifest-files.json")
2826 } else {
2827 "data/manifest-files.json".to_string()
2828 };
2829
2830 let mut export_failed = false;
2832 let mut failure_reason = String::new();
2833 if let Some(ref s3_state) = self.s3_state {
2834 let mut s3 = s3_state.write();
2835 if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2836 let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2837 let obj = fakecloud_s3::state::S3Object {
2838 key: s3_key.clone(),
2839 data: bytes::Bytes::from(json_lines),
2840 content_type: "application/json".to_string(),
2841 etag,
2842 size: data_size as u64,
2843 last_modified: now,
2844 metadata: HashMap::new(),
2845 storage_class: "STANDARD".to_string(),
2846 tags: HashMap::new(),
2847 acl_grants: Vec::new(),
2848 acl_owner_id: None,
2849 parts_count: None,
2850 part_sizes: None,
2851 sse_algorithm: None,
2852 sse_kms_key_id: None,
2853 bucket_key_enabled: None,
2854 version_id: None,
2855 is_delete_marker: false,
2856 content_encoding: None,
2857 website_redirect_location: None,
2858 restore_ongoing: None,
2859 restore_expiry: None,
2860 checksum_algorithm: None,
2861 checksum_value: None,
2862 lock_mode: None,
2863 lock_retain_until: None,
2864 lock_legal_hold: None,
2865 };
2866 bucket.objects.insert(s3_key, obj);
2867 } else {
2868 export_failed = true;
2869 failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2870 }
2871 }
2872
2873 let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2874
2875 let export = ExportDescription {
2876 export_arn: export_arn.clone(),
2877 export_status: export_status.to_string(),
2878 table_arn: table_arn.to_string(),
2879 s3_bucket: s3_bucket.to_string(),
2880 s3_prefix: s3_prefix.map(|s| s.to_string()),
2881 export_format: export_format.to_string(),
2882 start_time: now,
2883 end_time: now,
2884 export_time: now,
2885 item_count,
2886 billed_size_bytes: data_size,
2887 };
2888
2889 let mut state = self.state.write();
2890 state.exports.insert(export_arn.clone(), export);
2891
2892 let mut response = json!({
2893 "ExportDescription": {
2894 "ExportArn": export_arn,
2895 "ExportStatus": export_status,
2896 "TableArn": table_arn,
2897 "S3Bucket": s3_bucket,
2898 "S3Prefix": s3_prefix,
2899 "ExportFormat": export_format,
2900 "StartTime": now.timestamp() as f64,
2901 "EndTime": now.timestamp() as f64,
2902 "ExportTime": now.timestamp() as f64,
2903 "ItemCount": item_count,
2904 "BilledSizeBytes": data_size
2905 }
2906 });
2907 if export_failed {
2908 response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2909 response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2910 }
2911 Self::ok_json(response)
2912 }
2913
2914 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2915 let body = Self::parse_body(req)?;
2916 let export_arn = require_str(&body, "ExportArn")?;
2917
2918 let state = self.state.read();
2919 let export = state.exports.get(export_arn).ok_or_else(|| {
2920 AwsServiceError::aws_error(
2921 StatusCode::BAD_REQUEST,
2922 "ExportNotFoundException",
2923 format!("Export not found: {export_arn}"),
2924 )
2925 })?;
2926
2927 Self::ok_json(json!({
2928 "ExportDescription": {
2929 "ExportArn": export.export_arn,
2930 "ExportStatus": export.export_status,
2931 "TableArn": export.table_arn,
2932 "S3Bucket": export.s3_bucket,
2933 "S3Prefix": export.s3_prefix,
2934 "ExportFormat": export.export_format,
2935 "StartTime": export.start_time.timestamp() as f64,
2936 "EndTime": export.end_time.timestamp() as f64,
2937 "ExportTime": export.export_time.timestamp() as f64,
2938 "ItemCount": export.item_count,
2939 "BilledSizeBytes": export.billed_size_bytes
2940 }
2941 }))
2942 }
2943
2944 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2945 let body = Self::parse_body(req)?;
2946 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2947 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2948 let table_arn = body["TableArn"].as_str();
2949
2950 let state = self.state.read();
2951 let summaries: Vec<Value> = state
2952 .exports
2953 .values()
2954 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2955 .map(|e| {
2956 json!({
2957 "ExportArn": e.export_arn,
2958 "ExportStatus": e.export_status,
2959 "TableArn": e.table_arn
2960 })
2961 })
2962 .collect();
2963
2964 Self::ok_json(json!({
2965 "ExportSummaries": summaries
2966 }))
2967 }
2968
2969 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2970 let body = Self::parse_body(req)?;
2971 let input_format = require_str(&body, "InputFormat")?;
2972 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2973 AwsServiceError::aws_error(
2974 StatusCode::BAD_REQUEST,
2975 "ValidationException",
2976 "S3BucketSource is required",
2977 )
2978 })?;
2979 let s3_bucket = s3_source
2980 .get("S3Bucket")
2981 .and_then(|v| v.as_str())
2982 .unwrap_or("");
2983 let s3_key_prefix = s3_source
2984 .get("S3KeyPrefix")
2985 .and_then(|v| v.as_str())
2986 .unwrap_or("");
2987
2988 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2989 AwsServiceError::aws_error(
2990 StatusCode::BAD_REQUEST,
2991 "ValidationException",
2992 "TableCreationParameters is required",
2993 )
2994 })?;
2995 let table_name = table_params
2996 .get("TableName")
2997 .and_then(|v| v.as_str())
2998 .ok_or_else(|| {
2999 AwsServiceError::aws_error(
3000 StatusCode::BAD_REQUEST,
3001 "ValidationException",
3002 "TableCreationParameters.TableName is required",
3003 )
3004 })?;
3005
3006 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3007 let attribute_definitions = parse_attribute_definitions(
3008 table_params
3009 .get("AttributeDefinitions")
3010 .unwrap_or(&Value::Null),
3011 )?;
3012
3013 let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3015 let mut processed_size_bytes: i64 = 0;
3016 if let Some(ref s3_state) = self.s3_state {
3017 let s3 = s3_state.read();
3018 let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3019 AwsServiceError::aws_error(
3020 StatusCode::BAD_REQUEST,
3021 "ImportConflictException",
3022 format!("S3 bucket does not exist: {s3_bucket}"),
3023 )
3024 })?;
3025 let prefix = if s3_key_prefix.is_empty() {
3027 String::new()
3028 } else {
3029 s3_key_prefix.to_string()
3030 };
3031 for (key, obj) in &bucket.objects {
3032 if !prefix.is_empty() && !key.starts_with(&prefix) {
3033 continue;
3034 }
3035 let data = std::str::from_utf8(&obj.data).unwrap_or("");
3036 processed_size_bytes += obj.size as i64;
3037 for line in data.lines() {
3038 let line = line.trim();
3039 if line.is_empty() {
3040 continue;
3041 }
3042 if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3043 let item = if input_format == "DYNAMODB_JSON" {
3045 if let Some(item_obj) = parsed.get("Item") {
3046 item_obj.as_object().cloned().unwrap_or_default()
3047 } else {
3048 parsed.as_object().cloned().unwrap_or_default()
3049 }
3050 } else {
3051 parsed.as_object().cloned().unwrap_or_default()
3052 };
3053 if !item.is_empty() {
3054 imported_items
3055 .push(item.into_iter().collect::<HashMap<String, Value>>());
3056 }
3057 }
3058 }
3059 }
3060 }
3061
3062 let mut state = self.state.write();
3063
3064 if state.tables.contains_key(table_name) {
3065 return Err(AwsServiceError::aws_error(
3066 StatusCode::BAD_REQUEST,
3067 "ResourceInUseException",
3068 format!("Table already exists: {table_name}"),
3069 ));
3070 }
3071
3072 let now = Utc::now();
3073 let table_arn = format!(
3074 "arn:aws:dynamodb:{}:{}:table/{}",
3075 state.region, state.account_id, table_name
3076 );
3077 let import_arn = format!(
3078 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3079 state.region,
3080 state.account_id,
3081 table_name,
3082 uuid::Uuid::new_v4()
3083 );
3084
3085 let processed_item_count = imported_items.len() as i64;
3086
3087 let mut table = DynamoTable {
3088 name: table_name.to_string(),
3089 arn: table_arn.clone(),
3090 key_schema,
3091 attribute_definitions,
3092 provisioned_throughput: ProvisionedThroughput {
3093 read_capacity_units: 0,
3094 write_capacity_units: 0,
3095 },
3096 items: imported_items,
3097 gsi: Vec::new(),
3098 lsi: Vec::new(),
3099 tags: HashMap::new(),
3100 created_at: now,
3101 status: "ACTIVE".to_string(),
3102 item_count: 0,
3103 size_bytes: 0,
3104 billing_mode: "PAY_PER_REQUEST".to_string(),
3105 ttl_attribute: None,
3106 ttl_enabled: false,
3107 resource_policy: None,
3108 pitr_enabled: false,
3109 kinesis_destinations: Vec::new(),
3110 contributor_insights_status: "DISABLED".to_string(),
3111 contributor_insights_counters: HashMap::new(),
3112 stream_enabled: false,
3113 stream_view_type: None,
3114 stream_arn: None,
3115 stream_records: Arc::new(RwLock::new(Vec::new())),
3116 sse_type: None,
3117 sse_kms_key_arn: None,
3118 };
3119 table.recalculate_stats();
3120 state.tables.insert(table_name.to_string(), table);
3121
3122 let import_desc = ImportDescription {
3123 import_arn: import_arn.clone(),
3124 import_status: "COMPLETED".to_string(),
3125 table_arn: table_arn.clone(),
3126 table_name: table_name.to_string(),
3127 s3_bucket_source: s3_bucket.to_string(),
3128 input_format: input_format.to_string(),
3129 start_time: now,
3130 end_time: now,
3131 processed_item_count,
3132 processed_size_bytes,
3133 };
3134 state.imports.insert(import_arn.clone(), import_desc);
3135
3136 let table_ref = state.tables.get(table_name).unwrap();
3137 let ks: Vec<Value> = table_ref
3138 .key_schema
3139 .iter()
3140 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3141 .collect();
3142 let ad: Vec<Value> = table_ref
3143 .attribute_definitions
3144 .iter()
3145 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3146 .collect();
3147
3148 Self::ok_json(json!({
3149 "ImportTableDescription": {
3150 "ImportArn": import_arn,
3151 "ImportStatus": "COMPLETED",
3152 "TableArn": table_arn,
3153 "TableId": uuid::Uuid::new_v4().to_string(),
3154 "S3BucketSource": {
3155 "S3Bucket": s3_bucket
3156 },
3157 "InputFormat": input_format,
3158 "TableCreationParameters": {
3159 "TableName": table_name,
3160 "KeySchema": ks,
3161 "AttributeDefinitions": ad
3162 },
3163 "StartTime": now.timestamp() as f64,
3164 "EndTime": now.timestamp() as f64,
3165 "ProcessedItemCount": processed_item_count,
3166 "ProcessedSizeBytes": processed_size_bytes
3167 }
3168 }))
3169 }
3170
3171 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3172 let body = Self::parse_body(req)?;
3173 let import_arn = require_str(&body, "ImportArn")?;
3174
3175 let state = self.state.read();
3176 let import = state.imports.get(import_arn).ok_or_else(|| {
3177 AwsServiceError::aws_error(
3178 StatusCode::BAD_REQUEST,
3179 "ImportNotFoundException",
3180 format!("Import not found: {import_arn}"),
3181 )
3182 })?;
3183
3184 Self::ok_json(json!({
3185 "ImportTableDescription": {
3186 "ImportArn": import.import_arn,
3187 "ImportStatus": import.import_status,
3188 "TableArn": import.table_arn,
3189 "S3BucketSource": {
3190 "S3Bucket": import.s3_bucket_source
3191 },
3192 "InputFormat": import.input_format,
3193 "StartTime": import.start_time.timestamp() as f64,
3194 "EndTime": import.end_time.timestamp() as f64,
3195 "ProcessedItemCount": import.processed_item_count,
3196 "ProcessedSizeBytes": import.processed_size_bytes
3197 }
3198 }))
3199 }
3200
3201 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3202 let body = Self::parse_body(req)?;
3203 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3204 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3205 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3206 let table_arn = body["TableArn"].as_str();
3207
3208 let state = self.state.read();
3209 let summaries: Vec<Value> = state
3210 .imports
3211 .values()
3212 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3213 .map(|i| {
3214 json!({
3215 "ImportArn": i.import_arn,
3216 "ImportStatus": i.import_status,
3217 "TableArn": i.table_arn
3218 })
3219 })
3220 .collect();
3221
3222 Self::ok_json(json!({
3223 "ImportSummaryList": summaries
3224 }))
3225 }
3226}
3227
3228#[async_trait]
3229impl AwsService for DynamoDbService {
3230 fn service_name(&self) -> &str {
3231 "dynamodb"
3232 }
3233
3234 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3235 match req.action.as_str() {
3236 "CreateTable" => self.create_table(&req),
3237 "DeleteTable" => self.delete_table(&req),
3238 "DescribeTable" => self.describe_table(&req),
3239 "ListTables" => self.list_tables(&req),
3240 "UpdateTable" => self.update_table(&req),
3241 "PutItem" => self.put_item(&req),
3242 "GetItem" => self.get_item(&req),
3243 "DeleteItem" => self.delete_item(&req),
3244 "UpdateItem" => self.update_item(&req),
3245 "Query" => self.query(&req),
3246 "Scan" => self.scan(&req),
3247 "BatchGetItem" => self.batch_get_item(&req),
3248 "BatchWriteItem" => self.batch_write_item(&req),
3249 "TagResource" => self.tag_resource(&req),
3250 "UntagResource" => self.untag_resource(&req),
3251 "ListTagsOfResource" => self.list_tags_of_resource(&req),
3252 "TransactGetItems" => self.transact_get_items(&req),
3253 "TransactWriteItems" => self.transact_write_items(&req),
3254 "ExecuteStatement" => self.execute_statement(&req),
3255 "BatchExecuteStatement" => self.batch_execute_statement(&req),
3256 "ExecuteTransaction" => self.execute_transaction(&req),
3257 "UpdateTimeToLive" => self.update_time_to_live(&req),
3258 "DescribeTimeToLive" => self.describe_time_to_live(&req),
3259 "PutResourcePolicy" => self.put_resource_policy(&req),
3260 "GetResourcePolicy" => self.get_resource_policy(&req),
3261 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3262 "DescribeEndpoints" => self.describe_endpoints(&req),
3264 "DescribeLimits" => self.describe_limits(&req),
3265 "CreateBackup" => self.create_backup(&req),
3267 "DeleteBackup" => self.delete_backup(&req),
3268 "DescribeBackup" => self.describe_backup(&req),
3269 "ListBackups" => self.list_backups(&req),
3270 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3271 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3272 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3273 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3274 "CreateGlobalTable" => self.create_global_table(&req),
3276 "DescribeGlobalTable" => self.describe_global_table(&req),
3277 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3278 "ListGlobalTables" => self.list_global_tables(&req),
3279 "UpdateGlobalTable" => self.update_global_table(&req),
3280 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3281 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3282 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3283 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3285 "DisableKinesisStreamingDestination" => {
3286 self.disable_kinesis_streaming_destination(&req)
3287 }
3288 "DescribeKinesisStreamingDestination" => {
3289 self.describe_kinesis_streaming_destination(&req)
3290 }
3291 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3292 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3294 "UpdateContributorInsights" => self.update_contributor_insights(&req),
3295 "ListContributorInsights" => self.list_contributor_insights(&req),
3296 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3298 "DescribeExport" => self.describe_export(&req),
3299 "ListExports" => self.list_exports(&req),
3300 "ImportTable" => self.import_table(&req),
3301 "DescribeImport" => self.describe_import(&req),
3302 "ListImports" => self.list_imports(&req),
3303 _ => Err(AwsServiceError::action_not_implemented(
3304 "dynamodb",
3305 &req.action,
3306 )),
3307 }
3308 }
3309
3310 fn supported_actions(&self) -> &[&str] {
3311 &[
3312 "CreateTable",
3313 "DeleteTable",
3314 "DescribeTable",
3315 "ListTables",
3316 "UpdateTable",
3317 "PutItem",
3318 "GetItem",
3319 "DeleteItem",
3320 "UpdateItem",
3321 "Query",
3322 "Scan",
3323 "BatchGetItem",
3324 "BatchWriteItem",
3325 "TagResource",
3326 "UntagResource",
3327 "ListTagsOfResource",
3328 "TransactGetItems",
3329 "TransactWriteItems",
3330 "ExecuteStatement",
3331 "BatchExecuteStatement",
3332 "ExecuteTransaction",
3333 "UpdateTimeToLive",
3334 "DescribeTimeToLive",
3335 "PutResourcePolicy",
3336 "GetResourcePolicy",
3337 "DeleteResourcePolicy",
3338 "DescribeEndpoints",
3339 "DescribeLimits",
3340 "CreateBackup",
3341 "DeleteBackup",
3342 "DescribeBackup",
3343 "ListBackups",
3344 "RestoreTableFromBackup",
3345 "RestoreTableToPointInTime",
3346 "UpdateContinuousBackups",
3347 "DescribeContinuousBackups",
3348 "CreateGlobalTable",
3349 "DescribeGlobalTable",
3350 "DescribeGlobalTableSettings",
3351 "ListGlobalTables",
3352 "UpdateGlobalTable",
3353 "UpdateGlobalTableSettings",
3354 "DescribeTableReplicaAutoScaling",
3355 "UpdateTableReplicaAutoScaling",
3356 "EnableKinesisStreamingDestination",
3357 "DisableKinesisStreamingDestination",
3358 "DescribeKinesisStreamingDestination",
3359 "UpdateKinesisStreamingDestination",
3360 "DescribeContributorInsights",
3361 "UpdateContributorInsights",
3362 "ListContributorInsights",
3363 "ExportTableToPointInTime",
3364 "DescribeExport",
3365 "ListExports",
3366 "ImportTable",
3367 "DescribeImport",
3368 "ListImports",
3369 ]
3370 }
3371}
3372
3373fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3376 body[field].as_str().ok_or_else(|| {
3377 AwsServiceError::aws_error(
3378 StatusCode::BAD_REQUEST,
3379 "ValidationException",
3380 format!("{field} is required"),
3381 )
3382 })
3383}
3384
3385fn require_object(
3386 body: &Value,
3387 field: &str,
3388) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3389 let obj = body[field].as_object().ok_or_else(|| {
3390 AwsServiceError::aws_error(
3391 StatusCode::BAD_REQUEST,
3392 "ValidationException",
3393 format!("{field} is required"),
3394 )
3395 })?;
3396 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3397}
3398
3399fn get_table<'a>(
3400 tables: &'a HashMap<String, DynamoTable>,
3401 name: &str,
3402) -> Result<&'a DynamoTable, AwsServiceError> {
3403 tables.get(name).ok_or_else(|| {
3404 AwsServiceError::aws_error(
3405 StatusCode::BAD_REQUEST,
3406 "ResourceNotFoundException",
3407 format!("Requested resource not found: Table: {name} not found"),
3408 )
3409 })
3410}
3411
3412fn get_table_mut<'a>(
3413 tables: &'a mut HashMap<String, DynamoTable>,
3414 name: &str,
3415) -> Result<&'a mut DynamoTable, AwsServiceError> {
3416 tables.get_mut(name).ok_or_else(|| {
3417 AwsServiceError::aws_error(
3418 StatusCode::BAD_REQUEST,
3419 "ResourceNotFoundException",
3420 format!("Requested resource not found: Table: {name} not found"),
3421 )
3422 })
3423}
3424
3425fn find_table_by_arn<'a>(
3426 tables: &'a HashMap<String, DynamoTable>,
3427 arn: &str,
3428) -> Result<&'a DynamoTable, AwsServiceError> {
3429 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3430 AwsServiceError::aws_error(
3431 StatusCode::BAD_REQUEST,
3432 "ResourceNotFoundException",
3433 format!("Requested resource not found: {arn}"),
3434 )
3435 })
3436}
3437
3438fn find_table_by_arn_mut<'a>(
3439 tables: &'a mut HashMap<String, DynamoTable>,
3440 arn: &str,
3441) -> Result<&'a mut DynamoTable, AwsServiceError> {
3442 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3443 AwsServiceError::aws_error(
3444 StatusCode::BAD_REQUEST,
3445 "ResourceNotFoundException",
3446 format!("Requested resource not found: {arn}"),
3447 )
3448 })
3449}
3450
3451fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3452 let arr = val.as_array().ok_or_else(|| {
3453 AwsServiceError::aws_error(
3454 StatusCode::BAD_REQUEST,
3455 "ValidationException",
3456 "KeySchema is required",
3457 )
3458 })?;
3459 Ok(arr
3460 .iter()
3461 .map(|elem| KeySchemaElement {
3462 attribute_name: elem["AttributeName"]
3463 .as_str()
3464 .unwrap_or_default()
3465 .to_string(),
3466 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3467 })
3468 .collect())
3469}
3470
3471fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3472 let arr = val.as_array().ok_or_else(|| {
3473 AwsServiceError::aws_error(
3474 StatusCode::BAD_REQUEST,
3475 "ValidationException",
3476 "AttributeDefinitions is required",
3477 )
3478 })?;
3479 Ok(arr
3480 .iter()
3481 .map(|elem| AttributeDefinition {
3482 attribute_name: elem["AttributeName"]
3483 .as_str()
3484 .unwrap_or_default()
3485 .to_string(),
3486 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3487 })
3488 .collect())
3489}
3490
3491fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3492 Ok(ProvisionedThroughput {
3493 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3494 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3495 })
3496}
3497
3498fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3499 let Some(arr) = val.as_array() else {
3500 return Vec::new();
3501 };
3502 arr.iter()
3503 .filter_map(|g| {
3504 Some(GlobalSecondaryIndex {
3505 index_name: g["IndexName"].as_str()?.to_string(),
3506 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3507 projection: parse_projection(&g["Projection"]),
3508 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3509 .ok(),
3510 })
3511 })
3512 .collect()
3513}
3514
3515fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3516 let Some(arr) = val.as_array() else {
3517 return Vec::new();
3518 };
3519 arr.iter()
3520 .filter_map(|l| {
3521 Some(LocalSecondaryIndex {
3522 index_name: l["IndexName"].as_str()?.to_string(),
3523 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3524 projection: parse_projection(&l["Projection"]),
3525 })
3526 })
3527 .collect()
3528}
3529
3530fn parse_projection(val: &Value) -> Projection {
3531 Projection {
3532 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3533 non_key_attributes: val["NonKeyAttributes"]
3534 .as_array()
3535 .map(|arr| {
3536 arr.iter()
3537 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3538 .collect()
3539 })
3540 .unwrap_or_default(),
3541 }
3542}
3543
3544fn parse_tags(val: &Value) -> HashMap<String, String> {
3545 let mut tags = HashMap::new();
3546 if let Some(arr) = val.as_array() {
3547 for tag in arr {
3548 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3549 tags.insert(k.to_string(), v.to_string());
3550 }
3551 }
3552 }
3553 tags
3554}
3555
3556fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3557 let mut names = HashMap::new();
3558 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3559 for (k, v) in obj {
3560 if let Some(s) = v.as_str() {
3561 names.insert(k.clone(), s.to_string());
3562 }
3563 }
3564 }
3565 names
3566}
3567
3568fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3569 let mut values = HashMap::new();
3570 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3571 for (k, v) in obj {
3572 values.insert(k.clone(), v.clone());
3573 }
3574 }
3575 values
3576}
3577
3578fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3579 if name.starts_with('#') {
3580 expr_attr_names
3581 .get(name)
3582 .cloned()
3583 .unwrap_or_else(|| name.to_string())
3584 } else {
3585 name.to_string()
3586 }
3587}
3588
3589fn extract_key(
3590 table: &DynamoTable,
3591 item: &HashMap<String, AttributeValue>,
3592) -> HashMap<String, AttributeValue> {
3593 let mut key = HashMap::new();
3594 let hash_key = table.hash_key_name();
3595 if let Some(v) = item.get(hash_key) {
3596 key.insert(hash_key.to_string(), v.clone());
3597 }
3598 if let Some(range_key) = table.range_key_name() {
3599 if let Some(v) = item.get(range_key) {
3600 key.insert(range_key.to_string(), v.clone());
3601 }
3602 }
3603 key
3604}
3605
3606fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3608 let obj = value.as_object()?;
3609 if obj.is_empty() {
3610 return None;
3611 }
3612 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3613}
3614
3615fn item_matches_key(
3617 item: &HashMap<String, AttributeValue>,
3618 key: &HashMap<String, AttributeValue>,
3619 hash_key_name: &str,
3620 range_key_name: Option<&str>,
3621) -> bool {
3622 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3623 (Some(a), Some(b)) => a == b,
3624 _ => false,
3625 };
3626 if !hash_match {
3627 return false;
3628 }
3629 match range_key_name {
3630 Some(rk) => match (item.get(rk), key.get(rk)) {
3631 (Some(a), Some(b)) => a == b,
3632 (None, None) => true,
3633 _ => false,
3634 },
3635 None => true,
3636 }
3637}
3638
3639fn extract_key_for_schema(
3641 item: &HashMap<String, AttributeValue>,
3642 hash_key_name: &str,
3643 range_key_name: Option<&str>,
3644) -> HashMap<String, AttributeValue> {
3645 let mut key = HashMap::new();
3646 if let Some(v) = item.get(hash_key_name) {
3647 key.insert(hash_key_name.to_string(), v.clone());
3648 }
3649 if let Some(rk) = range_key_name {
3650 if let Some(v) = item.get(rk) {
3651 key.insert(rk.to_string(), v.clone());
3652 }
3653 }
3654 key
3655}
3656
3657fn validate_key_in_item(
3658 table: &DynamoTable,
3659 item: &HashMap<String, AttributeValue>,
3660) -> Result<(), AwsServiceError> {
3661 let hash_key = table.hash_key_name();
3662 if !item.contains_key(hash_key) {
3663 return Err(AwsServiceError::aws_error(
3664 StatusCode::BAD_REQUEST,
3665 "ValidationException",
3666 format!("Missing the key {hash_key} in the item"),
3667 ));
3668 }
3669 if let Some(range_key) = table.range_key_name() {
3670 if !item.contains_key(range_key) {
3671 return Err(AwsServiceError::aws_error(
3672 StatusCode::BAD_REQUEST,
3673 "ValidationException",
3674 format!("Missing the key {range_key} in the item"),
3675 ));
3676 }
3677 }
3678 Ok(())
3679}
3680
3681fn validate_key_attributes_in_key(
3682 table: &DynamoTable,
3683 key: &HashMap<String, AttributeValue>,
3684) -> Result<(), AwsServiceError> {
3685 let hash_key = table.hash_key_name();
3686 if !key.contains_key(hash_key) {
3687 return Err(AwsServiceError::aws_error(
3688 StatusCode::BAD_REQUEST,
3689 "ValidationException",
3690 format!("Missing the key {hash_key} in the item"),
3691 ));
3692 }
3693 Ok(())
3694}
3695
3696fn project_item(
3697 item: &HashMap<String, AttributeValue>,
3698 body: &Value,
3699) -> HashMap<String, AttributeValue> {
3700 let projection = body["ProjectionExpression"].as_str();
3701 match projection {
3702 Some(proj) if !proj.is_empty() => {
3703 let expr_attr_names = parse_expression_attribute_names(body);
3704 let attrs: Vec<String> = proj
3705 .split(',')
3706 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3707 .collect();
3708 let mut result = HashMap::new();
3709 for attr in &attrs {
3710 if let Some(v) = resolve_nested_path(item, attr) {
3711 insert_nested_value(&mut result, attr, v);
3712 }
3713 }
3714 result
3715 }
3716 _ => item.clone(),
3717 }
3718}
3719
3720fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3723 let mut result = String::new();
3725 for (i, segment) in path.split('.').enumerate() {
3726 if i > 0 {
3727 result.push('.');
3728 }
3729 if let Some(bracket_pos) = segment.find('[') {
3731 let key_part = &segment[..bracket_pos];
3732 let index_part = &segment[bracket_pos..];
3733 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3734 result.push_str(index_part);
3735 } else {
3736 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3737 }
3738 }
3739 result
3740}
3741
3742fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3744 let segments = parse_path_segments(path);
3745 if segments.is_empty() {
3746 return None;
3747 }
3748
3749 let first = &segments[0];
3750 let top_key = match first {
3751 PathSegment::Key(k) => k.as_str(),
3752 _ => return None,
3753 };
3754
3755 let mut current = item.get(top_key)?.clone();
3756
3757 for segment in &segments[1..] {
3758 match segment {
3759 PathSegment::Key(k) => {
3760 current = current.get("M")?.get(k)?.clone();
3762 }
3763 PathSegment::Index(idx) => {
3764 current = current.get("L")?.get(*idx)?.clone();
3766 }
3767 }
3768 }
3769
3770 Some(current)
3771}
3772
3773#[derive(Debug)]
3774enum PathSegment {
3775 Key(String),
3776 Index(usize),
3777}
3778
3779fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3781 let mut segments = Vec::new();
3782 let mut current = String::new();
3783
3784 let chars: Vec<char> = path.chars().collect();
3785 let mut i = 0;
3786 while i < chars.len() {
3787 match chars[i] {
3788 '.' => {
3789 if !current.is_empty() {
3790 segments.push(PathSegment::Key(current.clone()));
3791 current.clear();
3792 }
3793 }
3794 '[' => {
3795 if !current.is_empty() {
3796 segments.push(PathSegment::Key(current.clone()));
3797 current.clear();
3798 }
3799 i += 1;
3800 let mut num = String::new();
3801 while i < chars.len() && chars[i] != ']' {
3802 num.push(chars[i]);
3803 i += 1;
3804 }
3805 if let Ok(idx) = num.parse::<usize>() {
3806 segments.push(PathSegment::Index(idx));
3807 }
3808 }
3810 c => {
3811 current.push(c);
3812 }
3813 }
3814 i += 1;
3815 }
3816 if !current.is_empty() {
3817 segments.push(PathSegment::Key(current));
3818 }
3819 segments
3820}
3821
3822fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3825 if !path.contains('.') && !path.contains('[') {
3827 result.insert(path.to_string(), value);
3828 return;
3829 }
3830
3831 let segments = parse_path_segments(path);
3832 if segments.is_empty() {
3833 return;
3834 }
3835
3836 let top_key = match &segments[0] {
3837 PathSegment::Key(k) => k.clone(),
3838 _ => return,
3839 };
3840
3841 if segments.len() == 1 {
3842 result.insert(top_key, value);
3843 return;
3844 }
3845
3846 let wrapped = wrap_value_in_path(&segments[1..], value);
3848 let existing = result.remove(&top_key);
3850 let merged = match existing {
3851 Some(existing) => merge_attribute_values(existing, wrapped),
3852 None => wrapped,
3853 };
3854 result.insert(top_key, merged);
3855}
3856
3857fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3859 if segments.is_empty() {
3860 return value;
3861 }
3862 let inner = wrap_value_in_path(&segments[1..], value);
3863 match &segments[0] {
3864 PathSegment::Key(k) => {
3865 json!({"M": {k.clone(): inner}})
3866 }
3867 PathSegment::Index(idx) => {
3868 let mut arr = vec![Value::Null; idx + 1];
3869 arr[*idx] = inner;
3870 json!({"L": arr})
3871 }
3872 }
3873}
3874
3875fn merge_attribute_values(a: Value, b: Value) -> Value {
3877 if let (Some(a_map), Some(b_map)) = (
3878 a.get("M").and_then(|v| v.as_object()),
3879 b.get("M").and_then(|v| v.as_object()),
3880 ) {
3881 let mut merged = a_map.clone();
3882 for (k, v) in b_map {
3883 if let Some(existing) = merged.get(k) {
3884 merged.insert(
3885 k.clone(),
3886 merge_attribute_values(existing.clone(), v.clone()),
3887 );
3888 } else {
3889 merged.insert(k.clone(), v.clone());
3890 }
3891 }
3892 json!({"M": merged})
3893 } else {
3894 b
3895 }
3896}
3897
3898fn evaluate_condition(
3899 condition: &str,
3900 existing: Option<&HashMap<String, AttributeValue>>,
3901 expr_attr_names: &HashMap<String, String>,
3902 expr_attr_values: &HashMap<String, Value>,
3903) -> Result<(), AwsServiceError> {
3904 let empty = HashMap::new();
3909 let item = existing.unwrap_or(&empty);
3910 if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3911 Ok(())
3912 } else {
3913 Err(AwsServiceError::aws_error(
3914 StatusCode::BAD_REQUEST,
3915 "ConditionalCheckFailedException",
3916 "The conditional request failed",
3917 ))
3918 }
3919}
3920
3921fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3922 let with_paren = format!("{func_name}(");
3926 let with_space = format!("{func_name} (");
3927 let rest = expr
3928 .strip_prefix(&with_paren)
3929 .or_else(|| expr.strip_prefix(&with_space))?;
3930 let inner = rest.strip_suffix(')')?;
3931 Some(inner.trim())
3932}
3933
3934fn evaluate_key_condition(
3935 expr: &str,
3936 item: &HashMap<String, AttributeValue>,
3937 hash_key_name: &str,
3938 _range_key_name: Option<&str>,
3939 expr_attr_names: &HashMap<String, String>,
3940 expr_attr_values: &HashMap<String, Value>,
3941) -> bool {
3942 let parts: Vec<&str> = split_on_and(expr);
3943 for part in &parts {
3944 let part = part.trim();
3945 if !evaluate_single_key_condition(
3946 part,
3947 item,
3948 hash_key_name,
3949 expr_attr_names,
3950 expr_attr_values,
3951 ) {
3952 return false;
3953 }
3954 }
3955 true
3956}
3957
3958fn split_on_and(expr: &str) -> Vec<&str> {
3959 let mut parts = Vec::new();
3960 let mut start = 0;
3961 let len = expr.len();
3962 let mut i = 0;
3963 let mut depth = 0;
3964 while i < len {
3965 let ch = expr.as_bytes()[i];
3966 if ch == b'(' {
3967 depth += 1;
3968 } else if ch == b')' {
3969 if depth > 0 {
3970 depth -= 1;
3971 }
3972 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3973 parts.push(&expr[start..i]);
3974 start = i + 5;
3975 i = start;
3976 continue;
3977 }
3978 i += 1;
3979 }
3980 parts.push(&expr[start..]);
3981 parts
3982}
3983
3984fn split_on_or(expr: &str) -> Vec<&str> {
3985 let mut parts = Vec::new();
3986 let mut start = 0;
3987 let len = expr.len();
3988 let mut i = 0;
3989 let mut depth = 0;
3990 while i < len {
3991 let ch = expr.as_bytes()[i];
3992 if ch == b'(' {
3993 depth += 1;
3994 } else if ch == b')' {
3995 if depth > 0 {
3996 depth -= 1;
3997 }
3998 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3999 parts.push(&expr[start..i]);
4000 start = i + 4;
4001 i = start;
4002 continue;
4003 }
4004 i += 1;
4005 }
4006 parts.push(&expr[start..]);
4007 parts
4008}
4009
4010fn evaluate_single_key_condition(
4011 part: &str,
4012 item: &HashMap<String, AttributeValue>,
4013 _hash_key_name: &str,
4014 expr_attr_names: &HashMap<String, String>,
4015 expr_attr_values: &HashMap<String, Value>,
4016) -> bool {
4017 let part = part.trim();
4018
4019 if let Some(rest) = part
4021 .strip_prefix("begins_with(")
4022 .or_else(|| part.strip_prefix("begins_with ("))
4023 {
4024 if let Some(inner) = rest.strip_suffix(')') {
4025 let mut split = inner.splitn(2, ',');
4026 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4027 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4028 let val_ref = val_ref.trim();
4029 let expected = expr_attr_values.get(val_ref);
4030 let actual = item.get(&attr_name);
4031 return match (actual, expected) {
4032 (Some(a), Some(e)) => {
4033 let a_str = a.get("S").and_then(|v| v.as_str());
4034 let e_str = e.get("S").and_then(|v| v.as_str());
4035 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4036 }
4037 _ => false,
4038 };
4039 }
4040 }
4041 return false;
4042 }
4043
4044 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4046 let attr_part = part[..between_pos].trim();
4047 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4048 let range_part = &part[between_pos + 7..];
4049 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4050 let lo_ref = range_part[..and_pos].trim();
4051 let hi_ref = range_part[and_pos + 5..].trim();
4052 let lo = expr_attr_values.get(lo_ref);
4053 let hi = expr_attr_values.get(hi_ref);
4054 let actual = item.get(&attr_name);
4055 return match (actual, lo, hi) {
4056 (Some(a), Some(l), Some(h)) => {
4057 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4058 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4059 }
4060 _ => false,
4061 };
4062 }
4063 }
4064
4065 for op in &["<=", ">=", "<>", "=", "<", ">"] {
4067 if let Some(pos) = part.find(op) {
4068 let left = part[..pos].trim();
4069 let right = part[pos + op.len()..].trim();
4070 let attr_name = resolve_attr_name(left, expr_attr_names);
4071 let expected = expr_attr_values.get(right);
4072 let actual = item.get(&attr_name);
4073
4074 return match *op {
4075 "=" => actual == expected,
4076 "<>" => actual != expected,
4077 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4078 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4079 "<=" => {
4080 let cmp = compare_attribute_values(actual, expected);
4081 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4082 }
4083 ">=" => {
4084 let cmp = compare_attribute_values(actual, expected);
4085 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4086 }
4087 _ => false,
4088 };
4089 }
4090 }
4091
4092 false
4093}
4094
4095fn attribute_size(val: &Value) -> Option<usize> {
4100 if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
4101 return Some(s.len());
4102 }
4103 if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
4104 return Some(b.len()); }
4107 if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
4108 return Some(arr.len());
4109 }
4110 if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
4111 return Some(arr.len());
4112 }
4113 if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
4114 return Some(arr.len());
4115 }
4116 if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
4117 return Some(arr.len());
4118 }
4119 if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
4120 return Some(obj.len());
4121 }
4122 if val.get("N").is_some() {
4123 return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
4125 }
4126 if val.get("BOOL").is_some() || val.get("NULL").is_some() {
4127 return Some(1);
4128 }
4129 None
4130}
4131
4132fn evaluate_size_comparison(
4134 part: &str,
4135 item: &HashMap<String, AttributeValue>,
4136 expr_attr_names: &HashMap<String, String>,
4137 expr_attr_values: &HashMap<String, Value>,
4138) -> Option<bool> {
4139 let open = part.find('(')?;
4141 let close = part[open..].find(')')? + open;
4142 let path = part[open + 1..close].trim();
4143 let remainder = part[close + 1..].trim();
4144
4145 let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
4147 ("<=", rest.trim())
4148 } else if let Some(rest) = remainder.strip_prefix(">=") {
4149 (">=", rest.trim())
4150 } else if let Some(rest) = remainder.strip_prefix("<>") {
4151 ("<>", rest.trim())
4152 } else if let Some(rest) = remainder.strip_prefix('<') {
4153 ("<", rest.trim())
4154 } else if let Some(rest) = remainder.strip_prefix('>') {
4155 (">", rest.trim())
4156 } else if let Some(rest) = remainder.strip_prefix('=') {
4157 ("=", rest.trim())
4158 } else {
4159 return None;
4160 };
4161
4162 let attr_name = resolve_attr_name(path, expr_attr_names);
4163 let actual = item.get(&attr_name)?;
4164 let size = attribute_size(actual)? as f64;
4165
4166 let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
4167
4168 Some(match op {
4169 "=" => (size - expected).abs() < f64::EPSILON,
4170 "<>" => (size - expected).abs() >= f64::EPSILON,
4171 "<" => size < expected,
4172 ">" => size > expected,
4173 "<=" => size <= expected,
4174 ">=" => size >= expected,
4175 _ => false,
4176 })
4177}
4178
4179fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4180 match (a, b) {
4181 (None, None) => std::cmp::Ordering::Equal,
4182 (None, Some(_)) => std::cmp::Ordering::Less,
4183 (Some(_), None) => std::cmp::Ordering::Greater,
4184 (Some(a), Some(b)) => {
4185 let a_type = attribute_type_and_value(a);
4186 let b_type = attribute_type_and_value(b);
4187 match (a_type, b_type) {
4188 (Some(("S", a_val)), Some(("S", b_val))) => {
4189 let a_str = a_val.as_str().unwrap_or("");
4190 let b_str = b_val.as_str().unwrap_or("");
4191 a_str.cmp(b_str)
4192 }
4193 (Some(("N", a_val)), Some(("N", b_val))) => {
4194 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4195 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4196 a_num
4197 .partial_cmp(&b_num)
4198 .unwrap_or(std::cmp::Ordering::Equal)
4199 }
4200 (Some(("B", a_val)), Some(("B", b_val))) => {
4201 let a_str = a_val.as_str().unwrap_or("");
4202 let b_str = b_val.as_str().unwrap_or("");
4203 a_str.cmp(b_str)
4204 }
4205 _ => std::cmp::Ordering::Equal,
4206 }
4207 }
4208 }
4209}
4210
4211fn evaluate_filter_expression(
4212 expr: &str,
4213 item: &HashMap<String, AttributeValue>,
4214 expr_attr_names: &HashMap<String, String>,
4215 expr_attr_values: &HashMap<String, Value>,
4216) -> bool {
4217 let trimmed = expr.trim();
4218
4219 let or_parts = split_on_or(trimmed);
4221 if or_parts.len() > 1 {
4222 return or_parts.iter().any(|part| {
4223 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4224 });
4225 }
4226
4227 let and_parts = split_on_and(trimmed);
4229 if and_parts.len() > 1 {
4230 return and_parts.iter().all(|part| {
4231 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4232 });
4233 }
4234
4235 let stripped = strip_outer_parens(trimmed);
4237 if stripped != trimmed {
4238 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4239 }
4240
4241 if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4243 return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4244 }
4245
4246 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4247}
4248
4249fn strip_outer_parens(expr: &str) -> &str {
4251 let trimmed = expr.trim();
4252 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4253 return trimmed;
4254 }
4255 let inner = &trimmed[1..trimmed.len() - 1];
4257 let mut depth = 0;
4258 for ch in inner.bytes() {
4259 match ch {
4260 b'(' => depth += 1,
4261 b')' => {
4262 if depth == 0 {
4263 return trimmed; }
4265 depth -= 1;
4266 }
4267 _ => {}
4268 }
4269 }
4270 if depth == 0 {
4271 inner
4272 } else {
4273 trimmed
4274 }
4275}
4276
4277fn evaluate_single_filter_condition(
4278 part: &str,
4279 item: &HashMap<String, AttributeValue>,
4280 expr_attr_names: &HashMap<String, String>,
4281 expr_attr_values: &HashMap<String, Value>,
4282) -> bool {
4283 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4284 let attr = resolve_attr_name(inner, expr_attr_names);
4285 return item.contains_key(&attr);
4286 }
4287
4288 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4289 let attr = resolve_attr_name(inner, expr_attr_names);
4290 return !item.contains_key(&attr);
4291 }
4292
4293 if let Some(rest) = part
4295 .strip_prefix("begins_with(")
4296 .or_else(|| part.strip_prefix("begins_with ("))
4297 {
4298 if let Some(inner) = rest.strip_suffix(')') {
4299 let mut split = inner.splitn(2, ',');
4300 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4301 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4302 let expected = expr_attr_values.get(val_ref.trim());
4303 let actual = item.get(&attr_name);
4304 return match (actual, expected) {
4305 (Some(a), Some(e)) => {
4306 let a_str = a.get("S").and_then(|v| v.as_str());
4307 let e_str = e.get("S").and_then(|v| v.as_str());
4308 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
4309 }
4310 _ => false,
4311 };
4312 }
4313 }
4314 }
4315
4316 if let Some(rest) = part
4318 .strip_prefix("contains(")
4319 .or_else(|| part.strip_prefix("contains ("))
4320 {
4321 if let Some(inner) = rest.strip_suffix(')') {
4322 let mut split = inner.splitn(2, ',');
4323 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4324 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4325 let expected = expr_attr_values.get(val_ref.trim());
4326 let actual = item.get(&attr_name);
4327 return match (actual, expected) {
4328 (Some(a), Some(e)) => {
4329 if let (Some(a_s), Some(e_s)) = (
4331 a.get("S").and_then(|v| v.as_str()),
4332 e.get("S").and_then(|v| v.as_str()),
4333 ) {
4334 return a_s.contains(e_s);
4335 }
4336 if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
4338 if let Some(val) = e.get("S") {
4339 return set.contains(val);
4340 }
4341 }
4342 if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
4343 if let Some(val) = e.get("N") {
4344 return set.contains(val);
4345 }
4346 }
4347 if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
4348 if let Some(val) = e.get("B") {
4349 return set.contains(val);
4350 }
4351 }
4352 if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
4353 return list.contains(e);
4354 }
4355 false
4356 }
4357 _ => false,
4358 };
4359 }
4360 }
4361 }
4362
4363 if part.starts_with("size(") || part.starts_with("size (") {
4365 if let Some(result) =
4366 evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
4367 {
4368 return result;
4369 }
4370 }
4371
4372 if part.starts_with("attribute_type(") || part.starts_with("attribute_type (") {
4374 if let Some(rest) = part
4375 .strip_prefix("attribute_type(")
4376 .or_else(|| part.strip_prefix("attribute_type ("))
4377 {
4378 if let Some(inner) = rest.strip_suffix(')') {
4379 let mut split = inner.splitn(2, ',');
4380 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4381 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4382 let expected_type = expr_attr_values
4383 .get(val_ref.trim())
4384 .and_then(|v| v.get("S"))
4385 .and_then(|v| v.as_str());
4386 let actual = item.get(&attr_name);
4387 return match (actual, expected_type) {
4388 (Some(val), Some(t)) => match t {
4389 "S" => val.get("S").is_some(),
4390 "N" => val.get("N").is_some(),
4391 "B" => val.get("B").is_some(),
4392 "BOOL" => val.get("BOOL").is_some(),
4393 "NULL" => val.get("NULL").is_some(),
4394 "SS" => val.get("SS").is_some(),
4395 "NS" => val.get("NS").is_some(),
4396 "BS" => val.get("BS").is_some(),
4397 "L" => val.get("L").is_some(),
4398 "M" => val.get("M").is_some(),
4399 _ => false,
4400 },
4401 _ => false,
4402 };
4403 }
4404 }
4405 }
4406 }
4407
4408 if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
4409 let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
4410 let actual = item.get(&attr_name);
4411 return evaluate_in_match(actual, &value_refs, expr_attr_values);
4412 }
4413
4414 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4415}
4416
4417fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
4425 let upper = expr.to_ascii_uppercase();
4426 let in_pos = upper.find(" IN ")?;
4427 let attr_ref = expr[..in_pos].trim();
4428 if attr_ref.is_empty() {
4429 return None;
4430 }
4431 let rest = expr[in_pos + 4..].trim_start();
4432 let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
4433 let values: Vec<&str> = inner
4434 .split(',')
4435 .map(|s| s.trim())
4436 .filter(|s| !s.is_empty())
4437 .collect();
4438 if values.is_empty() {
4439 return None;
4440 }
4441 Some((attr_ref, values))
4442}
4443
4444fn evaluate_in_match(
4448 actual: Option<&AttributeValue>,
4449 value_refs: &[&str],
4450 expr_attr_values: &HashMap<String, Value>,
4451) -> bool {
4452 value_refs.iter().any(|v_ref| {
4453 let expected = expr_attr_values.get(*v_ref);
4454 matches!((actual, expected), (Some(a), Some(e)) if a == e)
4455 })
4456}
4457
4458fn apply_update_expression(
4459 item: &mut HashMap<String, AttributeValue>,
4460 expr: &str,
4461 expr_attr_names: &HashMap<String, String>,
4462 expr_attr_values: &HashMap<String, Value>,
4463) -> Result<(), AwsServiceError> {
4464 let clauses = parse_update_clauses(expr);
4465 if clauses.is_empty() && !expr.trim().is_empty() {
4466 return Err(AwsServiceError::aws_error(
4467 StatusCode::BAD_REQUEST,
4468 "ValidationException",
4469 "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
4470 ));
4471 }
4472 for (action, assignments) in &clauses {
4473 match action.to_ascii_uppercase().as_str() {
4474 "SET" => {
4475 for assignment in assignments {
4476 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4477 }
4478 }
4479 "REMOVE" => {
4480 for attr_ref in assignments {
4481 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4482 item.remove(&attr);
4483 }
4484 }
4485 "ADD" => {
4486 for assignment in assignments {
4487 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4488 }
4489 }
4490 "DELETE" => {
4491 for assignment in assignments {
4492 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4493 }
4494 }
4495 other => {
4496 return Err(AwsServiceError::aws_error(
4497 StatusCode::BAD_REQUEST,
4498 "ValidationException",
4499 format!("Invalid UpdateExpression: Invalid action: {}", other),
4500 ));
4501 }
4502 }
4503 }
4504 Ok(())
4505}
4506
4507fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4508 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4509 let upper = expr.to_ascii_uppercase();
4510 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4511 let mut positions: Vec<(usize, &str)> = Vec::new();
4512
4513 for kw in &keywords {
4514 let mut search_from = 0;
4515 while let Some(pos) = upper[search_from..].find(kw) {
4516 let abs_pos = search_from + pos;
4517 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4518 let after_pos = abs_pos + kw.len();
4519 let after_ok =
4520 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4521 if before_ok && after_ok {
4522 positions.push((abs_pos, kw));
4523 }
4524 search_from = abs_pos + kw.len();
4525 }
4526 }
4527
4528 positions.sort_by_key(|(pos, _)| *pos);
4529
4530 for (i, &(pos, kw)) in positions.iter().enumerate() {
4531 let start = pos + kw.len();
4532 let end = if i + 1 < positions.len() {
4533 positions[i + 1].0
4534 } else {
4535 expr.len()
4536 };
4537 let content = expr[start..end].trim();
4538 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4539 clauses.push((kw.to_string(), assignments));
4540 }
4541
4542 clauses
4543}
4544
4545fn apply_set_assignment(
4546 item: &mut HashMap<String, AttributeValue>,
4547 assignment: &str,
4548 expr_attr_names: &HashMap<String, String>,
4549 expr_attr_values: &HashMap<String, Value>,
4550) -> Result<(), AwsServiceError> {
4551 let Some((left, right)) = assignment.split_once('=') else {
4552 return Ok(());
4553 };
4554
4555 let left_trimmed = left.trim();
4556 let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
4560 Some((name, idx)) => (name, Some(idx)),
4561 None => (left_trimmed, None),
4562 };
4563 let attr = resolve_attr_name(attr_ref, expr_attr_names);
4564 let right = right.trim();
4565
4566 if let Some(rest) = right
4568 .strip_prefix("if_not_exists(")
4569 .or_else(|| right.strip_prefix("if_not_exists ("))
4570 {
4571 if let Some(inner) = rest.strip_suffix(')') {
4572 let mut split = inner.splitn(2, ',');
4573 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4574 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4575 if !item.contains_key(&check_name) {
4576 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4577 item.insert(attr, val.clone());
4578 }
4579 }
4580 return Ok(());
4581 }
4582 }
4583 }
4584
4585 if let Some(rest) = right
4587 .strip_prefix("list_append(")
4588 .or_else(|| right.strip_prefix("list_append ("))
4589 {
4590 if let Some(inner) = rest.strip_suffix(')') {
4591 let mut split = inner.splitn(2, ',');
4592 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4593 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4594 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4595
4596 let mut merged = Vec::new();
4597 if let Some(Value::Object(obj)) = &a_val {
4598 if let Some(Value::Array(arr)) = obj.get("L") {
4599 merged.extend(arr.clone());
4600 }
4601 }
4602 if let Some(Value::Object(obj)) = &b_val {
4603 if let Some(Value::Array(arr)) = obj.get("L") {
4604 merged.extend(arr.clone());
4605 }
4606 }
4607
4608 item.insert(attr, json!({"L": merged}));
4609 return Ok(());
4610 }
4611 }
4612 }
4613
4614 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4616 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4617 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4618
4619 let left_num = match extract_number(&left_val) {
4621 Some(n) => n,
4622 None if left_val.is_some() => {
4623 return Err(AwsServiceError::aws_error(
4624 StatusCode::BAD_REQUEST,
4625 "ValidationException",
4626 "An operand in the update expression has an incorrect data type",
4627 ));
4628 }
4629 None => 0.0, };
4631 let right_num = match extract_number(&right_val) {
4632 Some(n) => n,
4633 None => {
4634 return Err(AwsServiceError::aws_error(
4635 StatusCode::BAD_REQUEST,
4636 "ValidationException",
4637 "An operand in the update expression has an incorrect data type",
4638 ));
4639 }
4640 };
4641
4642 let result = if is_add {
4643 left_num + right_num
4644 } else {
4645 left_num - right_num
4646 };
4647
4648 let num_str = if result == result.trunc() {
4649 format!("{}", result as i64)
4650 } else {
4651 format!("{result}")
4652 };
4653
4654 item.insert(attr, json!({"N": num_str}));
4655 return Ok(());
4656 }
4657
4658 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4660 if let Some(v) = val {
4661 match list_index {
4662 Some(idx) => assign_list_index(item, &attr, idx, v)?,
4663 None => {
4664 item.insert(attr, v);
4665 }
4666 }
4667 }
4668
4669 Ok(())
4670}
4671
4672fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
4676 let path = path.trim();
4677 if !path.ends_with(']') {
4678 return None;
4679 }
4680 let open = path.rfind('[')?;
4681 let idx_str = &path[open + 1..path.len() - 1];
4685 let idx: usize = idx_str.parse().ok()?;
4686 let name = &path[..open];
4687 if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
4688 return None;
4689 }
4690 Some((name, idx))
4691}
4692
4693fn assign_list_index(
4698 item: &mut HashMap<String, AttributeValue>,
4699 attr: &str,
4700 idx: usize,
4701 value: Value,
4702) -> Result<(), AwsServiceError> {
4703 let Some(existing) = item.get_mut(attr) else {
4704 return Err(AwsServiceError::aws_error(
4705 StatusCode::BAD_REQUEST,
4706 "ValidationException",
4707 "The document path provided in the update expression is invalid for update",
4708 ));
4709 };
4710 let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
4711 return Err(AwsServiceError::aws_error(
4712 StatusCode::BAD_REQUEST,
4713 "ValidationException",
4714 "The document path provided in the update expression is invalid for update",
4715 ));
4716 };
4717 if idx < list.len() {
4718 list[idx] = value;
4719 } else if idx == list.len() {
4720 list.push(value);
4721 } else {
4722 return Err(AwsServiceError::aws_error(
4723 StatusCode::BAD_REQUEST,
4724 "ValidationException",
4725 "The document path provided in the update expression is invalid for update",
4726 ));
4727 }
4728 Ok(())
4729}
4730
4731fn resolve_value(
4732 reference: &str,
4733 item: &HashMap<String, AttributeValue>,
4734 expr_attr_names: &HashMap<String, String>,
4735 expr_attr_values: &HashMap<String, Value>,
4736) -> Option<Value> {
4737 let reference = reference.trim();
4738 if reference.starts_with(':') {
4739 expr_attr_values.get(reference).cloned()
4740 } else {
4741 let attr_name = resolve_attr_name(reference, expr_attr_names);
4742 item.get(&attr_name).cloned()
4743 }
4744}
4745
4746fn extract_number(val: &Option<Value>) -> Option<f64> {
4747 val.as_ref()
4748 .and_then(|v| v.get("N"))
4749 .and_then(|n| n.as_str())
4750 .and_then(|s| s.parse().ok())
4751}
4752
4753fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4754 let mut depth = 0;
4755 for (i, c) in expr.char_indices() {
4756 match c {
4757 '(' => depth += 1,
4758 ')' => depth -= 1,
4759 '+' if depth == 0 && i > 0 => {
4760 return Some((&expr[..i], &expr[i + 1..], true));
4761 }
4762 '-' if depth == 0 && i > 0 => {
4763 return Some((&expr[..i], &expr[i + 1..], false));
4764 }
4765 _ => {}
4766 }
4767 }
4768 None
4769}
4770
4771fn apply_add_assignment(
4772 item: &mut HashMap<String, AttributeValue>,
4773 assignment: &str,
4774 expr_attr_names: &HashMap<String, String>,
4775 expr_attr_values: &HashMap<String, Value>,
4776) -> Result<(), AwsServiceError> {
4777 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4778 if parts.len() != 2 {
4779 return Ok(());
4780 }
4781
4782 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4783 let val_ref = parts[1].trim();
4784 let add_val = expr_attr_values.get(val_ref);
4785
4786 if let Some(add_val) = add_val {
4787 if let Some(existing) = item.get(&attr) {
4788 if let (Some(existing_num), Some(add_num)) = (
4789 extract_number(&Some(existing.clone())),
4790 extract_number(&Some(add_val.clone())),
4791 ) {
4792 let result = existing_num + add_num;
4793 let num_str = if result == result.trunc() {
4794 format!("{}", result as i64)
4795 } else {
4796 format!("{result}")
4797 };
4798 item.insert(attr, json!({"N": num_str}));
4799 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4800 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4801 let mut merged: Vec<Value> = existing_set.clone();
4802 for v in add_set {
4803 if !merged.contains(v) {
4804 merged.push(v.clone());
4805 }
4806 }
4807 item.insert(attr, json!({"SS": merged}));
4808 }
4809 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4810 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4811 let mut merged: Vec<Value> = existing_set.clone();
4812 for v in add_set {
4813 if !merged.contains(v) {
4814 merged.push(v.clone());
4815 }
4816 }
4817 item.insert(attr, json!({"NS": merged}));
4818 }
4819 } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
4820 if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
4821 let mut merged: Vec<Value> = existing_set.clone();
4822 for v in add_set {
4823 if !merged.contains(v) {
4824 merged.push(v.clone());
4825 }
4826 }
4827 item.insert(attr, json!({"BS": merged}));
4828 }
4829 }
4830 } else {
4831 item.insert(attr, add_val.clone());
4832 }
4833 }
4834
4835 Ok(())
4836}
4837
4838fn apply_delete_assignment(
4839 item: &mut HashMap<String, AttributeValue>,
4840 assignment: &str,
4841 expr_attr_names: &HashMap<String, String>,
4842 expr_attr_values: &HashMap<String, Value>,
4843) -> Result<(), AwsServiceError> {
4844 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4845 if parts.len() != 2 {
4846 return Ok(());
4847 }
4848
4849 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4850 let val_ref = parts[1].trim();
4851 let del_val = expr_attr_values.get(val_ref);
4852
4853 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4854 if let (Some(existing_set), Some(del_set)) = (
4855 existing.get("SS").and_then(|v| v.as_array()),
4856 del_val.get("SS").and_then(|v| v.as_array()),
4857 ) {
4858 let filtered: Vec<Value> = existing_set
4859 .iter()
4860 .filter(|v| !del_set.contains(v))
4861 .cloned()
4862 .collect();
4863 if filtered.is_empty() {
4864 item.remove(&attr);
4865 } else {
4866 item.insert(attr, json!({"SS": filtered}));
4867 }
4868 } else if let (Some(existing_set), Some(del_set)) = (
4869 existing.get("NS").and_then(|v| v.as_array()),
4870 del_val.get("NS").and_then(|v| v.as_array()),
4871 ) {
4872 let filtered: Vec<Value> = existing_set
4873 .iter()
4874 .filter(|v| !del_set.contains(v))
4875 .cloned()
4876 .collect();
4877 if filtered.is_empty() {
4878 item.remove(&attr);
4879 } else {
4880 item.insert(attr, json!({"NS": filtered}));
4881 }
4882 } else if let (Some(existing_set), Some(del_set)) = (
4883 existing.get("BS").and_then(|v| v.as_array()),
4884 del_val.get("BS").and_then(|v| v.as_array()),
4885 ) {
4886 let filtered: Vec<Value> = existing_set
4887 .iter()
4888 .filter(|v| !del_set.contains(v))
4889 .cloned()
4890 .collect();
4891 if filtered.is_empty() {
4892 item.remove(&attr);
4893 } else {
4894 item.insert(attr, json!({"BS": filtered}));
4895 }
4896 }
4897 }
4898
4899 Ok(())
4900}
4901
4902#[allow(clippy::too_many_arguments)]
4903fn build_table_description_json(
4904 arn: &str,
4905 key_schema: &[KeySchemaElement],
4906 attribute_definitions: &[AttributeDefinition],
4907 provisioned_throughput: &ProvisionedThroughput,
4908 gsi: &[GlobalSecondaryIndex],
4909 lsi: &[LocalSecondaryIndex],
4910 billing_mode: &str,
4911 created_at: chrono::DateTime<chrono::Utc>,
4912 item_count: i64,
4913 size_bytes: i64,
4914 status: &str,
4915) -> Value {
4916 let table_name = arn.rsplit('/').next().unwrap_or("");
4917 let creation_timestamp =
4918 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4919
4920 let ks: Vec<Value> = key_schema
4921 .iter()
4922 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4923 .collect();
4924
4925 let ad: Vec<Value> = attribute_definitions
4926 .iter()
4927 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4928 .collect();
4929
4930 let mut desc = json!({
4931 "TableName": table_name,
4932 "TableArn": arn,
4933 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4934 "TableStatus": status,
4935 "KeySchema": ks,
4936 "AttributeDefinitions": ad,
4937 "CreationDateTime": creation_timestamp,
4938 "ItemCount": item_count,
4939 "TableSizeBytes": size_bytes,
4940 "BillingModeSummary": { "BillingMode": billing_mode },
4941 });
4942
4943 if billing_mode != "PAY_PER_REQUEST" {
4944 desc["ProvisionedThroughput"] = json!({
4945 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4946 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4947 "NumberOfDecreasesToday": 0,
4948 });
4949 } else {
4950 desc["ProvisionedThroughput"] = json!({
4951 "ReadCapacityUnits": 0,
4952 "WriteCapacityUnits": 0,
4953 "NumberOfDecreasesToday": 0,
4954 });
4955 }
4956
4957 if !gsi.is_empty() {
4958 let gsi_json: Vec<Value> = gsi
4959 .iter()
4960 .map(|g| {
4961 let gks: Vec<Value> = g
4962 .key_schema
4963 .iter()
4964 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4965 .collect();
4966 let mut idx = json!({
4967 "IndexName": g.index_name,
4968 "KeySchema": gks,
4969 "Projection": { "ProjectionType": g.projection.projection_type },
4970 "IndexStatus": "ACTIVE",
4971 "IndexArn": format!("{arn}/index/{}", g.index_name),
4972 "ItemCount": 0,
4973 "IndexSizeBytes": 0,
4974 });
4975 if !g.projection.non_key_attributes.is_empty() {
4976 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4977 }
4978 if let Some(ref pt) = g.provisioned_throughput {
4979 idx["ProvisionedThroughput"] = json!({
4980 "ReadCapacityUnits": pt.read_capacity_units,
4981 "WriteCapacityUnits": pt.write_capacity_units,
4982 "NumberOfDecreasesToday": 0,
4983 });
4984 }
4985 idx
4986 })
4987 .collect();
4988 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4989 }
4990
4991 if !lsi.is_empty() {
4992 let lsi_json: Vec<Value> = lsi
4993 .iter()
4994 .map(|l| {
4995 let lks: Vec<Value> = l
4996 .key_schema
4997 .iter()
4998 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4999 .collect();
5000 let mut idx = json!({
5001 "IndexName": l.index_name,
5002 "KeySchema": lks,
5003 "Projection": { "ProjectionType": l.projection.projection_type },
5004 "IndexArn": format!("{arn}/index/{}", l.index_name),
5005 "ItemCount": 0,
5006 "IndexSizeBytes": 0,
5007 });
5008 if !l.projection.non_key_attributes.is_empty() {
5009 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
5010 }
5011 idx
5012 })
5013 .collect();
5014 desc["LocalSecondaryIndexes"] = json!(lsi_json);
5015 }
5016
5017 desc
5018}
5019
5020fn build_table_description(table: &DynamoTable) -> Value {
5021 let mut desc = build_table_description_json(
5022 &table.arn,
5023 &table.key_schema,
5024 &table.attribute_definitions,
5025 &table.provisioned_throughput,
5026 &table.gsi,
5027 &table.lsi,
5028 &table.billing_mode,
5029 table.created_at,
5030 table.item_count,
5031 table.size_bytes,
5032 &table.status,
5033 );
5034
5035 if table.stream_enabled {
5037 if let Some(ref stream_arn) = table.stream_arn {
5038 desc["LatestStreamArn"] = json!(stream_arn);
5039 desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
5040 }
5041 if let Some(ref view_type) = table.stream_view_type {
5042 desc["StreamSpecification"] = json!({
5043 "StreamEnabled": true,
5044 "StreamViewType": view_type,
5045 });
5046 }
5047 }
5048
5049 if let Some(ref sse_type) = table.sse_type {
5051 let mut sse_desc = json!({
5052 "Status": "ENABLED",
5053 "SSEType": sse_type,
5054 });
5055 if let Some(ref key_arn) = table.sse_kms_key_arn {
5056 sse_desc["KMSMasterKeyArn"] = json!(key_arn);
5057 }
5058 desc["SSEDescription"] = sse_desc;
5059 } else {
5060 desc["SSEDescription"] = json!({
5062 "Status": "ENABLED",
5063 "SSEType": "AES256",
5064 });
5065 }
5066
5067 desc
5068}
5069
5070fn execute_partiql_statement(
5071 state: &SharedDynamoDbState,
5072 statement: &str,
5073 parameters: &[Value],
5074) -> Result<AwsResponse, AwsServiceError> {
5075 let trimmed = statement.trim();
5076 let upper = trimmed.to_ascii_uppercase();
5077
5078 if upper.starts_with("SELECT") {
5079 execute_partiql_select(state, trimmed, parameters)
5080 } else if upper.starts_with("INSERT") {
5081 execute_partiql_insert(state, trimmed, parameters)
5082 } else if upper.starts_with("UPDATE") {
5083 execute_partiql_update(state, trimmed, parameters)
5084 } else if upper.starts_with("DELETE") {
5085 execute_partiql_delete(state, trimmed, parameters)
5086 } else {
5087 Err(AwsServiceError::aws_error(
5088 StatusCode::BAD_REQUEST,
5089 "ValidationException",
5090 format!("Unsupported PartiQL statement: {trimmed}"),
5091 ))
5092 }
5093}
5094
5095fn execute_partiql_select(
5097 state: &SharedDynamoDbState,
5098 statement: &str,
5099 parameters: &[Value],
5100) -> Result<AwsResponse, AwsServiceError> {
5101 let upper = statement.to_ascii_uppercase();
5103 let from_pos = upper.find("FROM").ok_or_else(|| {
5104 AwsServiceError::aws_error(
5105 StatusCode::BAD_REQUEST,
5106 "ValidationException",
5107 "Invalid SELECT statement: missing FROM",
5108 )
5109 })?;
5110
5111 let after_from = statement[from_pos + 4..].trim();
5112 let (table_name, rest) = parse_partiql_table_name(after_from);
5113
5114 let state = state.read();
5115 let table = get_table(&state.tables, &table_name)?;
5116
5117 let rest_upper = rest.trim().to_ascii_uppercase();
5118 if rest_upper.starts_with("WHERE") {
5119 let where_clause = rest.trim()[5..].trim();
5120 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
5121 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
5122 DynamoDbService::ok_json(json!({ "Items": items }))
5123 } else {
5124 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
5126 DynamoDbService::ok_json(json!({ "Items": items }))
5127 }
5128}
5129
5130fn execute_partiql_insert(
5131 state: &SharedDynamoDbState,
5132 statement: &str,
5133 parameters: &[Value],
5134) -> Result<AwsResponse, AwsServiceError> {
5135 let upper = statement.to_ascii_uppercase();
5138 let into_pos = upper.find("INTO").ok_or_else(|| {
5139 AwsServiceError::aws_error(
5140 StatusCode::BAD_REQUEST,
5141 "ValidationException",
5142 "Invalid INSERT statement: missing INTO",
5143 )
5144 })?;
5145
5146 let after_into = statement[into_pos + 4..].trim();
5147 let (table_name, rest) = parse_partiql_table_name(after_into);
5148
5149 let rest_upper = rest.trim().to_ascii_uppercase();
5150 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
5151 AwsServiceError::aws_error(
5152 StatusCode::BAD_REQUEST,
5153 "ValidationException",
5154 "Invalid INSERT statement: missing VALUE",
5155 )
5156 })?;
5157
5158 let value_str = rest.trim()[value_pos + 5..].trim();
5159 let item = parse_partiql_value_object(value_str, parameters)?;
5160
5161 let mut state = state.write();
5162 let table = get_table_mut(&mut state.tables, &table_name)?;
5163 let key = extract_key(table, &item);
5164 if table.find_item_index(&key).is_some() {
5165 return Err(AwsServiceError::aws_error(
5167 StatusCode::BAD_REQUEST,
5168 "DuplicateItemException",
5169 "Duplicate primary key exists in table",
5170 ));
5171 } else {
5172 table.items.push(item);
5173 }
5174 table.recalculate_stats();
5175
5176 DynamoDbService::ok_json(json!({}))
5177}
5178
5179fn execute_partiql_update(
5180 state: &SharedDynamoDbState,
5181 statement: &str,
5182 parameters: &[Value],
5183) -> Result<AwsResponse, AwsServiceError> {
5184 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
5188
5189 let rest_upper = rest.trim().to_ascii_uppercase();
5190 let set_pos = rest_upper.find("SET").ok_or_else(|| {
5191 AwsServiceError::aws_error(
5192 StatusCode::BAD_REQUEST,
5193 "ValidationException",
5194 "Invalid UPDATE statement: missing SET",
5195 )
5196 })?;
5197
5198 let after_set = rest.trim()[set_pos + 3..].trim();
5199
5200 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
5202 let (set_clause, where_clause) = if let Some(wp) = where_pos {
5203 (&after_set[..wp], after_set[wp + 5..].trim())
5204 } else {
5205 (after_set, "")
5206 };
5207
5208 let mut state = state.write();
5209 let table = get_table_mut(&mut state.tables, &table_name)?;
5210
5211 let matched_indices = if !where_clause.is_empty() {
5212 find_partiql_where_indices(table, where_clause, parameters)?
5213 } else {
5214 (0..table.items.len()).collect()
5215 };
5216
5217 let param_offset = count_params_in_str(where_clause);
5219 let assignments: Vec<&str> = set_clause.split(',').collect();
5220 for idx in &matched_indices {
5221 let mut local_offset = param_offset;
5222 for assignment in &assignments {
5223 let assignment = assignment.trim();
5224 if let Some((attr, val_str)) = assignment.split_once('=') {
5225 let attr = attr.trim().trim_matches('"');
5226 let val_str = val_str.trim();
5227 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
5228 if let Some(v) = value {
5229 table.items[*idx].insert(attr.to_string(), v);
5230 }
5231 }
5232 }
5233 }
5234 table.recalculate_stats();
5235
5236 DynamoDbService::ok_json(json!({}))
5237}
5238
5239fn execute_partiql_delete(
5240 state: &SharedDynamoDbState,
5241 statement: &str,
5242 parameters: &[Value],
5243) -> Result<AwsResponse, AwsServiceError> {
5244 let upper = statement.to_ascii_uppercase();
5246 let from_pos = upper.find("FROM").ok_or_else(|| {
5247 AwsServiceError::aws_error(
5248 StatusCode::BAD_REQUEST,
5249 "ValidationException",
5250 "Invalid DELETE statement: missing FROM",
5251 )
5252 })?;
5253
5254 let after_from = statement[from_pos + 4..].trim();
5255 let (table_name, rest) = parse_partiql_table_name(after_from);
5256
5257 let rest_upper = rest.trim().to_ascii_uppercase();
5258 if !rest_upper.starts_with("WHERE") {
5259 return Err(AwsServiceError::aws_error(
5260 StatusCode::BAD_REQUEST,
5261 "ValidationException",
5262 "DELETE requires a WHERE clause",
5263 ));
5264 }
5265 let where_clause = rest.trim()[5..].trim();
5266
5267 let mut state = state.write();
5268 let table = get_table_mut(&mut state.tables, &table_name)?;
5269
5270 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
5271 indices.sort_unstable();
5273 indices.reverse();
5274 for idx in indices {
5275 table.items.remove(idx);
5276 }
5277 table.recalculate_stats();
5278
5279 DynamoDbService::ok_json(json!({}))
5280}
5281
5282fn parse_partiql_table_name(s: &str) -> (String, &str) {
5285 let s = s.trim();
5286 if let Some(stripped) = s.strip_prefix('"') {
5287 if let Some(end) = stripped.find('"') {
5289 let name = &stripped[..end];
5290 let rest = &stripped[end + 1..];
5291 (name.to_string(), rest)
5292 } else {
5293 let end = s.find(' ').unwrap_or(s.len());
5294 (s[..end].trim_matches('"').to_string(), &s[end..])
5295 }
5296 } else {
5297 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
5298 (s[..end].to_string(), &s[end..])
5299 }
5300}
5301
5302fn evaluate_partiql_where<'a>(
5305 table: &'a DynamoTable,
5306 where_clause: &str,
5307 parameters: &[Value],
5308) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
5309 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
5310 Ok(indices.iter().map(|i| &table.items[*i]).collect())
5311}
5312
5313fn find_partiql_where_indices(
5314 table: &DynamoTable,
5315 where_clause: &str,
5316 parameters: &[Value],
5317) -> Result<Vec<usize>, AwsServiceError> {
5318 let upper = where_clause.to_uppercase();
5321 let conditions = if upper.contains(" AND ") {
5322 let mut parts = Vec::new();
5324 let mut last = 0;
5325 for (i, _) in upper.match_indices(" AND ") {
5326 parts.push(where_clause[last..i].trim());
5327 last = i + 5;
5328 }
5329 parts.push(where_clause[last..].trim());
5330 parts
5331 } else {
5332 vec![where_clause.trim()]
5333 };
5334
5335 let mut param_idx = 0usize;
5336 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5337
5338 for cond in &conditions {
5339 let cond = cond.trim();
5340 if let Some((left, right)) = cond.split_once('=') {
5341 let attr = left.trim().trim_matches('"').to_string();
5342 let val_str = right.trim();
5343 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5344 if let Some(v) = value {
5345 parsed_conditions.push((attr, v));
5346 }
5347 }
5348 }
5349
5350 let mut indices = Vec::new();
5351 for (i, item) in table.items.iter().enumerate() {
5352 let all_match = parsed_conditions
5353 .iter()
5354 .all(|(attr, expected)| item.get(attr) == Some(expected));
5355 if all_match {
5356 indices.push(i);
5357 }
5358 }
5359
5360 Ok(indices)
5361}
5362
5363fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5368 let s = s.trim();
5369 if s == "?" {
5370 let idx = *param_idx;
5371 *param_idx += 1;
5372 parameters.get(idx).cloned()
5373 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5374 let inner = &s[1..s.len() - 1];
5375 Some(json!({"S": inner}))
5376 } else if let Ok(n) = s.parse::<f64>() {
5377 let num_str = if n == n.trunc() {
5378 format!("{}", n as i64)
5379 } else {
5380 format!("{n}")
5381 };
5382 Some(json!({"N": num_str}))
5383 } else {
5384 None
5385 }
5386}
5387
5388fn parse_partiql_value_object(
5390 s: &str,
5391 parameters: &[Value],
5392) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5393 let s = s.trim();
5394 let inner = s
5395 .strip_prefix('{')
5396 .and_then(|s| s.strip_suffix('}'))
5397 .ok_or_else(|| {
5398 AwsServiceError::aws_error(
5399 StatusCode::BAD_REQUEST,
5400 "ValidationException",
5401 "Invalid VALUE: expected object literal",
5402 )
5403 })?;
5404
5405 let mut item = HashMap::new();
5406 let mut param_idx = 0usize;
5407
5408 for pair in split_partiql_pairs(inner) {
5410 let pair = pair.trim();
5411 if pair.is_empty() {
5412 continue;
5413 }
5414 if let Some((key_part, val_part)) = pair.split_once(':') {
5415 let key = key_part
5416 .trim()
5417 .trim_matches('\'')
5418 .trim_matches('"')
5419 .to_string();
5420 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5421 item.insert(key, val);
5422 }
5423 }
5424 }
5425
5426 Ok(item)
5427}
5428
5429fn split_partiql_pairs(s: &str) -> Vec<&str> {
5431 let mut parts = Vec::new();
5432 let mut start = 0;
5433 let mut depth = 0;
5434 let mut in_quote = false;
5435
5436 for (i, c) in s.char_indices() {
5437 match c {
5438 '\'' if !in_quote => in_quote = true,
5439 '\'' if in_quote => in_quote = false,
5440 '{' if !in_quote => depth += 1,
5441 '}' if !in_quote => depth -= 1,
5442 ',' if !in_quote && depth == 0 => {
5443 parts.push(&s[start..i]);
5444 start = i + 1;
5445 }
5446 _ => {}
5447 }
5448 }
5449 parts.push(&s[start..]);
5450 parts
5451}
5452
5453fn count_params_in_str(s: &str) -> usize {
5455 s.chars().filter(|c| *c == '?').count()
5456}
5457
5458#[cfg(test)]
5459mod tests {
5460 use super::*;
5461 use serde_json::json;
5462
5463 #[test]
5464 fn test_parse_update_clauses_set() {
5465 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5466 assert_eq!(clauses.len(), 1);
5467 assert_eq!(clauses[0].0, "SET");
5468 assert_eq!(clauses[0].1.len(), 2);
5469 }
5470
5471 #[test]
5472 fn test_parse_update_clauses_set_and_remove() {
5473 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5474 assert_eq!(clauses.len(), 2);
5475 assert_eq!(clauses[0].0, "SET");
5476 assert_eq!(clauses[1].0, "REMOVE");
5477 }
5478
5479 #[test]
5480 fn test_evaluate_key_condition_simple() {
5481 let mut item = HashMap::new();
5482 item.insert("pk".to_string(), json!({"S": "user1"}));
5483 item.insert("sk".to_string(), json!({"S": "order1"}));
5484
5485 let mut expr_values = HashMap::new();
5486 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5487
5488 assert!(evaluate_key_condition(
5489 "pk = :pk",
5490 &item,
5491 "pk",
5492 Some("sk"),
5493 &HashMap::new(),
5494 &expr_values,
5495 ));
5496 }
5497
5498 #[test]
5499 fn test_compare_attribute_values_numbers() {
5500 let a = json!({"N": "10"});
5501 let b = json!({"N": "20"});
5502 assert_eq!(
5503 compare_attribute_values(Some(&a), Some(&b)),
5504 std::cmp::Ordering::Less
5505 );
5506 }
5507
5508 #[test]
5509 fn test_compare_attribute_values_strings() {
5510 let a = json!({"S": "apple"});
5511 let b = json!({"S": "banana"});
5512 assert_eq!(
5513 compare_attribute_values(Some(&a), Some(&b)),
5514 std::cmp::Ordering::Less
5515 );
5516 }
5517
5518 #[test]
5519 fn test_split_on_and() {
5520 let parts = split_on_and("pk = :pk AND sk > :sk");
5521 assert_eq!(parts.len(), 2);
5522 assert_eq!(parts[0].trim(), "pk = :pk");
5523 assert_eq!(parts[1].trim(), "sk > :sk");
5524 }
5525
5526 #[test]
5527 fn test_split_on_and_respects_parentheses() {
5528 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5530 assert_eq!(parts.len(), 1);
5532 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5533 }
5534
5535 #[test]
5536 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5537 let mut item = HashMap::new();
5539 item.insert("x".to_string(), json!({"S": "no"}));
5540 item.insert("y".to_string(), json!({"S": "no"}));
5541 item.insert("z".to_string(), json!({"S": "yes"}));
5542
5543 let mut expr_values = HashMap::new();
5544 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5545
5546 let result = evaluate_filter_expression(
5548 "(x = :yes AND y = :yes) OR z = :yes",
5549 &item,
5550 &HashMap::new(),
5551 &expr_values,
5552 );
5553 assert!(result, "should match because z = :yes is true");
5554
5555 let mut item2 = HashMap::new();
5557 item2.insert("x".to_string(), json!({"S": "no"}));
5558 item2.insert("y".to_string(), json!({"S": "no"}));
5559 item2.insert("z".to_string(), json!({"S": "no"}));
5560
5561 let result2 = evaluate_filter_expression(
5562 "(x = :yes AND y = :yes) OR z = :yes",
5563 &item2,
5564 &HashMap::new(),
5565 &expr_values,
5566 );
5567 assert!(!result2, "should not match because nothing is true");
5568 }
5569
5570 #[test]
5571 fn test_project_item_nested_path() {
5572 let mut item = HashMap::new();
5574 item.insert("pk".to_string(), json!({"S": "key1"}));
5575 item.insert(
5576 "data".to_string(),
5577 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5578 );
5579
5580 let body = json!({
5581 "ProjectionExpression": "data[0].name"
5582 });
5583
5584 let projected = project_item(&item, &body);
5585 let name = projected
5587 .get("data")
5588 .and_then(|v| v.get("L"))
5589 .and_then(|v| v.get(0))
5590 .and_then(|v| v.get("M"))
5591 .and_then(|v| v.get("name"))
5592 .and_then(|v| v.get("S"))
5593 .and_then(|v| v.as_str());
5594 assert_eq!(name, Some("Alice"));
5595
5596 let age = projected
5598 .get("data")
5599 .and_then(|v| v.get("L"))
5600 .and_then(|v| v.get(0))
5601 .and_then(|v| v.get("M"))
5602 .and_then(|v| v.get("age"));
5603 assert!(age.is_none(), "age should not be present in projection");
5604 }
5605
5606 #[test]
5607 fn test_resolve_nested_path_map() {
5608 let mut item = HashMap::new();
5609 item.insert(
5610 "info".to_string(),
5611 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5612 );
5613
5614 let result = resolve_nested_path(&item, "info.address.city");
5615 assert_eq!(result, Some(json!({"S": "NYC"})));
5616 }
5617
5618 #[test]
5619 fn test_resolve_nested_path_list_then_map() {
5620 let mut item = HashMap::new();
5621 item.insert(
5622 "items".to_string(),
5623 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5624 );
5625
5626 let result = resolve_nested_path(&item, "items[0].sku");
5627 assert_eq!(result, Some(json!({"S": "ABC"})));
5628 }
5629
5630 use crate::state::SharedDynamoDbState;
5633 use parking_lot::RwLock;
5634 use std::sync::Arc;
5635
5636 fn make_service() -> DynamoDbService {
5637 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5638 "123456789012",
5639 "us-east-1",
5640 )));
5641 DynamoDbService::new(state)
5642 }
5643
5644 fn make_request(action: &str, body: Value) -> AwsRequest {
5645 AwsRequest {
5646 service: "dynamodb".to_string(),
5647 action: action.to_string(),
5648 region: "us-east-1".to_string(),
5649 account_id: "123456789012".to_string(),
5650 request_id: "test-id".to_string(),
5651 headers: http::HeaderMap::new(),
5652 query_params: HashMap::new(),
5653 body: serde_json::to_vec(&body).unwrap().into(),
5654 path_segments: vec![],
5655 raw_path: "/".to_string(),
5656 raw_query: String::new(),
5657 method: http::Method::POST,
5658 is_query_protocol: false,
5659 access_key_id: None,
5660 }
5661 }
5662
5663 fn create_test_table(svc: &DynamoDbService) {
5664 let req = make_request(
5665 "CreateTable",
5666 json!({
5667 "TableName": "test-table",
5668 "KeySchema": [
5669 { "AttributeName": "pk", "KeyType": "HASH" }
5670 ],
5671 "AttributeDefinitions": [
5672 { "AttributeName": "pk", "AttributeType": "S" }
5673 ],
5674 "BillingMode": "PAY_PER_REQUEST"
5675 }),
5676 );
5677 svc.create_table(&req).unwrap();
5678 }
5679
5680 #[test]
5681 fn delete_item_return_values_all_old() {
5682 let svc = make_service();
5683 create_test_table(&svc);
5684
5685 let req = make_request(
5687 "PutItem",
5688 json!({
5689 "TableName": "test-table",
5690 "Item": {
5691 "pk": { "S": "key1" },
5692 "name": { "S": "Alice" },
5693 "age": { "N": "30" }
5694 }
5695 }),
5696 );
5697 svc.put_item(&req).unwrap();
5698
5699 let req = make_request(
5701 "DeleteItem",
5702 json!({
5703 "TableName": "test-table",
5704 "Key": { "pk": { "S": "key1" } },
5705 "ReturnValues": "ALL_OLD"
5706 }),
5707 );
5708 let resp = svc.delete_item(&req).unwrap();
5709 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5710
5711 let attrs = &body["Attributes"];
5713 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5714 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5715 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5716
5717 let req = make_request(
5719 "GetItem",
5720 json!({
5721 "TableName": "test-table",
5722 "Key": { "pk": { "S": "key1" } }
5723 }),
5724 );
5725 let resp = svc.get_item(&req).unwrap();
5726 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5727 assert!(body.get("Item").is_none(), "item should be deleted");
5728 }
5729
5730 #[test]
5731 fn transact_get_items_returns_existing_and_missing() {
5732 let svc = make_service();
5733 create_test_table(&svc);
5734
5735 let req = make_request(
5737 "PutItem",
5738 json!({
5739 "TableName": "test-table",
5740 "Item": {
5741 "pk": { "S": "exists" },
5742 "val": { "S": "hello" }
5743 }
5744 }),
5745 );
5746 svc.put_item(&req).unwrap();
5747
5748 let req = make_request(
5749 "TransactGetItems",
5750 json!({
5751 "TransactItems": [
5752 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5753 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5754 ]
5755 }),
5756 );
5757 let resp = svc.transact_get_items(&req).unwrap();
5758 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5759 let responses = body["Responses"].as_array().unwrap();
5760 assert_eq!(responses.len(), 2);
5761 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5762 assert!(responses[1].get("Item").is_none());
5763 }
5764
5765 #[test]
5766 fn transact_write_items_put_and_delete() {
5767 let svc = make_service();
5768 create_test_table(&svc);
5769
5770 let req = make_request(
5772 "PutItem",
5773 json!({
5774 "TableName": "test-table",
5775 "Item": {
5776 "pk": { "S": "to-delete" },
5777 "val": { "S": "bye" }
5778 }
5779 }),
5780 );
5781 svc.put_item(&req).unwrap();
5782
5783 let req = make_request(
5785 "TransactWriteItems",
5786 json!({
5787 "TransactItems": [
5788 {
5789 "Put": {
5790 "TableName": "test-table",
5791 "Item": {
5792 "pk": { "S": "new-item" },
5793 "val": { "S": "hi" }
5794 }
5795 }
5796 },
5797 {
5798 "Delete": {
5799 "TableName": "test-table",
5800 "Key": { "pk": { "S": "to-delete" } }
5801 }
5802 }
5803 ]
5804 }),
5805 );
5806 let resp = svc.transact_write_items(&req).unwrap();
5807 assert_eq!(resp.status, StatusCode::OK);
5808
5809 let req = make_request(
5811 "GetItem",
5812 json!({
5813 "TableName": "test-table",
5814 "Key": { "pk": { "S": "new-item" } }
5815 }),
5816 );
5817 let resp = svc.get_item(&req).unwrap();
5818 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5819 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5820
5821 let req = make_request(
5823 "GetItem",
5824 json!({
5825 "TableName": "test-table",
5826 "Key": { "pk": { "S": "to-delete" } }
5827 }),
5828 );
5829 let resp = svc.get_item(&req).unwrap();
5830 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5831 assert!(body.get("Item").is_none());
5832 }
5833
5834 #[test]
5835 fn transact_write_items_condition_check_failure() {
5836 let svc = make_service();
5837 create_test_table(&svc);
5838
5839 let req = make_request(
5841 "TransactWriteItems",
5842 json!({
5843 "TransactItems": [
5844 {
5845 "ConditionCheck": {
5846 "TableName": "test-table",
5847 "Key": { "pk": { "S": "nonexistent" } },
5848 "ConditionExpression": "attribute_exists(pk)"
5849 }
5850 }
5851 ]
5852 }),
5853 );
5854 let resp = svc.transact_write_items(&req).unwrap();
5855 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5857 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5858 assert_eq!(
5859 body["__type"].as_str().unwrap(),
5860 "TransactionCanceledException"
5861 );
5862 assert!(body["CancellationReasons"].as_array().is_some());
5863 }
5864
5865 #[test]
5866 fn update_and_describe_time_to_live() {
5867 let svc = make_service();
5868 create_test_table(&svc);
5869
5870 let req = make_request(
5872 "UpdateTimeToLive",
5873 json!({
5874 "TableName": "test-table",
5875 "TimeToLiveSpecification": {
5876 "AttributeName": "ttl",
5877 "Enabled": true
5878 }
5879 }),
5880 );
5881 let resp = svc.update_time_to_live(&req).unwrap();
5882 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5883 assert_eq!(
5884 body["TimeToLiveSpecification"]["AttributeName"]
5885 .as_str()
5886 .unwrap(),
5887 "ttl"
5888 );
5889 assert!(body["TimeToLiveSpecification"]["Enabled"]
5890 .as_bool()
5891 .unwrap());
5892
5893 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5895 let resp = svc.describe_time_to_live(&req).unwrap();
5896 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5897 assert_eq!(
5898 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5899 .as_str()
5900 .unwrap(),
5901 "ENABLED"
5902 );
5903 assert_eq!(
5904 body["TimeToLiveDescription"]["AttributeName"]
5905 .as_str()
5906 .unwrap(),
5907 "ttl"
5908 );
5909
5910 let req = make_request(
5912 "UpdateTimeToLive",
5913 json!({
5914 "TableName": "test-table",
5915 "TimeToLiveSpecification": {
5916 "AttributeName": "ttl",
5917 "Enabled": false
5918 }
5919 }),
5920 );
5921 svc.update_time_to_live(&req).unwrap();
5922
5923 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5924 let resp = svc.describe_time_to_live(&req).unwrap();
5925 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5926 assert_eq!(
5927 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5928 .as_str()
5929 .unwrap(),
5930 "DISABLED"
5931 );
5932 }
5933
5934 #[test]
5935 fn resource_policy_lifecycle() {
5936 let svc = make_service();
5937 create_test_table(&svc);
5938
5939 let table_arn = {
5940 let state = svc.state.read();
5941 state.tables.get("test-table").unwrap().arn.clone()
5942 };
5943
5944 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5946 let req = make_request(
5947 "PutResourcePolicy",
5948 json!({
5949 "ResourceArn": table_arn,
5950 "Policy": policy_doc
5951 }),
5952 );
5953 let resp = svc.put_resource_policy(&req).unwrap();
5954 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5955 assert!(body["RevisionId"].as_str().is_some());
5956
5957 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5959 let resp = svc.get_resource_policy(&req).unwrap();
5960 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5961 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5962
5963 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5965 svc.delete_resource_policy(&req).unwrap();
5966
5967 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5969 let resp = svc.get_resource_policy(&req).unwrap();
5970 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5971 assert!(body["Policy"].is_null());
5972 }
5973
5974 #[test]
5975 fn describe_endpoints() {
5976 let svc = make_service();
5977 let req = make_request("DescribeEndpoints", json!({}));
5978 let resp = svc.describe_endpoints(&req).unwrap();
5979 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5980 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5981 }
5982
5983 #[test]
5984 fn describe_limits() {
5985 let svc = make_service();
5986 let req = make_request("DescribeLimits", json!({}));
5987 let resp = svc.describe_limits(&req).unwrap();
5988 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5989 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5990 }
5991
5992 #[test]
5993 fn backup_lifecycle() {
5994 let svc = make_service();
5995 create_test_table(&svc);
5996
5997 let req = make_request(
5999 "CreateBackup",
6000 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
6001 );
6002 let resp = svc.create_backup(&req).unwrap();
6003 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6004 let backup_arn = body["BackupDetails"]["BackupArn"]
6005 .as_str()
6006 .unwrap()
6007 .to_string();
6008 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
6009
6010 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
6012 let resp = svc.describe_backup(&req).unwrap();
6013 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6014 assert_eq!(
6015 body["BackupDescription"]["BackupDetails"]["BackupName"],
6016 "my-backup"
6017 );
6018
6019 let req = make_request("ListBackups", json!({}));
6021 let resp = svc.list_backups(&req).unwrap();
6022 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6023 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
6024
6025 let req = make_request(
6027 "RestoreTableFromBackup",
6028 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
6029 );
6030 svc.restore_table_from_backup(&req).unwrap();
6031
6032 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
6034 let resp = svc.describe_table(&req).unwrap();
6035 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6036 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6037
6038 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
6040 svc.delete_backup(&req).unwrap();
6041
6042 let req = make_request("ListBackups", json!({}));
6044 let resp = svc.list_backups(&req).unwrap();
6045 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6046 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
6047 }
6048
6049 #[test]
6050 fn continuous_backups() {
6051 let svc = make_service();
6052 create_test_table(&svc);
6053
6054 let req = make_request(
6056 "DescribeContinuousBackups",
6057 json!({ "TableName": "test-table" }),
6058 );
6059 let resp = svc.describe_continuous_backups(&req).unwrap();
6060 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6061 assert_eq!(
6062 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6063 ["PointInTimeRecoveryStatus"],
6064 "DISABLED"
6065 );
6066
6067 let req = make_request(
6069 "UpdateContinuousBackups",
6070 json!({
6071 "TableName": "test-table",
6072 "PointInTimeRecoverySpecification": {
6073 "PointInTimeRecoveryEnabled": true
6074 }
6075 }),
6076 );
6077 svc.update_continuous_backups(&req).unwrap();
6078
6079 let req = make_request(
6081 "DescribeContinuousBackups",
6082 json!({ "TableName": "test-table" }),
6083 );
6084 let resp = svc.describe_continuous_backups(&req).unwrap();
6085 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6086 assert_eq!(
6087 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
6088 ["PointInTimeRecoveryStatus"],
6089 "ENABLED"
6090 );
6091 }
6092
6093 #[test]
6094 fn restore_table_to_point_in_time() {
6095 let svc = make_service();
6096 create_test_table(&svc);
6097
6098 let req = make_request(
6099 "RestoreTableToPointInTime",
6100 json!({
6101 "SourceTableName": "test-table",
6102 "TargetTableName": "pitr-restored"
6103 }),
6104 );
6105 svc.restore_table_to_point_in_time(&req).unwrap();
6106
6107 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
6108 let resp = svc.describe_table(&req).unwrap();
6109 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6110 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6111 }
6112
6113 #[test]
6114 fn global_table_lifecycle() {
6115 let svc = make_service();
6116
6117 let req = make_request(
6119 "CreateGlobalTable",
6120 json!({
6121 "GlobalTableName": "my-global",
6122 "ReplicationGroup": [
6123 { "RegionName": "us-east-1" },
6124 { "RegionName": "eu-west-1" }
6125 ]
6126 }),
6127 );
6128 let resp = svc.create_global_table(&req).unwrap();
6129 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6130 assert_eq!(
6131 body["GlobalTableDescription"]["GlobalTableStatus"],
6132 "ACTIVE"
6133 );
6134
6135 let req = make_request(
6137 "DescribeGlobalTable",
6138 json!({ "GlobalTableName": "my-global" }),
6139 );
6140 let resp = svc.describe_global_table(&req).unwrap();
6141 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6142 assert_eq!(
6143 body["GlobalTableDescription"]["ReplicationGroup"]
6144 .as_array()
6145 .unwrap()
6146 .len(),
6147 2
6148 );
6149
6150 let req = make_request("ListGlobalTables", json!({}));
6152 let resp = svc.list_global_tables(&req).unwrap();
6153 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6154 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
6155
6156 let req = make_request(
6158 "UpdateGlobalTable",
6159 json!({
6160 "GlobalTableName": "my-global",
6161 "ReplicaUpdates": [
6162 { "Create": { "RegionName": "ap-southeast-1" } }
6163 ]
6164 }),
6165 );
6166 let resp = svc.update_global_table(&req).unwrap();
6167 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6168 assert_eq!(
6169 body["GlobalTableDescription"]["ReplicationGroup"]
6170 .as_array()
6171 .unwrap()
6172 .len(),
6173 3
6174 );
6175
6176 let req = make_request(
6178 "DescribeGlobalTableSettings",
6179 json!({ "GlobalTableName": "my-global" }),
6180 );
6181 let resp = svc.describe_global_table_settings(&req).unwrap();
6182 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6183 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
6184
6185 let req = make_request(
6187 "UpdateGlobalTableSettings",
6188 json!({ "GlobalTableName": "my-global" }),
6189 );
6190 svc.update_global_table_settings(&req).unwrap();
6191 }
6192
6193 #[test]
6194 fn table_replica_auto_scaling() {
6195 let svc = make_service();
6196 create_test_table(&svc);
6197
6198 let req = make_request(
6199 "DescribeTableReplicaAutoScaling",
6200 json!({ "TableName": "test-table" }),
6201 );
6202 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
6203 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6204 assert_eq!(
6205 body["TableAutoScalingDescription"]["TableName"],
6206 "test-table"
6207 );
6208
6209 let req = make_request(
6210 "UpdateTableReplicaAutoScaling",
6211 json!({ "TableName": "test-table" }),
6212 );
6213 svc.update_table_replica_auto_scaling(&req).unwrap();
6214 }
6215
6216 #[test]
6217 fn kinesis_streaming_lifecycle() {
6218 let svc = make_service();
6219 create_test_table(&svc);
6220
6221 let req = make_request(
6223 "EnableKinesisStreamingDestination",
6224 json!({
6225 "TableName": "test-table",
6226 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6227 }),
6228 );
6229 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
6230 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6231 assert_eq!(body["DestinationStatus"], "ACTIVE");
6232
6233 let req = make_request(
6235 "DescribeKinesisStreamingDestination",
6236 json!({ "TableName": "test-table" }),
6237 );
6238 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
6239 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6240 assert_eq!(
6241 body["KinesisDataStreamDestinations"]
6242 .as_array()
6243 .unwrap()
6244 .len(),
6245 1
6246 );
6247
6248 let req = make_request(
6250 "UpdateKinesisStreamingDestination",
6251 json!({
6252 "TableName": "test-table",
6253 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
6254 "UpdateKinesisStreamingConfiguration": {
6255 "ApproximateCreationDateTimePrecision": "MICROSECOND"
6256 }
6257 }),
6258 );
6259 svc.update_kinesis_streaming_destination(&req).unwrap();
6260
6261 let req = make_request(
6263 "DisableKinesisStreamingDestination",
6264 json!({
6265 "TableName": "test-table",
6266 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
6267 }),
6268 );
6269 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
6270 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6271 assert_eq!(body["DestinationStatus"], "DISABLED");
6272 }
6273
6274 #[test]
6275 fn contributor_insights_lifecycle() {
6276 let svc = make_service();
6277 create_test_table(&svc);
6278
6279 let req = make_request(
6281 "DescribeContributorInsights",
6282 json!({ "TableName": "test-table" }),
6283 );
6284 let resp = svc.describe_contributor_insights(&req).unwrap();
6285 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6286 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6287
6288 let req = make_request(
6290 "UpdateContributorInsights",
6291 json!({
6292 "TableName": "test-table",
6293 "ContributorInsightsAction": "ENABLE"
6294 }),
6295 );
6296 let resp = svc.update_contributor_insights(&req).unwrap();
6297 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6298 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6299
6300 let req = make_request("ListContributorInsights", json!({}));
6302 let resp = svc.list_contributor_insights(&req).unwrap();
6303 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6304 assert_eq!(
6305 body["ContributorInsightsSummaries"]
6306 .as_array()
6307 .unwrap()
6308 .len(),
6309 1
6310 );
6311 }
6312
6313 #[test]
6314 fn export_lifecycle() {
6315 let svc = make_service();
6316 create_test_table(&svc);
6317
6318 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
6319
6320 let req = make_request(
6322 "ExportTableToPointInTime",
6323 json!({
6324 "TableArn": table_arn,
6325 "S3Bucket": "my-bucket"
6326 }),
6327 );
6328 let resp = svc.export_table_to_point_in_time(&req).unwrap();
6329 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6330 let export_arn = body["ExportDescription"]["ExportArn"]
6331 .as_str()
6332 .unwrap()
6333 .to_string();
6334 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6335
6336 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6338 let resp = svc.describe_export(&req).unwrap();
6339 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6340 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6341
6342 let req = make_request("ListExports", json!({}));
6344 let resp = svc.list_exports(&req).unwrap();
6345 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6346 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6347 }
6348
6349 #[test]
6350 fn import_lifecycle() {
6351 let svc = make_service();
6352
6353 let req = make_request(
6354 "ImportTable",
6355 json!({
6356 "InputFormat": "DYNAMODB_JSON",
6357 "S3BucketSource": { "S3Bucket": "import-bucket" },
6358 "TableCreationParameters": {
6359 "TableName": "imported-table",
6360 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6361 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6362 }
6363 }),
6364 );
6365 let resp = svc.import_table(&req).unwrap();
6366 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6367 let import_arn = body["ImportTableDescription"]["ImportArn"]
6368 .as_str()
6369 .unwrap()
6370 .to_string();
6371 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6372
6373 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6375 let resp = svc.describe_import(&req).unwrap();
6376 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6377 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6378
6379 let req = make_request("ListImports", json!({}));
6381 let resp = svc.list_imports(&req).unwrap();
6382 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6383 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6384
6385 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6387 let resp = svc.describe_table(&req).unwrap();
6388 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6389 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6390 }
6391
6392 #[test]
6393 fn backup_restore_preserves_items() {
6394 let svc = make_service();
6395 create_test_table(&svc);
6396
6397 for i in 1..=3 {
6399 let req = make_request(
6400 "PutItem",
6401 json!({
6402 "TableName": "test-table",
6403 "Item": {
6404 "pk": { "S": format!("key{i}") },
6405 "data": { "S": format!("value{i}") }
6406 }
6407 }),
6408 );
6409 svc.put_item(&req).unwrap();
6410 }
6411
6412 let req = make_request(
6414 "CreateBackup",
6415 json!({
6416 "TableName": "test-table",
6417 "BackupName": "my-backup"
6418 }),
6419 );
6420 let resp = svc.create_backup(&req).unwrap();
6421 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6422 let backup_arn = body["BackupDetails"]["BackupArn"]
6423 .as_str()
6424 .unwrap()
6425 .to_string();
6426
6427 for i in 1..=3 {
6429 let req = make_request(
6430 "DeleteItem",
6431 json!({
6432 "TableName": "test-table",
6433 "Key": { "pk": { "S": format!("key{i}") } }
6434 }),
6435 );
6436 svc.delete_item(&req).unwrap();
6437 }
6438
6439 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6441 let resp = svc.scan(&req).unwrap();
6442 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6443 assert_eq!(body["Count"], 0);
6444
6445 let req = make_request(
6447 "RestoreTableFromBackup",
6448 json!({
6449 "BackupArn": backup_arn,
6450 "TargetTableName": "restored-table"
6451 }),
6452 );
6453 svc.restore_table_from_backup(&req).unwrap();
6454
6455 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6457 let resp = svc.scan(&req).unwrap();
6458 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6459 assert_eq!(body["Count"], 3);
6460 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6461 }
6462
6463 #[test]
6464 fn global_table_replicates_writes() {
6465 let svc = make_service();
6466 create_test_table(&svc);
6467
6468 let req = make_request(
6470 "CreateGlobalTable",
6471 json!({
6472 "GlobalTableName": "test-table",
6473 "ReplicationGroup": [
6474 { "RegionName": "us-east-1" },
6475 { "RegionName": "eu-west-1" }
6476 ]
6477 }),
6478 );
6479 let resp = svc.create_global_table(&req).unwrap();
6480 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6481 assert_eq!(
6482 body["GlobalTableDescription"]["GlobalTableStatus"],
6483 "ACTIVE"
6484 );
6485
6486 let req = make_request(
6488 "PutItem",
6489 json!({
6490 "TableName": "test-table",
6491 "Item": {
6492 "pk": { "S": "replicated-key" },
6493 "data": { "S": "replicated-value" }
6494 }
6495 }),
6496 );
6497 svc.put_item(&req).unwrap();
6498
6499 let req = make_request(
6501 "GetItem",
6502 json!({
6503 "TableName": "test-table",
6504 "Key": { "pk": { "S": "replicated-key" } }
6505 }),
6506 );
6507 let resp = svc.get_item(&req).unwrap();
6508 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6509 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6510 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6511 }
6512
6513 #[test]
6514 fn contributor_insights_tracks_access() {
6515 let svc = make_service();
6516 create_test_table(&svc);
6517
6518 let req = make_request(
6520 "UpdateContributorInsights",
6521 json!({
6522 "TableName": "test-table",
6523 "ContributorInsightsAction": "ENABLE"
6524 }),
6525 );
6526 svc.update_contributor_insights(&req).unwrap();
6527
6528 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6530 let req = make_request(
6531 "PutItem",
6532 json!({
6533 "TableName": "test-table",
6534 "Item": {
6535 "pk": { "S": key },
6536 "data": { "S": "value" }
6537 }
6538 }),
6539 );
6540 svc.put_item(&req).unwrap();
6541 }
6542
6543 for _ in 0..3 {
6545 let req = make_request(
6546 "GetItem",
6547 json!({
6548 "TableName": "test-table",
6549 "Key": { "pk": { "S": "alpha" } }
6550 }),
6551 );
6552 svc.get_item(&req).unwrap();
6553 }
6554
6555 let req = make_request(
6557 "DescribeContributorInsights",
6558 json!({ "TableName": "test-table" }),
6559 );
6560 let resp = svc.describe_contributor_insights(&req).unwrap();
6561 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6562 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6563
6564 let contributors = body["TopContributors"].as_array().unwrap();
6565 assert!(
6566 !contributors.is_empty(),
6567 "TopContributors should not be empty"
6568 );
6569
6570 let top = &contributors[0];
6573 assert!(top["Count"].as_u64().unwrap() > 0);
6574
6575 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6577 assert!(!rules.is_empty());
6578 }
6579
6580 #[test]
6581 fn contributor_insights_not_tracked_when_disabled() {
6582 let svc = make_service();
6583 create_test_table(&svc);
6584
6585 let req = make_request(
6587 "PutItem",
6588 json!({
6589 "TableName": "test-table",
6590 "Item": {
6591 "pk": { "S": "key1" },
6592 "data": { "S": "value" }
6593 }
6594 }),
6595 );
6596 svc.put_item(&req).unwrap();
6597
6598 let req = make_request(
6600 "DescribeContributorInsights",
6601 json!({ "TableName": "test-table" }),
6602 );
6603 let resp = svc.describe_contributor_insights(&req).unwrap();
6604 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6605 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6606
6607 let contributors = body["TopContributors"].as_array().unwrap();
6608 assert!(contributors.is_empty());
6609 }
6610
6611 #[test]
6612 fn contributor_insights_disabled_table_no_counters_after_scan() {
6613 let svc = make_service();
6614 create_test_table(&svc);
6615
6616 for key in &["alpha", "beta"] {
6618 let req = make_request(
6619 "PutItem",
6620 json!({
6621 "TableName": "test-table",
6622 "Item": { "pk": { "S": key } }
6623 }),
6624 );
6625 svc.put_item(&req).unwrap();
6626 }
6627
6628 let req = make_request(
6630 "UpdateContributorInsights",
6631 json!({
6632 "TableName": "test-table",
6633 "ContributorInsightsAction": "ENABLE"
6634 }),
6635 );
6636 svc.update_contributor_insights(&req).unwrap();
6637
6638 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6640 svc.scan(&req).unwrap();
6641
6642 let req = make_request(
6644 "DescribeContributorInsights",
6645 json!({ "TableName": "test-table" }),
6646 );
6647 let resp = svc.describe_contributor_insights(&req).unwrap();
6648 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6649 let contributors = body["TopContributors"].as_array().unwrap();
6650 assert!(
6651 !contributors.is_empty(),
6652 "counters should be non-empty while enabled"
6653 );
6654
6655 let req = make_request(
6657 "UpdateContributorInsights",
6658 json!({
6659 "TableName": "test-table",
6660 "ContributorInsightsAction": "DISABLE"
6661 }),
6662 );
6663 svc.update_contributor_insights(&req).unwrap();
6664
6665 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6667 svc.scan(&req).unwrap();
6668
6669 let req = make_request(
6671 "DescribeContributorInsights",
6672 json!({ "TableName": "test-table" }),
6673 );
6674 let resp = svc.describe_contributor_insights(&req).unwrap();
6675 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6676 let contributors = body["TopContributors"].as_array().unwrap();
6677 assert!(
6678 contributors.is_empty(),
6679 "counters should be empty after disabling insights"
6680 );
6681 }
6682
6683 #[test]
6684 fn scan_pagination_with_limit() {
6685 let svc = make_service();
6686 create_test_table(&svc);
6687
6688 for i in 0..5 {
6690 let req = make_request(
6691 "PutItem",
6692 json!({
6693 "TableName": "test-table",
6694 "Item": {
6695 "pk": { "S": format!("item{i}") },
6696 "data": { "S": format!("value{i}") }
6697 }
6698 }),
6699 );
6700 svc.put_item(&req).unwrap();
6701 }
6702
6703 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6705 let resp = svc.scan(&req).unwrap();
6706 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6707 assert_eq!(body["Count"], 2);
6708 assert!(
6709 body["LastEvaluatedKey"].is_object(),
6710 "should have LastEvaluatedKey when limit < total items"
6711 );
6712 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6713
6714 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6716 let mut lek = body["LastEvaluatedKey"].clone();
6717
6718 while lek.is_object() {
6719 let req = make_request(
6720 "Scan",
6721 json!({
6722 "TableName": "test-table",
6723 "Limit": 2,
6724 "ExclusiveStartKey": lek
6725 }),
6726 );
6727 let resp = svc.scan(&req).unwrap();
6728 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6729 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6730 lek = body["LastEvaluatedKey"].clone();
6731 }
6732
6733 assert_eq!(
6734 all_items.len(),
6735 5,
6736 "should retrieve all 5 items via pagination"
6737 );
6738 }
6739
6740 #[test]
6741 fn scan_no_pagination_when_all_fit() {
6742 let svc = make_service();
6743 create_test_table(&svc);
6744
6745 for i in 0..3 {
6746 let req = make_request(
6747 "PutItem",
6748 json!({
6749 "TableName": "test-table",
6750 "Item": {
6751 "pk": { "S": format!("item{i}") }
6752 }
6753 }),
6754 );
6755 svc.put_item(&req).unwrap();
6756 }
6757
6758 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6760 let resp = svc.scan(&req).unwrap();
6761 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6762 assert_eq!(body["Count"], 3);
6763 assert!(
6764 body["LastEvaluatedKey"].is_null(),
6765 "should not have LastEvaluatedKey when all items fit"
6766 );
6767
6768 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6770 let resp = svc.scan(&req).unwrap();
6771 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6772 assert_eq!(body["Count"], 3);
6773 assert!(body["LastEvaluatedKey"].is_null());
6774 }
6775
6776 fn create_composite_table(svc: &DynamoDbService) {
6777 let req = make_request(
6778 "CreateTable",
6779 json!({
6780 "TableName": "composite-table",
6781 "KeySchema": [
6782 { "AttributeName": "pk", "KeyType": "HASH" },
6783 { "AttributeName": "sk", "KeyType": "RANGE" }
6784 ],
6785 "AttributeDefinitions": [
6786 { "AttributeName": "pk", "AttributeType": "S" },
6787 { "AttributeName": "sk", "AttributeType": "S" }
6788 ],
6789 "BillingMode": "PAY_PER_REQUEST"
6790 }),
6791 );
6792 svc.create_table(&req).unwrap();
6793 }
6794
6795 #[test]
6796 fn query_pagination_with_composite_key() {
6797 let svc = make_service();
6798 create_composite_table(&svc);
6799
6800 for i in 0..5 {
6802 let req = make_request(
6803 "PutItem",
6804 json!({
6805 "TableName": "composite-table",
6806 "Item": {
6807 "pk": { "S": "user1" },
6808 "sk": { "S": format!("item{i:03}") },
6809 "data": { "S": format!("value{i}") }
6810 }
6811 }),
6812 );
6813 svc.put_item(&req).unwrap();
6814 }
6815
6816 let req = make_request(
6818 "Query",
6819 json!({
6820 "TableName": "composite-table",
6821 "KeyConditionExpression": "pk = :pk",
6822 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6823 "Limit": 2
6824 }),
6825 );
6826 let resp = svc.query(&req).unwrap();
6827 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6828 assert_eq!(body["Count"], 2);
6829 assert!(body["LastEvaluatedKey"].is_object());
6830 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6831 assert!(body["LastEvaluatedKey"]["sk"].is_object());
6832
6833 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6835 let mut lek = body["LastEvaluatedKey"].clone();
6836
6837 while lek.is_object() {
6838 let req = make_request(
6839 "Query",
6840 json!({
6841 "TableName": "composite-table",
6842 "KeyConditionExpression": "pk = :pk",
6843 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6844 "Limit": 2,
6845 "ExclusiveStartKey": lek
6846 }),
6847 );
6848 let resp = svc.query(&req).unwrap();
6849 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6850 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6851 lek = body["LastEvaluatedKey"].clone();
6852 }
6853
6854 assert_eq!(
6855 all_items.len(),
6856 5,
6857 "should retrieve all 5 items via pagination"
6858 );
6859
6860 let sks: Vec<String> = all_items
6862 .iter()
6863 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6864 .collect();
6865 let mut sorted = sks.clone();
6866 sorted.sort();
6867 assert_eq!(sks, sorted, "items should be sorted by sort key");
6868 }
6869
6870 #[test]
6871 fn query_no_pagination_when_all_fit() {
6872 let svc = make_service();
6873 create_composite_table(&svc);
6874
6875 for i in 0..2 {
6876 let req = make_request(
6877 "PutItem",
6878 json!({
6879 "TableName": "composite-table",
6880 "Item": {
6881 "pk": { "S": "user1" },
6882 "sk": { "S": format!("item{i}") }
6883 }
6884 }),
6885 );
6886 svc.put_item(&req).unwrap();
6887 }
6888
6889 let req = make_request(
6890 "Query",
6891 json!({
6892 "TableName": "composite-table",
6893 "KeyConditionExpression": "pk = :pk",
6894 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6895 "Limit": 10
6896 }),
6897 );
6898 let resp = svc.query(&req).unwrap();
6899 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6900 assert_eq!(body["Count"], 2);
6901 assert!(
6902 body["LastEvaluatedKey"].is_null(),
6903 "should not have LastEvaluatedKey when all items fit"
6904 );
6905 }
6906
6907 fn create_gsi_table(svc: &DynamoDbService) {
6908 let req = make_request(
6909 "CreateTable",
6910 json!({
6911 "TableName": "gsi-table",
6912 "KeySchema": [
6913 { "AttributeName": "pk", "KeyType": "HASH" }
6914 ],
6915 "AttributeDefinitions": [
6916 { "AttributeName": "pk", "AttributeType": "S" },
6917 { "AttributeName": "gsi_pk", "AttributeType": "S" },
6918 { "AttributeName": "gsi_sk", "AttributeType": "S" }
6919 ],
6920 "BillingMode": "PAY_PER_REQUEST",
6921 "GlobalSecondaryIndexes": [
6922 {
6923 "IndexName": "gsi-index",
6924 "KeySchema": [
6925 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6926 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6927 ],
6928 "Projection": { "ProjectionType": "ALL" }
6929 }
6930 ]
6931 }),
6932 );
6933 svc.create_table(&req).unwrap();
6934 }
6935
6936 #[test]
6937 fn gsi_query_last_evaluated_key_includes_table_pk() {
6938 let svc = make_service();
6939 create_gsi_table(&svc);
6940
6941 for i in 0..3 {
6943 let req = make_request(
6944 "PutItem",
6945 json!({
6946 "TableName": "gsi-table",
6947 "Item": {
6948 "pk": { "S": format!("item{i}") },
6949 "gsi_pk": { "S": "shared" },
6950 "gsi_sk": { "S": "sort" }
6951 }
6952 }),
6953 );
6954 svc.put_item(&req).unwrap();
6955 }
6956
6957 let req = make_request(
6959 "Query",
6960 json!({
6961 "TableName": "gsi-table",
6962 "IndexName": "gsi-index",
6963 "KeyConditionExpression": "gsi_pk = :v",
6964 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6965 "Limit": 1
6966 }),
6967 );
6968 let resp = svc.query(&req).unwrap();
6969 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6970 assert_eq!(body["Count"], 1);
6971 let lek = &body["LastEvaluatedKey"];
6972 assert!(lek.is_object(), "should have LastEvaluatedKey");
6973 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6975 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6976 assert!(
6978 lek["pk"].is_object(),
6979 "LEK must contain table PK for GSI queries"
6980 );
6981 }
6982
6983 #[test]
6984 fn gsi_query_pagination_returns_all_items() {
6985 let svc = make_service();
6986 create_gsi_table(&svc);
6987
6988 for i in 0..4 {
6990 let req = make_request(
6991 "PutItem",
6992 json!({
6993 "TableName": "gsi-table",
6994 "Item": {
6995 "pk": { "S": format!("item{i:03}") },
6996 "gsi_pk": { "S": "shared" },
6997 "gsi_sk": { "S": "sort" }
6998 }
6999 }),
7000 );
7001 svc.put_item(&req).unwrap();
7002 }
7003
7004 let mut all_pks = Vec::new();
7006 let mut lek: Option<Value> = None;
7007
7008 loop {
7009 let mut query = json!({
7010 "TableName": "gsi-table",
7011 "IndexName": "gsi-index",
7012 "KeyConditionExpression": "gsi_pk = :v",
7013 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
7014 "Limit": 2
7015 });
7016 if let Some(ref start_key) = lek {
7017 query["ExclusiveStartKey"] = start_key.clone();
7018 }
7019
7020 let req = make_request("Query", query);
7021 let resp = svc.query(&req).unwrap();
7022 let body: Value = serde_json::from_slice(&resp.body).unwrap();
7023
7024 for item in body["Items"].as_array().unwrap() {
7025 let pk = item["pk"]["S"].as_str().unwrap().to_string();
7026 all_pks.push(pk);
7027 }
7028
7029 if body["LastEvaluatedKey"].is_object() {
7030 lek = Some(body["LastEvaluatedKey"].clone());
7031 } else {
7032 break;
7033 }
7034 }
7035
7036 all_pks.sort();
7037 assert_eq!(
7038 all_pks,
7039 vec!["item000", "item001", "item002", "item003"],
7040 "pagination should return all items without duplicates"
7041 );
7042 }
7043
7044 fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
7045 pairs
7046 .iter()
7047 .map(|(k, v)| (k.to_string(), json!({"S": v})))
7048 .collect()
7049 }
7050
7051 fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
7052 pairs
7053 .iter()
7054 .map(|(k, v)| (k.to_string(), v.to_string()))
7055 .collect()
7056 }
7057
7058 fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
7059 pairs
7060 .iter()
7061 .map(|(k, v)| (k.to_string(), json!({"S": v})))
7062 .collect()
7063 }
7064
7065 #[test]
7066 fn test_evaluate_condition_bare_not_equal() {
7067 let item = cond_item(&[("state", "active")]);
7068 let names = cond_names(&[("#s", "state")]);
7069 let values = cond_values(&[(":c", "complete")]);
7070
7071 assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
7072
7073 let item2 = cond_item(&[("state", "complete")]);
7074 assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
7075 }
7076
7077 #[test]
7078 fn test_evaluate_condition_parenthesized_not_equal() {
7079 let item = cond_item(&[("state", "active")]);
7080 let names = cond_names(&[("#s", "state")]);
7081 let values = cond_values(&[(":c", "complete")]);
7082
7083 assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
7084 }
7085
7086 #[test]
7087 fn test_evaluate_condition_parenthesized_equal_mismatch() {
7088 let item = cond_item(&[("state", "active")]);
7089 let names = cond_names(&[("#s", "state")]);
7090 let values = cond_values(&[(":c", "complete")]);
7091
7092 assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
7093 }
7094
7095 #[test]
7096 fn test_evaluate_condition_compound_and() {
7097 let item = cond_item(&[("state", "active")]);
7098 let names = cond_names(&[("#s", "state")]);
7099 let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
7100
7101 assert!(
7103 evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
7104 );
7105 }
7106
7107 #[test]
7108 fn test_evaluate_condition_compound_and_mismatch() {
7109 let item = cond_item(&[("state", "inactive")]);
7110 let names = cond_names(&[("#s", "state")]);
7111 let values = cond_values(&[(":a", "active"), (":b", "active")]);
7112
7113 assert!(
7115 evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
7116 );
7117 }
7118
7119 #[test]
7120 fn test_evaluate_condition_compound_or() {
7121 let item = cond_item(&[("state", "running")]);
7122 let names = cond_names(&[("#s", "state")]);
7123 let values = cond_values(&[(":a", "active"), (":b", "idle")]);
7124
7125 assert!(
7127 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
7128 );
7129
7130 let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
7132 assert!(
7133 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
7134 );
7135 }
7136
7137 #[test]
7138 fn test_evaluate_condition_not_operator() {
7139 let item = cond_item(&[("state", "active")]);
7140 let names = cond_names(&[("#s", "state")]);
7141 let values = cond_values(&[(":c", "complete")]);
7142
7143 assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
7145
7146 assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
7148
7149 assert!(
7151 evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
7152 );
7153
7154 assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
7156 }
7157
7158 #[test]
7159 fn test_evaluate_condition_begins_with() {
7160 let item = cond_item(&[("name", "fakecloud-dynamodb")]);
7163 let names = cond_names(&[("#n", "name")]);
7164 let values = cond_values(&[(":p", "fakecloud")]);
7165
7166 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
7167
7168 let values2 = cond_values(&[(":p", "realcloud")]);
7169 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
7170 }
7171
7172 #[test]
7173 fn test_evaluate_condition_contains() {
7174 let item = cond_item(&[("tags", "alpha,beta,gamma")]);
7175 let names = cond_names(&[("#t", "tags")]);
7176 let values = cond_values(&[(":v", "beta")]);
7177
7178 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
7179
7180 let values2 = cond_values(&[(":v", "delta")]);
7181 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
7182 }
7183
7184 #[test]
7185 fn test_evaluate_condition_no_existing_item() {
7186 let names = cond_names(&[("#s", "state")]);
7189 let values = cond_values(&[(":v", "active")]);
7190
7191 assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
7192 assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
7193 assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
7195 assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
7197 }
7198
7199 #[test]
7200 fn test_evaluate_filter_not_operator() {
7201 let item = cond_item(&[("status", "pending")]);
7202 let names = cond_names(&[("#s", "status")]);
7203 let values = cond_values(&[(":v", "pending")]);
7204
7205 assert!(!evaluate_filter_expression(
7206 "NOT (#s = :v)",
7207 &item,
7208 &names,
7209 &values
7210 ));
7211 assert!(evaluate_filter_expression(
7212 "NOT (#s <> :v)",
7213 &item,
7214 &names,
7215 &values
7216 ));
7217 }
7218
7219 #[test]
7220 fn test_evaluate_filter_expression_in_match() {
7221 let item = cond_item(&[("state", "active")]);
7227 let names = cond_names(&[("#s", "state")]);
7228 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7229
7230 assert!(
7231 evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7232 "state=active should match IN (active, pending)"
7233 );
7234 }
7235
7236 #[test]
7237 fn test_evaluate_filter_expression_in_no_match() {
7238 let item = cond_item(&[("state", "complete")]);
7239 let names = cond_names(&[("#s", "state")]);
7240 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7241
7242 assert!(
7243 !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
7244 "state=complete should not match IN (active, pending)"
7245 );
7246 }
7247
7248 #[test]
7249 fn test_evaluate_filter_expression_in_no_spaces() {
7250 let item = cond_item(&[("status", "shipped")]);
7254 let names = cond_names(&[("#s", "status")]);
7255 let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
7256
7257 assert!(
7258 evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
7259 "no-space IN list should still parse"
7260 );
7261 }
7262
7263 #[test]
7264 fn test_evaluate_filter_expression_in_missing_attribute() {
7265 let item: HashMap<String, AttributeValue> = HashMap::new();
7268 let names = cond_names(&[("#s", "state")]);
7269 let values = cond_values(&[(":a", "active")]);
7270
7271 assert!(
7272 !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
7273 "missing attribute should not match any IN list"
7274 );
7275 }
7276
7277 #[test]
7278 fn test_evaluate_filter_expression_compound_in_and_eq() {
7279 let item = cond_item(&[("state", "active"), ("priority", "high")]);
7285 let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
7286 let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
7287
7288 assert!(
7289 evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
7290 "(active IN (active, pending)) AND (high = high) should match"
7291 );
7292
7293 let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
7294 assert!(
7295 !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
7296 "(complete IN (active, pending)) AND (high = high) should not match"
7297 );
7298 }
7299
7300 #[test]
7301 fn test_evaluate_condition_attribute_exists_with_space() {
7302 let item = cond_item(&[("store_id", "s-1")]);
7310 let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
7311 let values = cond_values(&[(":0", "tab-A")]);
7312
7313 assert!(
7316 evaluate_condition(
7317 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7318 Some(&item),
7319 &names,
7320 &values,
7321 )
7322 .is_ok(),
7323 "claim-lease compound on free item should succeed"
7324 );
7325
7326 assert!(
7328 evaluate_condition(
7329 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7330 None,
7331 &names,
7332 &values,
7333 )
7334 .is_err(),
7335 "claim-lease compound on missing item must fail attribute_exists branch"
7336 );
7337
7338 let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
7341 assert!(
7342 evaluate_condition(
7343 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7344 Some(&held),
7345 &names,
7346 &values,
7347 )
7348 .is_err(),
7349 "claim-lease compound on item held by another tab must fail"
7350 );
7351
7352 let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
7355 assert!(
7356 evaluate_condition(
7357 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
7358 Some(&self_held),
7359 &names,
7360 &values,
7361 )
7362 .is_ok(),
7363 "same-tab re-claim must succeed"
7364 );
7365 }
7366
7367 #[test]
7368 fn test_evaluate_condition_in_match() {
7369 let item = cond_item(&[("state", "active")]);
7372 let names = cond_names(&[("#s", "state")]);
7373 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7374
7375 assert!(
7376 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
7377 "IN should succeed when actual value is in the list"
7378 );
7379 }
7380
7381 #[test]
7382 fn test_evaluate_condition_in_no_match() {
7383 let item = cond_item(&[("state", "complete")]);
7387 let names = cond_names(&[("#s", "state")]);
7388 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
7389
7390 assert!(
7391 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
7392 "IN should fail when actual value is not in the list"
7393 );
7394 }
7395
7396 #[test]
7397 fn test_apply_update_set_list_index_replaces_existing() {
7398 let mut item = HashMap::new();
7405 item.insert(
7406 "items".to_string(),
7407 json!({"L": [
7408 {"M": {"sku": {"S": "OLD-A"}}},
7409 {"M": {"sku": {"S": "OLD-B"}}},
7410 ]}),
7411 );
7412
7413 let names = cond_names(&[("#items", "items")]);
7414 let mut values = HashMap::new();
7415 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
7416
7417 apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
7418
7419 let items_list = item
7420 .get("items")
7421 .and_then(|v| v.get("L"))
7422 .and_then(|v| v.as_array())
7423 .expect("items should still be a list");
7424 assert_eq!(items_list.len(), 2, "list length should be unchanged");
7425 let sku0 = items_list[0]
7426 .get("M")
7427 .and_then(|m| m.get("sku"))
7428 .and_then(|s| s.get("S"))
7429 .and_then(|s| s.as_str());
7430 assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
7431 let sku1 = items_list[1]
7432 .get("M")
7433 .and_then(|m| m.get("sku"))
7434 .and_then(|s| s.get("S"))
7435 .and_then(|s| s.as_str());
7436 assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
7437
7438 assert!(!item.contains_key("items[0]"));
7439 assert!(!item.contains_key("#items[0]"));
7440 }
7441
7442 #[test]
7443 fn test_apply_update_set_list_index_second_slot() {
7444 let mut item = HashMap::new();
7445 item.insert(
7446 "items".to_string(),
7447 json!({"L": [
7448 {"M": {"sku": {"S": "A"}}},
7449 {"M": {"sku": {"S": "B"}}},
7450 {"M": {"sku": {"S": "C"}}},
7451 ]}),
7452 );
7453
7454 let names = cond_names(&[("#items", "items")]);
7455 let mut values = HashMap::new();
7456 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
7457
7458 apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
7459
7460 let items_list = item
7461 .get("items")
7462 .and_then(|v| v.get("L"))
7463 .and_then(|v| v.as_array())
7464 .unwrap();
7465 let skus: Vec<&str> = items_list
7466 .iter()
7467 .map(|v| {
7468 v.get("M")
7469 .and_then(|m| m.get("sku"))
7470 .and_then(|s| s.get("S"))
7471 .and_then(|s| s.as_str())
7472 .unwrap()
7473 })
7474 .collect();
7475 assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
7476 }
7477
7478 #[test]
7479 fn test_apply_update_set_list_index_without_name_ref() {
7480 let mut item = HashMap::new();
7483 item.insert(
7484 "tags".to_string(),
7485 json!({"L": [{"S": "red"}, {"S": "blue"}]}),
7486 );
7487
7488 let names: HashMap<String, String> = HashMap::new();
7489 let mut values = HashMap::new();
7490 values.insert(":t".to_string(), json!({"S": "green"}));
7491
7492 apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
7493
7494 let tags = item
7495 .get("tags")
7496 .and_then(|v| v.get("L"))
7497 .and_then(|v| v.as_array())
7498 .unwrap();
7499 assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
7500 assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
7501 }
7502
7503 #[test]
7504 fn test_unrecognized_expression_returns_false() {
7505 let item = cond_item(&[("x", "1")]);
7508 let names: HashMap<String, String> = HashMap::new();
7509 let values: HashMap<String, Value> = HashMap::new();
7510
7511 assert!(
7512 !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
7513 "unrecognized expression must return false"
7514 );
7515 }
7516
7517 #[test]
7518 fn test_set_list_index_out_of_range_returns_error() {
7519 let mut item = HashMap::new();
7522 item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
7523
7524 let names: HashMap<String, String> = HashMap::new();
7525 let mut values = HashMap::new();
7526 values.insert(":v".to_string(), json!({"S": "z"}));
7527
7528 let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
7529 assert!(
7530 result.is_err(),
7531 "out-of-range list index must return an error"
7532 );
7533
7534 let list = item
7536 .get("items")
7537 .and_then(|v| v.get("L"))
7538 .and_then(|v| v.as_array())
7539 .unwrap();
7540 assert_eq!(list.len(), 2);
7541 }
7542
7543 #[test]
7544 fn test_set_list_index_on_non_list_returns_error() {
7545 let mut item = HashMap::new();
7548 item.insert("name".to_string(), json!({"S": "hello"}));
7549
7550 let names: HashMap<String, String> = HashMap::new();
7551 let mut values = HashMap::new();
7552 values.insert(":v".to_string(), json!({"S": "z"}));
7553
7554 let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
7555 assert!(
7556 result.is_err(),
7557 "list index on non-list attribute must return an error"
7558 );
7559 }
7560
7561 #[test]
7562 fn test_unrecognized_update_action_returns_error() {
7563 let mut item = HashMap::new();
7564 item.insert("name".to_string(), json!({"S": "hello"}));
7565
7566 let names: HashMap<String, String> = HashMap::new();
7567 let mut values = HashMap::new();
7568 values.insert(":bar".to_string(), json!({"S": "baz"}));
7569
7570 let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
7571 assert!(
7572 result.is_err(),
7573 "unrecognized UpdateExpression action must return an error"
7574 );
7575 let err_msg = format!("{}", result.unwrap_err());
7576 assert!(
7577 err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
7578 "error should mention Invalid UpdateExpression, got: {err_msg}"
7579 );
7580 }
7581
7582 #[test]
7585 fn test_size_string() {
7586 let mut item = HashMap::new();
7587 item.insert("name".to_string(), json!({"S": "hello"}));
7588 let names = HashMap::new();
7589 let mut values = HashMap::new();
7590 values.insert(":limit".to_string(), json!({"N": "5"}));
7591
7592 assert!(evaluate_single_filter_condition(
7593 "size(name) = :limit",
7594 &item,
7595 &names,
7596 &values,
7597 ));
7598 values.insert(":limit".to_string(), json!({"N": "4"}));
7599 assert!(evaluate_single_filter_condition(
7600 "size(name) > :limit",
7601 &item,
7602 &names,
7603 &values,
7604 ));
7605 }
7606
7607 #[test]
7608 fn test_size_list() {
7609 let mut item = HashMap::new();
7610 item.insert(
7611 "items".to_string(),
7612 json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
7613 );
7614 let names = HashMap::new();
7615 let mut values = HashMap::new();
7616 values.insert(":limit".to_string(), json!({"N": "3"}));
7617
7618 assert!(evaluate_single_filter_condition(
7619 "size(items) = :limit",
7620 &item,
7621 &names,
7622 &values,
7623 ));
7624 }
7625
7626 #[test]
7627 fn test_size_map() {
7628 let mut item = HashMap::new();
7629 item.insert(
7630 "data".to_string(),
7631 json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
7632 );
7633 let names = HashMap::new();
7634 let mut values = HashMap::new();
7635 values.insert(":limit".to_string(), json!({"N": "2"}));
7636
7637 assert!(evaluate_single_filter_condition(
7638 "size(data) = :limit",
7639 &item,
7640 &names,
7641 &values,
7642 ));
7643 }
7644
7645 #[test]
7646 fn test_size_set() {
7647 let mut item = HashMap::new();
7648 item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
7649 let names = HashMap::new();
7650 let mut values = HashMap::new();
7651 values.insert(":limit".to_string(), json!({"N": "3"}));
7652
7653 assert!(evaluate_single_filter_condition(
7654 "size(tags) > :limit",
7655 &item,
7656 &names,
7657 &values,
7658 ));
7659 }
7660
7661 #[test]
7664 fn test_attribute_type_string() {
7665 let mut item = HashMap::new();
7666 item.insert("name".to_string(), json!({"S": "hello"}));
7667 let names = HashMap::new();
7668 let mut values = HashMap::new();
7669 values.insert(":t".to_string(), json!({"S": "S"}));
7670
7671 assert!(evaluate_single_filter_condition(
7672 "attribute_type(name, :t)",
7673 &item,
7674 &names,
7675 &values,
7676 ));
7677
7678 values.insert(":t".to_string(), json!({"S": "N"}));
7679 assert!(!evaluate_single_filter_condition(
7680 "attribute_type(name, :t)",
7681 &item,
7682 &names,
7683 &values,
7684 ));
7685 }
7686
7687 #[test]
7688 fn test_attribute_type_number() {
7689 let mut item = HashMap::new();
7690 item.insert("age".to_string(), json!({"N": "42"}));
7691 let names = HashMap::new();
7692 let mut values = HashMap::new();
7693 values.insert(":t".to_string(), json!({"S": "N"}));
7694
7695 assert!(evaluate_single_filter_condition(
7696 "attribute_type(age, :t)",
7697 &item,
7698 &names,
7699 &values,
7700 ));
7701 }
7702
7703 #[test]
7704 fn test_attribute_type_list() {
7705 let mut item = HashMap::new();
7706 item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
7707 let names = HashMap::new();
7708 let mut values = HashMap::new();
7709 values.insert(":t".to_string(), json!({"S": "L"}));
7710
7711 assert!(evaluate_single_filter_condition(
7712 "attribute_type(items, :t)",
7713 &item,
7714 &names,
7715 &values,
7716 ));
7717 }
7718
7719 #[test]
7720 fn test_attribute_type_map() {
7721 let mut item = HashMap::new();
7722 item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
7723 let names = HashMap::new();
7724 let mut values = HashMap::new();
7725 values.insert(":t".to_string(), json!({"S": "M"}));
7726
7727 assert!(evaluate_single_filter_condition(
7728 "attribute_type(data, :t)",
7729 &item,
7730 &names,
7731 &values,
7732 ));
7733 }
7734
7735 #[test]
7736 fn test_attribute_type_bool() {
7737 let mut item = HashMap::new();
7738 item.insert("active".to_string(), json!({"BOOL": true}));
7739 let names = HashMap::new();
7740 let mut values = HashMap::new();
7741 values.insert(":t".to_string(), json!({"S": "BOOL"}));
7742
7743 assert!(evaluate_single_filter_condition(
7744 "attribute_type(active, :t)",
7745 &item,
7746 &names,
7747 &values,
7748 ));
7749 }
7750
7751 #[test]
7754 fn test_begins_with_rejects_number_type() {
7755 let mut item = HashMap::new();
7756 item.insert("code".to_string(), json!({"N": "12345"}));
7757 let names = HashMap::new();
7758 let mut values = HashMap::new();
7759 values.insert(":prefix".to_string(), json!({"S": "123"}));
7760
7761 assert!(
7762 !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
7763 "begins_with must return false for N-type attributes"
7764 );
7765 }
7766
7767 #[test]
7768 fn test_begins_with_works_on_string_type() {
7769 let mut item = HashMap::new();
7770 item.insert("code".to_string(), json!({"S": "abc123"}));
7771 let names = HashMap::new();
7772 let mut values = HashMap::new();
7773 values.insert(":prefix".to_string(), json!({"S": "abc"}));
7774
7775 assert!(evaluate_single_filter_condition(
7776 "begins_with(code, :prefix)",
7777 &item,
7778 &names,
7779 &values,
7780 ));
7781 }
7782
7783 #[test]
7786 fn test_contains_string_set() {
7787 let mut item = HashMap::new();
7788 item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
7789 let names = HashMap::new();
7790 let mut values = HashMap::new();
7791 values.insert(":val".to_string(), json!({"S": "blue"}));
7792
7793 assert!(evaluate_single_filter_condition(
7794 "contains(tags, :val)",
7795 &item,
7796 &names,
7797 &values,
7798 ));
7799
7800 values.insert(":val".to_string(), json!({"S": "yellow"}));
7801 assert!(!evaluate_single_filter_condition(
7802 "contains(tags, :val)",
7803 &item,
7804 &names,
7805 &values,
7806 ));
7807 }
7808
7809 #[test]
7810 fn test_contains_number_set() {
7811 let mut item = HashMap::new();
7812 item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
7813 let names = HashMap::new();
7814 let mut values = HashMap::new();
7815 values.insert(":val".to_string(), json!({"N": "2"}));
7816
7817 assert!(evaluate_single_filter_condition(
7818 "contains(scores, :val)",
7819 &item,
7820 &names,
7821 &values,
7822 ));
7823 }
7824
7825 #[test]
7828 fn test_set_arithmetic_rejects_string_operand() {
7829 let mut item = HashMap::new();
7830 item.insert("name".to_string(), json!({"S": "hello"}));
7831 let names = HashMap::new();
7832 let mut values = HashMap::new();
7833 values.insert(":val".to_string(), json!({"N": "1"}));
7834
7835 let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
7836 assert!(
7837 result.is_err(),
7838 "arithmetic on S-type attribute must return a ValidationException"
7839 );
7840 }
7841
7842 #[test]
7843 fn test_set_arithmetic_rejects_string_value() {
7844 let mut item = HashMap::new();
7845 item.insert("count".to_string(), json!({"N": "5"}));
7846 let names = HashMap::new();
7847 let mut values = HashMap::new();
7848 values.insert(":val".to_string(), json!({"S": "notanumber"}));
7849
7850 let result =
7851 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7852 assert!(
7853 result.is_err(),
7854 "arithmetic with S-type value must return a ValidationException"
7855 );
7856 }
7857
7858 #[test]
7859 fn test_set_arithmetic_valid_numbers() {
7860 let mut item = HashMap::new();
7861 item.insert("count".to_string(), json!({"N": "10"}));
7862 let names = HashMap::new();
7863 let mut values = HashMap::new();
7864 values.insert(":val".to_string(), json!({"N": "3"}));
7865
7866 let result =
7867 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
7868 assert!(result.is_ok());
7869 assert_eq!(item["count"], json!({"N": "13"}));
7870 }
7871
7872 #[test]
7875 fn test_add_binary_set() {
7876 let mut item = HashMap::new();
7877 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
7878 let names = HashMap::new();
7879 let mut values = HashMap::new();
7880 values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
7881
7882 let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
7883 assert!(result.is_ok());
7884 let bs = item["data"]["BS"].as_array().unwrap();
7885 assert_eq!(bs.len(), 3, "should merge sets without duplicates");
7886 assert!(bs.contains(&json!("YQ==")));
7887 assert!(bs.contains(&json!("Yg==")));
7888 assert!(bs.contains(&json!("Yw==")));
7889 }
7890
7891 #[test]
7892 fn test_delete_binary_set() {
7893 let mut item = HashMap::new();
7894 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
7895 let names = HashMap::new();
7896 let mut values = HashMap::new();
7897 values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
7898
7899 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7900 assert!(result.is_ok());
7901 let bs = item["data"]["BS"].as_array().unwrap();
7902 assert_eq!(bs.len(), 2);
7903 assert!(!bs.contains(&json!("Yg==")));
7904 }
7905
7906 #[test]
7907 fn test_delete_binary_set_removes_attr_when_empty() {
7908 let mut item = HashMap::new();
7909 item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
7910 let names = HashMap::new();
7911 let mut values = HashMap::new();
7912 values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
7913
7914 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
7915 assert!(result.is_ok());
7916 assert!(
7917 !item.contains_key("data"),
7918 "attribute should be removed when set becomes empty"
7919 );
7920 }
7921}