1mod batch;
2mod global_tables;
3mod items;
4mod queries;
5mod streams;
6mod tables;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use base64::Engine;
13use http::StatusCode;
14use serde_json::{json, Value};
15
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
18
19use fakecloud_persistence::S3Store;
20use fakecloud_s3::state::SharedS3State;
21
22use crate::state::{
23 attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoTable,
24 GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
25 ProvisionedThroughput, SharedDynamoDbState,
26};
27
28pub(super) struct KinesisDeliveryTarget {
35 pub destinations: Vec<KinesisDestination>,
36 pub arn: String,
37 pub name: String,
38}
39
40pub struct DynamoDbService {
41 state: SharedDynamoDbState,
42 pub(crate) s3_state: Option<SharedS3State>,
43 pub(crate) s3_store: Option<Arc<dyn S3Store>>,
44 delivery: Option<Arc<DeliveryBus>>,
45}
46
47impl DynamoDbService {
48 pub fn new(state: SharedDynamoDbState) -> Self {
49 Self {
50 state,
51 s3_state: None,
52 s3_store: None,
53 delivery: None,
54 }
55 }
56
57 pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
58 self.s3_state = Some(s3_state);
59 self
60 }
61
62 pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
63 self.s3_store = Some(store);
64 self
65 }
66
67 pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
68 self.delivery = Some(delivery);
69 self
70 }
71
72 fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
73 if table
74 .kinesis_destinations
75 .iter()
76 .any(|d| d.destination_status == "ACTIVE")
77 {
78 Some(KinesisDeliveryTarget {
79 destinations: table.kinesis_destinations.clone(),
80 arn: table.arn.clone(),
81 name: table.name.clone(),
82 })
83 } else {
84 None
85 }
86 }
87
88 pub(super) fn deliver_to_kinesis_destinations(
90 &self,
91 target: &KinesisDeliveryTarget,
92 event_name: &str,
93 keys: &HashMap<String, AttributeValue>,
94 old_image: Option<&HashMap<String, AttributeValue>>,
95 new_image: Option<&HashMap<String, AttributeValue>>,
96 ) {
97 let delivery = match &self.delivery {
98 Some(d) => d,
99 None => return,
100 };
101
102 let active_destinations: Vec<_> = target
103 .destinations
104 .iter()
105 .filter(|d| d.destination_status == "ACTIVE")
106 .collect();
107
108 if active_destinations.is_empty() {
109 return;
110 }
111
112 let mut record = json!({
113 "eventID": uuid::Uuid::new_v4().to_string(),
114 "eventName": event_name,
115 "eventVersion": "1.1",
116 "eventSource": "aws:dynamodb",
117 "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
118 "dynamodb": {
119 "Keys": keys,
120 "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
121 "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
122 "StreamViewType": "NEW_AND_OLD_IMAGES",
123 },
124 "eventSourceARN": &target.arn,
125 "tableName": &target.name,
126 });
127
128 if let Some(old) = old_image {
129 record["dynamodb"]["OldImage"] = json!(old);
130 }
131 if let Some(new) = new_image {
132 record["dynamodb"]["NewImage"] = json!(new);
133 }
134
135 let record_str = serde_json::to_string(&record).unwrap_or_default();
136 let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
137 let partition_key = serde_json::to_string(keys).unwrap_or_default();
138
139 for dest in active_destinations {
140 delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
141 }
142 }
143
144 fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
145 serde_json::from_slice(&req.body).map_err(|e| {
146 AwsServiceError::aws_error(
147 StatusCode::BAD_REQUEST,
148 "SerializationException",
149 format!("Invalid JSON: {e}"),
150 )
151 })
152 }
153
154 fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
155 Ok(AwsResponse::ok_json(body))
156 }
157}
158
159#[async_trait]
160impl AwsService for DynamoDbService {
161 fn service_name(&self) -> &str {
162 "dynamodb"
163 }
164
165 async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
166 match req.action.as_str() {
167 "CreateTable" => self.create_table(&req),
168 "DeleteTable" => self.delete_table(&req),
169 "DescribeTable" => self.describe_table(&req),
170 "ListTables" => self.list_tables(&req),
171 "UpdateTable" => self.update_table(&req),
172 "PutItem" => self.put_item(&req),
173 "GetItem" => self.get_item(&req),
174 "DeleteItem" => self.delete_item(&req),
175 "UpdateItem" => self.update_item(&req),
176 "Query" => self.query(&req),
177 "Scan" => self.scan(&req),
178 "BatchGetItem" => self.batch_get_item(&req),
179 "BatchWriteItem" => self.batch_write_item(&req),
180 "TagResource" => self.tag_resource(&req),
181 "UntagResource" => self.untag_resource(&req),
182 "ListTagsOfResource" => self.list_tags_of_resource(&req),
183 "TransactGetItems" => self.transact_get_items(&req),
184 "TransactWriteItems" => self.transact_write_items(&req),
185 "ExecuteStatement" => self.execute_statement(&req),
186 "BatchExecuteStatement" => self.batch_execute_statement(&req),
187 "ExecuteTransaction" => self.execute_transaction(&req),
188 "UpdateTimeToLive" => self.update_time_to_live(&req),
189 "DescribeTimeToLive" => self.describe_time_to_live(&req),
190 "PutResourcePolicy" => self.put_resource_policy(&req),
191 "GetResourcePolicy" => self.get_resource_policy(&req),
192 "DeleteResourcePolicy" => self.delete_resource_policy(&req),
193 "DescribeEndpoints" => self.describe_endpoints(&req),
195 "DescribeLimits" => self.describe_limits(&req),
196 "CreateBackup" => self.create_backup(&req),
198 "DeleteBackup" => self.delete_backup(&req),
199 "DescribeBackup" => self.describe_backup(&req),
200 "ListBackups" => self.list_backups(&req),
201 "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
202 "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
203 "UpdateContinuousBackups" => self.update_continuous_backups(&req),
204 "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
205 "CreateGlobalTable" => self.create_global_table(&req),
207 "DescribeGlobalTable" => self.describe_global_table(&req),
208 "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
209 "ListGlobalTables" => self.list_global_tables(&req),
210 "UpdateGlobalTable" => self.update_global_table(&req),
211 "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
212 "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
213 "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
214 "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
216 "DisableKinesisStreamingDestination" => {
217 self.disable_kinesis_streaming_destination(&req)
218 }
219 "DescribeKinesisStreamingDestination" => {
220 self.describe_kinesis_streaming_destination(&req)
221 }
222 "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
223 "DescribeContributorInsights" => self.describe_contributor_insights(&req),
225 "UpdateContributorInsights" => self.update_contributor_insights(&req),
226 "ListContributorInsights" => self.list_contributor_insights(&req),
227 "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
229 "DescribeExport" => self.describe_export(&req),
230 "ListExports" => self.list_exports(&req),
231 "ImportTable" => self.import_table(&req),
232 "DescribeImport" => self.describe_import(&req),
233 "ListImports" => self.list_imports(&req),
234 _ => Err(AwsServiceError::action_not_implemented(
235 "dynamodb",
236 &req.action,
237 )),
238 }
239 }
240
241 fn supported_actions(&self) -> &[&str] {
242 &[
243 "CreateTable",
244 "DeleteTable",
245 "DescribeTable",
246 "ListTables",
247 "UpdateTable",
248 "PutItem",
249 "GetItem",
250 "DeleteItem",
251 "UpdateItem",
252 "Query",
253 "Scan",
254 "BatchGetItem",
255 "BatchWriteItem",
256 "TagResource",
257 "UntagResource",
258 "ListTagsOfResource",
259 "TransactGetItems",
260 "TransactWriteItems",
261 "ExecuteStatement",
262 "BatchExecuteStatement",
263 "ExecuteTransaction",
264 "UpdateTimeToLive",
265 "DescribeTimeToLive",
266 "PutResourcePolicy",
267 "GetResourcePolicy",
268 "DeleteResourcePolicy",
269 "DescribeEndpoints",
270 "DescribeLimits",
271 "CreateBackup",
272 "DeleteBackup",
273 "DescribeBackup",
274 "ListBackups",
275 "RestoreTableFromBackup",
276 "RestoreTableToPointInTime",
277 "UpdateContinuousBackups",
278 "DescribeContinuousBackups",
279 "CreateGlobalTable",
280 "DescribeGlobalTable",
281 "DescribeGlobalTableSettings",
282 "ListGlobalTables",
283 "UpdateGlobalTable",
284 "UpdateGlobalTableSettings",
285 "DescribeTableReplicaAutoScaling",
286 "UpdateTableReplicaAutoScaling",
287 "EnableKinesisStreamingDestination",
288 "DisableKinesisStreamingDestination",
289 "DescribeKinesisStreamingDestination",
290 "UpdateKinesisStreamingDestination",
291 "DescribeContributorInsights",
292 "UpdateContributorInsights",
293 "ListContributorInsights",
294 "ExportTableToPointInTime",
295 "DescribeExport",
296 "ListExports",
297 "ImportTable",
298 "DescribeImport",
299 "ListImports",
300 ]
301 }
302}
303fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
306 body[field].as_str().ok_or_else(|| {
307 AwsServiceError::aws_error(
308 StatusCode::BAD_REQUEST,
309 "ValidationException",
310 format!("{field} is required"),
311 )
312 })
313}
314
315fn require_object(
316 body: &Value,
317 field: &str,
318) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
319 let obj = body[field].as_object().ok_or_else(|| {
320 AwsServiceError::aws_error(
321 StatusCode::BAD_REQUEST,
322 "ValidationException",
323 format!("{field} is required"),
324 )
325 })?;
326 Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
327}
328
329fn get_table<'a>(
330 tables: &'a HashMap<String, DynamoTable>,
331 name: &str,
332) -> Result<&'a DynamoTable, AwsServiceError> {
333 tables.get(name).ok_or_else(|| {
334 AwsServiceError::aws_error(
335 StatusCode::BAD_REQUEST,
336 "ResourceNotFoundException",
337 format!("Requested resource not found: Table: {name} not found"),
338 )
339 })
340}
341
342fn get_table_mut<'a>(
343 tables: &'a mut HashMap<String, DynamoTable>,
344 name: &str,
345) -> Result<&'a mut DynamoTable, AwsServiceError> {
346 tables.get_mut(name).ok_or_else(|| {
347 AwsServiceError::aws_error(
348 StatusCode::BAD_REQUEST,
349 "ResourceNotFoundException",
350 format!("Requested resource not found: Table: {name} not found"),
351 )
352 })
353}
354
355fn find_table_by_arn<'a>(
356 tables: &'a HashMap<String, DynamoTable>,
357 arn: &str,
358) -> Result<&'a DynamoTable, AwsServiceError> {
359 tables.values().find(|t| t.arn == arn).ok_or_else(|| {
360 AwsServiceError::aws_error(
361 StatusCode::BAD_REQUEST,
362 "ResourceNotFoundException",
363 format!("Requested resource not found: {arn}"),
364 )
365 })
366}
367
368fn find_table_by_arn_mut<'a>(
369 tables: &'a mut HashMap<String, DynamoTable>,
370 arn: &str,
371) -> Result<&'a mut DynamoTable, AwsServiceError> {
372 tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
373 AwsServiceError::aws_error(
374 StatusCode::BAD_REQUEST,
375 "ResourceNotFoundException",
376 format!("Requested resource not found: {arn}"),
377 )
378 })
379}
380
381fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
382 let arr = val.as_array().ok_or_else(|| {
383 AwsServiceError::aws_error(
384 StatusCode::BAD_REQUEST,
385 "ValidationException",
386 "KeySchema is required",
387 )
388 })?;
389 Ok(arr
390 .iter()
391 .map(|elem| KeySchemaElement {
392 attribute_name: elem["AttributeName"]
393 .as_str()
394 .unwrap_or_default()
395 .to_string(),
396 key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
397 })
398 .collect())
399}
400
401fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
402 let arr = val.as_array().ok_or_else(|| {
403 AwsServiceError::aws_error(
404 StatusCode::BAD_REQUEST,
405 "ValidationException",
406 "AttributeDefinitions is required",
407 )
408 })?;
409 Ok(arr
410 .iter()
411 .map(|elem| AttributeDefinition {
412 attribute_name: elem["AttributeName"]
413 .as_str()
414 .unwrap_or_default()
415 .to_string(),
416 attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
417 })
418 .collect())
419}
420
421fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
422 Ok(ProvisionedThroughput {
423 read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
424 write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
425 })
426}
427
428fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
429 let Some(arr) = val.as_array() else {
430 return Vec::new();
431 };
432 arr.iter()
433 .filter_map(|g| {
434 Some(GlobalSecondaryIndex {
435 index_name: g["IndexName"].as_str()?.to_string(),
436 key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
437 projection: parse_projection(&g["Projection"]),
438 provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
439 .ok(),
440 })
441 })
442 .collect()
443}
444
445fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
446 let Some(arr) = val.as_array() else {
447 return Vec::new();
448 };
449 arr.iter()
450 .filter_map(|l| {
451 Some(LocalSecondaryIndex {
452 index_name: l["IndexName"].as_str()?.to_string(),
453 key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
454 projection: parse_projection(&l["Projection"]),
455 })
456 })
457 .collect()
458}
459
460fn parse_projection(val: &Value) -> Projection {
461 Projection {
462 projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
463 non_key_attributes: val["NonKeyAttributes"]
464 .as_array()
465 .map(|arr| {
466 arr.iter()
467 .filter_map(|v| v.as_str().map(|s| s.to_string()))
468 .collect()
469 })
470 .unwrap_or_default(),
471 }
472}
473
474fn parse_tags(val: &Value) -> HashMap<String, String> {
475 let mut tags = HashMap::new();
476 if let Some(arr) = val.as_array() {
477 for tag in arr {
478 if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
479 tags.insert(k.to_string(), v.to_string());
480 }
481 }
482 }
483 tags
484}
485
486fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
487 let mut names = HashMap::new();
488 if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
489 for (k, v) in obj {
490 if let Some(s) = v.as_str() {
491 names.insert(k.clone(), s.to_string());
492 }
493 }
494 }
495 names
496}
497
498fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
499 let mut values = HashMap::new();
500 if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
501 for (k, v) in obj {
502 values.insert(k.clone(), v.clone());
503 }
504 }
505 values
506}
507
508fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
509 if name.starts_with('#') {
510 expr_attr_names
511 .get(name)
512 .cloned()
513 .unwrap_or_else(|| name.to_string())
514 } else {
515 name.to_string()
516 }
517}
518
519fn extract_key(
520 table: &DynamoTable,
521 item: &HashMap<String, AttributeValue>,
522) -> HashMap<String, AttributeValue> {
523 let mut key = HashMap::new();
524 let hash_key = table.hash_key_name();
525 if let Some(v) = item.get(hash_key) {
526 key.insert(hash_key.to_string(), v.clone());
527 }
528 if let Some(range_key) = table.range_key_name() {
529 if let Some(v) = item.get(range_key) {
530 key.insert(range_key.to_string(), v.clone());
531 }
532 }
533 key
534}
535
536fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
538 let obj = value.as_object()?;
539 if obj.is_empty() {
540 return None;
541 }
542 Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
543}
544
545fn item_matches_key(
547 item: &HashMap<String, AttributeValue>,
548 key: &HashMap<String, AttributeValue>,
549 hash_key_name: &str,
550 range_key_name: Option<&str>,
551) -> bool {
552 let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
553 (Some(a), Some(b)) => a == b,
554 _ => false,
555 };
556 if !hash_match {
557 return false;
558 }
559 match range_key_name {
560 Some(rk) => match (item.get(rk), key.get(rk)) {
561 (Some(a), Some(b)) => a == b,
562 (None, None) => true,
563 _ => false,
564 },
565 None => true,
566 }
567}
568
569fn extract_key_for_schema(
571 item: &HashMap<String, AttributeValue>,
572 hash_key_name: &str,
573 range_key_name: Option<&str>,
574) -> HashMap<String, AttributeValue> {
575 let mut key = HashMap::new();
576 if let Some(v) = item.get(hash_key_name) {
577 key.insert(hash_key_name.to_string(), v.clone());
578 }
579 if let Some(rk) = range_key_name {
580 if let Some(v) = item.get(rk) {
581 key.insert(rk.to_string(), v.clone());
582 }
583 }
584 key
585}
586
587fn validate_key_in_item(
588 table: &DynamoTable,
589 item: &HashMap<String, AttributeValue>,
590) -> Result<(), AwsServiceError> {
591 let hash_key = table.hash_key_name();
592 if !item.contains_key(hash_key) {
593 return Err(AwsServiceError::aws_error(
594 StatusCode::BAD_REQUEST,
595 "ValidationException",
596 format!("Missing the key {hash_key} in the item"),
597 ));
598 }
599 if let Some(range_key) = table.range_key_name() {
600 if !item.contains_key(range_key) {
601 return Err(AwsServiceError::aws_error(
602 StatusCode::BAD_REQUEST,
603 "ValidationException",
604 format!("Missing the key {range_key} in the item"),
605 ));
606 }
607 }
608 Ok(())
609}
610
611fn validate_key_attributes_in_key(
612 table: &DynamoTable,
613 key: &HashMap<String, AttributeValue>,
614) -> Result<(), AwsServiceError> {
615 let hash_key = table.hash_key_name();
616 if !key.contains_key(hash_key) {
617 return Err(AwsServiceError::aws_error(
618 StatusCode::BAD_REQUEST,
619 "ValidationException",
620 format!("Missing the key {hash_key} in the item"),
621 ));
622 }
623 Ok(())
624}
625
626fn project_item(
627 item: &HashMap<String, AttributeValue>,
628 body: &Value,
629) -> HashMap<String, AttributeValue> {
630 let projection = body["ProjectionExpression"].as_str();
631 match projection {
632 Some(proj) if !proj.is_empty() => {
633 let expr_attr_names = parse_expression_attribute_names(body);
634 let attrs: Vec<String> = proj
635 .split(',')
636 .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
637 .collect();
638 let mut result = HashMap::new();
639 for attr in &attrs {
640 if let Some(v) = resolve_nested_path(item, attr) {
641 insert_nested_value(&mut result, attr, v);
642 }
643 }
644 result
645 }
646 _ => item.clone(),
647 }
648}
649
650fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
653 let mut result = String::new();
655 for (i, segment) in path.split('.').enumerate() {
656 if i > 0 {
657 result.push('.');
658 }
659 if let Some(bracket_pos) = segment.find('[') {
661 let key_part = &segment[..bracket_pos];
662 let index_part = &segment[bracket_pos..];
663 result.push_str(&resolve_attr_name(key_part, expr_attr_names));
664 result.push_str(index_part);
665 } else {
666 result.push_str(&resolve_attr_name(segment, expr_attr_names));
667 }
668 }
669 result
670}
671
672fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
674 let segments = parse_path_segments(path);
675 if segments.is_empty() {
676 return None;
677 }
678
679 let first = &segments[0];
680 let top_key = match first {
681 PathSegment::Key(k) => k.as_str(),
682 _ => return None,
683 };
684
685 let mut current = item.get(top_key)?.clone();
686
687 for segment in &segments[1..] {
688 match segment {
689 PathSegment::Key(k) => {
690 current = current.get("M")?.get(k)?.clone();
692 }
693 PathSegment::Index(idx) => {
694 current = current.get("L")?.get(*idx)?.clone();
696 }
697 }
698 }
699
700 Some(current)
701}
702
703#[derive(Debug)]
704enum PathSegment {
705 Key(String),
706 Index(usize),
707}
708
709fn parse_path_segments(path: &str) -> Vec<PathSegment> {
711 let mut segments = Vec::new();
712 let mut current = String::new();
713
714 let chars: Vec<char> = path.chars().collect();
715 let mut i = 0;
716 while i < chars.len() {
717 match chars[i] {
718 '.' => {
719 if !current.is_empty() {
720 segments.push(PathSegment::Key(current.clone()));
721 current.clear();
722 }
723 }
724 '[' => {
725 if !current.is_empty() {
726 segments.push(PathSegment::Key(current.clone()));
727 current.clear();
728 }
729 i += 1;
730 let mut num = String::new();
731 while i < chars.len() && chars[i] != ']' {
732 num.push(chars[i]);
733 i += 1;
734 }
735 if let Ok(idx) = num.parse::<usize>() {
736 segments.push(PathSegment::Index(idx));
737 }
738 }
740 c => {
741 current.push(c);
742 }
743 }
744 i += 1;
745 }
746 if !current.is_empty() {
747 segments.push(PathSegment::Key(current));
748 }
749 segments
750}
751
752fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
755 if !path.contains('.') && !path.contains('[') {
757 result.insert(path.to_string(), value);
758 return;
759 }
760
761 let segments = parse_path_segments(path);
762 if segments.is_empty() {
763 return;
764 }
765
766 let top_key = match &segments[0] {
767 PathSegment::Key(k) => k.clone(),
768 _ => return,
769 };
770
771 if segments.len() == 1 {
772 result.insert(top_key, value);
773 return;
774 }
775
776 let wrapped = wrap_value_in_path(&segments[1..], value);
778 let existing = result.remove(&top_key);
780 let merged = match existing {
781 Some(existing) => merge_attribute_values(existing, wrapped),
782 None => wrapped,
783 };
784 result.insert(top_key, merged);
785}
786
787fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
789 if segments.is_empty() {
790 return value;
791 }
792 let inner = wrap_value_in_path(&segments[1..], value);
793 match &segments[0] {
794 PathSegment::Key(k) => {
795 json!({"M": {k.clone(): inner}})
796 }
797 PathSegment::Index(idx) => {
798 let mut arr = vec![Value::Null; idx + 1];
799 arr[*idx] = inner;
800 json!({"L": arr})
801 }
802 }
803}
804
805fn merge_attribute_values(a: Value, b: Value) -> Value {
807 if let (Some(a_map), Some(b_map)) = (
808 a.get("M").and_then(|v| v.as_object()),
809 b.get("M").and_then(|v| v.as_object()),
810 ) {
811 let mut merged = a_map.clone();
812 for (k, v) in b_map {
813 if let Some(existing) = merged.get(k) {
814 merged.insert(
815 k.clone(),
816 merge_attribute_values(existing.clone(), v.clone()),
817 );
818 } else {
819 merged.insert(k.clone(), v.clone());
820 }
821 }
822 json!({"M": merged})
823 } else {
824 b
825 }
826}
827
828fn evaluate_condition(
829 condition: &str,
830 existing: Option<&HashMap<String, AttributeValue>>,
831 expr_attr_names: &HashMap<String, String>,
832 expr_attr_values: &HashMap<String, Value>,
833) -> Result<(), AwsServiceError> {
834 let empty = HashMap::new();
839 let item = existing.unwrap_or(&empty);
840 if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
841 Ok(())
842 } else {
843 Err(AwsServiceError::aws_error(
844 StatusCode::BAD_REQUEST,
845 "ConditionalCheckFailedException",
846 "The conditional request failed",
847 ))
848 }
849}
850
851fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
852 let with_paren = format!("{func_name}(");
856 let with_space = format!("{func_name} (");
857 let rest = expr
858 .strip_prefix(&with_paren)
859 .or_else(|| expr.strip_prefix(&with_space))?;
860 let inner = rest.strip_suffix(')')?;
861 Some(inner.trim())
862}
863
864fn evaluate_key_condition(
865 expr: &str,
866 item: &HashMap<String, AttributeValue>,
867 hash_key_name: &str,
868 _range_key_name: Option<&str>,
869 expr_attr_names: &HashMap<String, String>,
870 expr_attr_values: &HashMap<String, Value>,
871) -> bool {
872 let parts: Vec<&str> = split_on_and(expr);
873 for part in &parts {
874 let part = part.trim();
875 if !evaluate_single_key_condition(
876 part,
877 item,
878 hash_key_name,
879 expr_attr_names,
880 expr_attr_values,
881 ) {
882 return false;
883 }
884 }
885 true
886}
887
888fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
892 let mut parts = Vec::new();
893 let mut start = 0;
894 let len = expr.len();
895 let mut i = 0;
896 let mut depth = 0;
897 while i < len {
898 let ch = expr.as_bytes()[i];
899 if ch == b'(' {
900 depth += 1;
901 } else if ch == b')' {
902 if depth > 0 {
903 depth -= 1;
904 }
905 } else if depth == 0
906 && i + keyword.len() <= len
907 && expr.is_char_boundary(i)
908 && expr.is_char_boundary(i + keyword.len())
909 && expr[i..i + keyword.len()].eq_ignore_ascii_case(keyword)
910 {
911 parts.push(&expr[start..i]);
912 start = i + keyword.len();
913 i = start;
914 continue;
915 }
916 i += 1;
917 }
918 parts.push(&expr[start..]);
919 parts
920}
921
922fn split_on_and(expr: &str) -> Vec<&str> {
923 split_on_top_level_keyword(expr, " AND ")
924}
925
926fn split_on_or(expr: &str) -> Vec<&str> {
927 split_on_top_level_keyword(expr, " OR ")
928}
929
930fn evaluate_single_key_condition(
931 part: &str,
932 item: &HashMap<String, AttributeValue>,
933 _hash_key_name: &str,
934 expr_attr_names: &HashMap<String, String>,
935 expr_attr_values: &HashMap<String, Value>,
936) -> bool {
937 let part = part.trim();
938
939 if let Some(rest) = part
940 .strip_prefix("begins_with(")
941 .or_else(|| part.strip_prefix("begins_with ("))
942 {
943 return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
944 }
945
946 if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
947 return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
948 }
949
950 key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
951}
952
953fn key_cond_begins_with(
958 rest: &str,
959 item: &HashMap<String, AttributeValue>,
960 expr_attr_names: &HashMap<String, String>,
961 expr_attr_values: &HashMap<String, Value>,
962) -> bool {
963 let Some(inner) = rest.strip_suffix(')') else {
964 return false;
965 };
966 let mut split = inner.splitn(2, ',');
967 let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
968 return false;
969 };
970 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
971 let expected = expr_attr_values.get(val_ref.trim());
972 let actual = item.get(&attr_name);
973 match (actual, expected) {
974 (Some(a), Some(e)) => {
975 let a_str = a.get("S").and_then(|v| v.as_str());
976 let e_str = e.get("S").and_then(|v| v.as_str());
977 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
978 }
979 _ => false,
980 }
981}
982
983fn key_cond_between(
986 part: &str,
987 between_pos: usize,
988 item: &HashMap<String, AttributeValue>,
989 expr_attr_names: &HashMap<String, String>,
990 expr_attr_values: &HashMap<String, Value>,
991) -> bool {
992 let attr_part = part[..between_pos].trim();
993 let attr_name = resolve_attr_name(attr_part, expr_attr_names);
994 let range_part = &part[between_pos + 7..];
995 let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
996 return false;
997 };
998 let lo_ref = range_part[..and_pos].trim();
999 let hi_ref = range_part[and_pos + 5..].trim();
1000 let lo = expr_attr_values.get(lo_ref);
1001 let hi = expr_attr_values.get(hi_ref);
1002 let actual = item.get(&attr_name);
1003 match (actual, lo, hi) {
1004 (Some(a), Some(l), Some(h)) => {
1005 compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1006 && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1007 }
1008 _ => false,
1009 }
1010}
1011
1012fn key_cond_simple_comparison(
1016 part: &str,
1017 item: &HashMap<String, AttributeValue>,
1018 expr_attr_names: &HashMap<String, String>,
1019 expr_attr_values: &HashMap<String, Value>,
1020) -> bool {
1021 for op in &["<=", ">=", "<>", "=", "<", ">"] {
1022 let Some(pos) = part.find(op) else {
1023 continue;
1024 };
1025 let left = part[..pos].trim();
1026 let right = part[pos + op.len()..].trim();
1027 let attr_name = resolve_attr_name(left, expr_attr_names);
1028 let expected = expr_attr_values.get(right);
1029 let actual = item.get(&attr_name);
1030
1031 return match *op {
1032 "=" => actual == expected,
1033 "<>" => actual != expected,
1034 "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1035 ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1036 "<=" => {
1037 let cmp = compare_attribute_values(actual, expected);
1038 cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1039 }
1040 ">=" => {
1041 let cmp = compare_attribute_values(actual, expected);
1042 cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1043 }
1044 _ => false,
1045 };
1046 }
1047 false
1048}
1049
1050fn attribute_size(val: &Value) -> Option<usize> {
1055 if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1056 return Some(s.len());
1057 }
1058 if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1059 let decoded_len = base64::engine::general_purpose::STANDARD
1061 .decode(b)
1062 .map(|v| v.len())
1063 .unwrap_or(b.len());
1064 return Some(decoded_len);
1065 }
1066 if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1067 return Some(arr.len());
1068 }
1069 if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1070 return Some(arr.len());
1071 }
1072 if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1073 return Some(arr.len());
1074 }
1075 if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1076 return Some(arr.len());
1077 }
1078 if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1079 return Some(obj.len());
1080 }
1081 if val.get("N").is_some() {
1082 return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
1084 }
1085 if val.get("BOOL").is_some() || val.get("NULL").is_some() {
1086 return Some(1);
1087 }
1088 None
1089}
1090
1091fn evaluate_size_comparison(
1093 part: &str,
1094 item: &HashMap<String, AttributeValue>,
1095 expr_attr_names: &HashMap<String, String>,
1096 expr_attr_values: &HashMap<String, Value>,
1097) -> Option<bool> {
1098 let open = part.find('(')?;
1100 let close = part[open..].find(')')? + open;
1101 let path = part[open + 1..close].trim();
1102 let remainder = part[close + 1..].trim();
1103
1104 let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1106 ("<=", rest.trim())
1107 } else if let Some(rest) = remainder.strip_prefix(">=") {
1108 (">=", rest.trim())
1109 } else if let Some(rest) = remainder.strip_prefix("<>") {
1110 ("<>", rest.trim())
1111 } else if let Some(rest) = remainder.strip_prefix('<') {
1112 ("<", rest.trim())
1113 } else if let Some(rest) = remainder.strip_prefix('>') {
1114 (">", rest.trim())
1115 } else if let Some(rest) = remainder.strip_prefix('=') {
1116 ("=", rest.trim())
1117 } else {
1118 return None;
1119 };
1120
1121 let attr_name = resolve_attr_name(path, expr_attr_names);
1122 let actual = item.get(&attr_name)?;
1123 let size = attribute_size(actual)? as f64;
1124
1125 let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1126
1127 Some(match op {
1128 "=" => (size - expected).abs() < f64::EPSILON,
1129 "<>" => (size - expected).abs() >= f64::EPSILON,
1130 "<" => size < expected,
1131 ">" => size > expected,
1132 "<=" => size <= expected,
1133 ">=" => size >= expected,
1134 _ => false,
1135 })
1136}
1137
1138fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1139 match (a, b) {
1140 (None, None) => std::cmp::Ordering::Equal,
1141 (None, Some(_)) => std::cmp::Ordering::Less,
1142 (Some(_), None) => std::cmp::Ordering::Greater,
1143 (Some(a), Some(b)) => {
1144 let a_type = attribute_type_and_value(a);
1145 let b_type = attribute_type_and_value(b);
1146 match (a_type, b_type) {
1147 (Some(("S", a_val)), Some(("S", b_val))) => {
1148 let a_str = a_val.as_str().unwrap_or("");
1149 let b_str = b_val.as_str().unwrap_or("");
1150 a_str.cmp(b_str)
1151 }
1152 (Some(("N", a_val)), Some(("N", b_val))) => {
1153 let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1154 let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1155 a_num
1156 .partial_cmp(&b_num)
1157 .unwrap_or(std::cmp::Ordering::Equal)
1158 }
1159 (Some(("B", a_val)), Some(("B", b_val))) => {
1160 let a_str = a_val.as_str().unwrap_or("");
1161 let b_str = b_val.as_str().unwrap_or("");
1162 a_str.cmp(b_str)
1163 }
1164 _ => std::cmp::Ordering::Equal,
1165 }
1166 }
1167 }
1168}
1169
1170fn evaluate_filter_expression(
1171 expr: &str,
1172 item: &HashMap<String, AttributeValue>,
1173 expr_attr_names: &HashMap<String, String>,
1174 expr_attr_values: &HashMap<String, Value>,
1175) -> bool {
1176 let trimmed = expr.trim();
1177
1178 let or_parts = split_on_or(trimmed);
1180 if or_parts.len() > 1 {
1181 return or_parts.iter().any(|part| {
1182 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1183 });
1184 }
1185
1186 let and_parts = split_on_and(trimmed);
1188 if and_parts.len() > 1 {
1189 return and_parts.iter().all(|part| {
1190 evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1191 });
1192 }
1193
1194 let stripped = strip_outer_parens(trimmed);
1196 if stripped != trimmed {
1197 return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1198 }
1199
1200 if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1202 return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1203 }
1204
1205 evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1206}
1207
1208fn strip_outer_parens(expr: &str) -> &str {
1210 let trimmed = expr.trim();
1211 if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1212 return trimmed;
1213 }
1214 let inner = &trimmed[1..trimmed.len() - 1];
1216 let mut depth = 0;
1217 for ch in inner.bytes() {
1218 match ch {
1219 b'(' => depth += 1,
1220 b')' => {
1221 if depth == 0 {
1222 return trimmed; }
1224 depth -= 1;
1225 }
1226 _ => {}
1227 }
1228 }
1229 if depth == 0 {
1230 inner
1231 } else {
1232 trimmed
1233 }
1234}
1235
1236fn evaluate_single_filter_condition(
1237 part: &str,
1238 item: &HashMap<String, AttributeValue>,
1239 expr_attr_names: &HashMap<String, String>,
1240 expr_attr_values: &HashMap<String, Value>,
1241) -> bool {
1242 if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1243 let attr = resolve_attr_name(inner, expr_attr_names);
1244 return item.contains_key(&attr);
1245 }
1246
1247 if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1248 let attr = resolve_attr_name(inner, expr_attr_names);
1249 return !item.contains_key(&attr);
1250 }
1251
1252 if let Some(rest) = part
1253 .strip_prefix("begins_with(")
1254 .or_else(|| part.strip_prefix("begins_with ("))
1255 {
1256 return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1257 }
1258
1259 if let Some(rest) = part
1260 .strip_prefix("contains(")
1261 .or_else(|| part.strip_prefix("contains ("))
1262 {
1263 return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1264 }
1265
1266 if part.starts_with("size(") || part.starts_with("size (") {
1267 if let Some(result) =
1268 evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1269 {
1270 return result;
1271 }
1272 }
1273
1274 if let Some(rest) = part
1275 .strip_prefix("attribute_type(")
1276 .or_else(|| part.strip_prefix("attribute_type ("))
1277 {
1278 return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1279 }
1280
1281 if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1282 let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1283 let actual = item.get(&attr_name);
1284 return evaluate_in_match(actual, &value_refs, expr_attr_values);
1285 }
1286
1287 evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
1288}
1289
1290fn eval_begins_with(
1294 rest: &str,
1295 item: &HashMap<String, AttributeValue>,
1296 expr_attr_names: &HashMap<String, String>,
1297 expr_attr_values: &HashMap<String, Value>,
1298) -> bool {
1299 let Some(inner) = rest.strip_suffix(')') else {
1300 return false;
1301 };
1302 let mut split = inner.splitn(2, ',');
1303 let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1304 return false;
1305 };
1306 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1307 let expected = expr_attr_values.get(val_ref.trim());
1308 let actual = item.get(&attr_name);
1309 match (actual, expected) {
1310 (Some(a), Some(e)) => {
1311 let a_str = a.get("S").and_then(|v| v.as_str());
1312 let e_str = e.get("S").and_then(|v| v.as_str());
1313 matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1314 }
1315 _ => false,
1316 }
1317}
1318
1319fn eval_contains(
1323 rest: &str,
1324 item: &HashMap<String, AttributeValue>,
1325 expr_attr_names: &HashMap<String, String>,
1326 expr_attr_values: &HashMap<String, Value>,
1327) -> bool {
1328 let Some(inner) = rest.strip_suffix(')') else {
1329 return false;
1330 };
1331 let mut split = inner.splitn(2, ',');
1332 let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1333 return false;
1334 };
1335 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1336 let expected = expr_attr_values.get(val_ref.trim());
1337 let actual = item.get(&attr_name);
1338 let (Some(a), Some(e)) = (actual, expected) else {
1339 return false;
1340 };
1341
1342 if let (Some(a_s), Some(e_s)) = (
1343 a.get("S").and_then(|v| v.as_str()),
1344 e.get("S").and_then(|v| v.as_str()),
1345 ) {
1346 return a_s.contains(e_s);
1347 }
1348 if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1349 if let Some(val) = e.get("S") {
1350 return set.contains(val);
1351 }
1352 }
1353 if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1354 if let Some(val) = e.get("N") {
1355 return set.contains(val);
1356 }
1357 }
1358 if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1359 if let Some(val) = e.get("B") {
1360 return set.contains(val);
1361 }
1362 }
1363 if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1364 return list.contains(e);
1365 }
1366 false
1367}
1368
1369fn eval_attribute_type(
1373 rest: &str,
1374 item: &HashMap<String, AttributeValue>,
1375 expr_attr_names: &HashMap<String, String>,
1376 expr_attr_values: &HashMap<String, Value>,
1377) -> bool {
1378 let Some(inner) = rest.strip_suffix(')') else {
1379 return false;
1380 };
1381 let mut split = inner.splitn(2, ',');
1382 let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1383 return false;
1384 };
1385 let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1386 let expected_type = expr_attr_values
1387 .get(val_ref.trim())
1388 .and_then(|v| v.get("S"))
1389 .and_then(|v| v.as_str());
1390 let actual = item.get(&attr_name);
1391 let (Some(val), Some(t)) = (actual, expected_type) else {
1392 return false;
1393 };
1394 match t {
1395 "S" => val.get("S").is_some(),
1396 "N" => val.get("N").is_some(),
1397 "B" => val.get("B").is_some(),
1398 "BOOL" => val.get("BOOL").is_some(),
1399 "NULL" => val.get("NULL").is_some(),
1400 "SS" => val.get("SS").is_some(),
1401 "NS" => val.get("NS").is_some(),
1402 "BS" => val.get("BS").is_some(),
1403 "L" => val.get("L").is_some(),
1404 "M" => val.get("M").is_some(),
1405 _ => false,
1406 }
1407}
1408
1409fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1417 let upper = expr.to_ascii_uppercase();
1418 let in_pos = upper.find(" IN ")?;
1419 let attr_ref = expr[..in_pos].trim();
1420 if attr_ref.is_empty() {
1421 return None;
1422 }
1423 let rest = expr[in_pos + 4..].trim_start();
1424 let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1425 let values: Vec<&str> = inner
1426 .split(',')
1427 .map(|s| s.trim())
1428 .filter(|s| !s.is_empty())
1429 .collect();
1430 if values.is_empty() {
1431 return None;
1432 }
1433 Some((attr_ref, values))
1434}
1435
1436fn evaluate_in_match(
1440 actual: Option<&AttributeValue>,
1441 value_refs: &[&str],
1442 expr_attr_values: &HashMap<String, Value>,
1443) -> bool {
1444 value_refs.iter().any(|v_ref| {
1445 let expected = expr_attr_values.get(*v_ref);
1446 matches!((actual, expected), (Some(a), Some(e)) if a == e)
1447 })
1448}
1449
1450#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1452enum UpdateAction {
1453 Set,
1454 Remove,
1455 Add,
1456 Delete,
1457}
1458
1459impl UpdateAction {
1460 const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1463 ("SET", UpdateAction::Set),
1464 ("REMOVE", UpdateAction::Remove),
1465 ("ADD", UpdateAction::Add),
1466 ("DELETE", UpdateAction::Delete),
1467 ];
1468
1469 fn keyword(self) -> &'static str {
1470 match self {
1471 UpdateAction::Set => "SET",
1472 UpdateAction::Remove => "REMOVE",
1473 UpdateAction::Add => "ADD",
1474 UpdateAction::Delete => "DELETE",
1475 }
1476 }
1477}
1478
1479fn apply_update_expression(
1480 item: &mut HashMap<String, AttributeValue>,
1481 expr: &str,
1482 expr_attr_names: &HashMap<String, String>,
1483 expr_attr_values: &HashMap<String, Value>,
1484) -> Result<(), AwsServiceError> {
1485 let clauses = parse_update_clauses(expr);
1486 if clauses.is_empty() && !expr.trim().is_empty() {
1487 return Err(AwsServiceError::aws_error(
1488 StatusCode::BAD_REQUEST,
1489 "ValidationException",
1490 "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1491 ));
1492 }
1493 for (action, assignments) in &clauses {
1494 match action {
1495 UpdateAction::Set => {
1496 for assignment in assignments {
1497 apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1498 }
1499 }
1500 UpdateAction::Remove => {
1501 for attr_ref in assignments {
1502 let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1503 item.remove(&attr);
1504 }
1505 }
1506 UpdateAction::Add => {
1507 for assignment in assignments {
1508 apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1509 }
1510 }
1511 UpdateAction::Delete => {
1512 for assignment in assignments {
1513 apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1514 }
1515 }
1516 }
1517 }
1518 Ok(())
1519}
1520
1521fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1522 let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1523 let upper = expr.to_ascii_uppercase();
1524 let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1525
1526 for &(kw, action) in UpdateAction::KEYWORDS {
1527 let mut search_from = 0;
1528 while let Some(pos) = upper[search_from..].find(kw) {
1529 let abs_pos = search_from + pos;
1530 let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1531 let after_pos = abs_pos + kw.len();
1532 let after_ok =
1533 after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1534 if before_ok && after_ok {
1535 positions.push((abs_pos, action));
1536 }
1537 search_from = abs_pos + kw.len();
1538 }
1539 }
1540
1541 positions.sort_by_key(|(pos, _)| *pos);
1542
1543 for (i, &(pos, action)) in positions.iter().enumerate() {
1544 let start = pos + action.keyword().len();
1545 let end = if i + 1 < positions.len() {
1546 positions[i + 1].0
1547 } else {
1548 expr.len()
1549 };
1550 let content = expr[start..end].trim();
1551 let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
1552 clauses.push((action, assignments));
1553 }
1554
1555 clauses
1556}
1557
1558fn apply_set_assignment(
1559 item: &mut HashMap<String, AttributeValue>,
1560 assignment: &str,
1561 expr_attr_names: &HashMap<String, String>,
1562 expr_attr_values: &HashMap<String, Value>,
1563) -> Result<(), AwsServiceError> {
1564 let Some((left, right)) = assignment.split_once('=') else {
1565 return Ok(());
1566 };
1567
1568 let left_trimmed = left.trim();
1569 let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1573 Some((name, idx)) => (name, Some(idx)),
1574 None => (left_trimmed, None),
1575 };
1576 let attr = resolve_attr_name(attr_ref, expr_attr_names);
1577 let right = right.trim();
1578
1579 if let Some(rest) = right
1580 .strip_prefix("if_not_exists(")
1581 .or_else(|| right.strip_prefix("if_not_exists ("))
1582 {
1583 apply_set_if_not_exists(item, &attr, rest, expr_attr_names, expr_attr_values);
1584 return Ok(());
1585 }
1586
1587 if let Some(rest) = right
1588 .strip_prefix("list_append(")
1589 .or_else(|| right.strip_prefix("list_append ("))
1590 {
1591 apply_set_list_append(item, &attr, rest, expr_attr_names, expr_attr_values);
1592 return Ok(());
1593 }
1594
1595 if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1596 return apply_set_arithmetic(
1597 item,
1598 &attr,
1599 arith_left,
1600 arith_right,
1601 is_add,
1602 expr_attr_names,
1603 expr_attr_values,
1604 );
1605 }
1606
1607 let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
1608 if let Some(v) = val {
1609 match list_index {
1610 Some(idx) => assign_list_index(item, &attr, idx, v)?,
1611 None => {
1612 item.insert(attr, v);
1613 }
1614 }
1615 }
1616
1617 Ok(())
1618}
1619
1620fn apply_set_if_not_exists(
1625 item: &mut HashMap<String, AttributeValue>,
1626 attr: &str,
1627 rest: &str,
1628 expr_attr_names: &HashMap<String, String>,
1629 expr_attr_values: &HashMap<String, Value>,
1630) {
1631 let Some(inner) = rest.strip_suffix(')') else {
1632 return;
1633 };
1634 let mut split = inner.splitn(2, ',');
1635 let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) else {
1636 return;
1637 };
1638 let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
1639 if item.contains_key(&check_name) {
1640 return;
1641 }
1642 if let Some(val) = expr_attr_values.get(default_ref.trim()) {
1643 item.insert(attr.to_string(), val.clone());
1644 }
1645}
1646
1647fn apply_set_list_append(
1651 item: &mut HashMap<String, AttributeValue>,
1652 attr: &str,
1653 rest: &str,
1654 expr_attr_names: &HashMap<String, String>,
1655 expr_attr_values: &HashMap<String, Value>,
1656) {
1657 let Some(inner) = rest.strip_suffix(')') else {
1658 return;
1659 };
1660 let mut split = inner.splitn(2, ',');
1661 let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) else {
1662 return;
1663 };
1664 let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1665 let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1666
1667 let mut merged = Vec::new();
1668 if let Some(Value::Object(obj)) = &a_val {
1669 if let Some(Value::Array(arr)) = obj.get("L") {
1670 merged.extend(arr.clone());
1671 }
1672 }
1673 if let Some(Value::Object(obj)) = &b_val {
1674 if let Some(Value::Array(arr)) = obj.get("L") {
1675 merged.extend(arr.clone());
1676 }
1677 }
1678
1679 item.insert(attr.to_string(), json!({"L": merged}));
1680}
1681
1682fn apply_set_arithmetic(
1687 item: &mut HashMap<String, AttributeValue>,
1688 attr: &str,
1689 arith_left: &str,
1690 arith_right: &str,
1691 is_add: bool,
1692 expr_attr_names: &HashMap<String, String>,
1693 expr_attr_values: &HashMap<String, Value>,
1694) -> Result<(), AwsServiceError> {
1695 let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1696 let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1697
1698 let left_num = match extract_number(&left_val) {
1699 Some(n) => n,
1700 None if left_val.is_some() => {
1701 return Err(AwsServiceError::aws_error(
1702 StatusCode::BAD_REQUEST,
1703 "ValidationException",
1704 "An operand in the update expression has an incorrect data type",
1705 ));
1706 }
1707 None => 0.0,
1708 };
1709 let right_num = extract_number(&right_val).ok_or_else(|| {
1710 AwsServiceError::aws_error(
1711 StatusCode::BAD_REQUEST,
1712 "ValidationException",
1713 "An operand in the update expression has an incorrect data type",
1714 )
1715 })?;
1716
1717 let result = if is_add {
1718 left_num + right_num
1719 } else {
1720 left_num - right_num
1721 };
1722
1723 let num_str = if result == result.trunc() {
1724 format!("{}", result as i64)
1725 } else {
1726 format!("{result}")
1727 };
1728
1729 item.insert(attr.to_string(), json!({"N": num_str}));
1730 Ok(())
1731}
1732
1733fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
1737 let path = path.trim();
1738 if !path.ends_with(']') {
1739 return None;
1740 }
1741 let open = path.rfind('[')?;
1742 let idx_str = &path[open + 1..path.len() - 1];
1746 let idx: usize = idx_str.parse().ok()?;
1747 let name = &path[..open];
1748 if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
1749 return None;
1750 }
1751 Some((name, idx))
1752}
1753
1754fn assign_list_index(
1759 item: &mut HashMap<String, AttributeValue>,
1760 attr: &str,
1761 idx: usize,
1762 value: Value,
1763) -> Result<(), AwsServiceError> {
1764 let Some(existing) = item.get_mut(attr) else {
1765 return Err(AwsServiceError::aws_error(
1766 StatusCode::BAD_REQUEST,
1767 "ValidationException",
1768 "The document path provided in the update expression is invalid for update",
1769 ));
1770 };
1771 let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
1772 return Err(AwsServiceError::aws_error(
1773 StatusCode::BAD_REQUEST,
1774 "ValidationException",
1775 "The document path provided in the update expression is invalid for update",
1776 ));
1777 };
1778 if idx < list.len() {
1779 list[idx] = value;
1780 } else if idx == list.len() {
1781 list.push(value);
1782 } else {
1783 return Err(AwsServiceError::aws_error(
1784 StatusCode::BAD_REQUEST,
1785 "ValidationException",
1786 "The document path provided in the update expression is invalid for update",
1787 ));
1788 }
1789 Ok(())
1790}
1791
1792fn resolve_value(
1793 reference: &str,
1794 item: &HashMap<String, AttributeValue>,
1795 expr_attr_names: &HashMap<String, String>,
1796 expr_attr_values: &HashMap<String, Value>,
1797) -> Option<Value> {
1798 let reference = reference.trim();
1799 if reference.starts_with(':') {
1800 expr_attr_values.get(reference).cloned()
1801 } else {
1802 let attr_name = resolve_attr_name(reference, expr_attr_names);
1803 item.get(&attr_name).cloned()
1804 }
1805}
1806
1807fn extract_number(val: &Option<Value>) -> Option<f64> {
1808 val.as_ref()
1809 .and_then(|v| v.get("N"))
1810 .and_then(|n| n.as_str())
1811 .and_then(|s| s.parse().ok())
1812}
1813
1814fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
1815 let mut depth = 0;
1816 for (i, c) in expr.char_indices() {
1817 match c {
1818 '(' => depth += 1,
1819 ')' => depth -= 1,
1820 '+' if depth == 0 && i > 0 => {
1821 return Some((&expr[..i], &expr[i + 1..], true));
1822 }
1823 '-' if depth == 0 && i > 0 => {
1824 return Some((&expr[..i], &expr[i + 1..], false));
1825 }
1826 _ => {}
1827 }
1828 }
1829 None
1830}
1831
1832fn apply_add_assignment(
1833 item: &mut HashMap<String, AttributeValue>,
1834 assignment: &str,
1835 expr_attr_names: &HashMap<String, String>,
1836 expr_attr_values: &HashMap<String, Value>,
1837) -> Result<(), AwsServiceError> {
1838 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1839 if parts.len() != 2 {
1840 return Ok(());
1841 }
1842
1843 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1844 let val_ref = parts[1].trim();
1845 let add_val = expr_attr_values.get(val_ref);
1846
1847 if let Some(add_val) = add_val {
1848 if let Some(existing) = item.get(&attr) {
1849 if let (Some(existing_num), Some(add_num)) = (
1850 extract_number(&Some(existing.clone())),
1851 extract_number(&Some(add_val.clone())),
1852 ) {
1853 let result = existing_num + add_num;
1854 let num_str = if result == result.trunc() {
1855 format!("{}", result as i64)
1856 } else {
1857 format!("{result}")
1858 };
1859 item.insert(attr, json!({"N": num_str}));
1860 } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
1861 if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
1862 let mut merged: Vec<Value> = existing_set.clone();
1863 for v in add_set {
1864 if !merged.contains(v) {
1865 merged.push(v.clone());
1866 }
1867 }
1868 item.insert(attr, json!({"SS": merged}));
1869 }
1870 } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
1871 if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
1872 let mut merged: Vec<Value> = existing_set.clone();
1873 for v in add_set {
1874 if !merged.contains(v) {
1875 merged.push(v.clone());
1876 }
1877 }
1878 item.insert(attr, json!({"NS": merged}));
1879 }
1880 } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
1881 if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
1882 let mut merged: Vec<Value> = existing_set.clone();
1883 for v in add_set {
1884 if !merged.contains(v) {
1885 merged.push(v.clone());
1886 }
1887 }
1888 item.insert(attr, json!({"BS": merged}));
1889 }
1890 }
1891 } else {
1892 item.insert(attr, add_val.clone());
1893 }
1894 }
1895
1896 Ok(())
1897}
1898
1899fn apply_delete_assignment(
1900 item: &mut HashMap<String, AttributeValue>,
1901 assignment: &str,
1902 expr_attr_names: &HashMap<String, String>,
1903 expr_attr_values: &HashMap<String, Value>,
1904) -> Result<(), AwsServiceError> {
1905 let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1906 if parts.len() != 2 {
1907 return Ok(());
1908 }
1909
1910 let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1911 let val_ref = parts[1].trim();
1912 let del_val = expr_attr_values.get(val_ref);
1913
1914 if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
1915 if let (Some(existing_set), Some(del_set)) = (
1916 existing.get("SS").and_then(|v| v.as_array()),
1917 del_val.get("SS").and_then(|v| v.as_array()),
1918 ) {
1919 let filtered: Vec<Value> = existing_set
1920 .iter()
1921 .filter(|v| !del_set.contains(v))
1922 .cloned()
1923 .collect();
1924 if filtered.is_empty() {
1925 item.remove(&attr);
1926 } else {
1927 item.insert(attr, json!({"SS": filtered}));
1928 }
1929 } else if let (Some(existing_set), Some(del_set)) = (
1930 existing.get("NS").and_then(|v| v.as_array()),
1931 del_val.get("NS").and_then(|v| v.as_array()),
1932 ) {
1933 let filtered: Vec<Value> = existing_set
1934 .iter()
1935 .filter(|v| !del_set.contains(v))
1936 .cloned()
1937 .collect();
1938 if filtered.is_empty() {
1939 item.remove(&attr);
1940 } else {
1941 item.insert(attr, json!({"NS": filtered}));
1942 }
1943 } else if let (Some(existing_set), Some(del_set)) = (
1944 existing.get("BS").and_then(|v| v.as_array()),
1945 del_val.get("BS").and_then(|v| v.as_array()),
1946 ) {
1947 let filtered: Vec<Value> = existing_set
1948 .iter()
1949 .filter(|v| !del_set.contains(v))
1950 .cloned()
1951 .collect();
1952 if filtered.is_empty() {
1953 item.remove(&attr);
1954 } else {
1955 item.insert(attr, json!({"BS": filtered}));
1956 }
1957 }
1958 }
1959
1960 Ok(())
1961}
1962
1963pub(super) struct TableDescriptionInput<'a> {
1964 pub arn: &'a str,
1965 pub table_id: &'a str,
1966 pub key_schema: &'a [KeySchemaElement],
1967 pub attribute_definitions: &'a [AttributeDefinition],
1968 pub provisioned_throughput: &'a ProvisionedThroughput,
1969 pub gsi: &'a [GlobalSecondaryIndex],
1970 pub lsi: &'a [LocalSecondaryIndex],
1971 pub billing_mode: &'a str,
1972 pub created_at: chrono::DateTime<chrono::Utc>,
1973 pub item_count: i64,
1974 pub size_bytes: i64,
1975 pub status: &'a str,
1976}
1977
1978fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
1979 let TableDescriptionInput {
1980 arn,
1981 table_id,
1982 key_schema,
1983 attribute_definitions,
1984 provisioned_throughput,
1985 gsi,
1986 lsi,
1987 billing_mode,
1988 created_at,
1989 item_count,
1990 size_bytes,
1991 status,
1992 } = *input;
1993 let table_name = arn.rsplit('/').next().unwrap_or("");
1994 let creation_timestamp =
1995 created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
1996
1997 let ks: Vec<Value> = key_schema
1998 .iter()
1999 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2000 .collect();
2001
2002 let ad: Vec<Value> = attribute_definitions
2003 .iter()
2004 .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2005 .collect();
2006
2007 let mut desc = json!({
2008 "TableName": table_name,
2009 "TableArn": arn,
2010 "TableId": table_id,
2011 "TableStatus": status,
2012 "KeySchema": ks,
2013 "AttributeDefinitions": ad,
2014 "CreationDateTime": creation_timestamp,
2015 "ItemCount": item_count,
2016 "TableSizeBytes": size_bytes,
2017 "BillingModeSummary": { "BillingMode": billing_mode },
2018 });
2019
2020 if billing_mode != "PAY_PER_REQUEST" {
2021 desc["ProvisionedThroughput"] = json!({
2022 "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2023 "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2024 "NumberOfDecreasesToday": 0,
2025 });
2026 } else {
2027 desc["ProvisionedThroughput"] = json!({
2028 "ReadCapacityUnits": 0,
2029 "WriteCapacityUnits": 0,
2030 "NumberOfDecreasesToday": 0,
2031 });
2032 }
2033
2034 if status == "ACTIVE" {
2039 desc["WarmThroughput"] = json!({
2040 "ReadUnitsPerSecond": 0,
2041 "WriteUnitsPerSecond": 0,
2042 "Status": "ACTIVE",
2043 });
2044 }
2045
2046 if !gsi.is_empty() {
2047 let gsi_json: Vec<Value> = gsi
2048 .iter()
2049 .map(|g| {
2050 let gks: Vec<Value> = g
2051 .key_schema
2052 .iter()
2053 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2054 .collect();
2055 let mut idx = json!({
2056 "IndexName": g.index_name,
2057 "KeySchema": gks,
2058 "Projection": { "ProjectionType": g.projection.projection_type },
2059 "IndexStatus": "ACTIVE",
2060 "IndexArn": format!("{arn}/index/{}", g.index_name),
2061 "ItemCount": 0,
2062 "IndexSizeBytes": 0,
2063 });
2064 if !g.projection.non_key_attributes.is_empty() {
2065 idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2066 }
2067 if let Some(ref pt) = g.provisioned_throughput {
2068 idx["ProvisionedThroughput"] = json!({
2069 "ReadCapacityUnits": pt.read_capacity_units,
2070 "WriteCapacityUnits": pt.write_capacity_units,
2071 "NumberOfDecreasesToday": 0,
2072 });
2073 }
2074 idx
2075 })
2076 .collect();
2077 desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2078 }
2079
2080 if !lsi.is_empty() {
2081 let lsi_json: Vec<Value> = lsi
2082 .iter()
2083 .map(|l| {
2084 let lks: Vec<Value> = l
2085 .key_schema
2086 .iter()
2087 .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2088 .collect();
2089 let mut idx = json!({
2090 "IndexName": l.index_name,
2091 "KeySchema": lks,
2092 "Projection": { "ProjectionType": l.projection.projection_type },
2093 "IndexArn": format!("{arn}/index/{}", l.index_name),
2094 "ItemCount": 0,
2095 "IndexSizeBytes": 0,
2096 });
2097 if !l.projection.non_key_attributes.is_empty() {
2098 idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2099 }
2100 idx
2101 })
2102 .collect();
2103 desc["LocalSecondaryIndexes"] = json!(lsi_json);
2104 }
2105
2106 desc
2107}
2108
2109fn build_table_description(table: &DynamoTable) -> Value {
2110 let mut desc = build_table_description_json(&TableDescriptionInput {
2111 arn: &table.arn,
2112 table_id: &table.table_id,
2113 key_schema: &table.key_schema,
2114 attribute_definitions: &table.attribute_definitions,
2115 provisioned_throughput: &table.provisioned_throughput,
2116 gsi: &table.gsi,
2117 lsi: &table.lsi,
2118 billing_mode: &table.billing_mode,
2119 created_at: table.created_at,
2120 item_count: table.item_count,
2121 size_bytes: table.size_bytes,
2122 status: &table.status,
2123 });
2124
2125 if table.stream_enabled {
2127 if let Some(ref stream_arn) = table.stream_arn {
2128 desc["LatestStreamArn"] = json!(stream_arn);
2129 desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2130 }
2131 if let Some(ref view_type) = table.stream_view_type {
2132 desc["StreamSpecification"] = json!({
2133 "StreamEnabled": true,
2134 "StreamViewType": view_type,
2135 });
2136 }
2137 }
2138
2139 if let Some(ref sse_type) = table.sse_type {
2141 let mut sse_desc = json!({
2142 "Status": "ENABLED",
2143 "SSEType": sse_type,
2144 });
2145 if let Some(ref key_arn) = table.sse_kms_key_arn {
2146 sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2147 }
2148 desc["SSEDescription"] = sse_desc;
2149 } else {
2150 desc["SSEDescription"] = json!({
2152 "Status": "ENABLED",
2153 "SSEType": "AES256",
2154 });
2155 }
2156
2157 desc
2158}
2159
2160fn execute_partiql_statement(
2161 state: &SharedDynamoDbState,
2162 statement: &str,
2163 parameters: &[Value],
2164) -> Result<AwsResponse, AwsServiceError> {
2165 let trimmed = statement.trim();
2166 let upper = trimmed.to_ascii_uppercase();
2167
2168 if upper.starts_with("SELECT") {
2169 execute_partiql_select(state, trimmed, parameters)
2170 } else if upper.starts_with("INSERT") {
2171 execute_partiql_insert(state, trimmed, parameters)
2172 } else if upper.starts_with("UPDATE") {
2173 execute_partiql_update(state, trimmed, parameters)
2174 } else if upper.starts_with("DELETE") {
2175 execute_partiql_delete(state, trimmed, parameters)
2176 } else {
2177 Err(AwsServiceError::aws_error(
2178 StatusCode::BAD_REQUEST,
2179 "ValidationException",
2180 format!("Unsupported PartiQL statement: {trimmed}"),
2181 ))
2182 }
2183}
2184
2185fn execute_partiql_select(
2187 state: &SharedDynamoDbState,
2188 statement: &str,
2189 parameters: &[Value],
2190) -> Result<AwsResponse, AwsServiceError> {
2191 let upper = statement.to_ascii_uppercase();
2193 let from_pos = upper.find("FROM").ok_or_else(|| {
2194 AwsServiceError::aws_error(
2195 StatusCode::BAD_REQUEST,
2196 "ValidationException",
2197 "Invalid SELECT statement: missing FROM",
2198 )
2199 })?;
2200
2201 let after_from = statement[from_pos + 4..].trim();
2202 let (table_name, rest) = parse_partiql_table_name(after_from);
2203
2204 let state = state.read();
2205 let table = get_table(&state.tables, &table_name)?;
2206
2207 let rest_upper = rest.trim().to_ascii_uppercase();
2208 if rest_upper.starts_with("WHERE") {
2209 let where_clause = rest.trim()[5..].trim();
2210 let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2211 let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2212 DynamoDbService::ok_json(json!({ "Items": items }))
2213 } else {
2214 let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2216 DynamoDbService::ok_json(json!({ "Items": items }))
2217 }
2218}
2219
2220fn execute_partiql_insert(
2221 state: &SharedDynamoDbState,
2222 statement: &str,
2223 parameters: &[Value],
2224) -> Result<AwsResponse, AwsServiceError> {
2225 let upper = statement.to_ascii_uppercase();
2228 let into_pos = upper.find("INTO").ok_or_else(|| {
2229 AwsServiceError::aws_error(
2230 StatusCode::BAD_REQUEST,
2231 "ValidationException",
2232 "Invalid INSERT statement: missing INTO",
2233 )
2234 })?;
2235
2236 let after_into = statement[into_pos + 4..].trim();
2237 let (table_name, rest) = parse_partiql_table_name(after_into);
2238
2239 let rest_upper = rest.trim().to_ascii_uppercase();
2240 let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2241 AwsServiceError::aws_error(
2242 StatusCode::BAD_REQUEST,
2243 "ValidationException",
2244 "Invalid INSERT statement: missing VALUE",
2245 )
2246 })?;
2247
2248 let value_str = rest.trim()[value_pos + 5..].trim();
2249 let item = parse_partiql_value_object(value_str, parameters)?;
2250
2251 let mut state = state.write();
2252 let table = get_table_mut(&mut state.tables, &table_name)?;
2253 let key = extract_key(table, &item);
2254 if table.find_item_index(&key).is_some() {
2255 return Err(AwsServiceError::aws_error(
2257 StatusCode::BAD_REQUEST,
2258 "DuplicateItemException",
2259 "Duplicate primary key exists in table",
2260 ));
2261 } else {
2262 table.items.push(item);
2263 }
2264 table.recalculate_stats();
2265
2266 DynamoDbService::ok_json(json!({}))
2267}
2268
2269fn execute_partiql_update(
2270 state: &SharedDynamoDbState,
2271 statement: &str,
2272 parameters: &[Value],
2273) -> Result<AwsResponse, AwsServiceError> {
2274 let after_update = statement[6..].trim(); let (table_name, rest) = parse_partiql_table_name(after_update);
2278
2279 let rest_upper = rest.trim().to_ascii_uppercase();
2280 let set_pos = rest_upper.find("SET").ok_or_else(|| {
2281 AwsServiceError::aws_error(
2282 StatusCode::BAD_REQUEST,
2283 "ValidationException",
2284 "Invalid UPDATE statement: missing SET",
2285 )
2286 })?;
2287
2288 let after_set = rest.trim()[set_pos + 3..].trim();
2289
2290 let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2292 let (set_clause, where_clause) = if let Some(wp) = where_pos {
2293 (&after_set[..wp], after_set[wp + 5..].trim())
2294 } else {
2295 (after_set, "")
2296 };
2297
2298 let mut state = state.write();
2299 let table = get_table_mut(&mut state.tables, &table_name)?;
2300
2301 let matched_indices = if !where_clause.is_empty() {
2302 find_partiql_where_indices(table, where_clause, parameters)?
2303 } else {
2304 (0..table.items.len()).collect()
2305 };
2306
2307 let param_offset = count_params_in_str(where_clause);
2309 let assignments: Vec<&str> = set_clause.split(',').collect();
2310 for idx in &matched_indices {
2311 let mut local_offset = param_offset;
2312 for assignment in &assignments {
2313 let assignment = assignment.trim();
2314 if let Some((attr, val_str)) = assignment.split_once('=') {
2315 let attr = attr.trim().trim_matches('"');
2316 let val_str = val_str.trim();
2317 let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2318 if let Some(v) = value {
2319 table.items[*idx].insert(attr.to_string(), v);
2320 }
2321 }
2322 }
2323 }
2324 table.recalculate_stats();
2325
2326 DynamoDbService::ok_json(json!({}))
2327}
2328
2329fn execute_partiql_delete(
2330 state: &SharedDynamoDbState,
2331 statement: &str,
2332 parameters: &[Value],
2333) -> Result<AwsResponse, AwsServiceError> {
2334 let upper = statement.to_ascii_uppercase();
2336 let from_pos = upper.find("FROM").ok_or_else(|| {
2337 AwsServiceError::aws_error(
2338 StatusCode::BAD_REQUEST,
2339 "ValidationException",
2340 "Invalid DELETE statement: missing FROM",
2341 )
2342 })?;
2343
2344 let after_from = statement[from_pos + 4..].trim();
2345 let (table_name, rest) = parse_partiql_table_name(after_from);
2346
2347 let rest_upper = rest.trim().to_ascii_uppercase();
2348 if !rest_upper.starts_with("WHERE") {
2349 return Err(AwsServiceError::aws_error(
2350 StatusCode::BAD_REQUEST,
2351 "ValidationException",
2352 "DELETE requires a WHERE clause",
2353 ));
2354 }
2355 let where_clause = rest.trim()[5..].trim();
2356
2357 let mut state = state.write();
2358 let table = get_table_mut(&mut state.tables, &table_name)?;
2359
2360 let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2361 indices.sort_unstable();
2363 indices.reverse();
2364 for idx in indices {
2365 table.items.remove(idx);
2366 }
2367 table.recalculate_stats();
2368
2369 DynamoDbService::ok_json(json!({}))
2370}
2371
2372fn parse_partiql_table_name(s: &str) -> (String, &str) {
2375 let s = s.trim();
2376 if let Some(stripped) = s.strip_prefix('"') {
2377 if let Some(end) = stripped.find('"') {
2379 let name = &stripped[..end];
2380 let rest = &stripped[end + 1..];
2381 (name.to_string(), rest)
2382 } else {
2383 let end = s.find(' ').unwrap_or(s.len());
2384 (s[..end].trim_matches('"').to_string(), &s[end..])
2385 }
2386 } else {
2387 let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2388 (s[..end].to_string(), &s[end..])
2389 }
2390}
2391
2392fn evaluate_partiql_where<'a>(
2395 table: &'a DynamoTable,
2396 where_clause: &str,
2397 parameters: &[Value],
2398) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2399 let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2400 Ok(indices.iter().map(|i| &table.items[*i]).collect())
2401}
2402
2403fn find_partiql_where_indices(
2404 table: &DynamoTable,
2405 where_clause: &str,
2406 parameters: &[Value],
2407) -> Result<Vec<usize>, AwsServiceError> {
2408 let conditions = split_partiql_and_clauses(where_clause);
2409 let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2410
2411 let mut indices = Vec::new();
2412 for (i, item) in table.items.iter().enumerate() {
2413 let all_match = parsed_conditions
2414 .iter()
2415 .all(|(attr, expected)| item.get(attr) == Some(expected));
2416 if all_match {
2417 indices.push(i);
2418 }
2419 }
2420
2421 Ok(indices)
2422}
2423
2424fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2426 let upper = where_clause.to_uppercase();
2427 if !upper.contains(" AND ") {
2428 return vec![where_clause.trim()];
2429 }
2430 let mut parts = Vec::new();
2431 let mut last = 0;
2432 for (i, _) in upper.match_indices(" AND ") {
2433 parts.push(where_clause[last..i].trim());
2434 last = i + 5;
2435 }
2436 parts.push(where_clause[last..].trim());
2437 parts
2438}
2439
2440fn parse_partiql_equality_conditions(
2445 conditions: &[&str],
2446 parameters: &[Value],
2447) -> Vec<(String, Value)> {
2448 let mut param_idx = 0usize;
2449 let mut parsed = Vec::new();
2450 for cond in conditions {
2451 let cond = cond.trim();
2452 if let Some((left, right)) = cond.split_once('=') {
2453 let attr = left.trim().trim_matches('"').to_string();
2454 let val_str = right.trim();
2455 if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2456 parsed.push((attr, value));
2457 }
2458 }
2459 }
2460 parsed
2461}
2462
2463fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2468 let s = s.trim();
2469 if s == "?" {
2470 let idx = *param_idx;
2471 *param_idx += 1;
2472 parameters.get(idx).cloned()
2473 } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2474 let inner = &s[1..s.len() - 1];
2475 Some(json!({"S": inner}))
2476 } else if let Ok(n) = s.parse::<f64>() {
2477 let num_str = if n == n.trunc() {
2478 format!("{}", n as i64)
2479 } else {
2480 format!("{n}")
2481 };
2482 Some(json!({"N": num_str}))
2483 } else {
2484 None
2485 }
2486}
2487
2488fn parse_partiql_value_object(
2490 s: &str,
2491 parameters: &[Value],
2492) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2493 let s = s.trim();
2494 let inner = s
2495 .strip_prefix('{')
2496 .and_then(|s| s.strip_suffix('}'))
2497 .ok_or_else(|| {
2498 AwsServiceError::aws_error(
2499 StatusCode::BAD_REQUEST,
2500 "ValidationException",
2501 "Invalid VALUE: expected object literal",
2502 )
2503 })?;
2504
2505 let mut item = HashMap::new();
2506 let mut param_idx = 0usize;
2507
2508 for pair in split_partiql_pairs(inner) {
2510 let pair = pair.trim();
2511 if pair.is_empty() {
2512 continue;
2513 }
2514 if let Some((key_part, val_part)) = pair.split_once(':') {
2515 let key = key_part
2516 .trim()
2517 .trim_matches('\'')
2518 .trim_matches('"')
2519 .to_string();
2520 if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2521 item.insert(key, val);
2522 }
2523 }
2524 }
2525
2526 Ok(item)
2527}
2528
2529fn split_partiql_pairs(s: &str) -> Vec<&str> {
2531 let mut parts = Vec::new();
2532 let mut start = 0;
2533 let mut depth = 0;
2534 let mut in_quote = false;
2535
2536 for (i, c) in s.char_indices() {
2537 match c {
2538 '\'' if !in_quote => in_quote = true,
2539 '\'' if in_quote => in_quote = false,
2540 '{' if !in_quote => depth += 1,
2541 '}' if !in_quote => depth -= 1,
2542 ',' if !in_quote && depth == 0 => {
2543 parts.push(&s[start..i]);
2544 start = i + 1;
2545 }
2546 _ => {}
2547 }
2548 }
2549 parts.push(&s[start..]);
2550 parts
2551}
2552
2553fn count_params_in_str(s: &str) -> usize {
2555 s.chars().filter(|c| *c == '?').count()
2556}
2557
2558#[cfg(test)]
2559mod tests {
2560 use super::*;
2561 use serde_json::json;
2562
2563 #[test]
2564 fn test_parse_update_clauses_set() {
2565 let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2566 assert_eq!(clauses.len(), 1);
2567 assert_eq!(clauses[0].0, UpdateAction::Set);
2568 assert_eq!(clauses[0].1.len(), 2);
2569 }
2570
2571 #[test]
2572 fn test_parse_update_clauses_set_and_remove() {
2573 let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2574 assert_eq!(clauses.len(), 2);
2575 assert_eq!(clauses[0].0, UpdateAction::Set);
2576 assert_eq!(clauses[1].0, UpdateAction::Remove);
2577 }
2578
2579 #[test]
2580 fn test_evaluate_key_condition_simple() {
2581 let mut item = HashMap::new();
2582 item.insert("pk".to_string(), json!({"S": "user1"}));
2583 item.insert("sk".to_string(), json!({"S": "order1"}));
2584
2585 let mut expr_values = HashMap::new();
2586 expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2587
2588 assert!(evaluate_key_condition(
2589 "pk = :pk",
2590 &item,
2591 "pk",
2592 Some("sk"),
2593 &HashMap::new(),
2594 &expr_values,
2595 ));
2596 }
2597
2598 #[test]
2599 fn test_compare_attribute_values_numbers() {
2600 let a = json!({"N": "10"});
2601 let b = json!({"N": "20"});
2602 assert_eq!(
2603 compare_attribute_values(Some(&a), Some(&b)),
2604 std::cmp::Ordering::Less
2605 );
2606 }
2607
2608 #[test]
2609 fn test_compare_attribute_values_strings() {
2610 let a = json!({"S": "apple"});
2611 let b = json!({"S": "banana"});
2612 assert_eq!(
2613 compare_attribute_values(Some(&a), Some(&b)),
2614 std::cmp::Ordering::Less
2615 );
2616 }
2617
2618 #[test]
2619 fn test_split_on_and() {
2620 let parts = split_on_and("pk = :pk AND sk > :sk");
2621 assert_eq!(parts.len(), 2);
2622 assert_eq!(parts[0].trim(), "pk = :pk");
2623 assert_eq!(parts[1].trim(), "sk > :sk");
2624 }
2625
2626 #[test]
2627 fn test_split_on_and_respects_parentheses() {
2628 let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
2630 assert_eq!(parts.len(), 1);
2632 assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
2633 }
2634
2635 #[test]
2636 fn test_evaluate_filter_expression_parenthesized_and_with_or() {
2637 let mut item = HashMap::new();
2639 item.insert("x".to_string(), json!({"S": "no"}));
2640 item.insert("y".to_string(), json!({"S": "no"}));
2641 item.insert("z".to_string(), json!({"S": "yes"}));
2642
2643 let mut expr_values = HashMap::new();
2644 expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
2645
2646 let result = evaluate_filter_expression(
2648 "(x = :yes AND y = :yes) OR z = :yes",
2649 &item,
2650 &HashMap::new(),
2651 &expr_values,
2652 );
2653 assert!(result, "should match because z = :yes is true");
2654
2655 let mut item2 = HashMap::new();
2657 item2.insert("x".to_string(), json!({"S": "no"}));
2658 item2.insert("y".to_string(), json!({"S": "no"}));
2659 item2.insert("z".to_string(), json!({"S": "no"}));
2660
2661 let result2 = evaluate_filter_expression(
2662 "(x = :yes AND y = :yes) OR z = :yes",
2663 &item2,
2664 &HashMap::new(),
2665 &expr_values,
2666 );
2667 assert!(!result2, "should not match because nothing is true");
2668 }
2669
2670 #[test]
2671 fn test_project_item_nested_path() {
2672 let mut item = HashMap::new();
2674 item.insert("pk".to_string(), json!({"S": "key1"}));
2675 item.insert(
2676 "data".to_string(),
2677 json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
2678 );
2679
2680 let body = json!({
2681 "ProjectionExpression": "data[0].name"
2682 });
2683
2684 let projected = project_item(&item, &body);
2685 let name = projected
2687 .get("data")
2688 .and_then(|v| v.get("L"))
2689 .and_then(|v| v.get(0))
2690 .and_then(|v| v.get("M"))
2691 .and_then(|v| v.get("name"))
2692 .and_then(|v| v.get("S"))
2693 .and_then(|v| v.as_str());
2694 assert_eq!(name, Some("Alice"));
2695
2696 let age = projected
2698 .get("data")
2699 .and_then(|v| v.get("L"))
2700 .and_then(|v| v.get(0))
2701 .and_then(|v| v.get("M"))
2702 .and_then(|v| v.get("age"));
2703 assert!(age.is_none(), "age should not be present in projection");
2704 }
2705
2706 #[test]
2707 fn test_resolve_nested_path_map() {
2708 let mut item = HashMap::new();
2709 item.insert(
2710 "info".to_string(),
2711 json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
2712 );
2713
2714 let result = resolve_nested_path(&item, "info.address.city");
2715 assert_eq!(result, Some(json!({"S": "NYC"})));
2716 }
2717
2718 #[test]
2719 fn test_resolve_nested_path_list_then_map() {
2720 let mut item = HashMap::new();
2721 item.insert(
2722 "items".to_string(),
2723 json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
2724 );
2725
2726 let result = resolve_nested_path(&item, "items[0].sku");
2727 assert_eq!(result, Some(json!({"S": "ABC"})));
2728 }
2729
2730 use crate::state::SharedDynamoDbState;
2733 use parking_lot::RwLock;
2734 use std::sync::Arc;
2735
2736 fn make_service() -> DynamoDbService {
2737 let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
2738 "123456789012",
2739 "us-east-1",
2740 )));
2741 DynamoDbService::new(state)
2742 }
2743
2744 fn make_request(action: &str, body: Value) -> AwsRequest {
2745 AwsRequest {
2746 service: "dynamodb".to_string(),
2747 action: action.to_string(),
2748 region: "us-east-1".to_string(),
2749 account_id: "123456789012".to_string(),
2750 request_id: "test-id".to_string(),
2751 headers: http::HeaderMap::new(),
2752 query_params: HashMap::new(),
2753 body: serde_json::to_vec(&body).unwrap().into(),
2754 path_segments: vec![],
2755 raw_path: "/".to_string(),
2756 raw_query: String::new(),
2757 method: http::Method::POST,
2758 is_query_protocol: false,
2759 access_key_id: None,
2760 }
2761 }
2762
2763 fn create_test_table(svc: &DynamoDbService) {
2764 let req = make_request(
2765 "CreateTable",
2766 json!({
2767 "TableName": "test-table",
2768 "KeySchema": [
2769 { "AttributeName": "pk", "KeyType": "HASH" }
2770 ],
2771 "AttributeDefinitions": [
2772 { "AttributeName": "pk", "AttributeType": "S" }
2773 ],
2774 "BillingMode": "PAY_PER_REQUEST"
2775 }),
2776 );
2777 svc.create_table(&req).unwrap();
2778 }
2779
2780 #[test]
2781 fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
2782 let svc = make_service();
2783 let req = make_request(
2784 "CreateTable",
2785 json!({
2786 "TableName": "warm-throughput-table",
2787 "KeySchema": [
2788 { "AttributeName": "pk", "KeyType": "HASH" }
2789 ],
2790 "AttributeDefinitions": [
2791 { "AttributeName": "pk", "AttributeType": "S" }
2792 ],
2793 "BillingMode": "PAY_PER_REQUEST"
2794 }),
2795 );
2796 let create_resp = svc.create_table(&req).unwrap();
2797 let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
2798 let create_table = &create_body["TableDescription"];
2799
2800 assert_eq!(create_table["TableStatus"], "ACTIVE");
2801 assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
2802 let table_id = create_table["TableId"].as_str().unwrap().to_string();
2803 assert!(!table_id.is_empty());
2804
2805 let describe_req = make_request(
2806 "DescribeTable",
2807 json!({ "TableName": "warm-throughput-table" }),
2808 );
2809 let describe_resp = svc.describe_table(&describe_req).unwrap();
2810 let describe_body: Value =
2811 serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
2812 let described_table = &describe_body["Table"];
2813
2814 assert_eq!(described_table["TableStatus"], "ACTIVE");
2815 assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
2816 assert_eq!(described_table["TableId"], table_id);
2817
2818 let describe_resp_again = svc.describe_table(&describe_req).unwrap();
2819 let describe_body_again: Value =
2820 serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
2821 assert_eq!(describe_body_again["Table"]["TableId"], table_id);
2822 }
2823
2824 #[test]
2825 fn delete_item_return_values_all_old() {
2826 let svc = make_service();
2827 create_test_table(&svc);
2828
2829 let req = make_request(
2831 "PutItem",
2832 json!({
2833 "TableName": "test-table",
2834 "Item": {
2835 "pk": { "S": "key1" },
2836 "name": { "S": "Alice" },
2837 "age": { "N": "30" }
2838 }
2839 }),
2840 );
2841 svc.put_item(&req).unwrap();
2842
2843 let req = make_request(
2845 "DeleteItem",
2846 json!({
2847 "TableName": "test-table",
2848 "Key": { "pk": { "S": "key1" } },
2849 "ReturnValues": "ALL_OLD"
2850 }),
2851 );
2852 let resp = svc.delete_item(&req).unwrap();
2853 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2854
2855 let attrs = &body["Attributes"];
2857 assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
2858 assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
2859 assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
2860
2861 let req = make_request(
2863 "GetItem",
2864 json!({
2865 "TableName": "test-table",
2866 "Key": { "pk": { "S": "key1" } }
2867 }),
2868 );
2869 let resp = svc.get_item(&req).unwrap();
2870 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2871 assert!(body.get("Item").is_none(), "item should be deleted");
2872 }
2873
2874 #[test]
2875 fn transact_get_items_returns_existing_and_missing() {
2876 let svc = make_service();
2877 create_test_table(&svc);
2878
2879 let req = make_request(
2881 "PutItem",
2882 json!({
2883 "TableName": "test-table",
2884 "Item": {
2885 "pk": { "S": "exists" },
2886 "val": { "S": "hello" }
2887 }
2888 }),
2889 );
2890 svc.put_item(&req).unwrap();
2891
2892 let req = make_request(
2893 "TransactGetItems",
2894 json!({
2895 "TransactItems": [
2896 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
2897 { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
2898 ]
2899 }),
2900 );
2901 let resp = svc.transact_get_items(&req).unwrap();
2902 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2903 let responses = body["Responses"].as_array().unwrap();
2904 assert_eq!(responses.len(), 2);
2905 assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
2906 assert!(responses[1].get("Item").is_none());
2907 }
2908
2909 #[test]
2910 fn transact_write_items_put_and_delete() {
2911 let svc = make_service();
2912 create_test_table(&svc);
2913
2914 let req = make_request(
2916 "PutItem",
2917 json!({
2918 "TableName": "test-table",
2919 "Item": {
2920 "pk": { "S": "to-delete" },
2921 "val": { "S": "bye" }
2922 }
2923 }),
2924 );
2925 svc.put_item(&req).unwrap();
2926
2927 let req = make_request(
2929 "TransactWriteItems",
2930 json!({
2931 "TransactItems": [
2932 {
2933 "Put": {
2934 "TableName": "test-table",
2935 "Item": {
2936 "pk": { "S": "new-item" },
2937 "val": { "S": "hi" }
2938 }
2939 }
2940 },
2941 {
2942 "Delete": {
2943 "TableName": "test-table",
2944 "Key": { "pk": { "S": "to-delete" } }
2945 }
2946 }
2947 ]
2948 }),
2949 );
2950 let resp = svc.transact_write_items(&req).unwrap();
2951 assert_eq!(resp.status, StatusCode::OK);
2952
2953 let req = make_request(
2955 "GetItem",
2956 json!({
2957 "TableName": "test-table",
2958 "Key": { "pk": { "S": "new-item" } }
2959 }),
2960 );
2961 let resp = svc.get_item(&req).unwrap();
2962 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2963 assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
2964
2965 let req = make_request(
2967 "GetItem",
2968 json!({
2969 "TableName": "test-table",
2970 "Key": { "pk": { "S": "to-delete" } }
2971 }),
2972 );
2973 let resp = svc.get_item(&req).unwrap();
2974 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2975 assert!(body.get("Item").is_none());
2976 }
2977
2978 #[test]
2979 fn transact_write_items_condition_check_failure() {
2980 let svc = make_service();
2981 create_test_table(&svc);
2982
2983 let req = make_request(
2985 "TransactWriteItems",
2986 json!({
2987 "TransactItems": [
2988 {
2989 "ConditionCheck": {
2990 "TableName": "test-table",
2991 "Key": { "pk": { "S": "nonexistent" } },
2992 "ConditionExpression": "attribute_exists(pk)"
2993 }
2994 }
2995 ]
2996 }),
2997 );
2998 let resp = svc.transact_write_items(&req).unwrap();
2999 assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3001 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3002 assert_eq!(
3003 body["__type"].as_str().unwrap(),
3004 "TransactionCanceledException"
3005 );
3006 assert!(body["CancellationReasons"].as_array().is_some());
3007 }
3008
3009 #[test]
3010 fn update_and_describe_time_to_live() {
3011 let svc = make_service();
3012 create_test_table(&svc);
3013
3014 let req = make_request(
3016 "UpdateTimeToLive",
3017 json!({
3018 "TableName": "test-table",
3019 "TimeToLiveSpecification": {
3020 "AttributeName": "ttl",
3021 "Enabled": true
3022 }
3023 }),
3024 );
3025 let resp = svc.update_time_to_live(&req).unwrap();
3026 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3027 assert_eq!(
3028 body["TimeToLiveSpecification"]["AttributeName"]
3029 .as_str()
3030 .unwrap(),
3031 "ttl"
3032 );
3033 assert!(body["TimeToLiveSpecification"]["Enabled"]
3034 .as_bool()
3035 .unwrap());
3036
3037 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3039 let resp = svc.describe_time_to_live(&req).unwrap();
3040 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3041 assert_eq!(
3042 body["TimeToLiveDescription"]["TimeToLiveStatus"]
3043 .as_str()
3044 .unwrap(),
3045 "ENABLED"
3046 );
3047 assert_eq!(
3048 body["TimeToLiveDescription"]["AttributeName"]
3049 .as_str()
3050 .unwrap(),
3051 "ttl"
3052 );
3053
3054 let req = make_request(
3056 "UpdateTimeToLive",
3057 json!({
3058 "TableName": "test-table",
3059 "TimeToLiveSpecification": {
3060 "AttributeName": "ttl",
3061 "Enabled": false
3062 }
3063 }),
3064 );
3065 svc.update_time_to_live(&req).unwrap();
3066
3067 let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3068 let resp = svc.describe_time_to_live(&req).unwrap();
3069 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3070 assert_eq!(
3071 body["TimeToLiveDescription"]["TimeToLiveStatus"]
3072 .as_str()
3073 .unwrap(),
3074 "DISABLED"
3075 );
3076 }
3077
3078 #[test]
3079 fn resource_policy_lifecycle() {
3080 let svc = make_service();
3081 create_test_table(&svc);
3082
3083 let table_arn = {
3084 let state = svc.state.read();
3085 state.tables.get("test-table").unwrap().arn.clone()
3086 };
3087
3088 let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3090 let req = make_request(
3091 "PutResourcePolicy",
3092 json!({
3093 "ResourceArn": table_arn,
3094 "Policy": policy_doc
3095 }),
3096 );
3097 let resp = svc.put_resource_policy(&req).unwrap();
3098 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3099 assert!(body["RevisionId"].as_str().is_some());
3100
3101 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3103 let resp = svc.get_resource_policy(&req).unwrap();
3104 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3105 assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3106
3107 let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3109 svc.delete_resource_policy(&req).unwrap();
3110
3111 let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3113 let resp = svc.get_resource_policy(&req).unwrap();
3114 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3115 assert!(body["Policy"].is_null());
3116 }
3117
3118 #[test]
3119 fn describe_endpoints() {
3120 let svc = make_service();
3121 let req = make_request("DescribeEndpoints", json!({}));
3122 let resp = svc.describe_endpoints(&req).unwrap();
3123 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3124 assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3125 }
3126
3127 #[test]
3128 fn describe_limits() {
3129 let svc = make_service();
3130 let req = make_request("DescribeLimits", json!({}));
3131 let resp = svc.describe_limits(&req).unwrap();
3132 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3133 assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3134 }
3135
3136 #[test]
3137 fn backup_lifecycle() {
3138 let svc = make_service();
3139 create_test_table(&svc);
3140
3141 let req = make_request(
3143 "CreateBackup",
3144 json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3145 );
3146 let resp = svc.create_backup(&req).unwrap();
3147 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3148 let backup_arn = body["BackupDetails"]["BackupArn"]
3149 .as_str()
3150 .unwrap()
3151 .to_string();
3152 assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3153
3154 let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3156 let resp = svc.describe_backup(&req).unwrap();
3157 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3158 assert_eq!(
3159 body["BackupDescription"]["BackupDetails"]["BackupName"],
3160 "my-backup"
3161 );
3162
3163 let req = make_request("ListBackups", json!({}));
3165 let resp = svc.list_backups(&req).unwrap();
3166 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3167 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3168
3169 let req = make_request(
3171 "RestoreTableFromBackup",
3172 json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3173 );
3174 svc.restore_table_from_backup(&req).unwrap();
3175
3176 let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3178 let resp = svc.describe_table(&req).unwrap();
3179 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3180 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3181
3182 let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3184 svc.delete_backup(&req).unwrap();
3185
3186 let req = make_request("ListBackups", json!({}));
3188 let resp = svc.list_backups(&req).unwrap();
3189 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3190 assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3191 }
3192
3193 #[test]
3194 fn continuous_backups() {
3195 let svc = make_service();
3196 create_test_table(&svc);
3197
3198 let req = make_request(
3200 "DescribeContinuousBackups",
3201 json!({ "TableName": "test-table" }),
3202 );
3203 let resp = svc.describe_continuous_backups(&req).unwrap();
3204 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3205 assert_eq!(
3206 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3207 ["PointInTimeRecoveryStatus"],
3208 "DISABLED"
3209 );
3210
3211 let req = make_request(
3213 "UpdateContinuousBackups",
3214 json!({
3215 "TableName": "test-table",
3216 "PointInTimeRecoverySpecification": {
3217 "PointInTimeRecoveryEnabled": true
3218 }
3219 }),
3220 );
3221 svc.update_continuous_backups(&req).unwrap();
3222
3223 let req = make_request(
3225 "DescribeContinuousBackups",
3226 json!({ "TableName": "test-table" }),
3227 );
3228 let resp = svc.describe_continuous_backups(&req).unwrap();
3229 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3230 assert_eq!(
3231 body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3232 ["PointInTimeRecoveryStatus"],
3233 "ENABLED"
3234 );
3235 }
3236
3237 #[test]
3238 fn restore_table_to_point_in_time() {
3239 let svc = make_service();
3240 create_test_table(&svc);
3241
3242 let req = make_request(
3243 "RestoreTableToPointInTime",
3244 json!({
3245 "SourceTableName": "test-table",
3246 "TargetTableName": "pitr-restored"
3247 }),
3248 );
3249 svc.restore_table_to_point_in_time(&req).unwrap();
3250
3251 let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3252 let resp = svc.describe_table(&req).unwrap();
3253 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3254 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3255 }
3256
3257 #[test]
3258 fn global_table_lifecycle() {
3259 let svc = make_service();
3260
3261 let req = make_request(
3263 "CreateGlobalTable",
3264 json!({
3265 "GlobalTableName": "my-global",
3266 "ReplicationGroup": [
3267 { "RegionName": "us-east-1" },
3268 { "RegionName": "eu-west-1" }
3269 ]
3270 }),
3271 );
3272 let resp = svc.create_global_table(&req).unwrap();
3273 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3274 assert_eq!(
3275 body["GlobalTableDescription"]["GlobalTableStatus"],
3276 "ACTIVE"
3277 );
3278
3279 let req = make_request(
3281 "DescribeGlobalTable",
3282 json!({ "GlobalTableName": "my-global" }),
3283 );
3284 let resp = svc.describe_global_table(&req).unwrap();
3285 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3286 assert_eq!(
3287 body["GlobalTableDescription"]["ReplicationGroup"]
3288 .as_array()
3289 .unwrap()
3290 .len(),
3291 2
3292 );
3293
3294 let req = make_request("ListGlobalTables", json!({}));
3296 let resp = svc.list_global_tables(&req).unwrap();
3297 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3298 assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3299
3300 let req = make_request(
3302 "UpdateGlobalTable",
3303 json!({
3304 "GlobalTableName": "my-global",
3305 "ReplicaUpdates": [
3306 { "Create": { "RegionName": "ap-southeast-1" } }
3307 ]
3308 }),
3309 );
3310 let resp = svc.update_global_table(&req).unwrap();
3311 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3312 assert_eq!(
3313 body["GlobalTableDescription"]["ReplicationGroup"]
3314 .as_array()
3315 .unwrap()
3316 .len(),
3317 3
3318 );
3319
3320 let req = make_request(
3322 "DescribeGlobalTableSettings",
3323 json!({ "GlobalTableName": "my-global" }),
3324 );
3325 let resp = svc.describe_global_table_settings(&req).unwrap();
3326 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3327 assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3328
3329 let req = make_request(
3331 "UpdateGlobalTableSettings",
3332 json!({ "GlobalTableName": "my-global" }),
3333 );
3334 svc.update_global_table_settings(&req).unwrap();
3335 }
3336
3337 #[test]
3338 fn table_replica_auto_scaling() {
3339 let svc = make_service();
3340 create_test_table(&svc);
3341
3342 let req = make_request(
3343 "DescribeTableReplicaAutoScaling",
3344 json!({ "TableName": "test-table" }),
3345 );
3346 let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3347 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3348 assert_eq!(
3349 body["TableAutoScalingDescription"]["TableName"],
3350 "test-table"
3351 );
3352
3353 let req = make_request(
3354 "UpdateTableReplicaAutoScaling",
3355 json!({ "TableName": "test-table" }),
3356 );
3357 svc.update_table_replica_auto_scaling(&req).unwrap();
3358 }
3359
3360 #[test]
3361 fn kinesis_streaming_lifecycle() {
3362 let svc = make_service();
3363 create_test_table(&svc);
3364
3365 let req = make_request(
3367 "EnableKinesisStreamingDestination",
3368 json!({
3369 "TableName": "test-table",
3370 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3371 }),
3372 );
3373 let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3374 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3375 assert_eq!(body["DestinationStatus"], "ACTIVE");
3376
3377 let req = make_request(
3379 "DescribeKinesisStreamingDestination",
3380 json!({ "TableName": "test-table" }),
3381 );
3382 let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3383 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3384 assert_eq!(
3385 body["KinesisDataStreamDestinations"]
3386 .as_array()
3387 .unwrap()
3388 .len(),
3389 1
3390 );
3391
3392 let req = make_request(
3394 "UpdateKinesisStreamingDestination",
3395 json!({
3396 "TableName": "test-table",
3397 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3398 "UpdateKinesisStreamingConfiguration": {
3399 "ApproximateCreationDateTimePrecision": "MICROSECOND"
3400 }
3401 }),
3402 );
3403 svc.update_kinesis_streaming_destination(&req).unwrap();
3404
3405 let req = make_request(
3407 "DisableKinesisStreamingDestination",
3408 json!({
3409 "TableName": "test-table",
3410 "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3411 }),
3412 );
3413 let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3414 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3415 assert_eq!(body["DestinationStatus"], "DISABLED");
3416 }
3417
3418 #[test]
3419 fn contributor_insights_lifecycle() {
3420 let svc = make_service();
3421 create_test_table(&svc);
3422
3423 let req = make_request(
3425 "DescribeContributorInsights",
3426 json!({ "TableName": "test-table" }),
3427 );
3428 let resp = svc.describe_contributor_insights(&req).unwrap();
3429 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3430 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3431
3432 let req = make_request(
3434 "UpdateContributorInsights",
3435 json!({
3436 "TableName": "test-table",
3437 "ContributorInsightsAction": "ENABLE"
3438 }),
3439 );
3440 let resp = svc.update_contributor_insights(&req).unwrap();
3441 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3442 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3443
3444 let req = make_request("ListContributorInsights", json!({}));
3446 let resp = svc.list_contributor_insights(&req).unwrap();
3447 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3448 assert_eq!(
3449 body["ContributorInsightsSummaries"]
3450 .as_array()
3451 .unwrap()
3452 .len(),
3453 1
3454 );
3455 }
3456
3457 #[test]
3458 fn export_lifecycle() {
3459 let svc = make_service();
3460 create_test_table(&svc);
3461
3462 let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3463
3464 let req = make_request(
3466 "ExportTableToPointInTime",
3467 json!({
3468 "TableArn": table_arn,
3469 "S3Bucket": "my-bucket"
3470 }),
3471 );
3472 let resp = svc.export_table_to_point_in_time(&req).unwrap();
3473 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3474 let export_arn = body["ExportDescription"]["ExportArn"]
3475 .as_str()
3476 .unwrap()
3477 .to_string();
3478 assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3479
3480 let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3482 let resp = svc.describe_export(&req).unwrap();
3483 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3484 assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3485
3486 let req = make_request("ListExports", json!({}));
3488 let resp = svc.list_exports(&req).unwrap();
3489 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3490 assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3491 }
3492
3493 #[test]
3494 fn import_lifecycle() {
3495 let svc = make_service();
3496
3497 let req = make_request(
3498 "ImportTable",
3499 json!({
3500 "InputFormat": "DYNAMODB_JSON",
3501 "S3BucketSource": { "S3Bucket": "import-bucket" },
3502 "TableCreationParameters": {
3503 "TableName": "imported-table",
3504 "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3505 "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3506 }
3507 }),
3508 );
3509 let resp = svc.import_table(&req).unwrap();
3510 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3511 let import_arn = body["ImportTableDescription"]["ImportArn"]
3512 .as_str()
3513 .unwrap()
3514 .to_string();
3515 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3516
3517 let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3519 let resp = svc.describe_import(&req).unwrap();
3520 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3521 assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3522
3523 let req = make_request("ListImports", json!({}));
3525 let resp = svc.list_imports(&req).unwrap();
3526 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3527 assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3528
3529 let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3531 let resp = svc.describe_table(&req).unwrap();
3532 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3533 assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3534 }
3535
3536 #[test]
3537 fn backup_restore_preserves_items() {
3538 let svc = make_service();
3539 create_test_table(&svc);
3540
3541 for i in 1..=3 {
3543 let req = make_request(
3544 "PutItem",
3545 json!({
3546 "TableName": "test-table",
3547 "Item": {
3548 "pk": { "S": format!("key{i}") },
3549 "data": { "S": format!("value{i}") }
3550 }
3551 }),
3552 );
3553 svc.put_item(&req).unwrap();
3554 }
3555
3556 let req = make_request(
3558 "CreateBackup",
3559 json!({
3560 "TableName": "test-table",
3561 "BackupName": "my-backup"
3562 }),
3563 );
3564 let resp = svc.create_backup(&req).unwrap();
3565 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3566 let backup_arn = body["BackupDetails"]["BackupArn"]
3567 .as_str()
3568 .unwrap()
3569 .to_string();
3570
3571 for i in 1..=3 {
3573 let req = make_request(
3574 "DeleteItem",
3575 json!({
3576 "TableName": "test-table",
3577 "Key": { "pk": { "S": format!("key{i}") } }
3578 }),
3579 );
3580 svc.delete_item(&req).unwrap();
3581 }
3582
3583 let req = make_request("Scan", json!({ "TableName": "test-table" }));
3585 let resp = svc.scan(&req).unwrap();
3586 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3587 assert_eq!(body["Count"], 0);
3588
3589 let req = make_request(
3591 "RestoreTableFromBackup",
3592 json!({
3593 "BackupArn": backup_arn,
3594 "TargetTableName": "restored-table"
3595 }),
3596 );
3597 svc.restore_table_from_backup(&req).unwrap();
3598
3599 let req = make_request("Scan", json!({ "TableName": "restored-table" }));
3601 let resp = svc.scan(&req).unwrap();
3602 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3603 assert_eq!(body["Count"], 3);
3604 assert_eq!(body["Items"].as_array().unwrap().len(), 3);
3605 }
3606
3607 #[test]
3608 fn global_table_replicates_writes() {
3609 let svc = make_service();
3610 create_test_table(&svc);
3611
3612 let req = make_request(
3614 "CreateGlobalTable",
3615 json!({
3616 "GlobalTableName": "test-table",
3617 "ReplicationGroup": [
3618 { "RegionName": "us-east-1" },
3619 { "RegionName": "eu-west-1" }
3620 ]
3621 }),
3622 );
3623 let resp = svc.create_global_table(&req).unwrap();
3624 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3625 assert_eq!(
3626 body["GlobalTableDescription"]["GlobalTableStatus"],
3627 "ACTIVE"
3628 );
3629
3630 let req = make_request(
3632 "PutItem",
3633 json!({
3634 "TableName": "test-table",
3635 "Item": {
3636 "pk": { "S": "replicated-key" },
3637 "data": { "S": "replicated-value" }
3638 }
3639 }),
3640 );
3641 svc.put_item(&req).unwrap();
3642
3643 let req = make_request(
3645 "GetItem",
3646 json!({
3647 "TableName": "test-table",
3648 "Key": { "pk": { "S": "replicated-key" } }
3649 }),
3650 );
3651 let resp = svc.get_item(&req).unwrap();
3652 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3653 assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
3654 assert_eq!(body["Item"]["data"]["S"], "replicated-value");
3655 }
3656
3657 #[test]
3658 fn contributor_insights_tracks_access() {
3659 let svc = make_service();
3660 create_test_table(&svc);
3661
3662 let req = make_request(
3664 "UpdateContributorInsights",
3665 json!({
3666 "TableName": "test-table",
3667 "ContributorInsightsAction": "ENABLE"
3668 }),
3669 );
3670 svc.update_contributor_insights(&req).unwrap();
3671
3672 for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
3674 let req = make_request(
3675 "PutItem",
3676 json!({
3677 "TableName": "test-table",
3678 "Item": {
3679 "pk": { "S": key },
3680 "data": { "S": "value" }
3681 }
3682 }),
3683 );
3684 svc.put_item(&req).unwrap();
3685 }
3686
3687 for _ in 0..3 {
3689 let req = make_request(
3690 "GetItem",
3691 json!({
3692 "TableName": "test-table",
3693 "Key": { "pk": { "S": "alpha" } }
3694 }),
3695 );
3696 svc.get_item(&req).unwrap();
3697 }
3698
3699 let req = make_request(
3701 "DescribeContributorInsights",
3702 json!({ "TableName": "test-table" }),
3703 );
3704 let resp = svc.describe_contributor_insights(&req).unwrap();
3705 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3706 assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3707
3708 let contributors = body["TopContributors"].as_array().unwrap();
3709 assert!(
3710 !contributors.is_empty(),
3711 "TopContributors should not be empty"
3712 );
3713
3714 let top = &contributors[0];
3717 assert!(top["Count"].as_u64().unwrap() > 0);
3718
3719 let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
3721 assert!(!rules.is_empty());
3722 }
3723
3724 #[test]
3725 fn contributor_insights_not_tracked_when_disabled() {
3726 let svc = make_service();
3727 create_test_table(&svc);
3728
3729 let req = make_request(
3731 "PutItem",
3732 json!({
3733 "TableName": "test-table",
3734 "Item": {
3735 "pk": { "S": "key1" },
3736 "data": { "S": "value" }
3737 }
3738 }),
3739 );
3740 svc.put_item(&req).unwrap();
3741
3742 let req = make_request(
3744 "DescribeContributorInsights",
3745 json!({ "TableName": "test-table" }),
3746 );
3747 let resp = svc.describe_contributor_insights(&req).unwrap();
3748 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3749 assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3750
3751 let contributors = body["TopContributors"].as_array().unwrap();
3752 assert!(contributors.is_empty());
3753 }
3754
3755 #[test]
3756 fn contributor_insights_disabled_table_no_counters_after_scan() {
3757 let svc = make_service();
3758 create_test_table(&svc);
3759
3760 for key in &["alpha", "beta"] {
3762 let req = make_request(
3763 "PutItem",
3764 json!({
3765 "TableName": "test-table",
3766 "Item": { "pk": { "S": key } }
3767 }),
3768 );
3769 svc.put_item(&req).unwrap();
3770 }
3771
3772 let req = make_request(
3774 "UpdateContributorInsights",
3775 json!({
3776 "TableName": "test-table",
3777 "ContributorInsightsAction": "ENABLE"
3778 }),
3779 );
3780 svc.update_contributor_insights(&req).unwrap();
3781
3782 let req = make_request("Scan", json!({ "TableName": "test-table" }));
3784 svc.scan(&req).unwrap();
3785
3786 let req = make_request(
3788 "DescribeContributorInsights",
3789 json!({ "TableName": "test-table" }),
3790 );
3791 let resp = svc.describe_contributor_insights(&req).unwrap();
3792 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3793 let contributors = body["TopContributors"].as_array().unwrap();
3794 assert!(
3795 !contributors.is_empty(),
3796 "counters should be non-empty while enabled"
3797 );
3798
3799 let req = make_request(
3801 "UpdateContributorInsights",
3802 json!({
3803 "TableName": "test-table",
3804 "ContributorInsightsAction": "DISABLE"
3805 }),
3806 );
3807 svc.update_contributor_insights(&req).unwrap();
3808
3809 let req = make_request("Scan", json!({ "TableName": "test-table" }));
3811 svc.scan(&req).unwrap();
3812
3813 let req = make_request(
3815 "DescribeContributorInsights",
3816 json!({ "TableName": "test-table" }),
3817 );
3818 let resp = svc.describe_contributor_insights(&req).unwrap();
3819 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3820 let contributors = body["TopContributors"].as_array().unwrap();
3821 assert!(
3822 contributors.is_empty(),
3823 "counters should be empty after disabling insights"
3824 );
3825 }
3826
3827 #[test]
3828 fn scan_pagination_with_limit() {
3829 let svc = make_service();
3830 create_test_table(&svc);
3831
3832 for i in 0..5 {
3834 let req = make_request(
3835 "PutItem",
3836 json!({
3837 "TableName": "test-table",
3838 "Item": {
3839 "pk": { "S": format!("item{i}") },
3840 "data": { "S": format!("value{i}") }
3841 }
3842 }),
3843 );
3844 svc.put_item(&req).unwrap();
3845 }
3846
3847 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
3849 let resp = svc.scan(&req).unwrap();
3850 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3851 assert_eq!(body["Count"], 2);
3852 assert!(
3853 body["LastEvaluatedKey"].is_object(),
3854 "should have LastEvaluatedKey when limit < total items"
3855 );
3856 assert!(body["LastEvaluatedKey"]["pk"].is_object());
3857
3858 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
3860 let mut lek = body["LastEvaluatedKey"].clone();
3861
3862 while lek.is_object() {
3863 let req = make_request(
3864 "Scan",
3865 json!({
3866 "TableName": "test-table",
3867 "Limit": 2,
3868 "ExclusiveStartKey": lek
3869 }),
3870 );
3871 let resp = svc.scan(&req).unwrap();
3872 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3873 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
3874 lek = body["LastEvaluatedKey"].clone();
3875 }
3876
3877 assert_eq!(
3878 all_items.len(),
3879 5,
3880 "should retrieve all 5 items via pagination"
3881 );
3882 }
3883
3884 #[test]
3885 fn scan_no_pagination_when_all_fit() {
3886 let svc = make_service();
3887 create_test_table(&svc);
3888
3889 for i in 0..3 {
3890 let req = make_request(
3891 "PutItem",
3892 json!({
3893 "TableName": "test-table",
3894 "Item": {
3895 "pk": { "S": format!("item{i}") }
3896 }
3897 }),
3898 );
3899 svc.put_item(&req).unwrap();
3900 }
3901
3902 let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
3904 let resp = svc.scan(&req).unwrap();
3905 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3906 assert_eq!(body["Count"], 3);
3907 assert!(
3908 body["LastEvaluatedKey"].is_null(),
3909 "should not have LastEvaluatedKey when all items fit"
3910 );
3911
3912 let req = make_request("Scan", json!({ "TableName": "test-table" }));
3914 let resp = svc.scan(&req).unwrap();
3915 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3916 assert_eq!(body["Count"], 3);
3917 assert!(body["LastEvaluatedKey"].is_null());
3918 }
3919
3920 fn create_composite_table(svc: &DynamoDbService) {
3921 let req = make_request(
3922 "CreateTable",
3923 json!({
3924 "TableName": "composite-table",
3925 "KeySchema": [
3926 { "AttributeName": "pk", "KeyType": "HASH" },
3927 { "AttributeName": "sk", "KeyType": "RANGE" }
3928 ],
3929 "AttributeDefinitions": [
3930 { "AttributeName": "pk", "AttributeType": "S" },
3931 { "AttributeName": "sk", "AttributeType": "S" }
3932 ],
3933 "BillingMode": "PAY_PER_REQUEST"
3934 }),
3935 );
3936 svc.create_table(&req).unwrap();
3937 }
3938
3939 #[test]
3940 fn query_pagination_with_composite_key() {
3941 let svc = make_service();
3942 create_composite_table(&svc);
3943
3944 for i in 0..5 {
3946 let req = make_request(
3947 "PutItem",
3948 json!({
3949 "TableName": "composite-table",
3950 "Item": {
3951 "pk": { "S": "user1" },
3952 "sk": { "S": format!("item{i:03}") },
3953 "data": { "S": format!("value{i}") }
3954 }
3955 }),
3956 );
3957 svc.put_item(&req).unwrap();
3958 }
3959
3960 let req = make_request(
3962 "Query",
3963 json!({
3964 "TableName": "composite-table",
3965 "KeyConditionExpression": "pk = :pk",
3966 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
3967 "Limit": 2
3968 }),
3969 );
3970 let resp = svc.query(&req).unwrap();
3971 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3972 assert_eq!(body["Count"], 2);
3973 assert!(body["LastEvaluatedKey"].is_object());
3974 assert!(body["LastEvaluatedKey"]["pk"].is_object());
3975 assert!(body["LastEvaluatedKey"]["sk"].is_object());
3976
3977 let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
3979 let mut lek = body["LastEvaluatedKey"].clone();
3980
3981 while lek.is_object() {
3982 let req = make_request(
3983 "Query",
3984 json!({
3985 "TableName": "composite-table",
3986 "KeyConditionExpression": "pk = :pk",
3987 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
3988 "Limit": 2,
3989 "ExclusiveStartKey": lek
3990 }),
3991 );
3992 let resp = svc.query(&req).unwrap();
3993 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3994 all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
3995 lek = body["LastEvaluatedKey"].clone();
3996 }
3997
3998 assert_eq!(
3999 all_items.len(),
4000 5,
4001 "should retrieve all 5 items via pagination"
4002 );
4003
4004 let sks: Vec<String> = all_items
4006 .iter()
4007 .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4008 .collect();
4009 let mut sorted = sks.clone();
4010 sorted.sort();
4011 assert_eq!(sks, sorted, "items should be sorted by sort key");
4012 }
4013
4014 #[test]
4015 fn query_no_pagination_when_all_fit() {
4016 let svc = make_service();
4017 create_composite_table(&svc);
4018
4019 for i in 0..2 {
4020 let req = make_request(
4021 "PutItem",
4022 json!({
4023 "TableName": "composite-table",
4024 "Item": {
4025 "pk": { "S": "user1" },
4026 "sk": { "S": format!("item{i}") }
4027 }
4028 }),
4029 );
4030 svc.put_item(&req).unwrap();
4031 }
4032
4033 let req = make_request(
4034 "Query",
4035 json!({
4036 "TableName": "composite-table",
4037 "KeyConditionExpression": "pk = :pk",
4038 "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4039 "Limit": 10
4040 }),
4041 );
4042 let resp = svc.query(&req).unwrap();
4043 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4044 assert_eq!(body["Count"], 2);
4045 assert!(
4046 body["LastEvaluatedKey"].is_null(),
4047 "should not have LastEvaluatedKey when all items fit"
4048 );
4049 }
4050
4051 fn create_gsi_table(svc: &DynamoDbService) {
4052 let req = make_request(
4053 "CreateTable",
4054 json!({
4055 "TableName": "gsi-table",
4056 "KeySchema": [
4057 { "AttributeName": "pk", "KeyType": "HASH" }
4058 ],
4059 "AttributeDefinitions": [
4060 { "AttributeName": "pk", "AttributeType": "S" },
4061 { "AttributeName": "gsi_pk", "AttributeType": "S" },
4062 { "AttributeName": "gsi_sk", "AttributeType": "S" }
4063 ],
4064 "BillingMode": "PAY_PER_REQUEST",
4065 "GlobalSecondaryIndexes": [
4066 {
4067 "IndexName": "gsi-index",
4068 "KeySchema": [
4069 { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4070 { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4071 ],
4072 "Projection": { "ProjectionType": "ALL" }
4073 }
4074 ]
4075 }),
4076 );
4077 svc.create_table(&req).unwrap();
4078 }
4079
4080 #[test]
4081 fn gsi_query_last_evaluated_key_includes_table_pk() {
4082 let svc = make_service();
4083 create_gsi_table(&svc);
4084
4085 for i in 0..3 {
4087 let req = make_request(
4088 "PutItem",
4089 json!({
4090 "TableName": "gsi-table",
4091 "Item": {
4092 "pk": { "S": format!("item{i}") },
4093 "gsi_pk": { "S": "shared" },
4094 "gsi_sk": { "S": "sort" }
4095 }
4096 }),
4097 );
4098 svc.put_item(&req).unwrap();
4099 }
4100
4101 let req = make_request(
4103 "Query",
4104 json!({
4105 "TableName": "gsi-table",
4106 "IndexName": "gsi-index",
4107 "KeyConditionExpression": "gsi_pk = :v",
4108 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4109 "Limit": 1
4110 }),
4111 );
4112 let resp = svc.query(&req).unwrap();
4113 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4114 assert_eq!(body["Count"], 1);
4115 let lek = &body["LastEvaluatedKey"];
4116 assert!(lek.is_object(), "should have LastEvaluatedKey");
4117 assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4119 assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4120 assert!(
4122 lek["pk"].is_object(),
4123 "LEK must contain table PK for GSI queries"
4124 );
4125 }
4126
4127 #[test]
4128 fn gsi_query_pagination_returns_all_items() {
4129 let svc = make_service();
4130 create_gsi_table(&svc);
4131
4132 for i in 0..4 {
4134 let req = make_request(
4135 "PutItem",
4136 json!({
4137 "TableName": "gsi-table",
4138 "Item": {
4139 "pk": { "S": format!("item{i:03}") },
4140 "gsi_pk": { "S": "shared" },
4141 "gsi_sk": { "S": "sort" }
4142 }
4143 }),
4144 );
4145 svc.put_item(&req).unwrap();
4146 }
4147
4148 let mut all_pks = Vec::new();
4150 let mut lek: Option<Value> = None;
4151
4152 loop {
4153 let mut query = json!({
4154 "TableName": "gsi-table",
4155 "IndexName": "gsi-index",
4156 "KeyConditionExpression": "gsi_pk = :v",
4157 "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4158 "Limit": 2
4159 });
4160 if let Some(ref start_key) = lek {
4161 query["ExclusiveStartKey"] = start_key.clone();
4162 }
4163
4164 let req = make_request("Query", query);
4165 let resp = svc.query(&req).unwrap();
4166 let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4167
4168 for item in body["Items"].as_array().unwrap() {
4169 let pk = item["pk"]["S"].as_str().unwrap().to_string();
4170 all_pks.push(pk);
4171 }
4172
4173 if body["LastEvaluatedKey"].is_object() {
4174 lek = Some(body["LastEvaluatedKey"].clone());
4175 } else {
4176 break;
4177 }
4178 }
4179
4180 all_pks.sort();
4181 assert_eq!(
4182 all_pks,
4183 vec!["item000", "item001", "item002", "item003"],
4184 "pagination should return all items without duplicates"
4185 );
4186 }
4187
4188 fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4189 pairs
4190 .iter()
4191 .map(|(k, v)| (k.to_string(), json!({"S": v})))
4192 .collect()
4193 }
4194
4195 fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4196 pairs
4197 .iter()
4198 .map(|(k, v)| (k.to_string(), v.to_string()))
4199 .collect()
4200 }
4201
4202 fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4203 pairs
4204 .iter()
4205 .map(|(k, v)| (k.to_string(), json!({"S": v})))
4206 .collect()
4207 }
4208
4209 #[test]
4210 fn test_evaluate_condition_bare_not_equal() {
4211 let item = cond_item(&[("state", "active")]);
4212 let names = cond_names(&[("#s", "state")]);
4213 let values = cond_values(&[(":c", "complete")]);
4214
4215 assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4216
4217 let item2 = cond_item(&[("state", "complete")]);
4218 assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4219 }
4220
4221 #[test]
4222 fn test_evaluate_condition_parenthesized_not_equal() {
4223 let item = cond_item(&[("state", "active")]);
4224 let names = cond_names(&[("#s", "state")]);
4225 let values = cond_values(&[(":c", "complete")]);
4226
4227 assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4228 }
4229
4230 #[test]
4231 fn test_evaluate_condition_parenthesized_equal_mismatch() {
4232 let item = cond_item(&[("state", "active")]);
4233 let names = cond_names(&[("#s", "state")]);
4234 let values = cond_values(&[(":c", "complete")]);
4235
4236 assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4237 }
4238
4239 #[test]
4240 fn test_evaluate_condition_compound_and() {
4241 let item = cond_item(&[("state", "active")]);
4242 let names = cond_names(&[("#s", "state")]);
4243 let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4244
4245 assert!(
4247 evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4248 );
4249 }
4250
4251 #[test]
4252 fn test_evaluate_condition_compound_and_mismatch() {
4253 let item = cond_item(&[("state", "inactive")]);
4254 let names = cond_names(&[("#s", "state")]);
4255 let values = cond_values(&[(":a", "active"), (":b", "active")]);
4256
4257 assert!(
4259 evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4260 );
4261 }
4262
4263 #[test]
4264 fn test_evaluate_condition_compound_or() {
4265 let item = cond_item(&[("state", "running")]);
4266 let names = cond_names(&[("#s", "state")]);
4267 let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4268
4269 assert!(
4271 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4272 );
4273
4274 let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4276 assert!(
4277 evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4278 );
4279 }
4280
4281 #[test]
4282 fn test_evaluate_condition_not_operator() {
4283 let item = cond_item(&[("state", "active")]);
4284 let names = cond_names(&[("#s", "state")]);
4285 let values = cond_values(&[(":c", "complete")]);
4286
4287 assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4289
4290 assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4292
4293 assert!(
4295 evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4296 );
4297
4298 assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4300 }
4301
4302 #[test]
4303 fn test_evaluate_condition_begins_with() {
4304 let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4307 let names = cond_names(&[("#n", "name")]);
4308 let values = cond_values(&[(":p", "fakecloud")]);
4309
4310 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4311
4312 let values2 = cond_values(&[(":p", "realcloud")]);
4313 assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4314 }
4315
4316 #[test]
4317 fn test_evaluate_condition_contains() {
4318 let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4319 let names = cond_names(&[("#t", "tags")]);
4320 let values = cond_values(&[(":v", "beta")]);
4321
4322 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4323
4324 let values2 = cond_values(&[(":v", "delta")]);
4325 assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4326 }
4327
4328 #[test]
4329 fn test_evaluate_condition_no_existing_item() {
4330 let names = cond_names(&[("#s", "state")]);
4333 let values = cond_values(&[(":v", "active")]);
4334
4335 assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4336 assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4337 assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4339 assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4341 }
4342
4343 #[test]
4344 fn test_evaluate_filter_not_operator() {
4345 let item = cond_item(&[("status", "pending")]);
4346 let names = cond_names(&[("#s", "status")]);
4347 let values = cond_values(&[(":v", "pending")]);
4348
4349 assert!(!evaluate_filter_expression(
4350 "NOT (#s = :v)",
4351 &item,
4352 &names,
4353 &values
4354 ));
4355 assert!(evaluate_filter_expression(
4356 "NOT (#s <> :v)",
4357 &item,
4358 &names,
4359 &values
4360 ));
4361 }
4362
4363 #[test]
4364 fn test_evaluate_filter_expression_in_match() {
4365 let item = cond_item(&[("state", "active")]);
4371 let names = cond_names(&[("#s", "state")]);
4372 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4373
4374 assert!(
4375 evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4376 "state=active should match IN (active, pending)"
4377 );
4378 }
4379
4380 #[test]
4381 fn test_evaluate_filter_expression_in_no_match() {
4382 let item = cond_item(&[("state", "complete")]);
4383 let names = cond_names(&[("#s", "state")]);
4384 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4385
4386 assert!(
4387 !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4388 "state=complete should not match IN (active, pending)"
4389 );
4390 }
4391
4392 #[test]
4393 fn test_evaluate_filter_expression_in_no_spaces() {
4394 let item = cond_item(&[("status", "shipped")]);
4398 let names = cond_names(&[("#s", "status")]);
4399 let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4400
4401 assert!(
4402 evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4403 "no-space IN list should still parse"
4404 );
4405 }
4406
4407 #[test]
4408 fn test_evaluate_filter_expression_in_missing_attribute() {
4409 let item: HashMap<String, AttributeValue> = HashMap::new();
4412 let names = cond_names(&[("#s", "state")]);
4413 let values = cond_values(&[(":a", "active")]);
4414
4415 assert!(
4416 !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4417 "missing attribute should not match any IN list"
4418 );
4419 }
4420
4421 #[test]
4422 fn test_evaluate_filter_expression_compound_in_and_eq() {
4423 let item = cond_item(&[("state", "active"), ("priority", "high")]);
4429 let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4430 let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4431
4432 assert!(
4433 evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4434 "(active IN (active, pending)) AND (high = high) should match"
4435 );
4436
4437 let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4438 assert!(
4439 !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4440 "(complete IN (active, pending)) AND (high = high) should not match"
4441 );
4442 }
4443
4444 #[test]
4445 fn test_evaluate_condition_attribute_exists_with_space() {
4446 let item = cond_item(&[("store_id", "s-1")]);
4454 let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4455 let values = cond_values(&[(":0", "tab-A")]);
4456
4457 assert!(
4460 evaluate_condition(
4461 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4462 Some(&item),
4463 &names,
4464 &values,
4465 )
4466 .is_ok(),
4467 "claim-lease compound on free item should succeed"
4468 );
4469
4470 assert!(
4472 evaluate_condition(
4473 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4474 None,
4475 &names,
4476 &values,
4477 )
4478 .is_err(),
4479 "claim-lease compound on missing item must fail attribute_exists branch"
4480 );
4481
4482 let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4485 assert!(
4486 evaluate_condition(
4487 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4488 Some(&held),
4489 &names,
4490 &values,
4491 )
4492 .is_err(),
4493 "claim-lease compound on item held by another tab must fail"
4494 );
4495
4496 let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4499 assert!(
4500 evaluate_condition(
4501 "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4502 Some(&self_held),
4503 &names,
4504 &values,
4505 )
4506 .is_ok(),
4507 "same-tab re-claim must succeed"
4508 );
4509 }
4510
4511 #[test]
4512 fn test_evaluate_condition_in_match() {
4513 let item = cond_item(&[("state", "active")]);
4516 let names = cond_names(&[("#s", "state")]);
4517 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4518
4519 assert!(
4520 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4521 "IN should succeed when actual value is in the list"
4522 );
4523 }
4524
4525 #[test]
4526 fn test_evaluate_condition_in_no_match() {
4527 let item = cond_item(&[("state", "complete")]);
4531 let names = cond_names(&[("#s", "state")]);
4532 let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4533
4534 assert!(
4535 evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4536 "IN should fail when actual value is not in the list"
4537 );
4538 }
4539
4540 #[test]
4541 fn test_apply_update_set_list_index_replaces_existing() {
4542 let mut item = HashMap::new();
4549 item.insert(
4550 "items".to_string(),
4551 json!({"L": [
4552 {"M": {"sku": {"S": "OLD-A"}}},
4553 {"M": {"sku": {"S": "OLD-B"}}},
4554 ]}),
4555 );
4556
4557 let names = cond_names(&[("#items", "items")]);
4558 let mut values = HashMap::new();
4559 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
4560
4561 apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
4562
4563 let items_list = item
4564 .get("items")
4565 .and_then(|v| v.get("L"))
4566 .and_then(|v| v.as_array())
4567 .expect("items should still be a list");
4568 assert_eq!(items_list.len(), 2, "list length should be unchanged");
4569 let sku0 = items_list[0]
4570 .get("M")
4571 .and_then(|m| m.get("sku"))
4572 .and_then(|s| s.get("S"))
4573 .and_then(|s| s.as_str());
4574 assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
4575 let sku1 = items_list[1]
4576 .get("M")
4577 .and_then(|m| m.get("sku"))
4578 .and_then(|s| s.get("S"))
4579 .and_then(|s| s.as_str());
4580 assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
4581
4582 assert!(!item.contains_key("items[0]"));
4583 assert!(!item.contains_key("#items[0]"));
4584 }
4585
4586 #[test]
4587 fn test_apply_update_set_list_index_second_slot() {
4588 let mut item = HashMap::new();
4589 item.insert(
4590 "items".to_string(),
4591 json!({"L": [
4592 {"M": {"sku": {"S": "A"}}},
4593 {"M": {"sku": {"S": "B"}}},
4594 {"M": {"sku": {"S": "C"}}},
4595 ]}),
4596 );
4597
4598 let names = cond_names(&[("#items", "items")]);
4599 let mut values = HashMap::new();
4600 values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
4601
4602 apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
4603
4604 let items_list = item
4605 .get("items")
4606 .and_then(|v| v.get("L"))
4607 .and_then(|v| v.as_array())
4608 .unwrap();
4609 let skus: Vec<&str> = items_list
4610 .iter()
4611 .map(|v| {
4612 v.get("M")
4613 .and_then(|m| m.get("sku"))
4614 .and_then(|s| s.get("S"))
4615 .and_then(|s| s.as_str())
4616 .unwrap()
4617 })
4618 .collect();
4619 assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
4620 }
4621
4622 #[test]
4623 fn test_apply_update_set_list_index_without_name_ref() {
4624 let mut item = HashMap::new();
4627 item.insert(
4628 "tags".to_string(),
4629 json!({"L": [{"S": "red"}, {"S": "blue"}]}),
4630 );
4631
4632 let names: HashMap<String, String> = HashMap::new();
4633 let mut values = HashMap::new();
4634 values.insert(":t".to_string(), json!({"S": "green"}));
4635
4636 apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
4637
4638 let tags = item
4639 .get("tags")
4640 .and_then(|v| v.get("L"))
4641 .and_then(|v| v.as_array())
4642 .unwrap();
4643 assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
4644 assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
4645 }
4646
4647 #[test]
4648 fn test_unrecognized_expression_returns_false() {
4649 let item = cond_item(&[("x", "1")]);
4652 let names: HashMap<String, String> = HashMap::new();
4653 let values: HashMap<String, Value> = HashMap::new();
4654
4655 assert!(
4656 !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
4657 "unrecognized expression must return false"
4658 );
4659 }
4660
4661 #[test]
4662 fn test_set_list_index_out_of_range_returns_error() {
4663 let mut item = HashMap::new();
4666 item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
4667
4668 let names: HashMap<String, String> = HashMap::new();
4669 let mut values = HashMap::new();
4670 values.insert(":v".to_string(), json!({"S": "z"}));
4671
4672 let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
4673 assert!(
4674 result.is_err(),
4675 "out-of-range list index must return an error"
4676 );
4677
4678 let list = item
4680 .get("items")
4681 .and_then(|v| v.get("L"))
4682 .and_then(|v| v.as_array())
4683 .unwrap();
4684 assert_eq!(list.len(), 2);
4685 }
4686
4687 #[test]
4688 fn test_set_list_index_on_non_list_returns_error() {
4689 let mut item = HashMap::new();
4692 item.insert("name".to_string(), json!({"S": "hello"}));
4693
4694 let names: HashMap<String, String> = HashMap::new();
4695 let mut values = HashMap::new();
4696 values.insert(":v".to_string(), json!({"S": "z"}));
4697
4698 let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
4699 assert!(
4700 result.is_err(),
4701 "list index on non-list attribute must return an error"
4702 );
4703 }
4704
4705 #[test]
4706 fn test_unrecognized_update_action_returns_error() {
4707 let mut item = HashMap::new();
4708 item.insert("name".to_string(), json!({"S": "hello"}));
4709
4710 let names: HashMap<String, String> = HashMap::new();
4711 let mut values = HashMap::new();
4712 values.insert(":bar".to_string(), json!({"S": "baz"}));
4713
4714 let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
4715 assert!(
4716 result.is_err(),
4717 "unrecognized UpdateExpression action must return an error"
4718 );
4719 let err_msg = format!("{}", result.unwrap_err());
4720 assert!(
4721 err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
4722 "error should mention Invalid UpdateExpression, got: {err_msg}"
4723 );
4724 }
4725
4726 #[test]
4729 fn test_size_string() {
4730 let mut item = HashMap::new();
4731 item.insert("name".to_string(), json!({"S": "hello"}));
4732 let names = HashMap::new();
4733 let mut values = HashMap::new();
4734 values.insert(":limit".to_string(), json!({"N": "5"}));
4735
4736 assert!(evaluate_single_filter_condition(
4737 "size(name) = :limit",
4738 &item,
4739 &names,
4740 &values,
4741 ));
4742 values.insert(":limit".to_string(), json!({"N": "4"}));
4743 assert!(evaluate_single_filter_condition(
4744 "size(name) > :limit",
4745 &item,
4746 &names,
4747 &values,
4748 ));
4749 }
4750
4751 #[test]
4752 fn test_size_list() {
4753 let mut item = HashMap::new();
4754 item.insert(
4755 "items".to_string(),
4756 json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
4757 );
4758 let names = HashMap::new();
4759 let mut values = HashMap::new();
4760 values.insert(":limit".to_string(), json!({"N": "3"}));
4761
4762 assert!(evaluate_single_filter_condition(
4763 "size(items) = :limit",
4764 &item,
4765 &names,
4766 &values,
4767 ));
4768 }
4769
4770 #[test]
4771 fn test_size_map() {
4772 let mut item = HashMap::new();
4773 item.insert(
4774 "data".to_string(),
4775 json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
4776 );
4777 let names = HashMap::new();
4778 let mut values = HashMap::new();
4779 values.insert(":limit".to_string(), json!({"N": "2"}));
4780
4781 assert!(evaluate_single_filter_condition(
4782 "size(data) = :limit",
4783 &item,
4784 &names,
4785 &values,
4786 ));
4787 }
4788
4789 #[test]
4790 fn test_size_set() {
4791 let mut item = HashMap::new();
4792 item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
4793 let names = HashMap::new();
4794 let mut values = HashMap::new();
4795 values.insert(":limit".to_string(), json!({"N": "3"}));
4796
4797 assert!(evaluate_single_filter_condition(
4798 "size(tags) > :limit",
4799 &item,
4800 &names,
4801 &values,
4802 ));
4803 }
4804
4805 #[test]
4808 fn test_attribute_type_string() {
4809 let mut item = HashMap::new();
4810 item.insert("name".to_string(), json!({"S": "hello"}));
4811 let names = HashMap::new();
4812 let mut values = HashMap::new();
4813 values.insert(":t".to_string(), json!({"S": "S"}));
4814
4815 assert!(evaluate_single_filter_condition(
4816 "attribute_type(name, :t)",
4817 &item,
4818 &names,
4819 &values,
4820 ));
4821
4822 values.insert(":t".to_string(), json!({"S": "N"}));
4823 assert!(!evaluate_single_filter_condition(
4824 "attribute_type(name, :t)",
4825 &item,
4826 &names,
4827 &values,
4828 ));
4829 }
4830
4831 #[test]
4832 fn test_attribute_type_number() {
4833 let mut item = HashMap::new();
4834 item.insert("age".to_string(), json!({"N": "42"}));
4835 let names = HashMap::new();
4836 let mut values = HashMap::new();
4837 values.insert(":t".to_string(), json!({"S": "N"}));
4838
4839 assert!(evaluate_single_filter_condition(
4840 "attribute_type(age, :t)",
4841 &item,
4842 &names,
4843 &values,
4844 ));
4845 }
4846
4847 #[test]
4848 fn test_attribute_type_list() {
4849 let mut item = HashMap::new();
4850 item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
4851 let names = HashMap::new();
4852 let mut values = HashMap::new();
4853 values.insert(":t".to_string(), json!({"S": "L"}));
4854
4855 assert!(evaluate_single_filter_condition(
4856 "attribute_type(items, :t)",
4857 &item,
4858 &names,
4859 &values,
4860 ));
4861 }
4862
4863 #[test]
4864 fn test_attribute_type_map() {
4865 let mut item = HashMap::new();
4866 item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
4867 let names = HashMap::new();
4868 let mut values = HashMap::new();
4869 values.insert(":t".to_string(), json!({"S": "M"}));
4870
4871 assert!(evaluate_single_filter_condition(
4872 "attribute_type(data, :t)",
4873 &item,
4874 &names,
4875 &values,
4876 ));
4877 }
4878
4879 #[test]
4880 fn test_attribute_type_bool() {
4881 let mut item = HashMap::new();
4882 item.insert("active".to_string(), json!({"BOOL": true}));
4883 let names = HashMap::new();
4884 let mut values = HashMap::new();
4885 values.insert(":t".to_string(), json!({"S": "BOOL"}));
4886
4887 assert!(evaluate_single_filter_condition(
4888 "attribute_type(active, :t)",
4889 &item,
4890 &names,
4891 &values,
4892 ));
4893 }
4894
4895 #[test]
4898 fn test_begins_with_rejects_number_type() {
4899 let mut item = HashMap::new();
4900 item.insert("code".to_string(), json!({"N": "12345"}));
4901 let names = HashMap::new();
4902 let mut values = HashMap::new();
4903 values.insert(":prefix".to_string(), json!({"S": "123"}));
4904
4905 assert!(
4906 !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
4907 "begins_with must return false for N-type attributes"
4908 );
4909 }
4910
4911 #[test]
4912 fn test_begins_with_works_on_string_type() {
4913 let mut item = HashMap::new();
4914 item.insert("code".to_string(), json!({"S": "abc123"}));
4915 let names = HashMap::new();
4916 let mut values = HashMap::new();
4917 values.insert(":prefix".to_string(), json!({"S": "abc"}));
4918
4919 assert!(evaluate_single_filter_condition(
4920 "begins_with(code, :prefix)",
4921 &item,
4922 &names,
4923 &values,
4924 ));
4925 }
4926
4927 #[test]
4930 fn test_contains_string_set() {
4931 let mut item = HashMap::new();
4932 item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
4933 let names = HashMap::new();
4934 let mut values = HashMap::new();
4935 values.insert(":val".to_string(), json!({"S": "blue"}));
4936
4937 assert!(evaluate_single_filter_condition(
4938 "contains(tags, :val)",
4939 &item,
4940 &names,
4941 &values,
4942 ));
4943
4944 values.insert(":val".to_string(), json!({"S": "yellow"}));
4945 assert!(!evaluate_single_filter_condition(
4946 "contains(tags, :val)",
4947 &item,
4948 &names,
4949 &values,
4950 ));
4951 }
4952
4953 #[test]
4954 fn test_contains_number_set() {
4955 let mut item = HashMap::new();
4956 item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
4957 let names = HashMap::new();
4958 let mut values = HashMap::new();
4959 values.insert(":val".to_string(), json!({"N": "2"}));
4960
4961 assert!(evaluate_single_filter_condition(
4962 "contains(scores, :val)",
4963 &item,
4964 &names,
4965 &values,
4966 ));
4967 }
4968
4969 #[test]
4972 fn test_set_arithmetic_rejects_string_operand() {
4973 let mut item = HashMap::new();
4974 item.insert("name".to_string(), json!({"S": "hello"}));
4975 let names = HashMap::new();
4976 let mut values = HashMap::new();
4977 values.insert(":val".to_string(), json!({"N": "1"}));
4978
4979 let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
4980 assert!(
4981 result.is_err(),
4982 "arithmetic on S-type attribute must return a ValidationException"
4983 );
4984 }
4985
4986 #[test]
4987 fn test_set_arithmetic_rejects_string_value() {
4988 let mut item = HashMap::new();
4989 item.insert("count".to_string(), json!({"N": "5"}));
4990 let names = HashMap::new();
4991 let mut values = HashMap::new();
4992 values.insert(":val".to_string(), json!({"S": "notanumber"}));
4993
4994 let result =
4995 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
4996 assert!(
4997 result.is_err(),
4998 "arithmetic with S-type value must return a ValidationException"
4999 );
5000 }
5001
5002 #[test]
5003 fn test_set_arithmetic_valid_numbers() {
5004 let mut item = HashMap::new();
5005 item.insert("count".to_string(), json!({"N": "10"}));
5006 let names = HashMap::new();
5007 let mut values = HashMap::new();
5008 values.insert(":val".to_string(), json!({"N": "3"}));
5009
5010 let result =
5011 apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5012 assert!(result.is_ok());
5013 assert_eq!(item["count"], json!({"N": "13"}));
5014 }
5015
5016 #[test]
5019 fn test_add_binary_set() {
5020 let mut item = HashMap::new();
5021 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5022 let names = HashMap::new();
5023 let mut values = HashMap::new();
5024 values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5025
5026 let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5027 assert!(result.is_ok());
5028 let bs = item["data"]["BS"].as_array().unwrap();
5029 assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5030 assert!(bs.contains(&json!("YQ==")));
5031 assert!(bs.contains(&json!("Yg==")));
5032 assert!(bs.contains(&json!("Yw==")));
5033 }
5034
5035 #[test]
5036 fn test_delete_binary_set() {
5037 let mut item = HashMap::new();
5038 item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5039 let names = HashMap::new();
5040 let mut values = HashMap::new();
5041 values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5042
5043 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5044 assert!(result.is_ok());
5045 let bs = item["data"]["BS"].as_array().unwrap();
5046 assert_eq!(bs.len(), 2);
5047 assert!(!bs.contains(&json!("Yg==")));
5048 }
5049
5050 #[test]
5051 fn test_delete_binary_set_removes_attr_when_empty() {
5052 let mut item = HashMap::new();
5053 item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5054 let names = HashMap::new();
5055 let mut values = HashMap::new();
5056 values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5057
5058 let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5059 assert!(result.is_ok());
5060 assert!(
5061 !item.contains_key("data"),
5062 "attribute should be removed when set becomes empty"
5063 );
5064 }
5065}