1use std::collections::HashMap;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use base64::Engine;
6use chrono::Utc;
7use http::StatusCode;
8use parking_lot::RwLock;
9use serde_json::{json, Value};
10
11use fakecloud_core::delivery::DeliveryBus;
12use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
13use fakecloud_core::validation::*;
14
15use fakecloud_s3::state::SharedS3State;
16
17use crate::state::{
18 attribute_type_and_value, AttributeDefinition, AttributeValue, BackupDescription, DynamoTable,
19 ExportDescription, GlobalSecondaryIndex, GlobalTableDescription, ImportDescription,
20 KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection, ProvisionedThroughput,
21 ReplicaDescription, SharedDynamoDbState,
22};
23
24pub struct DynamoDbService {
25 state: SharedDynamoDbState,
26 s3_state: Option<SharedS3State>,
27 delivery: Option<Arc<DeliveryBus>>,
28}
29
30impl DynamoDbService {
31 pub fn new(state: SharedDynamoDbState) -> Self {
32 Self {
33 state,
34 s3_state: None,
35 delivery: None,
36 }
37 }
38
39 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
40 self.s3_state = Some(s3_state);
41 self
42 }
43
44 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
45 self.delivery = Some(delivery);
46 self
47 }
48
49 fn deliver_to_kinesis_destinations(
51 &self,
52 table: &DynamoTable,
53 event_name: &str,
54 keys: &HashMap<String, AttributeValue>,
55 old_image: Option<&HashMap<String, AttributeValue>>,
56 new_image: Option<&HashMap<String, AttributeValue>>,
57 ) {
58 let delivery = match &self.delivery {
59 Some(d) => d,
60 None => return,
61 };
62
63 let active_destinations: Vec<_> = table
64 .kinesis_destinations
65 .iter()
66 .filter(|d| d.destination_status == "ACTIVE")
67 .collect();
68
69 if active_destinations.is_empty() {
70 return;
71 }
72
73 let mut record = json!({
74 "eventID": uuid::Uuid::new_v4().to_string(),
75 "eventName": event_name,
76 "eventVersion": "1.1",
77 "eventSource": "aws:dynamodb",
78 "awsRegion": table.arn.split(':').nth(3).unwrap_or("us-east-1"),
79 "dynamodb": {
80 "Keys": keys,
81 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
82 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
83 "StreamViewType": "NEW_AND_OLD_IMAGES",
84 },
85 "eventSourceARN": &table.arn,
86 "tableName": &table.name,
87 });
88
89 if let Some(old) = old_image {
90 record["dynamodb"]["OldImage"] = json!(old);
91 }
92 if let Some(new) = new_image {
93 record["dynamodb"]["NewImage"] = json!(new);
94 }
95
96 let record_str = serde_json::to_string(&record).unwrap_or_default();
97 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
98 let partition_key = serde_json::to_string(keys).unwrap_or_default();
99
100 for dest in active_destinations {
101 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
102 }
103 }
104
105 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
106 serde_json::from_slice(&req.body).map_err(|e| {
107 AwsServiceError::aws_error(
108 StatusCode::BAD_REQUEST,
109 "SerializationException",
110 format!("Invalid JSON: {e}"),
111 )
112 })
113 }
114
115 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
116 Ok(AwsResponse::ok_json(body))
117 }
118
119 fn create_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
122 let body = Self::parse_body(req)?;
123
124 let table_name = body["TableName"]
125 .as_str()
126 .ok_or_else(|| {
127 AwsServiceError::aws_error(
128 StatusCode::BAD_REQUEST,
129 "ValidationException",
130 "TableName is required",
131 )
132 })?
133 .to_string();
134
135 let key_schema = parse_key_schema(&body["KeySchema"])?;
136 let attribute_definitions = parse_attribute_definitions(&body["AttributeDefinitions"])?;
137
138 for ks in &key_schema {
140 if !attribute_definitions
141 .iter()
142 .any(|ad| ad.attribute_name == ks.attribute_name)
143 {
144 return Err(AwsServiceError::aws_error(
145 StatusCode::BAD_REQUEST,
146 "ValidationException",
147 format!(
148 "One or more parameter values were invalid: \
149 Some index key attributes are not defined in AttributeDefinitions. \
150 Keys: [{}], AttributeDefinitions: [{}]",
151 ks.attribute_name,
152 attribute_definitions
153 .iter()
154 .map(|ad| ad.attribute_name.as_str())
155 .collect::<Vec<_>>()
156 .join(", ")
157 ),
158 ));
159 }
160 }
161
162 let billing_mode = body["BillingMode"]
163 .as_str()
164 .unwrap_or("PROVISIONED")
165 .to_string();
166
167 let provisioned_throughput = if billing_mode == "PAY_PER_REQUEST" {
168 ProvisionedThroughput {
169 read_capacity_units: 0,
170 write_capacity_units: 0,
171 }
172 } else {
173 parse_provisioned_throughput(&body["ProvisionedThroughput"])?
174 };
175
176 let gsi = parse_gsi(&body["GlobalSecondaryIndexes"]);
177 let lsi = parse_lsi(&body["LocalSecondaryIndexes"]);
178 let tags = parse_tags(&body["Tags"]);
179
180 let (stream_enabled, stream_view_type) =
182 if let Some(stream_spec) = body.get("StreamSpecification") {
183 let enabled = stream_spec
184 .get("StreamEnabled")
185 .and_then(|v| v.as_bool())
186 .unwrap_or(false);
187 let view_type = if enabled {
188 stream_spec
189 .get("StreamViewType")
190 .and_then(|v| v.as_str())
191 .map(|s| s.to_string())
192 } else {
193 None
194 };
195 (enabled, view_type)
196 } else {
197 (false, None)
198 };
199
200 let (sse_type, sse_kms_key_arn) = if let Some(sse_spec) = body.get("SSESpecification") {
202 let enabled = sse_spec
203 .get("Enabled")
204 .and_then(|v| v.as_bool())
205 .unwrap_or(false);
206 if enabled {
207 let sse_type = sse_spec
208 .get("SSEType")
209 .and_then(|v| v.as_str())
210 .unwrap_or("KMS")
211 .to_string();
212 let kms_key = sse_spec
213 .get("KMSMasterKeyId")
214 .and_then(|v| v.as_str())
215 .map(|s| s.to_string());
216 (Some(sse_type), kms_key)
217 } else {
218 (None, None)
219 }
220 } else {
221 (None, None)
222 };
223
224 let mut state = self.state.write();
225
226 if state.tables.contains_key(&table_name) {
227 return Err(AwsServiceError::aws_error(
228 StatusCode::BAD_REQUEST,
229 "ResourceInUseException",
230 format!("Table already exists: {table_name}"),
231 ));
232 }
233
234 let now = Utc::now();
235 let arn = format!(
236 "arn:aws:dynamodb:{}:{}:table/{}",
237 state.region, state.account_id, table_name
238 );
239 let stream_arn = if stream_enabled {
240 Some(format!(
241 "arn:aws:dynamodb:{}:{}:table/{}/stream/{}",
242 state.region,
243 state.account_id,
244 table_name,
245 now.format("%Y-%m-%dT%H:%M:%S.%3f")
246 ))
247 } else {
248 None
249 };
250
251 let table = DynamoTable {
252 name: table_name.clone(),
253 arn: arn.clone(),
254 key_schema: key_schema.clone(),
255 attribute_definitions: attribute_definitions.clone(),
256 provisioned_throughput: provisioned_throughput.clone(),
257 items: Vec::new(),
258 gsi: gsi.clone(),
259 lsi: lsi.clone(),
260 tags,
261 created_at: now,
262 status: "ACTIVE".to_string(),
263 item_count: 0,
264 size_bytes: 0,
265 billing_mode: billing_mode.clone(),
266 ttl_attribute: None,
267 ttl_enabled: false,
268 resource_policy: None,
269 pitr_enabled: false,
270 kinesis_destinations: Vec::new(),
271 contributor_insights_status: "DISABLED".to_string(),
272 contributor_insights_counters: HashMap::new(),
273 stream_enabled,
274 stream_view_type,
275 stream_arn,
276 stream_records: Arc::new(RwLock::new(Vec::new())),
277 sse_type,
278 sse_kms_key_arn,
279 };
280
281 state.tables.insert(table_name, table);
282
283 let table_desc = build_table_description_json(
284 &arn,
285 &key_schema,
286 &attribute_definitions,
287 &provisioned_throughput,
288 &gsi,
289 &lsi,
290 &billing_mode,
291 now,
292 0,
293 0,
294 "ACTIVE",
295 );
296
297 Self::ok_json(json!({ "TableDescription": table_desc }))
298 }
299
300 fn delete_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
301 let body = Self::parse_body(req)?;
302 let table_name = require_str(&body, "TableName")?;
303
304 let mut state = self.state.write();
305 let table = state.tables.remove(table_name).ok_or_else(|| {
306 AwsServiceError::aws_error(
307 StatusCode::BAD_REQUEST,
308 "ResourceNotFoundException",
309 format!("Requested resource not found: Table: {table_name} not found"),
310 )
311 })?;
312
313 let table_desc = build_table_description_json(
314 &table.arn,
315 &table.key_schema,
316 &table.attribute_definitions,
317 &table.provisioned_throughput,
318 &table.gsi,
319 &table.lsi,
320 &table.billing_mode,
321 table.created_at,
322 table.item_count,
323 table.size_bytes,
324 "DELETING",
325 );
326
327 Self::ok_json(json!({ "TableDescription": table_desc }))
328 }
329
330 fn describe_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
331 let body = Self::parse_body(req)?;
332 let table_name = require_str(&body, "TableName")?;
333
334 let state = self.state.read();
335 let table = get_table(&state.tables, table_name)?;
336
337 let table_desc = build_table_description(table);
338
339 Self::ok_json(json!({ "Table": table_desc }))
340 }
341
342 fn list_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
343 let body = Self::parse_body(req)?;
344
345 validate_optional_string_length(
346 "exclusiveStartTableName",
347 body["ExclusiveStartTableName"].as_str(),
348 3,
349 255,
350 )?;
351 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
352
353 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
354 let exclusive_start = body["ExclusiveStartTableName"]
355 .as_str()
356 .map(|s| s.to_string());
357
358 let state = self.state.read();
359 let mut names: Vec<&String> = state.tables.keys().collect();
360 names.sort();
361
362 let start_idx = match &exclusive_start {
363 Some(start) => names
364 .iter()
365 .position(|n| n.as_str() > start.as_str())
366 .unwrap_or(names.len()),
367 None => 0,
368 };
369
370 let page: Vec<&str> = names
371 .iter()
372 .skip(start_idx)
373 .take(limit)
374 .map(|n| n.as_str())
375 .collect();
376
377 let mut result = json!({ "TableNames": page });
378
379 if start_idx + limit < names.len() {
380 if let Some(last) = page.last() {
381 result["LastEvaluatedTableName"] = json!(last);
382 }
383 }
384
385 Self::ok_json(result)
386 }
387
388 fn update_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
389 let body = Self::parse_body(req)?;
390 let table_name = require_str(&body, "TableName")?;
391
392 let mut state = self.state.write();
393 let table = state.tables.get_mut(table_name).ok_or_else(|| {
394 AwsServiceError::aws_error(
395 StatusCode::BAD_REQUEST,
396 "ResourceNotFoundException",
397 format!("Requested resource not found: Table: {table_name} not found"),
398 )
399 })?;
400
401 if let Some(pt) = body.get("ProvisionedThroughput") {
402 if let Ok(throughput) = parse_provisioned_throughput(pt) {
403 table.provisioned_throughput = throughput;
404 }
405 }
406
407 if let Some(bm) = body["BillingMode"].as_str() {
408 table.billing_mode = bm.to_string();
409 }
410
411 if let Some(sse_spec) = body.get("SSESpecification") {
413 let enabled = sse_spec
414 .get("Enabled")
415 .and_then(|v| v.as_bool())
416 .unwrap_or(false);
417 if enabled {
418 table.sse_type = Some(
419 sse_spec
420 .get("SSEType")
421 .and_then(|v| v.as_str())
422 .unwrap_or("KMS")
423 .to_string(),
424 );
425 table.sse_kms_key_arn = sse_spec
426 .get("KMSMasterKeyId")
427 .and_then(|v| v.as_str())
428 .map(|s| s.to_string());
429 } else {
430 table.sse_type = None;
431 table.sse_kms_key_arn = None;
432 }
433 }
434
435 let table_desc = build_table_description_json(
436 &table.arn,
437 &table.key_schema,
438 &table.attribute_definitions,
439 &table.provisioned_throughput,
440 &table.gsi,
441 &table.lsi,
442 &table.billing_mode,
443 table.created_at,
444 table.item_count,
445 table.size_bytes,
446 &table.status,
447 );
448
449 Self::ok_json(json!({ "TableDescription": table_desc }))
450 }
451
452 fn put_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
455 let body = Self::parse_body(req)?;
457 let table_name = require_str(&body, "TableName")?;
458 let item = require_object(&body, "Item")?;
459 let condition = body["ConditionExpression"].as_str().map(|s| s.to_string());
460 let expr_attr_names = parse_expression_attribute_names(&body);
461 let expr_attr_values = parse_expression_attribute_values(&body);
462 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE").to_string();
463
464 let (old_item, kinesis_info) = {
467 let mut state = self.state.write();
468 let table = get_table_mut(&mut state.tables, table_name)?;
469
470 validate_key_in_item(table, &item)?;
471
472 let key = extract_key(table, &item);
473 let existing_idx = table.find_item_index(&key);
474
475 if let Some(ref cond) = condition {
476 let existing = existing_idx.map(|i| &table.items[i]);
477 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
478 }
479
480 let old_item_for_return = if return_values == "ALL_OLD" {
481 existing_idx.map(|i| table.items[i].clone())
482 } else {
483 None
484 };
485
486 let needs_change_capture = table.stream_enabled
488 || table
489 .kinesis_destinations
490 .iter()
491 .any(|d| d.destination_status == "ACTIVE");
492 let old_item_for_stream = if needs_change_capture {
493 existing_idx.map(|i| table.items[i].clone())
494 } else {
495 None
496 };
497
498 let is_modify = existing_idx.is_some();
499
500 if let Some(idx) = existing_idx {
501 table.items[idx] = item.clone();
502 } else {
503 table.items.push(item.clone());
504 }
505
506 table.record_item_access(&item);
507 table.recalculate_stats();
508
509 let event_name = if is_modify { "MODIFY" } else { "INSERT" };
510 let key = extract_key(table, &item);
511
512 if table.stream_enabled {
514 if let Some(record) = crate::streams::generate_stream_record(
515 table,
516 event_name,
517 key.clone(),
518 old_item_for_stream.clone(),
519 Some(item.clone()),
520 ) {
521 crate::streams::add_stream_record(table, record);
522 }
523 }
524
525 let kinesis_info = if table
527 .kinesis_destinations
528 .iter()
529 .any(|d| d.destination_status == "ACTIVE")
530 {
531 Some((
532 table.clone(),
533 event_name.to_string(),
534 key,
535 old_item_for_stream,
536 Some(item.clone()),
537 ))
538 } else {
539 None
540 };
541
542 (old_item_for_return, kinesis_info)
543 };
544 if let Some((table, event_name, keys, old_image, new_image)) = kinesis_info {
548 self.deliver_to_kinesis_destinations(
549 &table,
550 &event_name,
551 &keys,
552 old_image.as_ref(),
553 new_image.as_ref(),
554 );
555 }
556
557 let mut result = json!({});
558 if let Some(old) = old_item {
559 result["Attributes"] = json!(old);
560 }
561
562 Self::ok_json(result)
563 }
564
565 fn get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
566 let body = Self::parse_body(req)?;
568 let table_name = require_str(&body, "TableName")?;
569 let key = require_object(&body, "Key")?;
570
571 let (result, needs_insights) = {
573 let state = self.state.read();
574 let table = get_table(&state.tables, table_name)?;
575 let needs_insights = table.contributor_insights_status == "ENABLED";
576
577 let result = match table.find_item_index(&key) {
578 Some(idx) => {
579 let item = &table.items[idx];
580 let projected = project_item(item, &body);
581 json!({ "Item": projected })
582 }
583 None => json!({}),
584 };
585 (result, needs_insights)
586 };
587 if needs_insights {
591 let mut state = self.state.write();
592 if let Some(table) = state.tables.get_mut(table_name) {
593 table.record_key_access(&key);
594 }
595 }
596
597 Self::ok_json(result)
598 }
599
600 fn delete_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
601 let body = Self::parse_body(req)?;
602
603 validate_optional_enum_value(
604 "conditionalOperator",
605 &body["ConditionalOperator"],
606 &["AND", "OR"],
607 )?;
608 validate_optional_enum_value(
609 "returnConsumedCapacity",
610 &body["ReturnConsumedCapacity"],
611 &["INDEXES", "TOTAL", "NONE"],
612 )?;
613 validate_optional_enum_value(
614 "returnValues",
615 &body["ReturnValues"],
616 &["NONE", "ALL_OLD", "UPDATED_OLD", "ALL_NEW", "UPDATED_NEW"],
617 )?;
618 validate_optional_enum_value(
619 "returnItemCollectionMetrics",
620 &body["ReturnItemCollectionMetrics"],
621 &["SIZE", "NONE"],
622 )?;
623 validate_optional_enum_value(
624 "returnValuesOnConditionCheckFailure",
625 &body["ReturnValuesOnConditionCheckFailure"],
626 &["ALL_OLD", "NONE"],
627 )?;
628
629 let table_name = require_str(&body, "TableName")?;
630 let key = require_object(&body, "Key")?;
631
632 let (result, kinesis_info) = {
633 let mut state = self.state.write();
634 let table = get_table_mut(&mut state.tables, table_name)?;
635
636 let condition = body["ConditionExpression"].as_str();
637 let expr_attr_names = parse_expression_attribute_names(&body);
638 let expr_attr_values = parse_expression_attribute_values(&body);
639
640 let existing_idx = table.find_item_index(&key);
641
642 if let Some(cond) = condition {
643 let existing = existing_idx.map(|i| &table.items[i]);
644 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
645 }
646
647 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
648
649 let mut result = json!({});
650 let mut kinesis_info = None;
651
652 if let Some(idx) = existing_idx {
653 let old_item = table.items[idx].clone();
654 if return_values == "ALL_OLD" {
655 result["Attributes"] = json!(old_item.clone());
656 }
657
658 if table.stream_enabled {
660 if let Some(record) = crate::streams::generate_stream_record(
661 table,
662 "REMOVE",
663 key.clone(),
664 Some(old_item.clone()),
665 None,
666 ) {
667 crate::streams::add_stream_record(table, record);
668 }
669 }
670
671 if table
673 .kinesis_destinations
674 .iter()
675 .any(|d| d.destination_status == "ACTIVE")
676 {
677 kinesis_info = Some((table.clone(), key.clone(), Some(old_item)));
678 }
679
680 table.items.remove(idx);
681 table.recalculate_stats();
682 }
683
684 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
685 let return_icm = body["ReturnItemCollectionMetrics"]
686 .as_str()
687 .unwrap_or("NONE");
688
689 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
690 result["ConsumedCapacity"] = json!({
691 "TableName": table_name,
692 "CapacityUnits": 1.0,
693 });
694 }
695
696 if return_icm == "SIZE" {
697 result["ItemCollectionMetrics"] = json!({});
698 }
699
700 (result, kinesis_info)
701 };
702
703 if let Some((table, keys, old_image)) = kinesis_info {
705 self.deliver_to_kinesis_destinations(&table, "REMOVE", &keys, old_image.as_ref(), None);
706 }
707
708 Self::ok_json(result)
709 }
710
711 fn update_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
712 let body = Self::parse_body(req)?;
713 let table_name = require_str(&body, "TableName")?;
714 let key = require_object(&body, "Key")?;
715
716 let mut state = self.state.write();
717 let table = get_table_mut(&mut state.tables, table_name)?;
718
719 validate_key_attributes_in_key(table, &key)?;
720
721 let condition = body["ConditionExpression"].as_str();
722 let expr_attr_names = parse_expression_attribute_names(&body);
723 let expr_attr_values = parse_expression_attribute_values(&body);
724 let update_expression = body["UpdateExpression"].as_str();
725
726 let existing_idx = table.find_item_index(&key);
727
728 if let Some(cond) = condition {
729 let existing = existing_idx.map(|i| &table.items[i]);
730 evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)?;
731 }
732
733 let return_values = body["ReturnValues"].as_str().unwrap_or("NONE");
734
735 let is_insert = existing_idx.is_none();
736 let idx = match existing_idx {
737 Some(i) => i,
738 None => {
739 let mut new_item = HashMap::new();
740 for (k, v) in &key {
741 new_item.insert(k.clone(), v.clone());
742 }
743 table.items.push(new_item);
744 table.items.len() - 1
745 }
746 };
747
748 let needs_change_capture = table.stream_enabled
750 || table
751 .kinesis_destinations
752 .iter()
753 .any(|d| d.destination_status == "ACTIVE");
754 let old_item_for_stream = if needs_change_capture {
755 Some(table.items[idx].clone())
756 } else {
757 None
758 };
759
760 let old_item = if return_values == "ALL_OLD" {
761 Some(table.items[idx].clone())
762 } else {
763 None
764 };
765
766 if let Some(expr) = update_expression {
767 apply_update_expression(
768 &mut table.items[idx],
769 expr,
770 &expr_attr_names,
771 &expr_attr_values,
772 )?;
773 }
774
775 let new_item = if return_values == "ALL_NEW" || return_values == "UPDATED_NEW" {
776 Some(table.items[idx].clone())
777 } else {
778 None
779 };
780
781 let event_name = if is_insert { "INSERT" } else { "MODIFY" };
782 let new_item_for_stream = table.items[idx].clone();
783
784 if table.stream_enabled {
786 if let Some(record) = crate::streams::generate_stream_record(
787 table,
788 event_name,
789 key.clone(),
790 old_item_for_stream.clone(),
791 Some(new_item_for_stream.clone()),
792 ) {
793 crate::streams::add_stream_record(table, record);
794 }
795 }
796
797 let kinesis_info = if table
799 .kinesis_destinations
800 .iter()
801 .any(|d| d.destination_status == "ACTIVE")
802 {
803 Some((
804 table.clone(),
805 event_name.to_string(),
806 key.clone(),
807 old_item_for_stream,
808 Some(new_item_for_stream),
809 ))
810 } else {
811 None
812 };
813
814 table.recalculate_stats();
815
816 drop(state);
818
819 if let Some((tbl, ev, keys, old_image, new_image)) = kinesis_info {
821 self.deliver_to_kinesis_destinations(
822 &tbl,
823 &ev,
824 &keys,
825 old_image.as_ref(),
826 new_image.as_ref(),
827 );
828 }
829
830 let mut result = json!({});
831 if let Some(old) = old_item {
832 result["Attributes"] = json!(old);
833 } else if let Some(new) = new_item {
834 result["Attributes"] = json!(new);
835 }
836
837 Self::ok_json(result)
838 }
839
840 fn query(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
843 let body = Self::parse_body(req)?;
844 let table_name = require_str(&body, "TableName")?;
845
846 let state = self.state.read();
847 let table = get_table(&state.tables, table_name)?;
848
849 let expr_attr_names = parse_expression_attribute_names(&body);
850 let expr_attr_values = parse_expression_attribute_values(&body);
851
852 let key_condition = body["KeyConditionExpression"].as_str();
853 let filter_expression = body["FilterExpression"].as_str();
854 let scan_forward = body["ScanIndexForward"].as_bool().unwrap_or(true);
855 let limit = body["Limit"].as_i64().map(|l| l as usize);
856 let index_name = body["IndexName"].as_str();
857 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
858 parse_key_map(&body["ExclusiveStartKey"]);
859
860 let (items_to_scan, hash_key_name, range_key_name): (
861 &[HashMap<String, AttributeValue>],
862 String,
863 Option<String>,
864 ) = if let Some(idx_name) = index_name {
865 if let Some(gsi) = table.gsi.iter().find(|g| g.index_name == idx_name) {
866 let hk = gsi
867 .key_schema
868 .iter()
869 .find(|k| k.key_type == "HASH")
870 .map(|k| k.attribute_name.clone())
871 .unwrap_or_default();
872 let rk = gsi
873 .key_schema
874 .iter()
875 .find(|k| k.key_type == "RANGE")
876 .map(|k| k.attribute_name.clone());
877 (&table.items, hk, rk)
878 } else if let Some(lsi) = table.lsi.iter().find(|l| l.index_name == idx_name) {
879 let hk = lsi
880 .key_schema
881 .iter()
882 .find(|k| k.key_type == "HASH")
883 .map(|k| k.attribute_name.clone())
884 .unwrap_or_default();
885 let rk = lsi
886 .key_schema
887 .iter()
888 .find(|k| k.key_type == "RANGE")
889 .map(|k| k.attribute_name.clone());
890 (&table.items, hk, rk)
891 } else {
892 return Err(AwsServiceError::aws_error(
893 StatusCode::BAD_REQUEST,
894 "ValidationException",
895 format!("The table does not have the specified index: {idx_name}"),
896 ));
897 }
898 } else {
899 (
900 &table.items[..],
901 table.hash_key_name().to_string(),
902 table.range_key_name().map(|s| s.to_string()),
903 )
904 };
905
906 let mut matched: Vec<&HashMap<String, AttributeValue>> = items_to_scan
907 .iter()
908 .filter(|item| {
909 if let Some(kc) = key_condition {
910 evaluate_key_condition(
911 kc,
912 item,
913 &hash_key_name,
914 range_key_name.as_deref(),
915 &expr_attr_names,
916 &expr_attr_values,
917 )
918 } else {
919 true
920 }
921 })
922 .collect();
923
924 if let Some(ref rk) = range_key_name {
925 matched.sort_by(|a, b| {
926 let av = a.get(rk.as_str());
927 let bv = b.get(rk.as_str());
928 compare_attribute_values(av, bv)
929 });
930 if !scan_forward {
931 matched.reverse();
932 }
933 }
934
935 let table_pk_hash = table.hash_key_name().to_string();
938 let table_pk_range = table.range_key_name().map(|s| s.to_string());
939 let is_gsi_query = index_name.is_some()
940 && (hash_key_name != table_pk_hash
941 || range_key_name.as_deref() != table_pk_range.as_deref());
942
943 if let Some(ref start_key) = exclusive_start_key {
947 if let Some(pos) = matched.iter().position(|item| {
948 let index_match =
949 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref());
950 if is_gsi_query {
951 index_match
952 && item_matches_key(
953 item,
954 start_key,
955 &table_pk_hash,
956 table_pk_range.as_deref(),
957 )
958 } else {
959 index_match
960 }
961 }) {
962 matched = matched.split_off(pos + 1);
963 }
964 }
965
966 if let Some(filter) = filter_expression {
967 matched.retain(|item| {
968 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
969 });
970 }
971
972 let scanned_count = matched.len();
973
974 let has_more = if let Some(lim) = limit {
975 let more = matched.len() > lim;
976 matched.truncate(lim);
977 more
978 } else {
979 false
980 };
981
982 let last_evaluated_key = if has_more {
986 matched.last().map(|item| {
987 let mut key =
988 extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref());
989 if is_gsi_query {
990 let table_key =
991 extract_key_for_schema(item, &table_pk_hash, table_pk_range.as_deref());
992 key.extend(table_key);
993 }
994 key
995 })
996 } else {
997 None
998 };
999
1000 let insights_enabled = table.contributor_insights_status == "ENABLED";
1002 let pk_name = table.hash_key_name().to_string();
1003 let accessed_keys: Vec<String> = if insights_enabled {
1004 matched
1005 .iter()
1006 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1007 .collect()
1008 } else {
1009 Vec::new()
1010 };
1011
1012 let items: Vec<Value> = matched
1013 .iter()
1014 .map(|item| {
1015 let projected = project_item(item, &body);
1016 json!(projected)
1017 })
1018 .collect();
1019
1020 let mut result = json!({
1021 "Items": items,
1022 "Count": items.len(),
1023 "ScannedCount": scanned_count,
1024 });
1025
1026 if let Some(lek) = last_evaluated_key {
1027 result["LastEvaluatedKey"] = json!(lek);
1028 }
1029
1030 drop(state);
1031
1032 if !accessed_keys.is_empty() {
1033 let mut state = self.state.write();
1034 if let Some(table) = state.tables.get_mut(table_name) {
1035 if table.contributor_insights_status == "ENABLED" {
1038 for key_str in accessed_keys {
1039 *table
1040 .contributor_insights_counters
1041 .entry(key_str)
1042 .or_insert(0) += 1;
1043 }
1044 }
1045 }
1046 }
1047
1048 Self::ok_json(result)
1049 }
1050
1051 fn scan(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1052 let body = Self::parse_body(req)?;
1053 let table_name = require_str(&body, "TableName")?;
1054
1055 let state = self.state.read();
1056 let table = get_table(&state.tables, table_name)?;
1057
1058 let expr_attr_names = parse_expression_attribute_names(&body);
1059 let expr_attr_values = parse_expression_attribute_values(&body);
1060 let filter_expression = body["FilterExpression"].as_str();
1061 let limit = body["Limit"].as_i64().map(|l| l as usize);
1062 let exclusive_start_key: Option<HashMap<String, AttributeValue>> =
1063 parse_key_map(&body["ExclusiveStartKey"]);
1064
1065 let hash_key_name = table.hash_key_name().to_string();
1066 let range_key_name = table.range_key_name().map(|s| s.to_string());
1067
1068 let mut matched: Vec<&HashMap<String, AttributeValue>> = table.items.iter().collect();
1069
1070 if let Some(ref start_key) = exclusive_start_key {
1072 if let Some(pos) = matched.iter().position(|item| {
1073 item_matches_key(item, start_key, &hash_key_name, range_key_name.as_deref())
1074 }) {
1075 matched = matched.split_off(pos + 1);
1076 }
1077 }
1078
1079 let scanned_count = matched.len();
1080
1081 if let Some(filter) = filter_expression {
1082 matched.retain(|item| {
1083 evaluate_filter_expression(filter, item, &expr_attr_names, &expr_attr_values)
1084 });
1085 }
1086
1087 let has_more = if let Some(lim) = limit {
1088 let more = matched.len() > lim;
1089 matched.truncate(lim);
1090 more
1091 } else {
1092 false
1093 };
1094
1095 let last_evaluated_key = if has_more {
1097 matched
1098 .last()
1099 .map(|item| extract_key_for_schema(item, &hash_key_name, range_key_name.as_deref()))
1100 } else {
1101 None
1102 };
1103
1104 let insights_enabled = table.contributor_insights_status == "ENABLED";
1106 let pk_name = table.hash_key_name().to_string();
1107 let accessed_keys: Vec<String> = if insights_enabled {
1108 matched
1109 .iter()
1110 .filter_map(|item| item.get(&pk_name).map(|v| v.to_string()))
1111 .collect()
1112 } else {
1113 Vec::new()
1114 };
1115
1116 let items: Vec<Value> = matched
1117 .iter()
1118 .map(|item| {
1119 let projected = project_item(item, &body);
1120 json!(projected)
1121 })
1122 .collect();
1123
1124 let mut result = json!({
1125 "Items": items,
1126 "Count": items.len(),
1127 "ScannedCount": scanned_count,
1128 });
1129
1130 if let Some(lek) = last_evaluated_key {
1131 result["LastEvaluatedKey"] = json!(lek);
1132 }
1133
1134 drop(state);
1135
1136 if !accessed_keys.is_empty() {
1137 let mut state = self.state.write();
1138 if let Some(table) = state.tables.get_mut(table_name) {
1139 if table.contributor_insights_status == "ENABLED" {
1142 for key_str in accessed_keys {
1143 *table
1144 .contributor_insights_counters
1145 .entry(key_str)
1146 .or_insert(0) += 1;
1147 }
1148 }
1149 }
1150 }
1151
1152 Self::ok_json(result)
1153 }
1154
1155 fn batch_get_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1158 let body = Self::parse_body(req)?;
1159
1160 validate_optional_enum_value(
1161 "returnConsumedCapacity",
1162 &body["ReturnConsumedCapacity"],
1163 &["INDEXES", "TOTAL", "NONE"],
1164 )?;
1165
1166 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1167
1168 let request_items = body["RequestItems"]
1169 .as_object()
1170 .ok_or_else(|| {
1171 AwsServiceError::aws_error(
1172 StatusCode::BAD_REQUEST,
1173 "ValidationException",
1174 "RequestItems is required",
1175 )
1176 })?
1177 .clone();
1178
1179 let state = self.state.read();
1180 let mut responses: HashMap<String, Vec<Value>> = HashMap::new();
1181 let mut consumed_capacity: Vec<Value> = Vec::new();
1182
1183 for (table_name, params) in &request_items {
1184 let table = get_table(&state.tables, table_name)?;
1185 let keys = params["Keys"].as_array().ok_or_else(|| {
1186 AwsServiceError::aws_error(
1187 StatusCode::BAD_REQUEST,
1188 "ValidationException",
1189 "Keys is required",
1190 )
1191 })?;
1192
1193 let mut items = Vec::new();
1194 for key_val in keys {
1195 let key: HashMap<String, AttributeValue> =
1196 serde_json::from_value(key_val.clone()).unwrap_or_default();
1197 if let Some(idx) = table.find_item_index(&key) {
1198 items.push(json!(table.items[idx]));
1199 }
1200 }
1201 responses.insert(table_name.clone(), items);
1202
1203 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1204 consumed_capacity.push(json!({
1205 "TableName": table_name,
1206 "CapacityUnits": 1.0,
1207 }));
1208 }
1209 }
1210
1211 let mut result = json!({
1212 "Responses": responses,
1213 "UnprocessedKeys": {},
1214 });
1215
1216 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1217 result["ConsumedCapacity"] = json!(consumed_capacity);
1218 }
1219
1220 Self::ok_json(result)
1221 }
1222
1223 fn batch_write_item(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1224 let body = Self::parse_body(req)?;
1225
1226 validate_optional_enum_value(
1227 "returnConsumedCapacity",
1228 &body["ReturnConsumedCapacity"],
1229 &["INDEXES", "TOTAL", "NONE"],
1230 )?;
1231 validate_optional_enum_value(
1232 "returnItemCollectionMetrics",
1233 &body["ReturnItemCollectionMetrics"],
1234 &["SIZE", "NONE"],
1235 )?;
1236
1237 let return_consumed = body["ReturnConsumedCapacity"].as_str().unwrap_or("NONE");
1238 let return_icm = body["ReturnItemCollectionMetrics"]
1239 .as_str()
1240 .unwrap_or("NONE");
1241
1242 let request_items = body["RequestItems"]
1243 .as_object()
1244 .ok_or_else(|| {
1245 AwsServiceError::aws_error(
1246 StatusCode::BAD_REQUEST,
1247 "ValidationException",
1248 "RequestItems is required",
1249 )
1250 })?
1251 .clone();
1252
1253 let mut state = self.state.write();
1254 let mut consumed_capacity: Vec<Value> = Vec::new();
1255 let mut item_collection_metrics: HashMap<String, Vec<Value>> = HashMap::new();
1256
1257 for (table_name, requests) in &request_items {
1258 let table = state.tables.get_mut(table_name.as_str()).ok_or_else(|| {
1259 AwsServiceError::aws_error(
1260 StatusCode::BAD_REQUEST,
1261 "ResourceNotFoundException",
1262 format!("Requested resource not found: Table: {table_name} not found"),
1263 )
1264 })?;
1265
1266 let reqs = requests.as_array().ok_or_else(|| {
1267 AwsServiceError::aws_error(
1268 StatusCode::BAD_REQUEST,
1269 "ValidationException",
1270 "Request list must be an array",
1271 )
1272 })?;
1273
1274 for request in reqs {
1275 if let Some(put_req) = request.get("PutRequest") {
1276 let item: HashMap<String, AttributeValue> =
1277 serde_json::from_value(put_req["Item"].clone()).unwrap_or_default();
1278 let key = extract_key(table, &item);
1279 if let Some(idx) = table.find_item_index(&key) {
1280 table.items[idx] = item;
1281 } else {
1282 table.items.push(item);
1283 }
1284 } else if let Some(del_req) = request.get("DeleteRequest") {
1285 let key: HashMap<String, AttributeValue> =
1286 serde_json::from_value(del_req["Key"].clone()).unwrap_or_default();
1287 if let Some(idx) = table.find_item_index(&key) {
1288 table.items.remove(idx);
1289 }
1290 }
1291 }
1292
1293 table.recalculate_stats();
1294
1295 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1296 consumed_capacity.push(json!({
1297 "TableName": table_name,
1298 "CapacityUnits": 1.0,
1299 }));
1300 }
1301
1302 if return_icm == "SIZE" {
1303 item_collection_metrics.insert(table_name.clone(), vec![]);
1304 }
1305 }
1306
1307 let mut result = json!({
1308 "UnprocessedItems": {},
1309 });
1310
1311 if return_consumed == "TOTAL" || return_consumed == "INDEXES" {
1312 result["ConsumedCapacity"] = json!(consumed_capacity);
1313 }
1314
1315 if return_icm == "SIZE" {
1316 result["ItemCollectionMetrics"] = json!(item_collection_metrics);
1317 }
1318
1319 Self::ok_json(result)
1320 }
1321
1322 fn tag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1325 let body = Self::parse_body(req)?;
1326 let resource_arn = require_str(&body, "ResourceArn")?;
1327 validate_required("Tags", &body["Tags"])?;
1328
1329 let mut state = self.state.write();
1330 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1331
1332 fakecloud_core::tags::apply_tags(&mut table.tags, &body, "Tags", "Key", "Value").map_err(
1333 |f| {
1334 AwsServiceError::aws_error(
1335 StatusCode::BAD_REQUEST,
1336 "ValidationException",
1337 format!("{f} must be a list"),
1338 )
1339 },
1340 )?;
1341
1342 Self::ok_json(json!({}))
1343 }
1344
1345 fn untag_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1346 let body = Self::parse_body(req)?;
1347 let resource_arn = require_str(&body, "ResourceArn")?;
1348 validate_required("TagKeys", &body["TagKeys"])?;
1349
1350 let mut state = self.state.write();
1351 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1352
1353 fakecloud_core::tags::remove_tags(&mut table.tags, &body, "TagKeys").map_err(|f| {
1354 AwsServiceError::aws_error(
1355 StatusCode::BAD_REQUEST,
1356 "ValidationException",
1357 format!("{f} must be a list"),
1358 )
1359 })?;
1360
1361 Self::ok_json(json!({}))
1362 }
1363
1364 fn list_tags_of_resource(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1365 let body = Self::parse_body(req)?;
1366 let resource_arn = require_str(&body, "ResourceArn")?;
1367
1368 let state = self.state.read();
1369 let table = find_table_by_arn(&state.tables, resource_arn)?;
1370
1371 let tags = fakecloud_core::tags::tags_to_json(&table.tags, "Key", "Value");
1372
1373 Self::ok_json(json!({ "Tags": tags }))
1374 }
1375
1376 fn transact_get_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1379 let body = Self::parse_body(req)?;
1380 validate_optional_enum_value(
1381 "returnConsumedCapacity",
1382 &body["ReturnConsumedCapacity"],
1383 &["INDEXES", "TOTAL", "NONE"],
1384 )?;
1385 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1386 AwsServiceError::aws_error(
1387 StatusCode::BAD_REQUEST,
1388 "ValidationException",
1389 "TransactItems is required",
1390 )
1391 })?;
1392
1393 let state = self.state.read();
1394 let mut responses: Vec<Value> = Vec::new();
1395
1396 for ti in transact_items {
1397 let get = &ti["Get"];
1398 let table_name = get["TableName"].as_str().ok_or_else(|| {
1399 AwsServiceError::aws_error(
1400 StatusCode::BAD_REQUEST,
1401 "ValidationException",
1402 "TableName is required in Get",
1403 )
1404 })?;
1405 let key: HashMap<String, AttributeValue> =
1406 serde_json::from_value(get["Key"].clone()).unwrap_or_default();
1407
1408 let table = get_table(&state.tables, table_name)?;
1409 match table.find_item_index(&key) {
1410 Some(idx) => {
1411 responses.push(json!({ "Item": table.items[idx] }));
1412 }
1413 None => {
1414 responses.push(json!({}));
1415 }
1416 }
1417 }
1418
1419 Self::ok_json(json!({ "Responses": responses }))
1420 }
1421
1422 fn transact_write_items(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1423 let body = Self::parse_body(req)?;
1424 validate_optional_string_length(
1425 "clientRequestToken",
1426 body["ClientRequestToken"].as_str(),
1427 1,
1428 36,
1429 )?;
1430 validate_optional_enum_value(
1431 "returnConsumedCapacity",
1432 &body["ReturnConsumedCapacity"],
1433 &["INDEXES", "TOTAL", "NONE"],
1434 )?;
1435 validate_optional_enum_value(
1436 "returnItemCollectionMetrics",
1437 &body["ReturnItemCollectionMetrics"],
1438 &["SIZE", "NONE"],
1439 )?;
1440 let transact_items = body["TransactItems"].as_array().ok_or_else(|| {
1441 AwsServiceError::aws_error(
1442 StatusCode::BAD_REQUEST,
1443 "ValidationException",
1444 "TransactItems is required",
1445 )
1446 })?;
1447
1448 let mut state = self.state.write();
1449
1450 let mut cancellation_reasons: Vec<Value> = Vec::new();
1452 let mut any_failed = false;
1453
1454 for ti in transact_items {
1455 if let Some(put) = ti.get("Put") {
1456 let table_name = put["TableName"].as_str().unwrap_or_default();
1457 let item: HashMap<String, AttributeValue> =
1458 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1459 let condition = put["ConditionExpression"].as_str();
1460
1461 if let Some(cond) = condition {
1462 let table = get_table(&state.tables, table_name)?;
1463 let expr_attr_names = parse_expression_attribute_names(put);
1464 let expr_attr_values = parse_expression_attribute_values(put);
1465 let key = extract_key(table, &item);
1466 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1467 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1468 .is_err()
1469 {
1470 cancellation_reasons.push(json!({
1471 "Code": "ConditionalCheckFailed",
1472 "Message": "The conditional request failed"
1473 }));
1474 any_failed = true;
1475 continue;
1476 }
1477 }
1478 cancellation_reasons.push(json!({ "Code": "None" }));
1479 } else if let Some(delete) = ti.get("Delete") {
1480 let table_name = delete["TableName"].as_str().unwrap_or_default();
1481 let key: HashMap<String, AttributeValue> =
1482 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1483 let condition = delete["ConditionExpression"].as_str();
1484
1485 if let Some(cond) = condition {
1486 let table = get_table(&state.tables, table_name)?;
1487 let expr_attr_names = parse_expression_attribute_names(delete);
1488 let expr_attr_values = parse_expression_attribute_values(delete);
1489 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1490 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1491 .is_err()
1492 {
1493 cancellation_reasons.push(json!({
1494 "Code": "ConditionalCheckFailed",
1495 "Message": "The conditional request failed"
1496 }));
1497 any_failed = true;
1498 continue;
1499 }
1500 }
1501 cancellation_reasons.push(json!({ "Code": "None" }));
1502 } else if let Some(update) = ti.get("Update") {
1503 let table_name = update["TableName"].as_str().unwrap_or_default();
1504 let key: HashMap<String, AttributeValue> =
1505 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1506 let condition = update["ConditionExpression"].as_str();
1507
1508 if let Some(cond) = condition {
1509 let table = get_table(&state.tables, table_name)?;
1510 let expr_attr_names = parse_expression_attribute_names(update);
1511 let expr_attr_values = parse_expression_attribute_values(update);
1512 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1513 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values)
1514 .is_err()
1515 {
1516 cancellation_reasons.push(json!({
1517 "Code": "ConditionalCheckFailed",
1518 "Message": "The conditional request failed"
1519 }));
1520 any_failed = true;
1521 continue;
1522 }
1523 }
1524 cancellation_reasons.push(json!({ "Code": "None" }));
1525 } else if let Some(check) = ti.get("ConditionCheck") {
1526 let table_name = check["TableName"].as_str().unwrap_or_default();
1527 let key: HashMap<String, AttributeValue> =
1528 serde_json::from_value(check["Key"].clone()).unwrap_or_default();
1529 let cond = check["ConditionExpression"].as_str().unwrap_or_default();
1530
1531 let table = get_table(&state.tables, table_name)?;
1532 let expr_attr_names = parse_expression_attribute_names(check);
1533 let expr_attr_values = parse_expression_attribute_values(check);
1534 let existing = table.find_item_index(&key).map(|i| &table.items[i]);
1535 if evaluate_condition(cond, existing, &expr_attr_names, &expr_attr_values).is_err()
1536 {
1537 cancellation_reasons.push(json!({
1538 "Code": "ConditionalCheckFailed",
1539 "Message": "The conditional request failed"
1540 }));
1541 any_failed = true;
1542 continue;
1543 }
1544 cancellation_reasons.push(json!({ "Code": "None" }));
1545 } else {
1546 cancellation_reasons.push(json!({ "Code": "None" }));
1547 }
1548 }
1549
1550 if any_failed {
1551 let error_body = json!({
1552 "__type": "TransactionCanceledException",
1553 "message": "Transaction cancelled, please refer cancellation reasons for specific reasons [ConditionalCheckFailed]",
1554 "CancellationReasons": cancellation_reasons
1555 });
1556 return Ok(AwsResponse::json(
1557 StatusCode::BAD_REQUEST,
1558 serde_json::to_vec(&error_body).unwrap(),
1559 ));
1560 }
1561
1562 for ti in transact_items {
1564 if let Some(put) = ti.get("Put") {
1565 let table_name = put["TableName"].as_str().unwrap_or_default();
1566 let item: HashMap<String, AttributeValue> =
1567 serde_json::from_value(put["Item"].clone()).unwrap_or_default();
1568 let table = get_table_mut(&mut state.tables, table_name)?;
1569 let key = extract_key(table, &item);
1570 if let Some(idx) = table.find_item_index(&key) {
1571 table.items[idx] = item;
1572 } else {
1573 table.items.push(item);
1574 }
1575 table.recalculate_stats();
1576 } else if let Some(delete) = ti.get("Delete") {
1577 let table_name = delete["TableName"].as_str().unwrap_or_default();
1578 let key: HashMap<String, AttributeValue> =
1579 serde_json::from_value(delete["Key"].clone()).unwrap_or_default();
1580 let table = get_table_mut(&mut state.tables, table_name)?;
1581 if let Some(idx) = table.find_item_index(&key) {
1582 table.items.remove(idx);
1583 }
1584 table.recalculate_stats();
1585 } else if let Some(update) = ti.get("Update") {
1586 let table_name = update["TableName"].as_str().unwrap_or_default();
1587 let key: HashMap<String, AttributeValue> =
1588 serde_json::from_value(update["Key"].clone()).unwrap_or_default();
1589 let update_expression = update["UpdateExpression"].as_str();
1590 let expr_attr_names = parse_expression_attribute_names(update);
1591 let expr_attr_values = parse_expression_attribute_values(update);
1592
1593 let table = get_table_mut(&mut state.tables, table_name)?;
1594 let idx = match table.find_item_index(&key) {
1595 Some(i) => i,
1596 None => {
1597 let mut new_item = HashMap::new();
1598 for (k, v) in &key {
1599 new_item.insert(k.clone(), v.clone());
1600 }
1601 table.items.push(new_item);
1602 table.items.len() - 1
1603 }
1604 };
1605
1606 if let Some(expr) = update_expression {
1607 apply_update_expression(
1608 &mut table.items[idx],
1609 expr,
1610 &expr_attr_names,
1611 &expr_attr_values,
1612 )?;
1613 }
1614 table.recalculate_stats();
1615 }
1616 }
1618
1619 Self::ok_json(json!({}))
1620 }
1621
1622 fn execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1623 let body = Self::parse_body(req)?;
1624 let statement = require_str(&body, "Statement")?;
1625 let parameters = body["Parameters"].as_array().cloned().unwrap_or_default();
1626
1627 execute_partiql_statement(&self.state, statement, ¶meters)
1628 }
1629
1630 fn batch_execute_statement(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1631 let body = Self::parse_body(req)?;
1632 validate_optional_enum_value(
1633 "returnConsumedCapacity",
1634 &body["ReturnConsumedCapacity"],
1635 &["INDEXES", "TOTAL", "NONE"],
1636 )?;
1637 let statements = body["Statements"].as_array().ok_or_else(|| {
1638 AwsServiceError::aws_error(
1639 StatusCode::BAD_REQUEST,
1640 "ValidationException",
1641 "Statements is required",
1642 )
1643 })?;
1644
1645 let mut responses: Vec<Value> = Vec::new();
1646 for stmt_obj in statements {
1647 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1648 let parameters = stmt_obj["Parameters"]
1649 .as_array()
1650 .cloned()
1651 .unwrap_or_default();
1652
1653 match execute_partiql_statement(&self.state, statement, ¶meters) {
1654 Ok(resp) => {
1655 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1656 responses.push(resp_body);
1657 }
1658 Err(e) => {
1659 responses.push(json!({
1660 "Error": {
1661 "Code": "ValidationException",
1662 "Message": e.to_string()
1663 }
1664 }));
1665 }
1666 }
1667 }
1668
1669 Self::ok_json(json!({ "Responses": responses }))
1670 }
1671
1672 fn execute_transaction(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1673 let body = Self::parse_body(req)?;
1674 validate_optional_string_length(
1675 "clientRequestToken",
1676 body["ClientRequestToken"].as_str(),
1677 1,
1678 36,
1679 )?;
1680 validate_optional_enum_value(
1681 "returnConsumedCapacity",
1682 &body["ReturnConsumedCapacity"],
1683 &["INDEXES", "TOTAL", "NONE"],
1684 )?;
1685 let transact_statements = body["TransactStatements"].as_array().ok_or_else(|| {
1686 AwsServiceError::aws_error(
1687 StatusCode::BAD_REQUEST,
1688 "ValidationException",
1689 "TransactStatements is required",
1690 )
1691 })?;
1692
1693 let mut results: Vec<Result<Value, String>> = Vec::new();
1695 for stmt_obj in transact_statements {
1696 let statement = stmt_obj["Statement"].as_str().unwrap_or_default();
1697 let parameters = stmt_obj["Parameters"]
1698 .as_array()
1699 .cloned()
1700 .unwrap_or_default();
1701
1702 match execute_partiql_statement(&self.state, statement, ¶meters) {
1703 Ok(resp) => {
1704 let resp_body: Value = serde_json::from_slice(&resp.body).unwrap_or_default();
1705 results.push(Ok(resp_body));
1706 }
1707 Err(e) => {
1708 results.push(Err(e.to_string()));
1709 }
1710 }
1711 }
1712
1713 let any_failed = results.iter().any(|r| r.is_err());
1714 if any_failed {
1715 let reasons: Vec<Value> = results
1716 .iter()
1717 .map(|r| match r {
1718 Ok(_) => json!({ "Code": "None" }),
1719 Err(msg) => json!({
1720 "Code": "ValidationException",
1721 "Message": msg
1722 }),
1723 })
1724 .collect();
1725 let error_body = json!({
1726 "__type": "TransactionCanceledException",
1727 "message": "Transaction cancelled due to validation errors",
1728 "CancellationReasons": reasons
1729 });
1730 return Ok(AwsResponse::json(
1731 StatusCode::BAD_REQUEST,
1732 serde_json::to_vec(&error_body).unwrap(),
1733 ));
1734 }
1735
1736 let responses: Vec<Value> = results.into_iter().filter_map(|r| r.ok()).collect();
1737 Self::ok_json(json!({ "Responses": responses }))
1738 }
1739
1740 fn update_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1743 let body = Self::parse_body(req)?;
1744 let table_name = require_str(&body, "TableName")?;
1745 let spec = &body["TimeToLiveSpecification"];
1746 let attr_name = spec["AttributeName"].as_str().ok_or_else(|| {
1747 AwsServiceError::aws_error(
1748 StatusCode::BAD_REQUEST,
1749 "ValidationException",
1750 "TimeToLiveSpecification.AttributeName is required",
1751 )
1752 })?;
1753 let enabled = spec["Enabled"].as_bool().ok_or_else(|| {
1754 AwsServiceError::aws_error(
1755 StatusCode::BAD_REQUEST,
1756 "ValidationException",
1757 "TimeToLiveSpecification.Enabled is required",
1758 )
1759 })?;
1760
1761 let mut state = self.state.write();
1762 let table = get_table_mut(&mut state.tables, table_name)?;
1763
1764 if enabled {
1765 table.ttl_attribute = Some(attr_name.to_string());
1766 table.ttl_enabled = true;
1767 } else {
1768 table.ttl_enabled = false;
1769 }
1770
1771 Self::ok_json(json!({
1772 "TimeToLiveSpecification": {
1773 "AttributeName": attr_name,
1774 "Enabled": enabled
1775 }
1776 }))
1777 }
1778
1779 fn describe_time_to_live(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1780 let body = Self::parse_body(req)?;
1781 let table_name = require_str(&body, "TableName")?;
1782
1783 let state = self.state.read();
1784 let table = get_table(&state.tables, table_name)?;
1785
1786 let status = if table.ttl_enabled {
1787 "ENABLED"
1788 } else {
1789 "DISABLED"
1790 };
1791
1792 let mut desc = json!({
1793 "TimeToLiveDescription": {
1794 "TimeToLiveStatus": status
1795 }
1796 });
1797
1798 if let Some(ref attr) = table.ttl_attribute {
1799 desc["TimeToLiveDescription"]["AttributeName"] = json!(attr);
1800 }
1801
1802 Self::ok_json(desc)
1803 }
1804
1805 fn put_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1808 let body = Self::parse_body(req)?;
1809 let resource_arn = require_str(&body, "ResourceArn")?;
1810 let policy = require_str(&body, "Policy")?;
1811
1812 let mut state = self.state.write();
1813 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1814 table.resource_policy = Some(policy.to_string());
1815
1816 let revision_id = uuid::Uuid::new_v4().to_string();
1817 Self::ok_json(json!({ "RevisionId": revision_id }))
1818 }
1819
1820 fn get_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1821 let body = Self::parse_body(req)?;
1822 let resource_arn = require_str(&body, "ResourceArn")?;
1823
1824 let state = self.state.read();
1825 let table = find_table_by_arn(&state.tables, resource_arn)?;
1826
1827 match &table.resource_policy {
1828 Some(policy) => {
1829 let revision_id = uuid::Uuid::new_v4().to_string();
1830 Self::ok_json(json!({
1831 "Policy": policy,
1832 "RevisionId": revision_id
1833 }))
1834 }
1835 None => Self::ok_json(json!({ "Policy": null })),
1836 }
1837 }
1838
1839 fn delete_resource_policy(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1840 let body = Self::parse_body(req)?;
1841 let resource_arn = require_str(&body, "ResourceArn")?;
1842
1843 let mut state = self.state.write();
1844 let table = find_table_by_arn_mut(&mut state.tables, resource_arn)?;
1845 table.resource_policy = None;
1846
1847 Self::ok_json(json!({}))
1848 }
1849
1850 fn describe_endpoints(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1853 Self::ok_json(json!({
1854 "Endpoints": [{
1855 "Address": "dynamodb.us-east-1.amazonaws.com",
1856 "CachePeriodInMinutes": 1440
1857 }]
1858 }))
1859 }
1860
1861 fn describe_limits(&self, _req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1862 Self::ok_json(json!({
1863 "AccountMaxReadCapacityUnits": 80000,
1864 "AccountMaxWriteCapacityUnits": 80000,
1865 "TableMaxReadCapacityUnits": 40000,
1866 "TableMaxWriteCapacityUnits": 40000
1867 }))
1868 }
1869
1870 fn create_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1873 let body = Self::parse_body(req)?;
1874 let table_name = require_str(&body, "TableName")?;
1875 let backup_name = require_str(&body, "BackupName")?;
1876
1877 let mut state = self.state.write();
1878 let table = get_table(&state.tables, table_name)?;
1879
1880 let backup_arn = format!(
1881 "arn:aws:dynamodb:{}:{}:table/{}/backup/{}",
1882 state.region,
1883 state.account_id,
1884 table_name,
1885 Utc::now().format("%Y%m%d%H%M%S")
1886 );
1887 let now = Utc::now();
1888
1889 let backup = BackupDescription {
1890 backup_arn: backup_arn.clone(),
1891 backup_name: backup_name.to_string(),
1892 table_name: table_name.to_string(),
1893 table_arn: table.arn.clone(),
1894 backup_status: "AVAILABLE".to_string(),
1895 backup_type: "USER".to_string(),
1896 backup_creation_date: now,
1897 key_schema: table.key_schema.clone(),
1898 attribute_definitions: table.attribute_definitions.clone(),
1899 provisioned_throughput: table.provisioned_throughput.clone(),
1900 billing_mode: table.billing_mode.clone(),
1901 item_count: table.item_count,
1902 size_bytes: table.size_bytes,
1903 items: table.items.clone(),
1904 };
1905
1906 state.backups.insert(backup_arn.clone(), backup);
1907
1908 Self::ok_json(json!({
1909 "BackupDetails": {
1910 "BackupArn": backup_arn,
1911 "BackupName": backup_name,
1912 "BackupStatus": "AVAILABLE",
1913 "BackupType": "USER",
1914 "BackupCreationDateTime": now.timestamp() as f64,
1915 "BackupSizeBytes": 0
1916 }
1917 }))
1918 }
1919
1920 fn delete_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1921 let body = Self::parse_body(req)?;
1922 let backup_arn = require_str(&body, "BackupArn")?;
1923
1924 let mut state = self.state.write();
1925 let backup = state.backups.remove(backup_arn).ok_or_else(|| {
1926 AwsServiceError::aws_error(
1927 StatusCode::BAD_REQUEST,
1928 "BackupNotFoundException",
1929 format!("Backup not found: {backup_arn}"),
1930 )
1931 })?;
1932
1933 Self::ok_json(json!({
1934 "BackupDescription": {
1935 "BackupDetails": {
1936 "BackupArn": backup.backup_arn,
1937 "BackupName": backup.backup_name,
1938 "BackupStatus": "DELETED",
1939 "BackupType": backup.backup_type,
1940 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1941 "BackupSizeBytes": backup.size_bytes
1942 },
1943 "SourceTableDetails": {
1944 "TableName": backup.table_name,
1945 "TableArn": backup.table_arn,
1946 "TableId": uuid::Uuid::new_v4().to_string(),
1947 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1948 "AttributeName": ks.attribute_name,
1949 "KeyType": ks.key_type
1950 })).collect::<Vec<_>>(),
1951 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1952 "ProvisionedThroughput": {
1953 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1954 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
1955 },
1956 "ItemCount": backup.item_count,
1957 "BillingMode": backup.billing_mode,
1958 "TableSizeBytes": backup.size_bytes
1959 },
1960 "SourceTableFeatureDetails": {}
1961 }
1962 }))
1963 }
1964
1965 fn describe_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
1966 let body = Self::parse_body(req)?;
1967 let backup_arn = require_str(&body, "BackupArn")?;
1968
1969 let state = self.state.read();
1970 let backup = state.backups.get(backup_arn).ok_or_else(|| {
1971 AwsServiceError::aws_error(
1972 StatusCode::BAD_REQUEST,
1973 "BackupNotFoundException",
1974 format!("Backup not found: {backup_arn}"),
1975 )
1976 })?;
1977
1978 Self::ok_json(json!({
1979 "BackupDescription": {
1980 "BackupDetails": {
1981 "BackupArn": backup.backup_arn,
1982 "BackupName": backup.backup_name,
1983 "BackupStatus": backup.backup_status,
1984 "BackupType": backup.backup_type,
1985 "BackupCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1986 "BackupSizeBytes": backup.size_bytes
1987 },
1988 "SourceTableDetails": {
1989 "TableName": backup.table_name,
1990 "TableArn": backup.table_arn,
1991 "TableId": uuid::Uuid::new_v4().to_string(),
1992 "KeySchema": backup.key_schema.iter().map(|ks| json!({
1993 "AttributeName": ks.attribute_name,
1994 "KeyType": ks.key_type
1995 })).collect::<Vec<_>>(),
1996 "TableCreationDateTime": backup.backup_creation_date.timestamp() as f64,
1997 "ProvisionedThroughput": {
1998 "ReadCapacityUnits": backup.provisioned_throughput.read_capacity_units,
1999 "WriteCapacityUnits": backup.provisioned_throughput.write_capacity_units
2000 },
2001 "ItemCount": backup.item_count,
2002 "BillingMode": backup.billing_mode,
2003 "TableSizeBytes": backup.size_bytes
2004 },
2005 "SourceTableFeatureDetails": {}
2006 }
2007 }))
2008 }
2009
2010 fn list_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2011 let body = Self::parse_body(req)?;
2012 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2013 validate_optional_string_length(
2014 "exclusiveStartBackupArn",
2015 body["ExclusiveStartBackupArn"].as_str(),
2016 37,
2017 1024,
2018 )?;
2019 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, 100)?;
2020 validate_optional_enum_value(
2021 "backupType",
2022 &body["BackupType"],
2023 &["USER", "SYSTEM", "AWS_BACKUP", "ALL"],
2024 )?;
2025 let table_name = body["TableName"].as_str();
2026
2027 let state = self.state.read();
2028 let summaries: Vec<Value> = state
2029 .backups
2030 .values()
2031 .filter(|b| table_name.is_none() || table_name == Some(b.table_name.as_str()))
2032 .map(|b| {
2033 json!({
2034 "TableName": b.table_name,
2035 "TableArn": b.table_arn,
2036 "BackupArn": b.backup_arn,
2037 "BackupName": b.backup_name,
2038 "BackupCreationDateTime": b.backup_creation_date.timestamp() as f64,
2039 "BackupStatus": b.backup_status,
2040 "BackupType": b.backup_type,
2041 "BackupSizeBytes": b.size_bytes
2042 })
2043 })
2044 .collect();
2045
2046 Self::ok_json(json!({
2047 "BackupSummaries": summaries
2048 }))
2049 }
2050
2051 fn restore_table_from_backup(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2052 let body = Self::parse_body(req)?;
2053 let backup_arn = require_str(&body, "BackupArn")?;
2054 let target_table_name = require_str(&body, "TargetTableName")?;
2055
2056 let mut state = self.state.write();
2057 let backup = state.backups.get(backup_arn).ok_or_else(|| {
2058 AwsServiceError::aws_error(
2059 StatusCode::BAD_REQUEST,
2060 "BackupNotFoundException",
2061 format!("Backup not found: {backup_arn}"),
2062 )
2063 })?;
2064
2065 if state.tables.contains_key(target_table_name) {
2066 return Err(AwsServiceError::aws_error(
2067 StatusCode::BAD_REQUEST,
2068 "TableAlreadyExistsException",
2069 format!("Table already exists: {target_table_name}"),
2070 ));
2071 }
2072
2073 let now = Utc::now();
2074 let arn = format!(
2075 "arn:aws:dynamodb:{}:{}:table/{}",
2076 state.region, state.account_id, target_table_name
2077 );
2078
2079 let restored_items = backup.items.clone();
2080 let mut table = DynamoTable {
2081 name: target_table_name.to_string(),
2082 arn: arn.clone(),
2083 key_schema: backup.key_schema.clone(),
2084 attribute_definitions: backup.attribute_definitions.clone(),
2085 provisioned_throughput: backup.provisioned_throughput.clone(),
2086 items: restored_items,
2087 gsi: Vec::new(),
2088 lsi: Vec::new(),
2089 tags: HashMap::new(),
2090 created_at: now,
2091 status: "ACTIVE".to_string(),
2092 item_count: 0,
2093 size_bytes: 0,
2094 billing_mode: backup.billing_mode.clone(),
2095 ttl_attribute: None,
2096 ttl_enabled: false,
2097 resource_policy: None,
2098 pitr_enabled: false,
2099 kinesis_destinations: Vec::new(),
2100 contributor_insights_status: "DISABLED".to_string(),
2101 contributor_insights_counters: HashMap::new(),
2102 stream_enabled: false,
2103 stream_view_type: None,
2104 stream_arn: None,
2105 stream_records: Arc::new(RwLock::new(Vec::new())),
2106 sse_type: None,
2107 sse_kms_key_arn: None,
2108 };
2109 table.recalculate_stats();
2110
2111 let desc = build_table_description(&table);
2112 state.tables.insert(target_table_name.to_string(), table);
2113
2114 Self::ok_json(json!({
2115 "TableDescription": desc
2116 }))
2117 }
2118
2119 fn restore_table_to_point_in_time(
2120 &self,
2121 req: &AwsRequest,
2122 ) -> Result<AwsResponse, AwsServiceError> {
2123 let body = Self::parse_body(req)?;
2124 let target_table_name = require_str(&body, "TargetTableName")?;
2125 let source_table_name = body["SourceTableName"].as_str();
2126 let source_table_arn = body["SourceTableArn"].as_str();
2127
2128 let mut state = self.state.write();
2129
2130 let source = if let Some(name) = source_table_name {
2132 get_table(&state.tables, name)?.clone()
2133 } else if let Some(arn) = source_table_arn {
2134 find_table_by_arn(&state.tables, arn)?.clone()
2135 } else {
2136 return Err(AwsServiceError::aws_error(
2137 StatusCode::BAD_REQUEST,
2138 "ValidationException",
2139 "SourceTableName or SourceTableArn is required",
2140 ));
2141 };
2142
2143 if state.tables.contains_key(target_table_name) {
2144 return Err(AwsServiceError::aws_error(
2145 StatusCode::BAD_REQUEST,
2146 "TableAlreadyExistsException",
2147 format!("Table already exists: {target_table_name}"),
2148 ));
2149 }
2150
2151 let now = Utc::now();
2152 let arn = format!(
2153 "arn:aws:dynamodb:{}:{}:table/{}",
2154 state.region, state.account_id, target_table_name
2155 );
2156
2157 let mut table = DynamoTable {
2158 name: target_table_name.to_string(),
2159 arn: arn.clone(),
2160 key_schema: source.key_schema.clone(),
2161 attribute_definitions: source.attribute_definitions.clone(),
2162 provisioned_throughput: source.provisioned_throughput.clone(),
2163 items: source.items.clone(),
2164 gsi: Vec::new(),
2165 lsi: Vec::new(),
2166 tags: HashMap::new(),
2167 created_at: now,
2168 status: "ACTIVE".to_string(),
2169 item_count: 0,
2170 size_bytes: 0,
2171 billing_mode: source.billing_mode.clone(),
2172 ttl_attribute: None,
2173 ttl_enabled: false,
2174 resource_policy: None,
2175 pitr_enabled: false,
2176 kinesis_destinations: Vec::new(),
2177 contributor_insights_status: "DISABLED".to_string(),
2178 contributor_insights_counters: HashMap::new(),
2179 stream_enabled: false,
2180 stream_view_type: None,
2181 stream_arn: None,
2182 stream_records: Arc::new(RwLock::new(Vec::new())),
2183 sse_type: None,
2184 sse_kms_key_arn: None,
2185 };
2186 table.recalculate_stats();
2187
2188 let desc = build_table_description(&table);
2189 state.tables.insert(target_table_name.to_string(), table);
2190
2191 Self::ok_json(json!({
2192 "TableDescription": desc
2193 }))
2194 }
2195
2196 fn update_continuous_backups(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2197 let body = Self::parse_body(req)?;
2198 let table_name = require_str(&body, "TableName")?;
2199
2200 let pitr_spec = body["PointInTimeRecoverySpecification"]
2201 .as_object()
2202 .ok_or_else(|| {
2203 AwsServiceError::aws_error(
2204 StatusCode::BAD_REQUEST,
2205 "ValidationException",
2206 "PointInTimeRecoverySpecification is required",
2207 )
2208 })?;
2209 let enabled = pitr_spec
2210 .get("PointInTimeRecoveryEnabled")
2211 .and_then(|v| v.as_bool())
2212 .unwrap_or(false);
2213
2214 let mut state = self.state.write();
2215 let table = get_table_mut(&mut state.tables, table_name)?;
2216 table.pitr_enabled = enabled;
2217
2218 let status = if enabled { "ENABLED" } else { "DISABLED" };
2219 Self::ok_json(json!({
2220 "ContinuousBackupsDescription": {
2221 "ContinuousBackupsStatus": status,
2222 "PointInTimeRecoveryDescription": {
2223 "PointInTimeRecoveryStatus": status,
2224 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2225 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2226 }
2227 }
2228 }))
2229 }
2230
2231 fn describe_continuous_backups(
2232 &self,
2233 req: &AwsRequest,
2234 ) -> Result<AwsResponse, AwsServiceError> {
2235 let body = Self::parse_body(req)?;
2236 let table_name = require_str(&body, "TableName")?;
2237
2238 let state = self.state.read();
2239 let table = get_table(&state.tables, table_name)?;
2240
2241 let status = if table.pitr_enabled {
2242 "ENABLED"
2243 } else {
2244 "DISABLED"
2245 };
2246 Self::ok_json(json!({
2247 "ContinuousBackupsDescription": {
2248 "ContinuousBackupsStatus": status,
2249 "PointInTimeRecoveryDescription": {
2250 "PointInTimeRecoveryStatus": status,
2251 "EarliestRestorableDateTime": Utc::now().timestamp() as f64,
2252 "LatestRestorableDateTime": Utc::now().timestamp() as f64
2253 }
2254 }
2255 }))
2256 }
2257
2258 fn create_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2261 let body = Self::parse_body(req)?;
2262 let global_table_name = require_str(&body, "GlobalTableName")?;
2263 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2264
2265 let replication_group = body["ReplicationGroup"]
2266 .as_array()
2267 .ok_or_else(|| {
2268 AwsServiceError::aws_error(
2269 StatusCode::BAD_REQUEST,
2270 "ValidationException",
2271 "ReplicationGroup is required",
2272 )
2273 })?
2274 .iter()
2275 .filter_map(|r| {
2276 r["RegionName"].as_str().map(|rn| ReplicaDescription {
2277 region_name: rn.to_string(),
2278 replica_status: "ACTIVE".to_string(),
2279 })
2280 })
2281 .collect::<Vec<_>>();
2282
2283 let mut state = self.state.write();
2284
2285 if state.global_tables.contains_key(global_table_name) {
2286 return Err(AwsServiceError::aws_error(
2287 StatusCode::BAD_REQUEST,
2288 "GlobalTableAlreadyExistsException",
2289 format!("Global table already exists: {global_table_name}"),
2290 ));
2291 }
2292
2293 let arn = format!(
2294 "arn:aws:dynamodb::{}:global-table/{}",
2295 state.account_id, global_table_name
2296 );
2297 let now = Utc::now();
2298
2299 let gt = GlobalTableDescription {
2300 global_table_name: global_table_name.to_string(),
2301 global_table_arn: arn.clone(),
2302 global_table_status: "ACTIVE".to_string(),
2303 creation_date: now,
2304 replication_group: replication_group.clone(),
2305 };
2306
2307 state
2308 .global_tables
2309 .insert(global_table_name.to_string(), gt);
2310
2311 Self::ok_json(json!({
2312 "GlobalTableDescription": {
2313 "GlobalTableName": global_table_name,
2314 "GlobalTableArn": arn,
2315 "GlobalTableStatus": "ACTIVE",
2316 "CreationDateTime": now.timestamp() as f64,
2317 "ReplicationGroup": replication_group.iter().map(|r| json!({
2318 "RegionName": r.region_name,
2319 "ReplicaStatus": r.replica_status
2320 })).collect::<Vec<_>>()
2321 }
2322 }))
2323 }
2324
2325 fn describe_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2326 let body = Self::parse_body(req)?;
2327 let global_table_name = require_str(&body, "GlobalTableName")?;
2328 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2329
2330 let state = self.state.read();
2331 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2332 AwsServiceError::aws_error(
2333 StatusCode::BAD_REQUEST,
2334 "GlobalTableNotFoundException",
2335 format!("Global table not found: {global_table_name}"),
2336 )
2337 })?;
2338
2339 Self::ok_json(json!({
2340 "GlobalTableDescription": {
2341 "GlobalTableName": gt.global_table_name,
2342 "GlobalTableArn": gt.global_table_arn,
2343 "GlobalTableStatus": gt.global_table_status,
2344 "CreationDateTime": gt.creation_date.timestamp() as f64,
2345 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2346 "RegionName": r.region_name,
2347 "ReplicaStatus": r.replica_status
2348 })).collect::<Vec<_>>()
2349 }
2350 }))
2351 }
2352
2353 fn describe_global_table_settings(
2354 &self,
2355 req: &AwsRequest,
2356 ) -> Result<AwsResponse, AwsServiceError> {
2357 let body = Self::parse_body(req)?;
2358 let global_table_name = require_str(&body, "GlobalTableName")?;
2359 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2360
2361 let state = self.state.read();
2362 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2363 AwsServiceError::aws_error(
2364 StatusCode::BAD_REQUEST,
2365 "GlobalTableNotFoundException",
2366 format!("Global table not found: {global_table_name}"),
2367 )
2368 })?;
2369
2370 let replica_settings: Vec<Value> = gt
2371 .replication_group
2372 .iter()
2373 .map(|r| {
2374 json!({
2375 "RegionName": r.region_name,
2376 "ReplicaStatus": r.replica_status,
2377 "ReplicaProvisionedReadCapacityUnits": 0,
2378 "ReplicaProvisionedWriteCapacityUnits": 0
2379 })
2380 })
2381 .collect();
2382
2383 Self::ok_json(json!({
2384 "GlobalTableName": gt.global_table_name,
2385 "ReplicaSettings": replica_settings
2386 }))
2387 }
2388
2389 fn list_global_tables(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2390 let body = Self::parse_body(req)?;
2391 validate_optional_string_length(
2392 "exclusiveStartGlobalTableName",
2393 body["ExclusiveStartGlobalTableName"].as_str(),
2394 3,
2395 255,
2396 )?;
2397 validate_optional_range_i64("limit", body["Limit"].as_i64(), 1, i64::MAX)?;
2398 let limit = body["Limit"].as_i64().unwrap_or(100) as usize;
2399
2400 let state = self.state.read();
2401 let tables: Vec<Value> = state
2402 .global_tables
2403 .values()
2404 .take(limit)
2405 .map(|gt| {
2406 json!({
2407 "GlobalTableName": gt.global_table_name,
2408 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2409 "RegionName": r.region_name
2410 })).collect::<Vec<_>>()
2411 })
2412 })
2413 .collect();
2414
2415 Self::ok_json(json!({
2416 "GlobalTables": tables
2417 }))
2418 }
2419
2420 fn update_global_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2421 let body = Self::parse_body(req)?;
2422 let global_table_name = require_str(&body, "GlobalTableName")?;
2423 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2424 validate_required("replicaUpdates", &body["ReplicaUpdates"])?;
2425
2426 let mut state = self.state.write();
2427 let gt = state
2428 .global_tables
2429 .get_mut(global_table_name)
2430 .ok_or_else(|| {
2431 AwsServiceError::aws_error(
2432 StatusCode::BAD_REQUEST,
2433 "GlobalTableNotFoundException",
2434 format!("Global table not found: {global_table_name}"),
2435 )
2436 })?;
2437
2438 if let Some(updates) = body["ReplicaUpdates"].as_array() {
2439 for update in updates {
2440 if let Some(create) = update["Create"].as_object() {
2441 if let Some(region) = create.get("RegionName").and_then(|v| v.as_str()) {
2442 gt.replication_group.push(ReplicaDescription {
2443 region_name: region.to_string(),
2444 replica_status: "ACTIVE".to_string(),
2445 });
2446 }
2447 }
2448 if let Some(delete) = update["Delete"].as_object() {
2449 if let Some(region) = delete.get("RegionName").and_then(|v| v.as_str()) {
2450 gt.replication_group.retain(|r| r.region_name != region);
2451 }
2452 }
2453 }
2454 }
2455
2456 Self::ok_json(json!({
2457 "GlobalTableDescription": {
2458 "GlobalTableName": gt.global_table_name,
2459 "GlobalTableArn": gt.global_table_arn,
2460 "GlobalTableStatus": gt.global_table_status,
2461 "CreationDateTime": gt.creation_date.timestamp() as f64,
2462 "ReplicationGroup": gt.replication_group.iter().map(|r| json!({
2463 "RegionName": r.region_name,
2464 "ReplicaStatus": r.replica_status
2465 })).collect::<Vec<_>>()
2466 }
2467 }))
2468 }
2469
2470 fn update_global_table_settings(
2471 &self,
2472 req: &AwsRequest,
2473 ) -> Result<AwsResponse, AwsServiceError> {
2474 let body = Self::parse_body(req)?;
2475 let global_table_name = require_str(&body, "GlobalTableName")?;
2476 validate_string_length("globalTableName", global_table_name, 3, 255)?;
2477 validate_optional_enum_value(
2478 "globalTableBillingMode",
2479 &body["GlobalTableBillingMode"],
2480 &["PROVISIONED", "PAY_PER_REQUEST"],
2481 )?;
2482 validate_optional_range_i64(
2483 "globalTableProvisionedWriteCapacityUnits",
2484 body["GlobalTableProvisionedWriteCapacityUnits"].as_i64(),
2485 1,
2486 i64::MAX,
2487 )?;
2488
2489 let state = self.state.read();
2490 let gt = state.global_tables.get(global_table_name).ok_or_else(|| {
2491 AwsServiceError::aws_error(
2492 StatusCode::BAD_REQUEST,
2493 "GlobalTableNotFoundException",
2494 format!("Global table not found: {global_table_name}"),
2495 )
2496 })?;
2497
2498 let replica_settings: Vec<Value> = gt
2499 .replication_group
2500 .iter()
2501 .map(|r| {
2502 json!({
2503 "RegionName": r.region_name,
2504 "ReplicaStatus": r.replica_status,
2505 "ReplicaProvisionedReadCapacityUnits": 0,
2506 "ReplicaProvisionedWriteCapacityUnits": 0
2507 })
2508 })
2509 .collect();
2510
2511 Self::ok_json(json!({
2512 "GlobalTableName": gt.global_table_name,
2513 "ReplicaSettings": replica_settings
2514 }))
2515 }
2516
2517 fn describe_table_replica_auto_scaling(
2518 &self,
2519 req: &AwsRequest,
2520 ) -> Result<AwsResponse, AwsServiceError> {
2521 let body = Self::parse_body(req)?;
2522 let table_name = require_str(&body, "TableName")?;
2523
2524 let state = self.state.read();
2525 let table = get_table(&state.tables, table_name)?;
2526
2527 Self::ok_json(json!({
2528 "TableAutoScalingDescription": {
2529 "TableName": table.name,
2530 "TableStatus": table.status,
2531 "Replicas": []
2532 }
2533 }))
2534 }
2535
2536 fn update_table_replica_auto_scaling(
2537 &self,
2538 req: &AwsRequest,
2539 ) -> Result<AwsResponse, AwsServiceError> {
2540 let body = Self::parse_body(req)?;
2541 let table_name = require_str(&body, "TableName")?;
2542
2543 let state = self.state.read();
2544 let table = get_table(&state.tables, table_name)?;
2545
2546 Self::ok_json(json!({
2547 "TableAutoScalingDescription": {
2548 "TableName": table.name,
2549 "TableStatus": table.status,
2550 "Replicas": []
2551 }
2552 }))
2553 }
2554
2555 fn enable_kinesis_streaming_destination(
2558 &self,
2559 req: &AwsRequest,
2560 ) -> Result<AwsResponse, AwsServiceError> {
2561 let body = Self::parse_body(req)?;
2562 let table_name = require_str(&body, "TableName")?;
2563 let stream_arn = require_str(&body, "StreamArn")?;
2564 let precision = body["EnableKinesisStreamingConfiguration"]
2565 ["ApproximateCreationDateTimePrecision"]
2566 .as_str()
2567 .unwrap_or("MILLISECOND");
2568
2569 let mut state = self.state.write();
2570 let table = get_table_mut(&mut state.tables, table_name)?;
2571
2572 table.kinesis_destinations.push(KinesisDestination {
2573 stream_arn: stream_arn.to_string(),
2574 destination_status: "ACTIVE".to_string(),
2575 approximate_creation_date_time_precision: precision.to_string(),
2576 });
2577
2578 Self::ok_json(json!({
2579 "TableName": table_name,
2580 "StreamArn": stream_arn,
2581 "DestinationStatus": "ACTIVE",
2582 "EnableKinesisStreamingConfiguration": {
2583 "ApproximateCreationDateTimePrecision": precision
2584 }
2585 }))
2586 }
2587
2588 fn disable_kinesis_streaming_destination(
2589 &self,
2590 req: &AwsRequest,
2591 ) -> Result<AwsResponse, AwsServiceError> {
2592 let body = Self::parse_body(req)?;
2593 let table_name = require_str(&body, "TableName")?;
2594 let stream_arn = require_str(&body, "StreamArn")?;
2595
2596 let mut state = self.state.write();
2597 let table = get_table_mut(&mut state.tables, table_name)?;
2598
2599 if let Some(dest) = table
2600 .kinesis_destinations
2601 .iter_mut()
2602 .find(|d| d.stream_arn == stream_arn)
2603 {
2604 dest.destination_status = "DISABLED".to_string();
2605 }
2606
2607 Self::ok_json(json!({
2608 "TableName": table_name,
2609 "StreamArn": stream_arn,
2610 "DestinationStatus": "DISABLED"
2611 }))
2612 }
2613
2614 fn describe_kinesis_streaming_destination(
2615 &self,
2616 req: &AwsRequest,
2617 ) -> Result<AwsResponse, AwsServiceError> {
2618 let body = Self::parse_body(req)?;
2619 let table_name = require_str(&body, "TableName")?;
2620
2621 let state = self.state.read();
2622 let table = get_table(&state.tables, table_name)?;
2623
2624 let destinations: Vec<Value> = table
2625 .kinesis_destinations
2626 .iter()
2627 .map(|d| {
2628 json!({
2629 "StreamArn": d.stream_arn,
2630 "DestinationStatus": d.destination_status,
2631 "ApproximateCreationDateTimePrecision": d.approximate_creation_date_time_precision
2632 })
2633 })
2634 .collect();
2635
2636 Self::ok_json(json!({
2637 "TableName": table_name,
2638 "KinesisDataStreamDestinations": destinations
2639 }))
2640 }
2641
2642 fn update_kinesis_streaming_destination(
2643 &self,
2644 req: &AwsRequest,
2645 ) -> Result<AwsResponse, AwsServiceError> {
2646 let body = Self::parse_body(req)?;
2647 let table_name = require_str(&body, "TableName")?;
2648 let stream_arn = require_str(&body, "StreamArn")?;
2649 let precision = body["UpdateKinesisStreamingConfiguration"]
2650 ["ApproximateCreationDateTimePrecision"]
2651 .as_str()
2652 .unwrap_or("MILLISECOND");
2653
2654 let mut state = self.state.write();
2655 let table = get_table_mut(&mut state.tables, table_name)?;
2656
2657 if let Some(dest) = table
2658 .kinesis_destinations
2659 .iter_mut()
2660 .find(|d| d.stream_arn == stream_arn)
2661 {
2662 dest.approximate_creation_date_time_precision = precision.to_string();
2663 }
2664
2665 Self::ok_json(json!({
2666 "TableName": table_name,
2667 "StreamArn": stream_arn,
2668 "DestinationStatus": "ACTIVE",
2669 "UpdateKinesisStreamingConfiguration": {
2670 "ApproximateCreationDateTimePrecision": precision
2671 }
2672 }))
2673 }
2674
2675 fn describe_contributor_insights(
2678 &self,
2679 req: &AwsRequest,
2680 ) -> Result<AwsResponse, AwsServiceError> {
2681 let body = Self::parse_body(req)?;
2682 let table_name = require_str(&body, "TableName")?;
2683 let index_name = body["IndexName"].as_str();
2684
2685 let state = self.state.read();
2686 let table = get_table(&state.tables, table_name)?;
2687
2688 let top = table.top_contributors(10);
2689 let contributors: Vec<Value> = top
2690 .iter()
2691 .map(|(key, count)| {
2692 json!({
2693 "Key": key,
2694 "Count": count
2695 })
2696 })
2697 .collect();
2698
2699 let mut result = json!({
2700 "TableName": table_name,
2701 "ContributorInsightsStatus": table.contributor_insights_status,
2702 "ContributorInsightsRuleList": ["DynamoDBContributorInsights"],
2703 "TopContributors": contributors
2704 });
2705 if let Some(idx) = index_name {
2706 result["IndexName"] = json!(idx);
2707 }
2708
2709 Self::ok_json(result)
2710 }
2711
2712 fn update_contributor_insights(
2713 &self,
2714 req: &AwsRequest,
2715 ) -> Result<AwsResponse, AwsServiceError> {
2716 let body = Self::parse_body(req)?;
2717 let table_name = require_str(&body, "TableName")?;
2718 let action = require_str(&body, "ContributorInsightsAction")?;
2719 let index_name = body["IndexName"].as_str();
2720
2721 let mut state = self.state.write();
2722 let table = get_table_mut(&mut state.tables, table_name)?;
2723
2724 let status = match action {
2725 "ENABLE" => "ENABLED",
2726 "DISABLE" => "DISABLED",
2727 _ => {
2728 return Err(AwsServiceError::aws_error(
2729 StatusCode::BAD_REQUEST,
2730 "ValidationException",
2731 format!("Invalid ContributorInsightsAction: {action}"),
2732 ))
2733 }
2734 };
2735 table.contributor_insights_status = status.to_string();
2736 if status == "DISABLED" {
2737 table.contributor_insights_counters.clear();
2738 }
2739
2740 let mut result = json!({
2741 "TableName": table_name,
2742 "ContributorInsightsStatus": status
2743 });
2744 if let Some(idx) = index_name {
2745 result["IndexName"] = json!(idx);
2746 }
2747
2748 Self::ok_json(result)
2749 }
2750
2751 fn list_contributor_insights(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2752 let body = Self::parse_body(req)?;
2753 validate_optional_string_length("tableName", body["TableName"].as_str(), 1, 1024)?;
2754 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 0, 100)?;
2755 let table_name = body["TableName"].as_str();
2756
2757 let state = self.state.read();
2758 let summaries: Vec<Value> = state
2759 .tables
2760 .values()
2761 .filter(|t| table_name.is_none() || table_name == Some(t.name.as_str()))
2762 .map(|t| {
2763 json!({
2764 "TableName": t.name,
2765 "ContributorInsightsStatus": t.contributor_insights_status
2766 })
2767 })
2768 .collect();
2769
2770 Self::ok_json(json!({
2771 "ContributorInsightsSummaries": summaries
2772 }))
2773 }
2774
2775 fn export_table_to_point_in_time(
2778 &self,
2779 req: &AwsRequest,
2780 ) -> Result<AwsResponse, AwsServiceError> {
2781 let body = Self::parse_body(req)?;
2782 let table_arn = require_str(&body, "TableArn")?;
2783 let s3_bucket = require_str(&body, "S3Bucket")?;
2784 let s3_prefix = body["S3Prefix"].as_str();
2785 let export_format = body["ExportFormat"].as_str().unwrap_or("DYNAMODB_JSON");
2786
2787 let state = self.state.read();
2788 let table = find_table_by_arn(&state.tables, table_arn)?;
2790 let items = table.items.clone();
2791 let item_count = items.len() as i64;
2792
2793 let now = Utc::now();
2794 let export_arn = format!(
2795 "arn:aws:dynamodb:{}:{}:table/{}/export/{}",
2796 state.region,
2797 state.account_id,
2798 table_arn.rsplit('/').next().unwrap_or("unknown"),
2799 uuid::Uuid::new_v4()
2800 );
2801
2802 drop(state);
2803
2804 let mut json_lines = String::new();
2806 for item in &items {
2807 let item_json = if export_format == "DYNAMODB_JSON" {
2808 json!({ "Item": item })
2809 } else {
2810 json!(item)
2811 };
2812 json_lines.push_str(&serde_json::to_string(&item_json).unwrap_or_default());
2813 json_lines.push('\n');
2814 }
2815 let data_size = json_lines.len() as i64;
2816
2817 let s3_key = if let Some(prefix) = s3_prefix {
2819 format!("{prefix}/data/manifest-files.json")
2820 } else {
2821 "data/manifest-files.json".to_string()
2822 };
2823
2824 let mut export_failed = false;
2826 let mut failure_reason = String::new();
2827 if let Some(ref s3_state) = self.s3_state {
2828 let mut s3 = s3_state.write();
2829 if let Some(bucket) = s3.buckets.get_mut(s3_bucket) {
2830 let etag = uuid::Uuid::new_v4().to_string().replace('-', "");
2831 let obj = fakecloud_s3::state::S3Object {
2832 key: s3_key.clone(),
2833 data: bytes::Bytes::from(json_lines),
2834 content_type: "application/json".to_string(),
2835 etag,
2836 size: data_size as u64,
2837 last_modified: now,
2838 metadata: HashMap::new(),
2839 storage_class: "STANDARD".to_string(),
2840 tags: HashMap::new(),
2841 acl_grants: Vec::new(),
2842 acl_owner_id: None,
2843 parts_count: None,
2844 part_sizes: None,
2845 sse_algorithm: None,
2846 sse_kms_key_id: None,
2847 bucket_key_enabled: None,
2848 version_id: None,
2849 is_delete_marker: false,
2850 content_encoding: None,
2851 website_redirect_location: None,
2852 restore_ongoing: None,
2853 restore_expiry: None,
2854 checksum_algorithm: None,
2855 checksum_value: None,
2856 lock_mode: None,
2857 lock_retain_until: None,
2858 lock_legal_hold: None,
2859 };
2860 bucket.objects.insert(s3_key, obj);
2861 } else {
2862 export_failed = true;
2863 failure_reason = format!("S3 bucket does not exist: {s3_bucket}");
2864 }
2865 }
2866
2867 let export_status = if export_failed { "FAILED" } else { "COMPLETED" };
2868
2869 let export = ExportDescription {
2870 export_arn: export_arn.clone(),
2871 export_status: export_status.to_string(),
2872 table_arn: table_arn.to_string(),
2873 s3_bucket: s3_bucket.to_string(),
2874 s3_prefix: s3_prefix.map(|s| s.to_string()),
2875 export_format: export_format.to_string(),
2876 start_time: now,
2877 end_time: now,
2878 export_time: now,
2879 item_count,
2880 billed_size_bytes: data_size,
2881 };
2882
2883 let mut state = self.state.write();
2884 state.exports.insert(export_arn.clone(), export);
2885
2886 let mut response = json!({
2887 "ExportDescription": {
2888 "ExportArn": export_arn,
2889 "ExportStatus": export_status,
2890 "TableArn": table_arn,
2891 "S3Bucket": s3_bucket,
2892 "S3Prefix": s3_prefix,
2893 "ExportFormat": export_format,
2894 "StartTime": now.timestamp() as f64,
2895 "EndTime": now.timestamp() as f64,
2896 "ExportTime": now.timestamp() as f64,
2897 "ItemCount": item_count,
2898 "BilledSizeBytes": data_size
2899 }
2900 });
2901 if export_failed {
2902 response["ExportDescription"]["FailureCode"] = json!("S3NoSuchBucket");
2903 response["ExportDescription"]["FailureMessage"] = json!(failure_reason);
2904 }
2905 Self::ok_json(response)
2906 }
2907
2908 fn describe_export(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2909 let body = Self::parse_body(req)?;
2910 let export_arn = require_str(&body, "ExportArn")?;
2911
2912 let state = self.state.read();
2913 let export = state.exports.get(export_arn).ok_or_else(|| {
2914 AwsServiceError::aws_error(
2915 StatusCode::BAD_REQUEST,
2916 "ExportNotFoundException",
2917 format!("Export not found: {export_arn}"),
2918 )
2919 })?;
2920
2921 Self::ok_json(json!({
2922 "ExportDescription": {
2923 "ExportArn": export.export_arn,
2924 "ExportStatus": export.export_status,
2925 "TableArn": export.table_arn,
2926 "S3Bucket": export.s3_bucket,
2927 "S3Prefix": export.s3_prefix,
2928 "ExportFormat": export.export_format,
2929 "StartTime": export.start_time.timestamp() as f64,
2930 "EndTime": export.end_time.timestamp() as f64,
2931 "ExportTime": export.export_time.timestamp() as f64,
2932 "ItemCount": export.item_count,
2933 "BilledSizeBytes": export.billed_size_bytes
2934 }
2935 }))
2936 }
2937
2938 fn list_exports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2939 let body = Self::parse_body(req)?;
2940 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
2941 validate_optional_range_i64("maxResults", body["MaxResults"].as_i64(), 1, 25)?;
2942 let table_arn = body["TableArn"].as_str();
2943
2944 let state = self.state.read();
2945 let summaries: Vec<Value> = state
2946 .exports
2947 .values()
2948 .filter(|e| table_arn.is_none() || table_arn == Some(e.table_arn.as_str()))
2949 .map(|e| {
2950 json!({
2951 "ExportArn": e.export_arn,
2952 "ExportStatus": e.export_status,
2953 "TableArn": e.table_arn
2954 })
2955 })
2956 .collect();
2957
2958 Self::ok_json(json!({
2959 "ExportSummaries": summaries
2960 }))
2961 }
2962
2963 fn import_table(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
2964 let body = Self::parse_body(req)?;
2965 let input_format = require_str(&body, "InputFormat")?;
2966 let s3_source = body["S3BucketSource"].as_object().ok_or_else(|| {
2967 AwsServiceError::aws_error(
2968 StatusCode::BAD_REQUEST,
2969 "ValidationException",
2970 "S3BucketSource is required",
2971 )
2972 })?;
2973 let s3_bucket = s3_source
2974 .get("S3Bucket")
2975 .and_then(|v| v.as_str())
2976 .unwrap_or("");
2977 let s3_key_prefix = s3_source
2978 .get("S3KeyPrefix")
2979 .and_then(|v| v.as_str())
2980 .unwrap_or("");
2981
2982 let table_params = body["TableCreationParameters"].as_object().ok_or_else(|| {
2983 AwsServiceError::aws_error(
2984 StatusCode::BAD_REQUEST,
2985 "ValidationException",
2986 "TableCreationParameters is required",
2987 )
2988 })?;
2989 let table_name = table_params
2990 .get("TableName")
2991 .and_then(|v| v.as_str())
2992 .ok_or_else(|| {
2993 AwsServiceError::aws_error(
2994 StatusCode::BAD_REQUEST,
2995 "ValidationException",
2996 "TableCreationParameters.TableName is required",
2997 )
2998 })?;
2999
3000 let key_schema = parse_key_schema(table_params.get("KeySchema").unwrap_or(&Value::Null))?;
3001 let attribute_definitions = parse_attribute_definitions(
3002 table_params
3003 .get("AttributeDefinitions")
3004 .unwrap_or(&Value::Null),
3005 )?;
3006
3007 let mut imported_items: Vec<HashMap<String, Value>> = Vec::new();
3009 let mut processed_size_bytes: i64 = 0;
3010 if let Some(ref s3_state) = self.s3_state {
3011 let s3 = s3_state.read();
3012 let bucket = s3.buckets.get(s3_bucket).ok_or_else(|| {
3013 AwsServiceError::aws_error(
3014 StatusCode::BAD_REQUEST,
3015 "ImportConflictException",
3016 format!("S3 bucket does not exist: {s3_bucket}"),
3017 )
3018 })?;
3019 let prefix = if s3_key_prefix.is_empty() {
3021 String::new()
3022 } else {
3023 s3_key_prefix.to_string()
3024 };
3025 for (key, obj) in &bucket.objects {
3026 if !prefix.is_empty() && !key.starts_with(&prefix) {
3027 continue;
3028 }
3029 let data = std::str::from_utf8(&obj.data).unwrap_or("");
3030 processed_size_bytes += obj.size as i64;
3031 for line in data.lines() {
3032 let line = line.trim();
3033 if line.is_empty() {
3034 continue;
3035 }
3036 if let Ok(parsed) = serde_json::from_str::<Value>(line) {
3037 let item = if input_format == "DYNAMODB_JSON" {
3039 if let Some(item_obj) = parsed.get("Item") {
3040 item_obj.as_object().cloned().unwrap_or_default()
3041 } else {
3042 parsed.as_object().cloned().unwrap_or_default()
3043 }
3044 } else {
3045 parsed.as_object().cloned().unwrap_or_default()
3046 };
3047 if !item.is_empty() {
3048 imported_items
3049 .push(item.into_iter().collect::<HashMap<String, Value>>());
3050 }
3051 }
3052 }
3053 }
3054 }
3055
3056 let mut state = self.state.write();
3057
3058 if state.tables.contains_key(table_name) {
3059 return Err(AwsServiceError::aws_error(
3060 StatusCode::BAD_REQUEST,
3061 "ResourceInUseException",
3062 format!("Table already exists: {table_name}"),
3063 ));
3064 }
3065
3066 let now = Utc::now();
3067 let table_arn = format!(
3068 "arn:aws:dynamodb:{}:{}:table/{}",
3069 state.region, state.account_id, table_name
3070 );
3071 let import_arn = format!(
3072 "arn:aws:dynamodb:{}:{}:table/{}/import/{}",
3073 state.region,
3074 state.account_id,
3075 table_name,
3076 uuid::Uuid::new_v4()
3077 );
3078
3079 let processed_item_count = imported_items.len() as i64;
3080
3081 let mut table = DynamoTable {
3082 name: table_name.to_string(),
3083 arn: table_arn.clone(),
3084 key_schema,
3085 attribute_definitions,
3086 provisioned_throughput: ProvisionedThroughput {
3087 read_capacity_units: 0,
3088 write_capacity_units: 0,
3089 },
3090 items: imported_items,
3091 gsi: Vec::new(),
3092 lsi: Vec::new(),
3093 tags: HashMap::new(),
3094 created_at: now,
3095 status: "ACTIVE".to_string(),
3096 item_count: 0,
3097 size_bytes: 0,
3098 billing_mode: "PAY_PER_REQUEST".to_string(),
3099 ttl_attribute: None,
3100 ttl_enabled: false,
3101 resource_policy: None,
3102 pitr_enabled: false,
3103 kinesis_destinations: Vec::new(),
3104 contributor_insights_status: "DISABLED".to_string(),
3105 contributor_insights_counters: HashMap::new(),
3106 stream_enabled: false,
3107 stream_view_type: None,
3108 stream_arn: None,
3109 stream_records: Arc::new(RwLock::new(Vec::new())),
3110 sse_type: None,
3111 sse_kms_key_arn: None,
3112 };
3113 table.recalculate_stats();
3114 state.tables.insert(table_name.to_string(), table);
3115
3116 let import_desc = ImportDescription {
3117 import_arn: import_arn.clone(),
3118 import_status: "COMPLETED".to_string(),
3119 table_arn: table_arn.clone(),
3120 table_name: table_name.to_string(),
3121 s3_bucket_source: s3_bucket.to_string(),
3122 input_format: input_format.to_string(),
3123 start_time: now,
3124 end_time: now,
3125 processed_item_count,
3126 processed_size_bytes,
3127 };
3128 state.imports.insert(import_arn.clone(), import_desc);
3129
3130 let table_ref = state.tables.get(table_name).unwrap();
3131 let ks: Vec<Value> = table_ref
3132 .key_schema
3133 .iter()
3134 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
3135 .collect();
3136 let ad: Vec<Value> = table_ref
3137 .attribute_definitions
3138 .iter()
3139 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
3140 .collect();
3141
3142 Self::ok_json(json!({
3143 "ImportTableDescription": {
3144 "ImportArn": import_arn,
3145 "ImportStatus": "COMPLETED",
3146 "TableArn": table_arn,
3147 "TableId": uuid::Uuid::new_v4().to_string(),
3148 "S3BucketSource": {
3149 "S3Bucket": s3_bucket
3150 },
3151 "InputFormat": input_format,
3152 "TableCreationParameters": {
3153 "TableName": table_name,
3154 "KeySchema": ks,
3155 "AttributeDefinitions": ad
3156 },
3157 "StartTime": now.timestamp() as f64,
3158 "EndTime": now.timestamp() as f64,
3159 "ProcessedItemCount": processed_item_count,
3160 "ProcessedSizeBytes": processed_size_bytes
3161 }
3162 }))
3163 }
3164
3165 fn describe_import(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3166 let body = Self::parse_body(req)?;
3167 let import_arn = require_str(&body, "ImportArn")?;
3168
3169 let state = self.state.read();
3170 let import = state.imports.get(import_arn).ok_or_else(|| {
3171 AwsServiceError::aws_error(
3172 StatusCode::BAD_REQUEST,
3173 "ImportNotFoundException",
3174 format!("Import not found: {import_arn}"),
3175 )
3176 })?;
3177
3178 Self::ok_json(json!({
3179 "ImportTableDescription": {
3180 "ImportArn": import.import_arn,
3181 "ImportStatus": import.import_status,
3182 "TableArn": import.table_arn,
3183 "S3BucketSource": {
3184 "S3Bucket": import.s3_bucket_source
3185 },
3186 "InputFormat": import.input_format,
3187 "StartTime": import.start_time.timestamp() as f64,
3188 "EndTime": import.end_time.timestamp() as f64,
3189 "ProcessedItemCount": import.processed_item_count,
3190 "ProcessedSizeBytes": import.processed_size_bytes
3191 }
3192 }))
3193 }
3194
3195 fn list_imports(&self, req: &AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3196 let body = Self::parse_body(req)?;
3197 validate_optional_string_length("tableArn", body["TableArn"].as_str(), 1, 1024)?;
3198 validate_optional_string_length("nextToken", body["NextToken"].as_str(), 112, 1024)?;
3199 validate_optional_range_i64("pageSize", body["PageSize"].as_i64(), 1, 25)?;
3200 let table_arn = body["TableArn"].as_str();
3201
3202 let state = self.state.read();
3203 let summaries: Vec<Value> = state
3204 .imports
3205 .values()
3206 .filter(|i| table_arn.is_none() || table_arn == Some(i.table_arn.as_str()))
3207 .map(|i| {
3208 json!({
3209 "ImportArn": i.import_arn,
3210 "ImportStatus": i.import_status,
3211 "TableArn": i.table_arn
3212 })
3213 })
3214 .collect();
3215
3216 Self::ok_json(json!({
3217 "ImportSummaryList": summaries
3218 }))
3219 }
3220}
3221
3222#[async_trait]
3223impl AwsService for DynamoDbService {
3224 fn service_name(&self) -> &str {
3225 "dynamodb"
3226 }
3227
3228 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
3229 match req.action.as_str() {
3230 "CreateTable" => self.create_table(&req),
3231 "DeleteTable" => self.delete_table(&req),
3232 "DescribeTable" => self.describe_table(&req),
3233 "ListTables" => self.list_tables(&req),
3234 "UpdateTable" => self.update_table(&req),
3235 "PutItem" => self.put_item(&req),
3236 "GetItem" => self.get_item(&req),
3237 "DeleteItem" => self.delete_item(&req),
3238 "UpdateItem" => self.update_item(&req),
3239 "Query" => self.query(&req),
3240 "Scan" => self.scan(&req),
3241 "BatchGetItem" => self.batch_get_item(&req),
3242 "BatchWriteItem" => self.batch_write_item(&req),
3243 "TagResource" => self.tag_resource(&req),
3244 "UntagResource" => self.untag_resource(&req),
3245 "ListTagsOfResource" => self.list_tags_of_resource(&req),
3246 "TransactGetItems" => self.transact_get_items(&req),
3247 "TransactWriteItems" => self.transact_write_items(&req),
3248 "ExecuteStatement" => self.execute_statement(&req),
3249 "BatchExecuteStatement" => self.batch_execute_statement(&req),
3250 "ExecuteTransaction" => self.execute_transaction(&req),
3251 "UpdateTimeToLive" => self.update_time_to_live(&req),
3252 "DescribeTimeToLive" => self.describe_time_to_live(&req),
3253 "PutResourcePolicy" => self.put_resource_policy(&req),
3254 "GetResourcePolicy" => self.get_resource_policy(&req),
3255 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
3256 "DescribeEndpoints" => self.describe_endpoints(&req),
3258 "DescribeLimits" => self.describe_limits(&req),
3259 "CreateBackup" => self.create_backup(&req),
3261 "DeleteBackup" => self.delete_backup(&req),
3262 "DescribeBackup" => self.describe_backup(&req),
3263 "ListBackups" => self.list_backups(&req),
3264 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
3265 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
3266 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
3267 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
3268 "CreateGlobalTable" => self.create_global_table(&req),
3270 "DescribeGlobalTable" => self.describe_global_table(&req),
3271 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
3272 "ListGlobalTables" => self.list_global_tables(&req),
3273 "UpdateGlobalTable" => self.update_global_table(&req),
3274 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
3275 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
3276 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
3277 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
3279 "DisableKinesisStreamingDestination" => {
3280 self.disable_kinesis_streaming_destination(&req)
3281 }
3282 "DescribeKinesisStreamingDestination" => {
3283 self.describe_kinesis_streaming_destination(&req)
3284 }
3285 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
3286 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
3288 "UpdateContributorInsights" => self.update_contributor_insights(&req),
3289 "ListContributorInsights" => self.list_contributor_insights(&req),
3290 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
3292 "DescribeExport" => self.describe_export(&req),
3293 "ListExports" => self.list_exports(&req),
3294 "ImportTable" => self.import_table(&req),
3295 "DescribeImport" => self.describe_import(&req),
3296 "ListImports" => self.list_imports(&req),
3297 _ => Err(AwsServiceError::action_not_implemented(
3298 "dynamodb",
3299 &req.action,
3300 )),
3301 }
3302 }
3303
3304 fn supported_actions(&self) -> &[&str] {
3305 &[
3306 "CreateTable",
3307 "DeleteTable",
3308 "DescribeTable",
3309 "ListTables",
3310 "UpdateTable",
3311 "PutItem",
3312 "GetItem",
3313 "DeleteItem",
3314 "UpdateItem",
3315 "Query",
3316 "Scan",
3317 "BatchGetItem",
3318 "BatchWriteItem",
3319 "TagResource",
3320 "UntagResource",
3321 "ListTagsOfResource",
3322 "TransactGetItems",
3323 "TransactWriteItems",
3324 "ExecuteStatement",
3325 "BatchExecuteStatement",
3326 "ExecuteTransaction",
3327 "UpdateTimeToLive",
3328 "DescribeTimeToLive",
3329 "PutResourcePolicy",
3330 "GetResourcePolicy",
3331 "DeleteResourcePolicy",
3332 "DescribeEndpoints",
3333 "DescribeLimits",
3334 "CreateBackup",
3335 "DeleteBackup",
3336 "DescribeBackup",
3337 "ListBackups",
3338 "RestoreTableFromBackup",
3339 "RestoreTableToPointInTime",
3340 "UpdateContinuousBackups",
3341 "DescribeContinuousBackups",
3342 "CreateGlobalTable",
3343 "DescribeGlobalTable",
3344 "DescribeGlobalTableSettings",
3345 "ListGlobalTables",
3346 "UpdateGlobalTable",
3347 "UpdateGlobalTableSettings",
3348 "DescribeTableReplicaAutoScaling",
3349 "UpdateTableReplicaAutoScaling",
3350 "EnableKinesisStreamingDestination",
3351 "DisableKinesisStreamingDestination",
3352 "DescribeKinesisStreamingDestination",
3353 "UpdateKinesisStreamingDestination",
3354 "DescribeContributorInsights",
3355 "UpdateContributorInsights",
3356 "ListContributorInsights",
3357 "ExportTableToPointInTime",
3358 "DescribeExport",
3359 "ListExports",
3360 "ImportTable",
3361 "DescribeImport",
3362 "ListImports",
3363 ]
3364 }
3365}
3366
3367fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
3370 body[field].as_str().ok_or_else(|| {
3371 AwsServiceError::aws_error(
3372 StatusCode::BAD_REQUEST,
3373 "ValidationException",
3374 format!("{field} is required"),
3375 )
3376 })
3377}
3378
3379fn require_object(
3380 body: &Value,
3381 field: &str,
3382) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
3383 let obj = body[field].as_object().ok_or_else(|| {
3384 AwsServiceError::aws_error(
3385 StatusCode::BAD_REQUEST,
3386 "ValidationException",
3387 format!("{field} is required"),
3388 )
3389 })?;
3390 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3391}
3392
3393fn get_table<'a>(
3394 tables: &'a HashMap<String, DynamoTable>,
3395 name: &str,
3396) -> Result<&'a DynamoTable, AwsServiceError> {
3397 tables.get(name).ok_or_else(|| {
3398 AwsServiceError::aws_error(
3399 StatusCode::BAD_REQUEST,
3400 "ResourceNotFoundException",
3401 format!("Requested resource not found: Table: {name} not found"),
3402 )
3403 })
3404}
3405
3406fn get_table_mut<'a>(
3407 tables: &'a mut HashMap<String, DynamoTable>,
3408 name: &str,
3409) -> Result<&'a mut DynamoTable, AwsServiceError> {
3410 tables.get_mut(name).ok_or_else(|| {
3411 AwsServiceError::aws_error(
3412 StatusCode::BAD_REQUEST,
3413 "ResourceNotFoundException",
3414 format!("Requested resource not found: Table: {name} not found"),
3415 )
3416 })
3417}
3418
3419fn find_table_by_arn<'a>(
3420 tables: &'a HashMap<String, DynamoTable>,
3421 arn: &str,
3422) -> Result<&'a DynamoTable, AwsServiceError> {
3423 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
3424 AwsServiceError::aws_error(
3425 StatusCode::BAD_REQUEST,
3426 "ResourceNotFoundException",
3427 format!("Requested resource not found: {arn}"),
3428 )
3429 })
3430}
3431
3432fn find_table_by_arn_mut<'a>(
3433 tables: &'a mut HashMap<String, DynamoTable>,
3434 arn: &str,
3435) -> Result<&'a mut DynamoTable, AwsServiceError> {
3436 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
3437 AwsServiceError::aws_error(
3438 StatusCode::BAD_REQUEST,
3439 "ResourceNotFoundException",
3440 format!("Requested resource not found: {arn}"),
3441 )
3442 })
3443}
3444
3445fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
3446 let arr = val.as_array().ok_or_else(|| {
3447 AwsServiceError::aws_error(
3448 StatusCode::BAD_REQUEST,
3449 "ValidationException",
3450 "KeySchema is required",
3451 )
3452 })?;
3453 Ok(arr
3454 .iter()
3455 .map(|elem| KeySchemaElement {
3456 attribute_name: elem["AttributeName"]
3457 .as_str()
3458 .unwrap_or_default()
3459 .to_string(),
3460 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
3461 })
3462 .collect())
3463}
3464
3465fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
3466 let arr = val.as_array().ok_or_else(|| {
3467 AwsServiceError::aws_error(
3468 StatusCode::BAD_REQUEST,
3469 "ValidationException",
3470 "AttributeDefinitions is required",
3471 )
3472 })?;
3473 Ok(arr
3474 .iter()
3475 .map(|elem| AttributeDefinition {
3476 attribute_name: elem["AttributeName"]
3477 .as_str()
3478 .unwrap_or_default()
3479 .to_string(),
3480 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
3481 })
3482 .collect())
3483}
3484
3485fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
3486 Ok(ProvisionedThroughput {
3487 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
3488 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
3489 })
3490}
3491
3492fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
3493 let Some(arr) = val.as_array() else {
3494 return Vec::new();
3495 };
3496 arr.iter()
3497 .filter_map(|g| {
3498 Some(GlobalSecondaryIndex {
3499 index_name: g["IndexName"].as_str()?.to_string(),
3500 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
3501 projection: parse_projection(&g["Projection"]),
3502 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
3503 .ok(),
3504 })
3505 })
3506 .collect()
3507}
3508
3509fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
3510 let Some(arr) = val.as_array() else {
3511 return Vec::new();
3512 };
3513 arr.iter()
3514 .filter_map(|l| {
3515 Some(LocalSecondaryIndex {
3516 index_name: l["IndexName"].as_str()?.to_string(),
3517 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
3518 projection: parse_projection(&l["Projection"]),
3519 })
3520 })
3521 .collect()
3522}
3523
3524fn parse_projection(val: &Value) -> Projection {
3525 Projection {
3526 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
3527 non_key_attributes: val["NonKeyAttributes"]
3528 .as_array()
3529 .map(|arr| {
3530 arr.iter()
3531 .filter_map(|v| v.as_str().map(|s| s.to_string()))
3532 .collect()
3533 })
3534 .unwrap_or_default(),
3535 }
3536}
3537
3538fn parse_tags(val: &Value) -> HashMap<String, String> {
3539 let mut tags = HashMap::new();
3540 if let Some(arr) = val.as_array() {
3541 for tag in arr {
3542 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
3543 tags.insert(k.to_string(), v.to_string());
3544 }
3545 }
3546 }
3547 tags
3548}
3549
3550fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
3551 let mut names = HashMap::new();
3552 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
3553 for (k, v) in obj {
3554 if let Some(s) = v.as_str() {
3555 names.insert(k.clone(), s.to_string());
3556 }
3557 }
3558 }
3559 names
3560}
3561
3562fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
3563 let mut values = HashMap::new();
3564 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
3565 for (k, v) in obj {
3566 values.insert(k.clone(), v.clone());
3567 }
3568 }
3569 values
3570}
3571
3572fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
3573 if name.starts_with('#') {
3574 expr_attr_names
3575 .get(name)
3576 .cloned()
3577 .unwrap_or_else(|| name.to_string())
3578 } else {
3579 name.to_string()
3580 }
3581}
3582
3583fn extract_key(
3584 table: &DynamoTable,
3585 item: &HashMap<String, AttributeValue>,
3586) -> HashMap<String, AttributeValue> {
3587 let mut key = HashMap::new();
3588 let hash_key = table.hash_key_name();
3589 if let Some(v) = item.get(hash_key) {
3590 key.insert(hash_key.to_string(), v.clone());
3591 }
3592 if let Some(range_key) = table.range_key_name() {
3593 if let Some(v) = item.get(range_key) {
3594 key.insert(range_key.to_string(), v.clone());
3595 }
3596 }
3597 key
3598}
3599
3600fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
3602 let obj = value.as_object()?;
3603 if obj.is_empty() {
3604 return None;
3605 }
3606 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
3607}
3608
3609fn item_matches_key(
3611 item: &HashMap<String, AttributeValue>,
3612 key: &HashMap<String, AttributeValue>,
3613 hash_key_name: &str,
3614 range_key_name: Option<&str>,
3615) -> bool {
3616 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
3617 (Some(a), Some(b)) => a == b,
3618 _ => false,
3619 };
3620 if !hash_match {
3621 return false;
3622 }
3623 match range_key_name {
3624 Some(rk) => match (item.get(rk), key.get(rk)) {
3625 (Some(a), Some(b)) => a == b,
3626 (None, None) => true,
3627 _ => false,
3628 },
3629 None => true,
3630 }
3631}
3632
3633fn extract_key_for_schema(
3635 item: &HashMap<String, AttributeValue>,
3636 hash_key_name: &str,
3637 range_key_name: Option<&str>,
3638) -> HashMap<String, AttributeValue> {
3639 let mut key = HashMap::new();
3640 if let Some(v) = item.get(hash_key_name) {
3641 key.insert(hash_key_name.to_string(), v.clone());
3642 }
3643 if let Some(rk) = range_key_name {
3644 if let Some(v) = item.get(rk) {
3645 key.insert(rk.to_string(), v.clone());
3646 }
3647 }
3648 key
3649}
3650
3651fn validate_key_in_item(
3652 table: &DynamoTable,
3653 item: &HashMap<String, AttributeValue>,
3654) -> Result<(), AwsServiceError> {
3655 let hash_key = table.hash_key_name();
3656 if !item.contains_key(hash_key) {
3657 return Err(AwsServiceError::aws_error(
3658 StatusCode::BAD_REQUEST,
3659 "ValidationException",
3660 format!("Missing the key {hash_key} in the item"),
3661 ));
3662 }
3663 if let Some(range_key) = table.range_key_name() {
3664 if !item.contains_key(range_key) {
3665 return Err(AwsServiceError::aws_error(
3666 StatusCode::BAD_REQUEST,
3667 "ValidationException",
3668 format!("Missing the key {range_key} in the item"),
3669 ));
3670 }
3671 }
3672 Ok(())
3673}
3674
3675fn validate_key_attributes_in_key(
3676 table: &DynamoTable,
3677 key: &HashMap<String, AttributeValue>,
3678) -> Result<(), AwsServiceError> {
3679 let hash_key = table.hash_key_name();
3680 if !key.contains_key(hash_key) {
3681 return Err(AwsServiceError::aws_error(
3682 StatusCode::BAD_REQUEST,
3683 "ValidationException",
3684 format!("Missing the key {hash_key} in the item"),
3685 ));
3686 }
3687 Ok(())
3688}
3689
3690fn project_item(
3691 item: &HashMap<String, AttributeValue>,
3692 body: &Value,
3693) -> HashMap<String, AttributeValue> {
3694 let projection = body["ProjectionExpression"].as_str();
3695 match projection {
3696 Some(proj) if !proj.is_empty() => {
3697 let expr_attr_names = parse_expression_attribute_names(body);
3698 let attrs: Vec<String> = proj
3699 .split(',')
3700 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
3701 .collect();
3702 let mut result = HashMap::new();
3703 for attr in &attrs {
3704 if let Some(v) = resolve_nested_path(item, attr) {
3705 insert_nested_value(&mut result, attr, v);
3706 }
3707 }
3708 result
3709 }
3710 _ => item.clone(),
3711 }
3712}
3713
3714fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
3717 let mut result = String::new();
3719 for (i, segment) in path.split('.').enumerate() {
3720 if i > 0 {
3721 result.push('.');
3722 }
3723 if let Some(bracket_pos) = segment.find('[') {
3725 let key_part = &segment[..bracket_pos];
3726 let index_part = &segment[bracket_pos..];
3727 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
3728 result.push_str(index_part);
3729 } else {
3730 result.push_str(&resolve_attr_name(segment, expr_attr_names));
3731 }
3732 }
3733 result
3734}
3735
3736fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
3738 let segments = parse_path_segments(path);
3739 if segments.is_empty() {
3740 return None;
3741 }
3742
3743 let first = &segments[0];
3744 let top_key = match first {
3745 PathSegment::Key(k) => k.as_str(),
3746 _ => return None,
3747 };
3748
3749 let mut current = item.get(top_key)?.clone();
3750
3751 for segment in &segments[1..] {
3752 match segment {
3753 PathSegment::Key(k) => {
3754 current = current.get("M")?.get(k)?.clone();
3756 }
3757 PathSegment::Index(idx) => {
3758 current = current.get("L")?.get(*idx)?.clone();
3760 }
3761 }
3762 }
3763
3764 Some(current)
3765}
3766
3767#[derive(Debug)]
3768enum PathSegment {
3769 Key(String),
3770 Index(usize),
3771}
3772
3773fn parse_path_segments(path: &str) -> Vec<PathSegment> {
3775 let mut segments = Vec::new();
3776 let mut current = String::new();
3777
3778 let chars: Vec<char> = path.chars().collect();
3779 let mut i = 0;
3780 while i < chars.len() {
3781 match chars[i] {
3782 '.' => {
3783 if !current.is_empty() {
3784 segments.push(PathSegment::Key(current.clone()));
3785 current.clear();
3786 }
3787 }
3788 '[' => {
3789 if !current.is_empty() {
3790 segments.push(PathSegment::Key(current.clone()));
3791 current.clear();
3792 }
3793 i += 1;
3794 let mut num = String::new();
3795 while i < chars.len() && chars[i] != ']' {
3796 num.push(chars[i]);
3797 i += 1;
3798 }
3799 if let Ok(idx) = num.parse::<usize>() {
3800 segments.push(PathSegment::Index(idx));
3801 }
3802 }
3804 c => {
3805 current.push(c);
3806 }
3807 }
3808 i += 1;
3809 }
3810 if !current.is_empty() {
3811 segments.push(PathSegment::Key(current));
3812 }
3813 segments
3814}
3815
3816fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
3819 if !path.contains('.') && !path.contains('[') {
3821 result.insert(path.to_string(), value);
3822 return;
3823 }
3824
3825 let segments = parse_path_segments(path);
3826 if segments.is_empty() {
3827 return;
3828 }
3829
3830 let top_key = match &segments[0] {
3831 PathSegment::Key(k) => k.clone(),
3832 _ => return,
3833 };
3834
3835 if segments.len() == 1 {
3836 result.insert(top_key, value);
3837 return;
3838 }
3839
3840 let wrapped = wrap_value_in_path(&segments[1..], value);
3842 let existing = result.remove(&top_key);
3844 let merged = match existing {
3845 Some(existing) => merge_attribute_values(existing, wrapped),
3846 None => wrapped,
3847 };
3848 result.insert(top_key, merged);
3849}
3850
3851fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
3853 if segments.is_empty() {
3854 return value;
3855 }
3856 let inner = wrap_value_in_path(&segments[1..], value);
3857 match &segments[0] {
3858 PathSegment::Key(k) => {
3859 json!({"M": {k.clone(): inner}})
3860 }
3861 PathSegment::Index(idx) => {
3862 let mut arr = vec![Value::Null; idx + 1];
3863 arr[*idx] = inner;
3864 json!({"L": arr})
3865 }
3866 }
3867}
3868
3869fn merge_attribute_values(a: Value, b: Value) -> Value {
3871 if let (Some(a_map), Some(b_map)) = (
3872 a.get("M").and_then(|v| v.as_object()),
3873 b.get("M").and_then(|v| v.as_object()),
3874 ) {
3875 let mut merged = a_map.clone();
3876 for (k, v) in b_map {
3877 if let Some(existing) = merged.get(k) {
3878 merged.insert(
3879 k.clone(),
3880 merge_attribute_values(existing.clone(), v.clone()),
3881 );
3882 } else {
3883 merged.insert(k.clone(), v.clone());
3884 }
3885 }
3886 json!({"M": merged})
3887 } else {
3888 b
3889 }
3890}
3891
3892fn evaluate_condition(
3893 condition: &str,
3894 existing: Option<&HashMap<String, AttributeValue>>,
3895 expr_attr_names: &HashMap<String, String>,
3896 expr_attr_values: &HashMap<String, Value>,
3897) -> Result<(), AwsServiceError> {
3898 let empty = HashMap::new();
3903 let item = existing.unwrap_or(&empty);
3904 if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
3905 Ok(())
3906 } else {
3907 Err(AwsServiceError::aws_error(
3908 StatusCode::BAD_REQUEST,
3909 "ConditionalCheckFailedException",
3910 "The conditional request failed",
3911 ))
3912 }
3913}
3914
3915fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
3916 let prefix = format!("{func_name}(");
3917 if let Some(rest) = expr.strip_prefix(&prefix) {
3918 if let Some(inner) = rest.strip_suffix(')') {
3919 return Some(inner.trim());
3920 }
3921 }
3922 None
3923}
3924
3925fn evaluate_key_condition(
3926 expr: &str,
3927 item: &HashMap<String, AttributeValue>,
3928 hash_key_name: &str,
3929 _range_key_name: Option<&str>,
3930 expr_attr_names: &HashMap<String, String>,
3931 expr_attr_values: &HashMap<String, Value>,
3932) -> bool {
3933 let parts: Vec<&str> = split_on_and(expr);
3934 for part in &parts {
3935 let part = part.trim();
3936 if !evaluate_single_key_condition(
3937 part,
3938 item,
3939 hash_key_name,
3940 expr_attr_names,
3941 expr_attr_values,
3942 ) {
3943 return false;
3944 }
3945 }
3946 true
3947}
3948
3949fn split_on_and(expr: &str) -> Vec<&str> {
3950 let mut parts = Vec::new();
3951 let mut start = 0;
3952 let len = expr.len();
3953 let mut i = 0;
3954 let mut depth = 0;
3955 while i < len {
3956 let ch = expr.as_bytes()[i];
3957 if ch == b'(' {
3958 depth += 1;
3959 } else if ch == b')' {
3960 if depth > 0 {
3961 depth -= 1;
3962 }
3963 } else if depth == 0 && i + 5 <= len && expr[i..i + 5].eq_ignore_ascii_case(" AND ") {
3964 parts.push(&expr[start..i]);
3965 start = i + 5;
3966 i = start;
3967 continue;
3968 }
3969 i += 1;
3970 }
3971 parts.push(&expr[start..]);
3972 parts
3973}
3974
3975fn split_on_or(expr: &str) -> Vec<&str> {
3976 let mut parts = Vec::new();
3977 let mut start = 0;
3978 let len = expr.len();
3979 let mut i = 0;
3980 let mut depth = 0;
3981 while i < len {
3982 let ch = expr.as_bytes()[i];
3983 if ch == b'(' {
3984 depth += 1;
3985 } else if ch == b')' {
3986 if depth > 0 {
3987 depth -= 1;
3988 }
3989 } else if depth == 0 && i + 4 <= len && expr[i..i + 4].eq_ignore_ascii_case(" OR ") {
3990 parts.push(&expr[start..i]);
3991 start = i + 4;
3992 i = start;
3993 continue;
3994 }
3995 i += 1;
3996 }
3997 parts.push(&expr[start..]);
3998 parts
3999}
4000
4001fn evaluate_single_key_condition(
4002 part: &str,
4003 item: &HashMap<String, AttributeValue>,
4004 _hash_key_name: &str,
4005 expr_attr_names: &HashMap<String, String>,
4006 expr_attr_values: &HashMap<String, Value>,
4007) -> bool {
4008 let part = part.trim();
4009
4010 if let Some(rest) = part
4012 .strip_prefix("begins_with(")
4013 .or_else(|| part.strip_prefix("begins_with ("))
4014 {
4015 if let Some(inner) = rest.strip_suffix(')') {
4016 let mut split = inner.splitn(2, ',');
4017 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4018 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4019 let val_ref = val_ref.trim();
4020 let expected = expr_attr_values.get(val_ref);
4021 let actual = item.get(&attr_name);
4022 return match (actual, expected) {
4023 (Some(a), Some(e)) => {
4024 let a_str = extract_string_value(a);
4025 let e_str = extract_string_value(e);
4026 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4027 }
4028 _ => false,
4029 };
4030 }
4031 }
4032 return false;
4033 }
4034
4035 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
4037 let attr_part = part[..between_pos].trim();
4038 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
4039 let range_part = &part[between_pos + 7..];
4040 if let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") {
4041 let lo_ref = range_part[..and_pos].trim();
4042 let hi_ref = range_part[and_pos + 5..].trim();
4043 let lo = expr_attr_values.get(lo_ref);
4044 let hi = expr_attr_values.get(hi_ref);
4045 let actual = item.get(&attr_name);
4046 return match (actual, lo, hi) {
4047 (Some(a), Some(l), Some(h)) => {
4048 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
4049 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
4050 }
4051 _ => false,
4052 };
4053 }
4054 }
4055
4056 for op in &["<=", ">=", "<>", "=", "<", ">"] {
4058 if let Some(pos) = part.find(op) {
4059 let left = part[..pos].trim();
4060 let right = part[pos + op.len()..].trim();
4061 let attr_name = resolve_attr_name(left, expr_attr_names);
4062 let expected = expr_attr_values.get(right);
4063 let actual = item.get(&attr_name);
4064
4065 return match *op {
4066 "=" => actual == expected,
4067 "<>" => actual != expected,
4068 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
4069 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
4070 "<=" => {
4071 let cmp = compare_attribute_values(actual, expected);
4072 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
4073 }
4074 ">=" => {
4075 let cmp = compare_attribute_values(actual, expected);
4076 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
4077 }
4078 _ => true,
4079 };
4080 }
4081 }
4082
4083 true
4084}
4085
4086fn extract_string_value(val: &Value) -> Option<String> {
4087 val.get("S")
4088 .and_then(|v| v.as_str())
4089 .map(|s| s.to_string())
4090 .or_else(|| val.get("N").and_then(|v| v.as_str()).map(|n| n.to_string()))
4091}
4092
4093fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
4094 match (a, b) {
4095 (None, None) => std::cmp::Ordering::Equal,
4096 (None, Some(_)) => std::cmp::Ordering::Less,
4097 (Some(_), None) => std::cmp::Ordering::Greater,
4098 (Some(a), Some(b)) => {
4099 let a_type = attribute_type_and_value(a);
4100 let b_type = attribute_type_and_value(b);
4101 match (a_type, b_type) {
4102 (Some(("S", a_val)), Some(("S", b_val))) => {
4103 let a_str = a_val.as_str().unwrap_or("");
4104 let b_str = b_val.as_str().unwrap_or("");
4105 a_str.cmp(b_str)
4106 }
4107 (Some(("N", a_val)), Some(("N", b_val))) => {
4108 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4109 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
4110 a_num
4111 .partial_cmp(&b_num)
4112 .unwrap_or(std::cmp::Ordering::Equal)
4113 }
4114 (Some(("B", a_val)), Some(("B", b_val))) => {
4115 let a_str = a_val.as_str().unwrap_or("");
4116 let b_str = b_val.as_str().unwrap_or("");
4117 a_str.cmp(b_str)
4118 }
4119 _ => std::cmp::Ordering::Equal,
4120 }
4121 }
4122 }
4123}
4124
4125fn evaluate_filter_expression(
4126 expr: &str,
4127 item: &HashMap<String, AttributeValue>,
4128 expr_attr_names: &HashMap<String, String>,
4129 expr_attr_values: &HashMap<String, Value>,
4130) -> bool {
4131 let trimmed = expr.trim();
4132
4133 let or_parts = split_on_or(trimmed);
4135 if or_parts.len() > 1 {
4136 return or_parts.iter().any(|part| {
4137 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4138 });
4139 }
4140
4141 let and_parts = split_on_and(trimmed);
4143 if and_parts.len() > 1 {
4144 return and_parts.iter().all(|part| {
4145 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
4146 });
4147 }
4148
4149 let stripped = strip_outer_parens(trimmed);
4151 if stripped != trimmed {
4152 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
4153 }
4154
4155 if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
4157 return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
4158 }
4159
4160 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
4161}
4162
4163fn strip_outer_parens(expr: &str) -> &str {
4165 let trimmed = expr.trim();
4166 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
4167 return trimmed;
4168 }
4169 let inner = &trimmed[1..trimmed.len() - 1];
4171 let mut depth = 0;
4172 for ch in inner.bytes() {
4173 match ch {
4174 b'(' => depth += 1,
4175 b')' => {
4176 if depth == 0 {
4177 return trimmed; }
4179 depth -= 1;
4180 }
4181 _ => {}
4182 }
4183 }
4184 if depth == 0 {
4185 inner
4186 } else {
4187 trimmed
4188 }
4189}
4190
4191fn evaluate_single_filter_condition(
4192 part: &str,
4193 item: &HashMap<String, AttributeValue>,
4194 expr_attr_names: &HashMap<String, String>,
4195 expr_attr_values: &HashMap<String, Value>,
4196) -> bool {
4197 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
4198 let attr = resolve_attr_name(inner, expr_attr_names);
4199 return item.contains_key(&attr);
4200 }
4201
4202 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
4203 let attr = resolve_attr_name(inner, expr_attr_names);
4204 return !item.contains_key(&attr);
4205 }
4206
4207 if let Some(rest) = part
4208 .strip_prefix("begins_with(")
4209 .or_else(|| part.strip_prefix("begins_with ("))
4210 {
4211 if let Some(inner) = rest.strip_suffix(')') {
4212 let mut split = inner.splitn(2, ',');
4213 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4214 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4215 let expected = expr_attr_values.get(val_ref.trim());
4216 let actual = item.get(&attr_name);
4217 return match (actual, expected) {
4218 (Some(a), Some(e)) => {
4219 let a_str = extract_string_value(a);
4220 let e_str = extract_string_value(e);
4221 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(&e))
4222 }
4223 _ => false,
4224 };
4225 }
4226 }
4227 }
4228
4229 if let Some(rest) = part
4230 .strip_prefix("contains(")
4231 .or_else(|| part.strip_prefix("contains ("))
4232 {
4233 if let Some(inner) = rest.strip_suffix(')') {
4234 let mut split = inner.splitn(2, ',');
4235 if let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) {
4236 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4237 let expected = expr_attr_values.get(val_ref.trim());
4238 let actual = item.get(&attr_name);
4239 return match (actual, expected) {
4240 (Some(a), Some(e)) => {
4241 let a_str = extract_string_value(a);
4242 let e_str = extract_string_value(e);
4243 matches!((a_str, e_str), (Some(a), Some(e)) if a.contains(&e))
4244 }
4245 _ => false,
4246 };
4247 }
4248 }
4249 }
4250
4251 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
4252}
4253
4254fn apply_update_expression(
4255 item: &mut HashMap<String, AttributeValue>,
4256 expr: &str,
4257 expr_attr_names: &HashMap<String, String>,
4258 expr_attr_values: &HashMap<String, Value>,
4259) -> Result<(), AwsServiceError> {
4260 let clauses = parse_update_clauses(expr);
4261 for (action, assignments) in &clauses {
4262 match action.to_ascii_uppercase().as_str() {
4263 "SET" => {
4264 for assignment in assignments {
4265 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4266 }
4267 }
4268 "REMOVE" => {
4269 for attr_ref in assignments {
4270 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
4271 item.remove(&attr);
4272 }
4273 }
4274 "ADD" => {
4275 for assignment in assignments {
4276 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4277 }
4278 }
4279 "DELETE" => {
4280 for assignment in assignments {
4281 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
4282 }
4283 }
4284 _ => {}
4285 }
4286 }
4287 Ok(())
4288}
4289
4290fn parse_update_clauses(expr: &str) -> Vec<(String, Vec<String>)> {
4291 let mut clauses: Vec<(String, Vec<String>)> = Vec::new();
4292 let upper = expr.to_ascii_uppercase();
4293 let keywords = ["SET", "REMOVE", "ADD", "DELETE"];
4294 let mut positions: Vec<(usize, &str)> = Vec::new();
4295
4296 for kw in &keywords {
4297 let mut search_from = 0;
4298 while let Some(pos) = upper[search_from..].find(kw) {
4299 let abs_pos = search_from + pos;
4300 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
4301 let after_pos = abs_pos + kw.len();
4302 let after_ok =
4303 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
4304 if before_ok && after_ok {
4305 positions.push((abs_pos, kw));
4306 }
4307 search_from = abs_pos + kw.len();
4308 }
4309 }
4310
4311 positions.sort_by_key(|(pos, _)| *pos);
4312
4313 for (i, &(pos, kw)) in positions.iter().enumerate() {
4314 let start = pos + kw.len();
4315 let end = if i + 1 < positions.len() {
4316 positions[i + 1].0
4317 } else {
4318 expr.len()
4319 };
4320 let content = expr[start..end].trim();
4321 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
4322 clauses.push((kw.to_string(), assignments));
4323 }
4324
4325 clauses
4326}
4327
4328fn apply_set_assignment(
4329 item: &mut HashMap<String, AttributeValue>,
4330 assignment: &str,
4331 expr_attr_names: &HashMap<String, String>,
4332 expr_attr_values: &HashMap<String, Value>,
4333) -> Result<(), AwsServiceError> {
4334 let Some((left, right)) = assignment.split_once('=') else {
4335 return Ok(());
4336 };
4337
4338 let attr = resolve_attr_name(left.trim(), expr_attr_names);
4339 let right = right.trim();
4340
4341 if let Some(rest) = right
4343 .strip_prefix("if_not_exists(")
4344 .or_else(|| right.strip_prefix("if_not_exists ("))
4345 {
4346 if let Some(inner) = rest.strip_suffix(')') {
4347 let mut split = inner.splitn(2, ',');
4348 if let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) {
4349 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
4350 if !item.contains_key(&check_name) {
4351 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
4352 item.insert(attr, val.clone());
4353 }
4354 }
4355 return Ok(());
4356 }
4357 }
4358 }
4359
4360 if let Some(rest) = right
4362 .strip_prefix("list_append(")
4363 .or_else(|| right.strip_prefix("list_append ("))
4364 {
4365 if let Some(inner) = rest.strip_suffix(')') {
4366 let mut split = inner.splitn(2, ',');
4367 if let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) {
4368 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
4369 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
4370
4371 let mut merged = Vec::new();
4372 if let Some(Value::Object(obj)) = &a_val {
4373 if let Some(Value::Array(arr)) = obj.get("L") {
4374 merged.extend(arr.clone());
4375 }
4376 }
4377 if let Some(Value::Object(obj)) = &b_val {
4378 if let Some(Value::Array(arr)) = obj.get("L") {
4379 merged.extend(arr.clone());
4380 }
4381 }
4382
4383 item.insert(attr, json!({"L": merged}));
4384 return Ok(());
4385 }
4386 }
4387 }
4388
4389 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
4391 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
4392 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
4393
4394 let left_num = extract_number(&left_val).unwrap_or(0.0);
4395 let right_num = extract_number(&right_val).unwrap_or(0.0);
4396
4397 let result = if is_add {
4398 left_num + right_num
4399 } else {
4400 left_num - right_num
4401 };
4402
4403 let num_str = if result == result.trunc() {
4404 format!("{}", result as i64)
4405 } else {
4406 format!("{result}")
4407 };
4408
4409 item.insert(attr, json!({"N": num_str}));
4410 return Ok(());
4411 }
4412
4413 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
4415 if let Some(v) = val {
4416 item.insert(attr, v);
4417 }
4418
4419 Ok(())
4420}
4421
4422fn resolve_value(
4423 reference: &str,
4424 item: &HashMap<String, AttributeValue>,
4425 expr_attr_names: &HashMap<String, String>,
4426 expr_attr_values: &HashMap<String, Value>,
4427) -> Option<Value> {
4428 let reference = reference.trim();
4429 if reference.starts_with(':') {
4430 expr_attr_values.get(reference).cloned()
4431 } else {
4432 let attr_name = resolve_attr_name(reference, expr_attr_names);
4433 item.get(&attr_name).cloned()
4434 }
4435}
4436
4437fn extract_number(val: &Option<Value>) -> Option<f64> {
4438 val.as_ref()
4439 .and_then(|v| v.get("N"))
4440 .and_then(|n| n.as_str())
4441 .and_then(|s| s.parse().ok())
4442}
4443
4444fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
4445 let mut depth = 0;
4446 for (i, c) in expr.char_indices() {
4447 match c {
4448 '(' => depth += 1,
4449 ')' => depth -= 1,
4450 '+' if depth == 0 && i > 0 => {
4451 return Some((&expr[..i], &expr[i + 1..], true));
4452 }
4453 '-' if depth == 0 && i > 0 => {
4454 return Some((&expr[..i], &expr[i + 1..], false));
4455 }
4456 _ => {}
4457 }
4458 }
4459 None
4460}
4461
4462fn apply_add_assignment(
4463 item: &mut HashMap<String, AttributeValue>,
4464 assignment: &str,
4465 expr_attr_names: &HashMap<String, String>,
4466 expr_attr_values: &HashMap<String, Value>,
4467) -> Result<(), AwsServiceError> {
4468 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4469 if parts.len() != 2 {
4470 return Ok(());
4471 }
4472
4473 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4474 let val_ref = parts[1].trim();
4475 let add_val = expr_attr_values.get(val_ref);
4476
4477 if let Some(add_val) = add_val {
4478 if let Some(existing) = item.get(&attr) {
4479 if let (Some(existing_num), Some(add_num)) = (
4480 extract_number(&Some(existing.clone())),
4481 extract_number(&Some(add_val.clone())),
4482 ) {
4483 let result = existing_num + add_num;
4484 let num_str = if result == result.trunc() {
4485 format!("{}", result as i64)
4486 } else {
4487 format!("{result}")
4488 };
4489 item.insert(attr, json!({"N": num_str}));
4490 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
4491 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
4492 let mut merged: Vec<Value> = existing_set.clone();
4493 for v in add_set {
4494 if !merged.contains(v) {
4495 merged.push(v.clone());
4496 }
4497 }
4498 item.insert(attr, json!({"SS": merged}));
4499 }
4500 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
4501 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
4502 let mut merged: Vec<Value> = existing_set.clone();
4503 for v in add_set {
4504 if !merged.contains(v) {
4505 merged.push(v.clone());
4506 }
4507 }
4508 item.insert(attr, json!({"NS": merged}));
4509 }
4510 }
4511 } else {
4512 item.insert(attr, add_val.clone());
4513 }
4514 }
4515
4516 Ok(())
4517}
4518
4519fn apply_delete_assignment(
4520 item: &mut HashMap<String, AttributeValue>,
4521 assignment: &str,
4522 expr_attr_names: &HashMap<String, String>,
4523 expr_attr_values: &HashMap<String, Value>,
4524) -> Result<(), AwsServiceError> {
4525 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
4526 if parts.len() != 2 {
4527 return Ok(());
4528 }
4529
4530 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
4531 let val_ref = parts[1].trim();
4532 let del_val = expr_attr_values.get(val_ref);
4533
4534 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
4535 if let (Some(existing_set), Some(del_set)) = (
4536 existing.get("SS").and_then(|v| v.as_array()),
4537 del_val.get("SS").and_then(|v| v.as_array()),
4538 ) {
4539 let filtered: Vec<Value> = existing_set
4540 .iter()
4541 .filter(|v| !del_set.contains(v))
4542 .cloned()
4543 .collect();
4544 if filtered.is_empty() {
4545 item.remove(&attr);
4546 } else {
4547 item.insert(attr, json!({"SS": filtered}));
4548 }
4549 } else if let (Some(existing_set), Some(del_set)) = (
4550 existing.get("NS").and_then(|v| v.as_array()),
4551 del_val.get("NS").and_then(|v| v.as_array()),
4552 ) {
4553 let filtered: Vec<Value> = existing_set
4554 .iter()
4555 .filter(|v| !del_set.contains(v))
4556 .cloned()
4557 .collect();
4558 if filtered.is_empty() {
4559 item.remove(&attr);
4560 } else {
4561 item.insert(attr, json!({"NS": filtered}));
4562 }
4563 }
4564 }
4565
4566 Ok(())
4567}
4568
4569#[allow(clippy::too_many_arguments)]
4570fn build_table_description_json(
4571 arn: &str,
4572 key_schema: &[KeySchemaElement],
4573 attribute_definitions: &[AttributeDefinition],
4574 provisioned_throughput: &ProvisionedThroughput,
4575 gsi: &[GlobalSecondaryIndex],
4576 lsi: &[LocalSecondaryIndex],
4577 billing_mode: &str,
4578 created_at: chrono::DateTime<chrono::Utc>,
4579 item_count: i64,
4580 size_bytes: i64,
4581 status: &str,
4582) -> Value {
4583 let table_name = arn.rsplit('/').next().unwrap_or("");
4584 let creation_timestamp =
4585 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
4586
4587 let ks: Vec<Value> = key_schema
4588 .iter()
4589 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4590 .collect();
4591
4592 let ad: Vec<Value> = attribute_definitions
4593 .iter()
4594 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
4595 .collect();
4596
4597 let mut desc = json!({
4598 "TableName": table_name,
4599 "TableArn": arn,
4600 "TableId": uuid::Uuid::new_v4().to_string().replace('-', ""),
4601 "TableStatus": status,
4602 "KeySchema": ks,
4603 "AttributeDefinitions": ad,
4604 "CreationDateTime": creation_timestamp,
4605 "ItemCount": item_count,
4606 "TableSizeBytes": size_bytes,
4607 "BillingModeSummary": { "BillingMode": billing_mode },
4608 });
4609
4610 if billing_mode != "PAY_PER_REQUEST" {
4611 desc["ProvisionedThroughput"] = json!({
4612 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
4613 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
4614 "NumberOfDecreasesToday": 0,
4615 });
4616 } else {
4617 desc["ProvisionedThroughput"] = json!({
4618 "ReadCapacityUnits": 0,
4619 "WriteCapacityUnits": 0,
4620 "NumberOfDecreasesToday": 0,
4621 });
4622 }
4623
4624 if !gsi.is_empty() {
4625 let gsi_json: Vec<Value> = gsi
4626 .iter()
4627 .map(|g| {
4628 let gks: Vec<Value> = g
4629 .key_schema
4630 .iter()
4631 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4632 .collect();
4633 let mut idx = json!({
4634 "IndexName": g.index_name,
4635 "KeySchema": gks,
4636 "Projection": { "ProjectionType": g.projection.projection_type },
4637 "IndexStatus": "ACTIVE",
4638 "IndexArn": format!("{arn}/index/{}", g.index_name),
4639 "ItemCount": 0,
4640 "IndexSizeBytes": 0,
4641 });
4642 if !g.projection.non_key_attributes.is_empty() {
4643 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
4644 }
4645 if let Some(ref pt) = g.provisioned_throughput {
4646 idx["ProvisionedThroughput"] = json!({
4647 "ReadCapacityUnits": pt.read_capacity_units,
4648 "WriteCapacityUnits": pt.write_capacity_units,
4649 "NumberOfDecreasesToday": 0,
4650 });
4651 }
4652 idx
4653 })
4654 .collect();
4655 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
4656 }
4657
4658 if !lsi.is_empty() {
4659 let lsi_json: Vec<Value> = lsi
4660 .iter()
4661 .map(|l| {
4662 let lks: Vec<Value> = l
4663 .key_schema
4664 .iter()
4665 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
4666 .collect();
4667 let mut idx = json!({
4668 "IndexName": l.index_name,
4669 "KeySchema": lks,
4670 "Projection": { "ProjectionType": l.projection.projection_type },
4671 "IndexArn": format!("{arn}/index/{}", l.index_name),
4672 "ItemCount": 0,
4673 "IndexSizeBytes": 0,
4674 });
4675 if !l.projection.non_key_attributes.is_empty() {
4676 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
4677 }
4678 idx
4679 })
4680 .collect();
4681 desc["LocalSecondaryIndexes"] = json!(lsi_json);
4682 }
4683
4684 desc
4685}
4686
4687fn build_table_description(table: &DynamoTable) -> Value {
4688 let mut desc = build_table_description_json(
4689 &table.arn,
4690 &table.key_schema,
4691 &table.attribute_definitions,
4692 &table.provisioned_throughput,
4693 &table.gsi,
4694 &table.lsi,
4695 &table.billing_mode,
4696 table.created_at,
4697 table.item_count,
4698 table.size_bytes,
4699 &table.status,
4700 );
4701
4702 if table.stream_enabled {
4704 if let Some(ref stream_arn) = table.stream_arn {
4705 desc["LatestStreamArn"] = json!(stream_arn);
4706 desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
4707 }
4708 if let Some(ref view_type) = table.stream_view_type {
4709 desc["StreamSpecification"] = json!({
4710 "StreamEnabled": true,
4711 "StreamViewType": view_type,
4712 });
4713 }
4714 }
4715
4716 if let Some(ref sse_type) = table.sse_type {
4718 let mut sse_desc = json!({
4719 "Status": "ENABLED",
4720 "SSEType": sse_type,
4721 });
4722 if let Some(ref key_arn) = table.sse_kms_key_arn {
4723 sse_desc["KMSMasterKeyArn"] = json!(key_arn);
4724 }
4725 desc["SSEDescription"] = sse_desc;
4726 } else {
4727 desc["SSEDescription"] = json!({
4729 "Status": "ENABLED",
4730 "SSEType": "AES256",
4731 });
4732 }
4733
4734 desc
4735}
4736
4737fn execute_partiql_statement(
4738 state: &SharedDynamoDbState,
4739 statement: &str,
4740 parameters: &[Value],
4741) -> Result<AwsResponse, AwsServiceError> {
4742 let trimmed = statement.trim();
4743 let upper = trimmed.to_ascii_uppercase();
4744
4745 if upper.starts_with("SELECT") {
4746 execute_partiql_select(state, trimmed, parameters)
4747 } else if upper.starts_with("INSERT") {
4748 execute_partiql_insert(state, trimmed, parameters)
4749 } else if upper.starts_with("UPDATE") {
4750 execute_partiql_update(state, trimmed, parameters)
4751 } else if upper.starts_with("DELETE") {
4752 execute_partiql_delete(state, trimmed, parameters)
4753 } else {
4754 Err(AwsServiceError::aws_error(
4755 StatusCode::BAD_REQUEST,
4756 "ValidationException",
4757 format!("Unsupported PartiQL statement: {trimmed}"),
4758 ))
4759 }
4760}
4761
4762fn execute_partiql_select(
4764 state: &SharedDynamoDbState,
4765 statement: &str,
4766 parameters: &[Value],
4767) -> Result<AwsResponse, AwsServiceError> {
4768 let upper = statement.to_ascii_uppercase();
4770 let from_pos = upper.find("FROM").ok_or_else(|| {
4771 AwsServiceError::aws_error(
4772 StatusCode::BAD_REQUEST,
4773 "ValidationException",
4774 "Invalid SELECT statement: missing FROM",
4775 )
4776 })?;
4777
4778 let after_from = statement[from_pos + 4..].trim();
4779 let (table_name, rest) = parse_partiql_table_name(after_from);
4780
4781 let state = state.read();
4782 let table = get_table(&state.tables, &table_name)?;
4783
4784 let rest_upper = rest.trim().to_ascii_uppercase();
4785 if rest_upper.starts_with("WHERE") {
4786 let where_clause = rest.trim()[5..].trim();
4787 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
4788 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
4789 DynamoDbService::ok_json(json!({ "Items": items }))
4790 } else {
4791 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
4793 DynamoDbService::ok_json(json!({ "Items": items }))
4794 }
4795}
4796
4797fn execute_partiql_insert(
4798 state: &SharedDynamoDbState,
4799 statement: &str,
4800 parameters: &[Value],
4801) -> Result<AwsResponse, AwsServiceError> {
4802 let upper = statement.to_ascii_uppercase();
4805 let into_pos = upper.find("INTO").ok_or_else(|| {
4806 AwsServiceError::aws_error(
4807 StatusCode::BAD_REQUEST,
4808 "ValidationException",
4809 "Invalid INSERT statement: missing INTO",
4810 )
4811 })?;
4812
4813 let after_into = statement[into_pos + 4..].trim();
4814 let (table_name, rest) = parse_partiql_table_name(after_into);
4815
4816 let rest_upper = rest.trim().to_ascii_uppercase();
4817 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
4818 AwsServiceError::aws_error(
4819 StatusCode::BAD_REQUEST,
4820 "ValidationException",
4821 "Invalid INSERT statement: missing VALUE",
4822 )
4823 })?;
4824
4825 let value_str = rest.trim()[value_pos + 5..].trim();
4826 let item = parse_partiql_value_object(value_str, parameters)?;
4827
4828 let mut state = state.write();
4829 let table = get_table_mut(&mut state.tables, &table_name)?;
4830 let key = extract_key(table, &item);
4831 if table.find_item_index(&key).is_some() {
4832 return Err(AwsServiceError::aws_error(
4834 StatusCode::BAD_REQUEST,
4835 "DuplicateItemException",
4836 "Duplicate primary key exists in table",
4837 ));
4838 } else {
4839 table.items.push(item);
4840 }
4841 table.recalculate_stats();
4842
4843 DynamoDbService::ok_json(json!({}))
4844}
4845
4846fn execute_partiql_update(
4847 state: &SharedDynamoDbState,
4848 statement: &str,
4849 parameters: &[Value],
4850) -> Result<AwsResponse, AwsServiceError> {
4851 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
4855
4856 let rest_upper = rest.trim().to_ascii_uppercase();
4857 let set_pos = rest_upper.find("SET").ok_or_else(|| {
4858 AwsServiceError::aws_error(
4859 StatusCode::BAD_REQUEST,
4860 "ValidationException",
4861 "Invalid UPDATE statement: missing SET",
4862 )
4863 })?;
4864
4865 let after_set = rest.trim()[set_pos + 3..].trim();
4866
4867 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
4869 let (set_clause, where_clause) = if let Some(wp) = where_pos {
4870 (&after_set[..wp], after_set[wp + 5..].trim())
4871 } else {
4872 (after_set, "")
4873 };
4874
4875 let mut state = state.write();
4876 let table = get_table_mut(&mut state.tables, &table_name)?;
4877
4878 let matched_indices = if !where_clause.is_empty() {
4879 find_partiql_where_indices(table, where_clause, parameters)?
4880 } else {
4881 (0..table.items.len()).collect()
4882 };
4883
4884 let param_offset = count_params_in_str(where_clause);
4886 let assignments: Vec<&str> = set_clause.split(',').collect();
4887 for idx in &matched_indices {
4888 let mut local_offset = param_offset;
4889 for assignment in &assignments {
4890 let assignment = assignment.trim();
4891 if let Some((attr, val_str)) = assignment.split_once('=') {
4892 let attr = attr.trim().trim_matches('"');
4893 let val_str = val_str.trim();
4894 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
4895 if let Some(v) = value {
4896 table.items[*idx].insert(attr.to_string(), v);
4897 }
4898 }
4899 }
4900 }
4901 table.recalculate_stats();
4902
4903 DynamoDbService::ok_json(json!({}))
4904}
4905
4906fn execute_partiql_delete(
4907 state: &SharedDynamoDbState,
4908 statement: &str,
4909 parameters: &[Value],
4910) -> Result<AwsResponse, AwsServiceError> {
4911 let upper = statement.to_ascii_uppercase();
4913 let from_pos = upper.find("FROM").ok_or_else(|| {
4914 AwsServiceError::aws_error(
4915 StatusCode::BAD_REQUEST,
4916 "ValidationException",
4917 "Invalid DELETE statement: missing FROM",
4918 )
4919 })?;
4920
4921 let after_from = statement[from_pos + 4..].trim();
4922 let (table_name, rest) = parse_partiql_table_name(after_from);
4923
4924 let rest_upper = rest.trim().to_ascii_uppercase();
4925 if !rest_upper.starts_with("WHERE") {
4926 return Err(AwsServiceError::aws_error(
4927 StatusCode::BAD_REQUEST,
4928 "ValidationException",
4929 "DELETE requires a WHERE clause",
4930 ));
4931 }
4932 let where_clause = rest.trim()[5..].trim();
4933
4934 let mut state = state.write();
4935 let table = get_table_mut(&mut state.tables, &table_name)?;
4936
4937 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
4938 indices.sort_unstable();
4940 indices.reverse();
4941 for idx in indices {
4942 table.items.remove(idx);
4943 }
4944 table.recalculate_stats();
4945
4946 DynamoDbService::ok_json(json!({}))
4947}
4948
4949fn parse_partiql_table_name(s: &str) -> (String, &str) {
4952 let s = s.trim();
4953 if let Some(stripped) = s.strip_prefix('"') {
4954 if let Some(end) = stripped.find('"') {
4956 let name = &stripped[..end];
4957 let rest = &stripped[end + 1..];
4958 (name.to_string(), rest)
4959 } else {
4960 let end = s.find(' ').unwrap_or(s.len());
4961 (s[..end].trim_matches('"').to_string(), &s[end..])
4962 }
4963 } else {
4964 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
4965 (s[..end].to_string(), &s[end..])
4966 }
4967}
4968
4969fn evaluate_partiql_where<'a>(
4972 table: &'a DynamoTable,
4973 where_clause: &str,
4974 parameters: &[Value],
4975) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
4976 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
4977 Ok(indices.iter().map(|i| &table.items[*i]).collect())
4978}
4979
4980fn find_partiql_where_indices(
4981 table: &DynamoTable,
4982 where_clause: &str,
4983 parameters: &[Value],
4984) -> Result<Vec<usize>, AwsServiceError> {
4985 let upper = where_clause.to_uppercase();
4988 let conditions = if upper.contains(" AND ") {
4989 let mut parts = Vec::new();
4991 let mut last = 0;
4992 for (i, _) in upper.match_indices(" AND ") {
4993 parts.push(where_clause[last..i].trim());
4994 last = i + 5;
4995 }
4996 parts.push(where_clause[last..].trim());
4997 parts
4998 } else {
4999 vec![where_clause.trim()]
5000 };
5001
5002 let mut param_idx = 0usize;
5003 let mut parsed_conditions: Vec<(String, Value)> = Vec::new();
5004
5005 for cond in &conditions {
5006 let cond = cond.trim();
5007 if let Some((left, right)) = cond.split_once('=') {
5008 let attr = left.trim().trim_matches('"').to_string();
5009 let val_str = right.trim();
5010 let value = parse_partiql_literal(val_str, parameters, &mut param_idx);
5011 if let Some(v) = value {
5012 parsed_conditions.push((attr, v));
5013 }
5014 }
5015 }
5016
5017 let mut indices = Vec::new();
5018 for (i, item) in table.items.iter().enumerate() {
5019 let all_match = parsed_conditions
5020 .iter()
5021 .all(|(attr, expected)| item.get(attr) == Some(expected));
5022 if all_match {
5023 indices.push(i);
5024 }
5025 }
5026
5027 Ok(indices)
5028}
5029
5030fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
5035 let s = s.trim();
5036 if s == "?" {
5037 let idx = *param_idx;
5038 *param_idx += 1;
5039 parameters.get(idx).cloned()
5040 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
5041 let inner = &s[1..s.len() - 1];
5042 Some(json!({"S": inner}))
5043 } else if let Ok(n) = s.parse::<f64>() {
5044 let num_str = if n == n.trunc() {
5045 format!("{}", n as i64)
5046 } else {
5047 format!("{n}")
5048 };
5049 Some(json!({"N": num_str}))
5050 } else {
5051 None
5052 }
5053}
5054
5055fn parse_partiql_value_object(
5057 s: &str,
5058 parameters: &[Value],
5059) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
5060 let s = s.trim();
5061 let inner = s
5062 .strip_prefix('{')
5063 .and_then(|s| s.strip_suffix('}'))
5064 .ok_or_else(|| {
5065 AwsServiceError::aws_error(
5066 StatusCode::BAD_REQUEST,
5067 "ValidationException",
5068 "Invalid VALUE: expected object literal",
5069 )
5070 })?;
5071
5072 let mut item = HashMap::new();
5073 let mut param_idx = 0usize;
5074
5075 for pair in split_partiql_pairs(inner) {
5077 let pair = pair.trim();
5078 if pair.is_empty() {
5079 continue;
5080 }
5081 if let Some((key_part, val_part)) = pair.split_once(':') {
5082 let key = key_part
5083 .trim()
5084 .trim_matches('\'')
5085 .trim_matches('"')
5086 .to_string();
5087 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
5088 item.insert(key, val);
5089 }
5090 }
5091 }
5092
5093 Ok(item)
5094}
5095
5096fn split_partiql_pairs(s: &str) -> Vec<&str> {
5098 let mut parts = Vec::new();
5099 let mut start = 0;
5100 let mut depth = 0;
5101 let mut in_quote = false;
5102
5103 for (i, c) in s.char_indices() {
5104 match c {
5105 '\'' if !in_quote => in_quote = true,
5106 '\'' if in_quote => in_quote = false,
5107 '{' if !in_quote => depth += 1,
5108 '}' if !in_quote => depth -= 1,
5109 ',' if !in_quote && depth == 0 => {
5110 parts.push(&s[start..i]);
5111 start = i + 1;
5112 }
5113 _ => {}
5114 }
5115 }
5116 parts.push(&s[start..]);
5117 parts
5118}
5119
5120fn count_params_in_str(s: &str) -> usize {
5122 s.chars().filter(|c| *c == '?').count()
5123}
5124
5125#[cfg(test)]
5126mod tests {
5127 use super::*;
5128 use serde_json::json;
5129
5130 #[test]
5131 fn test_parse_update_clauses_set() {
5132 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
5133 assert_eq!(clauses.len(), 1);
5134 assert_eq!(clauses[0].0, "SET");
5135 assert_eq!(clauses[0].1.len(), 2);
5136 }
5137
5138 #[test]
5139 fn test_parse_update_clauses_set_and_remove() {
5140 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
5141 assert_eq!(clauses.len(), 2);
5142 assert_eq!(clauses[0].0, "SET");
5143 assert_eq!(clauses[1].0, "REMOVE");
5144 }
5145
5146 #[test]
5147 fn test_evaluate_key_condition_simple() {
5148 let mut item = HashMap::new();
5149 item.insert("pk".to_string(), json!({"S": "user1"}));
5150 item.insert("sk".to_string(), json!({"S": "order1"}));
5151
5152 let mut expr_values = HashMap::new();
5153 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
5154
5155 assert!(evaluate_key_condition(
5156 "pk = :pk",
5157 &item,
5158 "pk",
5159 Some("sk"),
5160 &HashMap::new(),
5161 &expr_values,
5162 ));
5163 }
5164
5165 #[test]
5166 fn test_compare_attribute_values_numbers() {
5167 let a = json!({"N": "10"});
5168 let b = json!({"N": "20"});
5169 assert_eq!(
5170 compare_attribute_values(Some(&a), Some(&b)),
5171 std::cmp::Ordering::Less
5172 );
5173 }
5174
5175 #[test]
5176 fn test_compare_attribute_values_strings() {
5177 let a = json!({"S": "apple"});
5178 let b = json!({"S": "banana"});
5179 assert_eq!(
5180 compare_attribute_values(Some(&a), Some(&b)),
5181 std::cmp::Ordering::Less
5182 );
5183 }
5184
5185 #[test]
5186 fn test_split_on_and() {
5187 let parts = split_on_and("pk = :pk AND sk > :sk");
5188 assert_eq!(parts.len(), 2);
5189 assert_eq!(parts[0].trim(), "pk = :pk");
5190 assert_eq!(parts[1].trim(), "sk > :sk");
5191 }
5192
5193 #[test]
5194 fn test_split_on_and_respects_parentheses() {
5195 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
5197 assert_eq!(parts.len(), 1);
5199 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
5200 }
5201
5202 #[test]
5203 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
5204 let mut item = HashMap::new();
5206 item.insert("x".to_string(), json!({"S": "no"}));
5207 item.insert("y".to_string(), json!({"S": "no"}));
5208 item.insert("z".to_string(), json!({"S": "yes"}));
5209
5210 let mut expr_values = HashMap::new();
5211 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
5212
5213 let result = evaluate_filter_expression(
5215 "(x = :yes AND y = :yes) OR z = :yes",
5216 &item,
5217 &HashMap::new(),
5218 &expr_values,
5219 );
5220 assert!(result, "should match because z = :yes is true");
5221
5222 let mut item2 = HashMap::new();
5224 item2.insert("x".to_string(), json!({"S": "no"}));
5225 item2.insert("y".to_string(), json!({"S": "no"}));
5226 item2.insert("z".to_string(), json!({"S": "no"}));
5227
5228 let result2 = evaluate_filter_expression(
5229 "(x = :yes AND y = :yes) OR z = :yes",
5230 &item2,
5231 &HashMap::new(),
5232 &expr_values,
5233 );
5234 assert!(!result2, "should not match because nothing is true");
5235 }
5236
5237 #[test]
5238 fn test_project_item_nested_path() {
5239 let mut item = HashMap::new();
5241 item.insert("pk".to_string(), json!({"S": "key1"}));
5242 item.insert(
5243 "data".to_string(),
5244 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
5245 );
5246
5247 let body = json!({
5248 "ProjectionExpression": "data[0].name"
5249 });
5250
5251 let projected = project_item(&item, &body);
5252 let name = projected
5254 .get("data")
5255 .and_then(|v| v.get("L"))
5256 .and_then(|v| v.get(0))
5257 .and_then(|v| v.get("M"))
5258 .and_then(|v| v.get("name"))
5259 .and_then(|v| v.get("S"))
5260 .and_then(|v| v.as_str());
5261 assert_eq!(name, Some("Alice"));
5262
5263 let age = projected
5265 .get("data")
5266 .and_then(|v| v.get("L"))
5267 .and_then(|v| v.get(0))
5268 .and_then(|v| v.get("M"))
5269 .and_then(|v| v.get("age"));
5270 assert!(age.is_none(), "age should not be present in projection");
5271 }
5272
5273 #[test]
5274 fn test_resolve_nested_path_map() {
5275 let mut item = HashMap::new();
5276 item.insert(
5277 "info".to_string(),
5278 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
5279 );
5280
5281 let result = resolve_nested_path(&item, "info.address.city");
5282 assert_eq!(result, Some(json!({"S": "NYC"})));
5283 }
5284
5285 #[test]
5286 fn test_resolve_nested_path_list_then_map() {
5287 let mut item = HashMap::new();
5288 item.insert(
5289 "items".to_string(),
5290 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
5291 );
5292
5293 let result = resolve_nested_path(&item, "items[0].sku");
5294 assert_eq!(result, Some(json!({"S": "ABC"})));
5295 }
5296
5297 use crate::state::SharedDynamoDbState;
5300 use parking_lot::RwLock;
5301 use std::sync::Arc;
5302
5303 fn make_service() -> DynamoDbService {
5304 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
5305 "123456789012",
5306 "us-east-1",
5307 )));
5308 DynamoDbService::new(state)
5309 }
5310
5311 fn make_request(action: &str, body: Value) -> AwsRequest {
5312 AwsRequest {
5313 service: "dynamodb".to_string(),
5314 action: action.to_string(),
5315 region: "us-east-1".to_string(),
5316 account_id: "123456789012".to_string(),
5317 request_id: "test-id".to_string(),
5318 headers: http::HeaderMap::new(),
5319 query_params: HashMap::new(),
5320 body: serde_json::to_vec(&body).unwrap().into(),
5321 path_segments: vec![],
5322 raw_path: "/".to_string(),
5323 raw_query: String::new(),
5324 method: http::Method::POST,
5325 is_query_protocol: false,
5326 access_key_id: None,
5327 }
5328 }
5329
5330 fn create_test_table(svc: &DynamoDbService) {
5331 let req = make_request(
5332 "CreateTable",
5333 json!({
5334 "TableName": "test-table",
5335 "KeySchema": [
5336 { "AttributeName": "pk", "KeyType": "HASH" }
5337 ],
5338 "AttributeDefinitions": [
5339 { "AttributeName": "pk", "AttributeType": "S" }
5340 ],
5341 "BillingMode": "PAY_PER_REQUEST"
5342 }),
5343 );
5344 svc.create_table(&req).unwrap();
5345 }
5346
5347 #[test]
5348 fn delete_item_return_values_all_old() {
5349 let svc = make_service();
5350 create_test_table(&svc);
5351
5352 let req = make_request(
5354 "PutItem",
5355 json!({
5356 "TableName": "test-table",
5357 "Item": {
5358 "pk": { "S": "key1" },
5359 "name": { "S": "Alice" },
5360 "age": { "N": "30" }
5361 }
5362 }),
5363 );
5364 svc.put_item(&req).unwrap();
5365
5366 let req = make_request(
5368 "DeleteItem",
5369 json!({
5370 "TableName": "test-table",
5371 "Key": { "pk": { "S": "key1" } },
5372 "ReturnValues": "ALL_OLD"
5373 }),
5374 );
5375 let resp = svc.delete_item(&req).unwrap();
5376 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5377
5378 let attrs = &body["Attributes"];
5380 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
5381 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
5382 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
5383
5384 let req = make_request(
5386 "GetItem",
5387 json!({
5388 "TableName": "test-table",
5389 "Key": { "pk": { "S": "key1" } }
5390 }),
5391 );
5392 let resp = svc.get_item(&req).unwrap();
5393 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5394 assert!(body.get("Item").is_none(), "item should be deleted");
5395 }
5396
5397 #[test]
5398 fn transact_get_items_returns_existing_and_missing() {
5399 let svc = make_service();
5400 create_test_table(&svc);
5401
5402 let req = make_request(
5404 "PutItem",
5405 json!({
5406 "TableName": "test-table",
5407 "Item": {
5408 "pk": { "S": "exists" },
5409 "val": { "S": "hello" }
5410 }
5411 }),
5412 );
5413 svc.put_item(&req).unwrap();
5414
5415 let req = make_request(
5416 "TransactGetItems",
5417 json!({
5418 "TransactItems": [
5419 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
5420 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
5421 ]
5422 }),
5423 );
5424 let resp = svc.transact_get_items(&req).unwrap();
5425 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5426 let responses = body["Responses"].as_array().unwrap();
5427 assert_eq!(responses.len(), 2);
5428 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
5429 assert!(responses[1].get("Item").is_none());
5430 }
5431
5432 #[test]
5433 fn transact_write_items_put_and_delete() {
5434 let svc = make_service();
5435 create_test_table(&svc);
5436
5437 let req = make_request(
5439 "PutItem",
5440 json!({
5441 "TableName": "test-table",
5442 "Item": {
5443 "pk": { "S": "to-delete" },
5444 "val": { "S": "bye" }
5445 }
5446 }),
5447 );
5448 svc.put_item(&req).unwrap();
5449
5450 let req = make_request(
5452 "TransactWriteItems",
5453 json!({
5454 "TransactItems": [
5455 {
5456 "Put": {
5457 "TableName": "test-table",
5458 "Item": {
5459 "pk": { "S": "new-item" },
5460 "val": { "S": "hi" }
5461 }
5462 }
5463 },
5464 {
5465 "Delete": {
5466 "TableName": "test-table",
5467 "Key": { "pk": { "S": "to-delete" } }
5468 }
5469 }
5470 ]
5471 }),
5472 );
5473 let resp = svc.transact_write_items(&req).unwrap();
5474 assert_eq!(resp.status, StatusCode::OK);
5475
5476 let req = make_request(
5478 "GetItem",
5479 json!({
5480 "TableName": "test-table",
5481 "Key": { "pk": { "S": "new-item" } }
5482 }),
5483 );
5484 let resp = svc.get_item(&req).unwrap();
5485 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5486 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
5487
5488 let req = make_request(
5490 "GetItem",
5491 json!({
5492 "TableName": "test-table",
5493 "Key": { "pk": { "S": "to-delete" } }
5494 }),
5495 );
5496 let resp = svc.get_item(&req).unwrap();
5497 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5498 assert!(body.get("Item").is_none());
5499 }
5500
5501 #[test]
5502 fn transact_write_items_condition_check_failure() {
5503 let svc = make_service();
5504 create_test_table(&svc);
5505
5506 let req = make_request(
5508 "TransactWriteItems",
5509 json!({
5510 "TransactItems": [
5511 {
5512 "ConditionCheck": {
5513 "TableName": "test-table",
5514 "Key": { "pk": { "S": "nonexistent" } },
5515 "ConditionExpression": "attribute_exists(pk)"
5516 }
5517 }
5518 ]
5519 }),
5520 );
5521 let resp = svc.transact_write_items(&req).unwrap();
5522 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
5524 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5525 assert_eq!(
5526 body["__type"].as_str().unwrap(),
5527 "TransactionCanceledException"
5528 );
5529 assert!(body["CancellationReasons"].as_array().is_some());
5530 }
5531
5532 #[test]
5533 fn update_and_describe_time_to_live() {
5534 let svc = make_service();
5535 create_test_table(&svc);
5536
5537 let req = make_request(
5539 "UpdateTimeToLive",
5540 json!({
5541 "TableName": "test-table",
5542 "TimeToLiveSpecification": {
5543 "AttributeName": "ttl",
5544 "Enabled": true
5545 }
5546 }),
5547 );
5548 let resp = svc.update_time_to_live(&req).unwrap();
5549 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5550 assert_eq!(
5551 body["TimeToLiveSpecification"]["AttributeName"]
5552 .as_str()
5553 .unwrap(),
5554 "ttl"
5555 );
5556 assert!(body["TimeToLiveSpecification"]["Enabled"]
5557 .as_bool()
5558 .unwrap());
5559
5560 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5562 let resp = svc.describe_time_to_live(&req).unwrap();
5563 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5564 assert_eq!(
5565 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5566 .as_str()
5567 .unwrap(),
5568 "ENABLED"
5569 );
5570 assert_eq!(
5571 body["TimeToLiveDescription"]["AttributeName"]
5572 .as_str()
5573 .unwrap(),
5574 "ttl"
5575 );
5576
5577 let req = make_request(
5579 "UpdateTimeToLive",
5580 json!({
5581 "TableName": "test-table",
5582 "TimeToLiveSpecification": {
5583 "AttributeName": "ttl",
5584 "Enabled": false
5585 }
5586 }),
5587 );
5588 svc.update_time_to_live(&req).unwrap();
5589
5590 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
5591 let resp = svc.describe_time_to_live(&req).unwrap();
5592 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5593 assert_eq!(
5594 body["TimeToLiveDescription"]["TimeToLiveStatus"]
5595 .as_str()
5596 .unwrap(),
5597 "DISABLED"
5598 );
5599 }
5600
5601 #[test]
5602 fn resource_policy_lifecycle() {
5603 let svc = make_service();
5604 create_test_table(&svc);
5605
5606 let table_arn = {
5607 let state = svc.state.read();
5608 state.tables.get("test-table").unwrap().arn.clone()
5609 };
5610
5611 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
5613 let req = make_request(
5614 "PutResourcePolicy",
5615 json!({
5616 "ResourceArn": table_arn,
5617 "Policy": policy_doc
5618 }),
5619 );
5620 let resp = svc.put_resource_policy(&req).unwrap();
5621 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5622 assert!(body["RevisionId"].as_str().is_some());
5623
5624 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5626 let resp = svc.get_resource_policy(&req).unwrap();
5627 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5628 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
5629
5630 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
5632 svc.delete_resource_policy(&req).unwrap();
5633
5634 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
5636 let resp = svc.get_resource_policy(&req).unwrap();
5637 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5638 assert!(body["Policy"].is_null());
5639 }
5640
5641 #[test]
5642 fn describe_endpoints() {
5643 let svc = make_service();
5644 let req = make_request("DescribeEndpoints", json!({}));
5645 let resp = svc.describe_endpoints(&req).unwrap();
5646 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5647 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
5648 }
5649
5650 #[test]
5651 fn describe_limits() {
5652 let svc = make_service();
5653 let req = make_request("DescribeLimits", json!({}));
5654 let resp = svc.describe_limits(&req).unwrap();
5655 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5656 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
5657 }
5658
5659 #[test]
5660 fn backup_lifecycle() {
5661 let svc = make_service();
5662 create_test_table(&svc);
5663
5664 let req = make_request(
5666 "CreateBackup",
5667 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
5668 );
5669 let resp = svc.create_backup(&req).unwrap();
5670 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5671 let backup_arn = body["BackupDetails"]["BackupArn"]
5672 .as_str()
5673 .unwrap()
5674 .to_string();
5675 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
5676
5677 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
5679 let resp = svc.describe_backup(&req).unwrap();
5680 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5681 assert_eq!(
5682 body["BackupDescription"]["BackupDetails"]["BackupName"],
5683 "my-backup"
5684 );
5685
5686 let req = make_request("ListBackups", json!({}));
5688 let resp = svc.list_backups(&req).unwrap();
5689 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5690 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
5691
5692 let req = make_request(
5694 "RestoreTableFromBackup",
5695 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
5696 );
5697 svc.restore_table_from_backup(&req).unwrap();
5698
5699 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
5701 let resp = svc.describe_table(&req).unwrap();
5702 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5703 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5704
5705 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
5707 svc.delete_backup(&req).unwrap();
5708
5709 let req = make_request("ListBackups", json!({}));
5711 let resp = svc.list_backups(&req).unwrap();
5712 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5713 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
5714 }
5715
5716 #[test]
5717 fn continuous_backups() {
5718 let svc = make_service();
5719 create_test_table(&svc);
5720
5721 let req = make_request(
5723 "DescribeContinuousBackups",
5724 json!({ "TableName": "test-table" }),
5725 );
5726 let resp = svc.describe_continuous_backups(&req).unwrap();
5727 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5728 assert_eq!(
5729 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5730 ["PointInTimeRecoveryStatus"],
5731 "DISABLED"
5732 );
5733
5734 let req = make_request(
5736 "UpdateContinuousBackups",
5737 json!({
5738 "TableName": "test-table",
5739 "PointInTimeRecoverySpecification": {
5740 "PointInTimeRecoveryEnabled": true
5741 }
5742 }),
5743 );
5744 svc.update_continuous_backups(&req).unwrap();
5745
5746 let req = make_request(
5748 "DescribeContinuousBackups",
5749 json!({ "TableName": "test-table" }),
5750 );
5751 let resp = svc.describe_continuous_backups(&req).unwrap();
5752 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5753 assert_eq!(
5754 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
5755 ["PointInTimeRecoveryStatus"],
5756 "ENABLED"
5757 );
5758 }
5759
5760 #[test]
5761 fn restore_table_to_point_in_time() {
5762 let svc = make_service();
5763 create_test_table(&svc);
5764
5765 let req = make_request(
5766 "RestoreTableToPointInTime",
5767 json!({
5768 "SourceTableName": "test-table",
5769 "TargetTableName": "pitr-restored"
5770 }),
5771 );
5772 svc.restore_table_to_point_in_time(&req).unwrap();
5773
5774 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
5775 let resp = svc.describe_table(&req).unwrap();
5776 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5777 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
5778 }
5779
5780 #[test]
5781 fn global_table_lifecycle() {
5782 let svc = make_service();
5783
5784 let req = make_request(
5786 "CreateGlobalTable",
5787 json!({
5788 "GlobalTableName": "my-global",
5789 "ReplicationGroup": [
5790 { "RegionName": "us-east-1" },
5791 { "RegionName": "eu-west-1" }
5792 ]
5793 }),
5794 );
5795 let resp = svc.create_global_table(&req).unwrap();
5796 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5797 assert_eq!(
5798 body["GlobalTableDescription"]["GlobalTableStatus"],
5799 "ACTIVE"
5800 );
5801
5802 let req = make_request(
5804 "DescribeGlobalTable",
5805 json!({ "GlobalTableName": "my-global" }),
5806 );
5807 let resp = svc.describe_global_table(&req).unwrap();
5808 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5809 assert_eq!(
5810 body["GlobalTableDescription"]["ReplicationGroup"]
5811 .as_array()
5812 .unwrap()
5813 .len(),
5814 2
5815 );
5816
5817 let req = make_request("ListGlobalTables", json!({}));
5819 let resp = svc.list_global_tables(&req).unwrap();
5820 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5821 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
5822
5823 let req = make_request(
5825 "UpdateGlobalTable",
5826 json!({
5827 "GlobalTableName": "my-global",
5828 "ReplicaUpdates": [
5829 { "Create": { "RegionName": "ap-southeast-1" } }
5830 ]
5831 }),
5832 );
5833 let resp = svc.update_global_table(&req).unwrap();
5834 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5835 assert_eq!(
5836 body["GlobalTableDescription"]["ReplicationGroup"]
5837 .as_array()
5838 .unwrap()
5839 .len(),
5840 3
5841 );
5842
5843 let req = make_request(
5845 "DescribeGlobalTableSettings",
5846 json!({ "GlobalTableName": "my-global" }),
5847 );
5848 let resp = svc.describe_global_table_settings(&req).unwrap();
5849 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5850 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
5851
5852 let req = make_request(
5854 "UpdateGlobalTableSettings",
5855 json!({ "GlobalTableName": "my-global" }),
5856 );
5857 svc.update_global_table_settings(&req).unwrap();
5858 }
5859
5860 #[test]
5861 fn table_replica_auto_scaling() {
5862 let svc = make_service();
5863 create_test_table(&svc);
5864
5865 let req = make_request(
5866 "DescribeTableReplicaAutoScaling",
5867 json!({ "TableName": "test-table" }),
5868 );
5869 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
5870 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5871 assert_eq!(
5872 body["TableAutoScalingDescription"]["TableName"],
5873 "test-table"
5874 );
5875
5876 let req = make_request(
5877 "UpdateTableReplicaAutoScaling",
5878 json!({ "TableName": "test-table" }),
5879 );
5880 svc.update_table_replica_auto_scaling(&req).unwrap();
5881 }
5882
5883 #[test]
5884 fn kinesis_streaming_lifecycle() {
5885 let svc = make_service();
5886 create_test_table(&svc);
5887
5888 let req = make_request(
5890 "EnableKinesisStreamingDestination",
5891 json!({
5892 "TableName": "test-table",
5893 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5894 }),
5895 );
5896 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
5897 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5898 assert_eq!(body["DestinationStatus"], "ACTIVE");
5899
5900 let req = make_request(
5902 "DescribeKinesisStreamingDestination",
5903 json!({ "TableName": "test-table" }),
5904 );
5905 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
5906 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5907 assert_eq!(
5908 body["KinesisDataStreamDestinations"]
5909 .as_array()
5910 .unwrap()
5911 .len(),
5912 1
5913 );
5914
5915 let req = make_request(
5917 "UpdateKinesisStreamingDestination",
5918 json!({
5919 "TableName": "test-table",
5920 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
5921 "UpdateKinesisStreamingConfiguration": {
5922 "ApproximateCreationDateTimePrecision": "MICROSECOND"
5923 }
5924 }),
5925 );
5926 svc.update_kinesis_streaming_destination(&req).unwrap();
5927
5928 let req = make_request(
5930 "DisableKinesisStreamingDestination",
5931 json!({
5932 "TableName": "test-table",
5933 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
5934 }),
5935 );
5936 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
5937 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5938 assert_eq!(body["DestinationStatus"], "DISABLED");
5939 }
5940
5941 #[test]
5942 fn contributor_insights_lifecycle() {
5943 let svc = make_service();
5944 create_test_table(&svc);
5945
5946 let req = make_request(
5948 "DescribeContributorInsights",
5949 json!({ "TableName": "test-table" }),
5950 );
5951 let resp = svc.describe_contributor_insights(&req).unwrap();
5952 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5953 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
5954
5955 let req = make_request(
5957 "UpdateContributorInsights",
5958 json!({
5959 "TableName": "test-table",
5960 "ContributorInsightsAction": "ENABLE"
5961 }),
5962 );
5963 let resp = svc.update_contributor_insights(&req).unwrap();
5964 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5965 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
5966
5967 let req = make_request("ListContributorInsights", json!({}));
5969 let resp = svc.list_contributor_insights(&req).unwrap();
5970 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5971 assert_eq!(
5972 body["ContributorInsightsSummaries"]
5973 .as_array()
5974 .unwrap()
5975 .len(),
5976 1
5977 );
5978 }
5979
5980 #[test]
5981 fn export_lifecycle() {
5982 let svc = make_service();
5983 create_test_table(&svc);
5984
5985 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
5986
5987 let req = make_request(
5989 "ExportTableToPointInTime",
5990 json!({
5991 "TableArn": table_arn,
5992 "S3Bucket": "my-bucket"
5993 }),
5994 );
5995 let resp = svc.export_table_to_point_in_time(&req).unwrap();
5996 let body: Value = serde_json::from_slice(&resp.body).unwrap();
5997 let export_arn = body["ExportDescription"]["ExportArn"]
5998 .as_str()
5999 .unwrap()
6000 .to_string();
6001 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
6002
6003 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
6005 let resp = svc.describe_export(&req).unwrap();
6006 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6007 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
6008
6009 let req = make_request("ListExports", json!({}));
6011 let resp = svc.list_exports(&req).unwrap();
6012 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6013 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
6014 }
6015
6016 #[test]
6017 fn import_lifecycle() {
6018 let svc = make_service();
6019
6020 let req = make_request(
6021 "ImportTable",
6022 json!({
6023 "InputFormat": "DYNAMODB_JSON",
6024 "S3BucketSource": { "S3Bucket": "import-bucket" },
6025 "TableCreationParameters": {
6026 "TableName": "imported-table",
6027 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
6028 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
6029 }
6030 }),
6031 );
6032 let resp = svc.import_table(&req).unwrap();
6033 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6034 let import_arn = body["ImportTableDescription"]["ImportArn"]
6035 .as_str()
6036 .unwrap()
6037 .to_string();
6038 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6039
6040 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
6042 let resp = svc.describe_import(&req).unwrap();
6043 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6044 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
6045
6046 let req = make_request("ListImports", json!({}));
6048 let resp = svc.list_imports(&req).unwrap();
6049 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6050 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
6051
6052 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
6054 let resp = svc.describe_table(&req).unwrap();
6055 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6056 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
6057 }
6058
6059 #[test]
6060 fn backup_restore_preserves_items() {
6061 let svc = make_service();
6062 create_test_table(&svc);
6063
6064 for i in 1..=3 {
6066 let req = make_request(
6067 "PutItem",
6068 json!({
6069 "TableName": "test-table",
6070 "Item": {
6071 "pk": { "S": format!("key{i}") },
6072 "data": { "S": format!("value{i}") }
6073 }
6074 }),
6075 );
6076 svc.put_item(&req).unwrap();
6077 }
6078
6079 let req = make_request(
6081 "CreateBackup",
6082 json!({
6083 "TableName": "test-table",
6084 "BackupName": "my-backup"
6085 }),
6086 );
6087 let resp = svc.create_backup(&req).unwrap();
6088 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6089 let backup_arn = body["BackupDetails"]["BackupArn"]
6090 .as_str()
6091 .unwrap()
6092 .to_string();
6093
6094 for i in 1..=3 {
6096 let req = make_request(
6097 "DeleteItem",
6098 json!({
6099 "TableName": "test-table",
6100 "Key": { "pk": { "S": format!("key{i}") } }
6101 }),
6102 );
6103 svc.delete_item(&req).unwrap();
6104 }
6105
6106 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6108 let resp = svc.scan(&req).unwrap();
6109 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6110 assert_eq!(body["Count"], 0);
6111
6112 let req = make_request(
6114 "RestoreTableFromBackup",
6115 json!({
6116 "BackupArn": backup_arn,
6117 "TargetTableName": "restored-table"
6118 }),
6119 );
6120 svc.restore_table_from_backup(&req).unwrap();
6121
6122 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
6124 let resp = svc.scan(&req).unwrap();
6125 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6126 assert_eq!(body["Count"], 3);
6127 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
6128 }
6129
6130 #[test]
6131 fn global_table_replicates_writes() {
6132 let svc = make_service();
6133 create_test_table(&svc);
6134
6135 let req = make_request(
6137 "CreateGlobalTable",
6138 json!({
6139 "GlobalTableName": "test-table",
6140 "ReplicationGroup": [
6141 { "RegionName": "us-east-1" },
6142 { "RegionName": "eu-west-1" }
6143 ]
6144 }),
6145 );
6146 let resp = svc.create_global_table(&req).unwrap();
6147 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6148 assert_eq!(
6149 body["GlobalTableDescription"]["GlobalTableStatus"],
6150 "ACTIVE"
6151 );
6152
6153 let req = make_request(
6155 "PutItem",
6156 json!({
6157 "TableName": "test-table",
6158 "Item": {
6159 "pk": { "S": "replicated-key" },
6160 "data": { "S": "replicated-value" }
6161 }
6162 }),
6163 );
6164 svc.put_item(&req).unwrap();
6165
6166 let req = make_request(
6168 "GetItem",
6169 json!({
6170 "TableName": "test-table",
6171 "Key": { "pk": { "S": "replicated-key" } }
6172 }),
6173 );
6174 let resp = svc.get_item(&req).unwrap();
6175 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6176 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
6177 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
6178 }
6179
6180 #[test]
6181 fn contributor_insights_tracks_access() {
6182 let svc = make_service();
6183 create_test_table(&svc);
6184
6185 let req = make_request(
6187 "UpdateContributorInsights",
6188 json!({
6189 "TableName": "test-table",
6190 "ContributorInsightsAction": "ENABLE"
6191 }),
6192 );
6193 svc.update_contributor_insights(&req).unwrap();
6194
6195 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
6197 let req = make_request(
6198 "PutItem",
6199 json!({
6200 "TableName": "test-table",
6201 "Item": {
6202 "pk": { "S": key },
6203 "data": { "S": "value" }
6204 }
6205 }),
6206 );
6207 svc.put_item(&req).unwrap();
6208 }
6209
6210 for _ in 0..3 {
6212 let req = make_request(
6213 "GetItem",
6214 json!({
6215 "TableName": "test-table",
6216 "Key": { "pk": { "S": "alpha" } }
6217 }),
6218 );
6219 svc.get_item(&req).unwrap();
6220 }
6221
6222 let req = make_request(
6224 "DescribeContributorInsights",
6225 json!({ "TableName": "test-table" }),
6226 );
6227 let resp = svc.describe_contributor_insights(&req).unwrap();
6228 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6229 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
6230
6231 let contributors = body["TopContributors"].as_array().unwrap();
6232 assert!(
6233 !contributors.is_empty(),
6234 "TopContributors should not be empty"
6235 );
6236
6237 let top = &contributors[0];
6240 assert!(top["Count"].as_u64().unwrap() > 0);
6241
6242 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
6244 assert!(!rules.is_empty());
6245 }
6246
6247 #[test]
6248 fn contributor_insights_not_tracked_when_disabled() {
6249 let svc = make_service();
6250 create_test_table(&svc);
6251
6252 let req = make_request(
6254 "PutItem",
6255 json!({
6256 "TableName": "test-table",
6257 "Item": {
6258 "pk": { "S": "key1" },
6259 "data": { "S": "value" }
6260 }
6261 }),
6262 );
6263 svc.put_item(&req).unwrap();
6264
6265 let req = make_request(
6267 "DescribeContributorInsights",
6268 json!({ "TableName": "test-table" }),
6269 );
6270 let resp = svc.describe_contributor_insights(&req).unwrap();
6271 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6272 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
6273
6274 let contributors = body["TopContributors"].as_array().unwrap();
6275 assert!(contributors.is_empty());
6276 }
6277
6278 #[test]
6279 fn contributor_insights_disabled_table_no_counters_after_scan() {
6280 let svc = make_service();
6281 create_test_table(&svc);
6282
6283 for key in &["alpha", "beta"] {
6285 let req = make_request(
6286 "PutItem",
6287 json!({
6288 "TableName": "test-table",
6289 "Item": { "pk": { "S": key } }
6290 }),
6291 );
6292 svc.put_item(&req).unwrap();
6293 }
6294
6295 let req = make_request(
6297 "UpdateContributorInsights",
6298 json!({
6299 "TableName": "test-table",
6300 "ContributorInsightsAction": "ENABLE"
6301 }),
6302 );
6303 svc.update_contributor_insights(&req).unwrap();
6304
6305 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6307 svc.scan(&req).unwrap();
6308
6309 let req = make_request(
6311 "DescribeContributorInsights",
6312 json!({ "TableName": "test-table" }),
6313 );
6314 let resp = svc.describe_contributor_insights(&req).unwrap();
6315 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6316 let contributors = body["TopContributors"].as_array().unwrap();
6317 assert!(
6318 !contributors.is_empty(),
6319 "counters should be non-empty while enabled"
6320 );
6321
6322 let req = make_request(
6324 "UpdateContributorInsights",
6325 json!({
6326 "TableName": "test-table",
6327 "ContributorInsightsAction": "DISABLE"
6328 }),
6329 );
6330 svc.update_contributor_insights(&req).unwrap();
6331
6332 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6334 svc.scan(&req).unwrap();
6335
6336 let req = make_request(
6338 "DescribeContributorInsights",
6339 json!({ "TableName": "test-table" }),
6340 );
6341 let resp = svc.describe_contributor_insights(&req).unwrap();
6342 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6343 let contributors = body["TopContributors"].as_array().unwrap();
6344 assert!(
6345 contributors.is_empty(),
6346 "counters should be empty after disabling insights"
6347 );
6348 }
6349
6350 #[test]
6351 fn scan_pagination_with_limit() {
6352 let svc = make_service();
6353 create_test_table(&svc);
6354
6355 for i in 0..5 {
6357 let req = make_request(
6358 "PutItem",
6359 json!({
6360 "TableName": "test-table",
6361 "Item": {
6362 "pk": { "S": format!("item{i}") },
6363 "data": { "S": format!("value{i}") }
6364 }
6365 }),
6366 );
6367 svc.put_item(&req).unwrap();
6368 }
6369
6370 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
6372 let resp = svc.scan(&req).unwrap();
6373 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6374 assert_eq!(body["Count"], 2);
6375 assert!(
6376 body["LastEvaluatedKey"].is_object(),
6377 "should have LastEvaluatedKey when limit < total items"
6378 );
6379 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6380
6381 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6383 let mut lek = body["LastEvaluatedKey"].clone();
6384
6385 while lek.is_object() {
6386 let req = make_request(
6387 "Scan",
6388 json!({
6389 "TableName": "test-table",
6390 "Limit": 2,
6391 "ExclusiveStartKey": lek
6392 }),
6393 );
6394 let resp = svc.scan(&req).unwrap();
6395 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6396 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6397 lek = body["LastEvaluatedKey"].clone();
6398 }
6399
6400 assert_eq!(
6401 all_items.len(),
6402 5,
6403 "should retrieve all 5 items via pagination"
6404 );
6405 }
6406
6407 #[test]
6408 fn scan_no_pagination_when_all_fit() {
6409 let svc = make_service();
6410 create_test_table(&svc);
6411
6412 for i in 0..3 {
6413 let req = make_request(
6414 "PutItem",
6415 json!({
6416 "TableName": "test-table",
6417 "Item": {
6418 "pk": { "S": format!("item{i}") }
6419 }
6420 }),
6421 );
6422 svc.put_item(&req).unwrap();
6423 }
6424
6425 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
6427 let resp = svc.scan(&req).unwrap();
6428 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6429 assert_eq!(body["Count"], 3);
6430 assert!(
6431 body["LastEvaluatedKey"].is_null(),
6432 "should not have LastEvaluatedKey when all items fit"
6433 );
6434
6435 let req = make_request("Scan", json!({ "TableName": "test-table" }));
6437 let resp = svc.scan(&req).unwrap();
6438 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6439 assert_eq!(body["Count"], 3);
6440 assert!(body["LastEvaluatedKey"].is_null());
6441 }
6442
6443 fn create_composite_table(svc: &DynamoDbService) {
6444 let req = make_request(
6445 "CreateTable",
6446 json!({
6447 "TableName": "composite-table",
6448 "KeySchema": [
6449 { "AttributeName": "pk", "KeyType": "HASH" },
6450 { "AttributeName": "sk", "KeyType": "RANGE" }
6451 ],
6452 "AttributeDefinitions": [
6453 { "AttributeName": "pk", "AttributeType": "S" },
6454 { "AttributeName": "sk", "AttributeType": "S" }
6455 ],
6456 "BillingMode": "PAY_PER_REQUEST"
6457 }),
6458 );
6459 svc.create_table(&req).unwrap();
6460 }
6461
6462 #[test]
6463 fn query_pagination_with_composite_key() {
6464 let svc = make_service();
6465 create_composite_table(&svc);
6466
6467 for i in 0..5 {
6469 let req = make_request(
6470 "PutItem",
6471 json!({
6472 "TableName": "composite-table",
6473 "Item": {
6474 "pk": { "S": "user1" },
6475 "sk": { "S": format!("item{i:03}") },
6476 "data": { "S": format!("value{i}") }
6477 }
6478 }),
6479 );
6480 svc.put_item(&req).unwrap();
6481 }
6482
6483 let req = make_request(
6485 "Query",
6486 json!({
6487 "TableName": "composite-table",
6488 "KeyConditionExpression": "pk = :pk",
6489 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6490 "Limit": 2
6491 }),
6492 );
6493 let resp = svc.query(&req).unwrap();
6494 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6495 assert_eq!(body["Count"], 2);
6496 assert!(body["LastEvaluatedKey"].is_object());
6497 assert!(body["LastEvaluatedKey"]["pk"].is_object());
6498 assert!(body["LastEvaluatedKey"]["sk"].is_object());
6499
6500 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
6502 let mut lek = body["LastEvaluatedKey"].clone();
6503
6504 while lek.is_object() {
6505 let req = make_request(
6506 "Query",
6507 json!({
6508 "TableName": "composite-table",
6509 "KeyConditionExpression": "pk = :pk",
6510 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6511 "Limit": 2,
6512 "ExclusiveStartKey": lek
6513 }),
6514 );
6515 let resp = svc.query(&req).unwrap();
6516 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6517 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
6518 lek = body["LastEvaluatedKey"].clone();
6519 }
6520
6521 assert_eq!(
6522 all_items.len(),
6523 5,
6524 "should retrieve all 5 items via pagination"
6525 );
6526
6527 let sks: Vec<String> = all_items
6529 .iter()
6530 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
6531 .collect();
6532 let mut sorted = sks.clone();
6533 sorted.sort();
6534 assert_eq!(sks, sorted, "items should be sorted by sort key");
6535 }
6536
6537 #[test]
6538 fn query_no_pagination_when_all_fit() {
6539 let svc = make_service();
6540 create_composite_table(&svc);
6541
6542 for i in 0..2 {
6543 let req = make_request(
6544 "PutItem",
6545 json!({
6546 "TableName": "composite-table",
6547 "Item": {
6548 "pk": { "S": "user1" },
6549 "sk": { "S": format!("item{i}") }
6550 }
6551 }),
6552 );
6553 svc.put_item(&req).unwrap();
6554 }
6555
6556 let req = make_request(
6557 "Query",
6558 json!({
6559 "TableName": "composite-table",
6560 "KeyConditionExpression": "pk = :pk",
6561 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
6562 "Limit": 10
6563 }),
6564 );
6565 let resp = svc.query(&req).unwrap();
6566 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6567 assert_eq!(body["Count"], 2);
6568 assert!(
6569 body["LastEvaluatedKey"].is_null(),
6570 "should not have LastEvaluatedKey when all items fit"
6571 );
6572 }
6573
6574 fn create_gsi_table(svc: &DynamoDbService) {
6575 let req = make_request(
6576 "CreateTable",
6577 json!({
6578 "TableName": "gsi-table",
6579 "KeySchema": [
6580 { "AttributeName": "pk", "KeyType": "HASH" }
6581 ],
6582 "AttributeDefinitions": [
6583 { "AttributeName": "pk", "AttributeType": "S" },
6584 { "AttributeName": "gsi_pk", "AttributeType": "S" },
6585 { "AttributeName": "gsi_sk", "AttributeType": "S" }
6586 ],
6587 "BillingMode": "PAY_PER_REQUEST",
6588 "GlobalSecondaryIndexes": [
6589 {
6590 "IndexName": "gsi-index",
6591 "KeySchema": [
6592 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
6593 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
6594 ],
6595 "Projection": { "ProjectionType": "ALL" }
6596 }
6597 ]
6598 }),
6599 );
6600 svc.create_table(&req).unwrap();
6601 }
6602
6603 #[test]
6604 fn gsi_query_last_evaluated_key_includes_table_pk() {
6605 let svc = make_service();
6606 create_gsi_table(&svc);
6607
6608 for i in 0..3 {
6610 let req = make_request(
6611 "PutItem",
6612 json!({
6613 "TableName": "gsi-table",
6614 "Item": {
6615 "pk": { "S": format!("item{i}") },
6616 "gsi_pk": { "S": "shared" },
6617 "gsi_sk": { "S": "sort" }
6618 }
6619 }),
6620 );
6621 svc.put_item(&req).unwrap();
6622 }
6623
6624 let req = make_request(
6626 "Query",
6627 json!({
6628 "TableName": "gsi-table",
6629 "IndexName": "gsi-index",
6630 "KeyConditionExpression": "gsi_pk = :v",
6631 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6632 "Limit": 1
6633 }),
6634 );
6635 let resp = svc.query(&req).unwrap();
6636 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6637 assert_eq!(body["Count"], 1);
6638 let lek = &body["LastEvaluatedKey"];
6639 assert!(lek.is_object(), "should have LastEvaluatedKey");
6640 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
6642 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
6643 assert!(
6645 lek["pk"].is_object(),
6646 "LEK must contain table PK for GSI queries"
6647 );
6648 }
6649
6650 #[test]
6651 fn gsi_query_pagination_returns_all_items() {
6652 let svc = make_service();
6653 create_gsi_table(&svc);
6654
6655 for i in 0..4 {
6657 let req = make_request(
6658 "PutItem",
6659 json!({
6660 "TableName": "gsi-table",
6661 "Item": {
6662 "pk": { "S": format!("item{i:03}") },
6663 "gsi_pk": { "S": "shared" },
6664 "gsi_sk": { "S": "sort" }
6665 }
6666 }),
6667 );
6668 svc.put_item(&req).unwrap();
6669 }
6670
6671 let mut all_pks = Vec::new();
6673 let mut lek: Option<Value> = None;
6674
6675 loop {
6676 let mut query = json!({
6677 "TableName": "gsi-table",
6678 "IndexName": "gsi-index",
6679 "KeyConditionExpression": "gsi_pk = :v",
6680 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
6681 "Limit": 2
6682 });
6683 if let Some(ref start_key) = lek {
6684 query["ExclusiveStartKey"] = start_key.clone();
6685 }
6686
6687 let req = make_request("Query", query);
6688 let resp = svc.query(&req).unwrap();
6689 let body: Value = serde_json::from_slice(&resp.body).unwrap();
6690
6691 for item in body["Items"].as_array().unwrap() {
6692 let pk = item["pk"]["S"].as_str().unwrap().to_string();
6693 all_pks.push(pk);
6694 }
6695
6696 if body["LastEvaluatedKey"].is_object() {
6697 lek = Some(body["LastEvaluatedKey"].clone());
6698 } else {
6699 break;
6700 }
6701 }
6702
6703 all_pks.sort();
6704 assert_eq!(
6705 all_pks,
6706 vec!["item000", "item001", "item002", "item003"],
6707 "pagination should return all items without duplicates"
6708 );
6709 }
6710
6711 fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
6712 pairs
6713 .iter()
6714 .map(|(k, v)| (k.to_string(), json!({"S": v})))
6715 .collect()
6716 }
6717
6718 fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
6719 pairs
6720 .iter()
6721 .map(|(k, v)| (k.to_string(), v.to_string()))
6722 .collect()
6723 }
6724
6725 fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
6726 pairs
6727 .iter()
6728 .map(|(k, v)| (k.to_string(), json!({"S": v})))
6729 .collect()
6730 }
6731
6732 #[test]
6733 fn test_evaluate_condition_bare_not_equal() {
6734 let item = cond_item(&[("state", "active")]);
6735 let names = cond_names(&[("#s", "state")]);
6736 let values = cond_values(&[(":c", "complete")]);
6737
6738 assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
6739
6740 let item2 = cond_item(&[("state", "complete")]);
6741 assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
6742 }
6743
6744 #[test]
6745 fn test_evaluate_condition_parenthesized_not_equal() {
6746 let item = cond_item(&[("state", "active")]);
6747 let names = cond_names(&[("#s", "state")]);
6748 let values = cond_values(&[(":c", "complete")]);
6749
6750 assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
6751 }
6752
6753 #[test]
6754 fn test_evaluate_condition_parenthesized_equal_mismatch() {
6755 let item = cond_item(&[("state", "active")]);
6756 let names = cond_names(&[("#s", "state")]);
6757 let values = cond_values(&[(":c", "complete")]);
6758
6759 assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
6760 }
6761
6762 #[test]
6763 fn test_evaluate_condition_compound_and() {
6764 let item = cond_item(&[("state", "active")]);
6765 let names = cond_names(&[("#s", "state")]);
6766 let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
6767
6768 assert!(
6770 evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
6771 );
6772 }
6773
6774 #[test]
6775 fn test_evaluate_condition_compound_and_mismatch() {
6776 let item = cond_item(&[("state", "inactive")]);
6777 let names = cond_names(&[("#s", "state")]);
6778 let values = cond_values(&[(":a", "active"), (":b", "active")]);
6779
6780 assert!(
6782 evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
6783 );
6784 }
6785
6786 #[test]
6787 fn test_evaluate_condition_compound_or() {
6788 let item = cond_item(&[("state", "running")]);
6789 let names = cond_names(&[("#s", "state")]);
6790 let values = cond_values(&[(":a", "active"), (":b", "idle")]);
6791
6792 assert!(
6794 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
6795 );
6796
6797 let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
6799 assert!(
6800 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
6801 );
6802 }
6803
6804 #[test]
6805 fn test_evaluate_condition_not_operator() {
6806 let item = cond_item(&[("state", "active")]);
6807 let names = cond_names(&[("#s", "state")]);
6808 let values = cond_values(&[(":c", "complete")]);
6809
6810 assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
6812
6813 assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
6815
6816 assert!(
6818 evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
6819 );
6820
6821 assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
6823 }
6824
6825 #[test]
6826 fn test_evaluate_condition_begins_with() {
6827 let item = cond_item(&[("name", "fakecloud-dynamodb")]);
6830 let names = cond_names(&[("#n", "name")]);
6831 let values = cond_values(&[(":p", "fakecloud")]);
6832
6833 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
6834
6835 let values2 = cond_values(&[(":p", "realcloud")]);
6836 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
6837 }
6838
6839 #[test]
6840 fn test_evaluate_condition_contains() {
6841 let item = cond_item(&[("tags", "alpha,beta,gamma")]);
6842 let names = cond_names(&[("#t", "tags")]);
6843 let values = cond_values(&[(":v", "beta")]);
6844
6845 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
6846
6847 let values2 = cond_values(&[(":v", "delta")]);
6848 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
6849 }
6850
6851 #[test]
6852 fn test_evaluate_condition_no_existing_item() {
6853 let names = cond_names(&[("#s", "state")]);
6856 let values = cond_values(&[(":v", "active")]);
6857
6858 assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
6859 assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
6860 assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
6862 assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
6864 }
6865
6866 #[test]
6867 fn test_evaluate_filter_not_operator() {
6868 let item = cond_item(&[("status", "pending")]);
6869 let names = cond_names(&[("#s", "status")]);
6870 let values = cond_values(&[(":v", "pending")]);
6871
6872 assert!(!evaluate_filter_expression(
6873 "NOT (#s = :v)",
6874 &item,
6875 &names,
6876 &values
6877 ));
6878 assert!(evaluate_filter_expression(
6879 "NOT (#s <> :v)",
6880 &item,
6881 &names,
6882 &values
6883 ));
6884 }
6885}