Skip to main content

fakecloud_dynamodb/service/
mod.rs

1mod batch;
2mod global_tables;
3mod items;
4mod queries;
5mod streams;
6mod tables;
7
8use std::collections::HashMap;
9use std::sync::Arc;
10
11use async_trait::async_trait;
12use base64::Engine;
13use http::StatusCode;
14use serde_json::{json, Value};
15
16use fakecloud_core::delivery::DeliveryBus;
17use fakecloud_core::service::{AwsRequest, AwsResponse, AwsService, AwsServiceError};
18
19use fakecloud_persistence::S3Store;
20use fakecloud_s3::state::SharedS3State;
21
22use crate::state::{
23    attribute_type_and_value, AttributeDefinition, AttributeValue, DynamoTable,
24    GlobalSecondaryIndex, KeySchemaElement, KinesisDestination, LocalSecondaryIndex, Projection,
25    ProvisionedThroughput, SharedDynamoDbState,
26};
27
28/// Minimal subset of a ``DynamoTable`` that Kinesis streaming delivery needs.
29///
30/// A table can carry megabytes of items; cloning the whole table just to
31/// release the write lock and deliver one change record is extremely wasteful.
32/// Extracting only the fields the delivery path actually reads (destinations,
33/// arn, name) keeps the clone small.
34pub(super) struct KinesisDeliveryTarget {
35    pub destinations: Vec<KinesisDestination>,
36    pub arn: String,
37    pub name: String,
38}
39
40pub struct DynamoDbService {
41    state: SharedDynamoDbState,
42    pub(crate) s3_state: Option<SharedS3State>,
43    pub(crate) s3_store: Option<Arc<dyn S3Store>>,
44    delivery: Option<Arc<DeliveryBus>>,
45}
46
47impl DynamoDbService {
48    pub fn new(state: SharedDynamoDbState) -> Self {
49        Self {
50            state,
51            s3_state: None,
52            s3_store: None,
53            delivery: None,
54        }
55    }
56
57    pub fn with_s3(mut self, s3_state: SharedS3State) -> Self {
58        self.s3_state = Some(s3_state);
59        self
60    }
61
62    pub fn with_s3_store(mut self, store: Arc<dyn S3Store>) -> Self {
63        self.s3_store = Some(store);
64        self
65    }
66
67    pub fn with_delivery(mut self, delivery: Arc<DeliveryBus>) -> Self {
68        self.delivery = Some(delivery);
69        self
70    }
71
72    fn kinesis_target(table: &DynamoTable) -> Option<KinesisDeliveryTarget> {
73        if table
74            .kinesis_destinations
75            .iter()
76            .any(|d| d.destination_status == "ACTIVE")
77        {
78            Some(KinesisDeliveryTarget {
79                destinations: table.kinesis_destinations.clone(),
80                arn: table.arn.clone(),
81                name: table.name.clone(),
82            })
83        } else {
84            None
85        }
86    }
87
88    /// Deliver a change record to all active Kinesis streaming destinations for a table.
89    pub(super) fn deliver_to_kinesis_destinations(
90        &self,
91        target: &KinesisDeliveryTarget,
92        event_name: &str,
93        keys: &HashMap<String, AttributeValue>,
94        old_image: Option<&HashMap<String, AttributeValue>>,
95        new_image: Option<&HashMap<String, AttributeValue>>,
96    ) {
97        let delivery = match &self.delivery {
98            Some(d) => d,
99            None => return,
100        };
101
102        let active_destinations: Vec<_> = target
103            .destinations
104            .iter()
105            .filter(|d| d.destination_status == "ACTIVE")
106            .collect();
107
108        if active_destinations.is_empty() {
109            return;
110        }
111
112        let mut record = json!({
113            "eventID": uuid::Uuid::new_v4().to_string(),
114            "eventName": event_name,
115            "eventVersion": "1.1",
116            "eventSource": "aws:dynamodb",
117            "awsRegion": target.arn.split(':').nth(3).unwrap_or("us-east-1"),
118            "dynamodb": {
119                "Keys": keys,
120                "SequenceNumber": chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0).to_string(),
121                "SizeBytes": serde_json::to_string(keys).map(|s| s.len()).unwrap_or(0),
122                "StreamViewType": "NEW_AND_OLD_IMAGES",
123            },
124            "eventSourceARN": &target.arn,
125            "tableName": &target.name,
126        });
127
128        if let Some(old) = old_image {
129            record["dynamodb"]["OldImage"] = json!(old);
130        }
131        if let Some(new) = new_image {
132            record["dynamodb"]["NewImage"] = json!(new);
133        }
134
135        let record_str = serde_json::to_string(&record).unwrap_or_default();
136        let encoded = base64::engine::general_purpose::STANDARD.encode(&record_str);
137        let partition_key = serde_json::to_string(keys).unwrap_or_default();
138
139        for dest in active_destinations {
140            delivery.send_to_kinesis(&dest.stream_arn, &encoded, &partition_key);
141        }
142    }
143
144    fn parse_body(req: &AwsRequest) -> Result<Value, AwsServiceError> {
145        serde_json::from_slice(&req.body).map_err(|e| {
146            AwsServiceError::aws_error(
147                StatusCode::BAD_REQUEST,
148                "SerializationException",
149                format!("Invalid JSON: {e}"),
150            )
151        })
152    }
153
154    fn ok_json(body: Value) -> Result<AwsResponse, AwsServiceError> {
155        Ok(AwsResponse::ok_json(body))
156    }
157}
158
159#[async_trait]
160impl AwsService for DynamoDbService {
161    fn service_name(&self) -> &str {
162        "dynamodb"
163    }
164
165    async fn handle(&self, req: AwsRequest) -> Result<AwsResponse, AwsServiceError> {
166        match req.action.as_str() {
167            "CreateTable" => self.create_table(&req),
168            "DeleteTable" => self.delete_table(&req),
169            "DescribeTable" => self.describe_table(&req),
170            "ListTables" => self.list_tables(&req),
171            "UpdateTable" => self.update_table(&req),
172            "PutItem" => self.put_item(&req),
173            "GetItem" => self.get_item(&req),
174            "DeleteItem" => self.delete_item(&req),
175            "UpdateItem" => self.update_item(&req),
176            "Query" => self.query(&req),
177            "Scan" => self.scan(&req),
178            "BatchGetItem" => self.batch_get_item(&req),
179            "BatchWriteItem" => self.batch_write_item(&req),
180            "TagResource" => self.tag_resource(&req),
181            "UntagResource" => self.untag_resource(&req),
182            "ListTagsOfResource" => self.list_tags_of_resource(&req),
183            "TransactGetItems" => self.transact_get_items(&req),
184            "TransactWriteItems" => self.transact_write_items(&req),
185            "ExecuteStatement" => self.execute_statement(&req),
186            "BatchExecuteStatement" => self.batch_execute_statement(&req),
187            "ExecuteTransaction" => self.execute_transaction(&req),
188            "UpdateTimeToLive" => self.update_time_to_live(&req),
189            "DescribeTimeToLive" => self.describe_time_to_live(&req),
190            "PutResourcePolicy" => self.put_resource_policy(&req),
191            "GetResourcePolicy" => self.get_resource_policy(&req),
192            "DeleteResourcePolicy" => self.delete_resource_policy(&req),
193            // Stubs
194            "DescribeEndpoints" => self.describe_endpoints(&req),
195            "DescribeLimits" => self.describe_limits(&req),
196            // Backups
197            "CreateBackup" => self.create_backup(&req),
198            "DeleteBackup" => self.delete_backup(&req),
199            "DescribeBackup" => self.describe_backup(&req),
200            "ListBackups" => self.list_backups(&req),
201            "RestoreTableFromBackup" => self.restore_table_from_backup(&req),
202            "RestoreTableToPointInTime" => self.restore_table_to_point_in_time(&req),
203            "UpdateContinuousBackups" => self.update_continuous_backups(&req),
204            "DescribeContinuousBackups" => self.describe_continuous_backups(&req),
205            // Global tables
206            "CreateGlobalTable" => self.create_global_table(&req),
207            "DescribeGlobalTable" => self.describe_global_table(&req),
208            "DescribeGlobalTableSettings" => self.describe_global_table_settings(&req),
209            "ListGlobalTables" => self.list_global_tables(&req),
210            "UpdateGlobalTable" => self.update_global_table(&req),
211            "UpdateGlobalTableSettings" => self.update_global_table_settings(&req),
212            "DescribeTableReplicaAutoScaling" => self.describe_table_replica_auto_scaling(&req),
213            "UpdateTableReplicaAutoScaling" => self.update_table_replica_auto_scaling(&req),
214            // Kinesis streaming
215            "EnableKinesisStreamingDestination" => self.enable_kinesis_streaming_destination(&req),
216            "DisableKinesisStreamingDestination" => {
217                self.disable_kinesis_streaming_destination(&req)
218            }
219            "DescribeKinesisStreamingDestination" => {
220                self.describe_kinesis_streaming_destination(&req)
221            }
222            "UpdateKinesisStreamingDestination" => self.update_kinesis_streaming_destination(&req),
223            // Contributor insights
224            "DescribeContributorInsights" => self.describe_contributor_insights(&req),
225            "UpdateContributorInsights" => self.update_contributor_insights(&req),
226            "ListContributorInsights" => self.list_contributor_insights(&req),
227            // Import/Export
228            "ExportTableToPointInTime" => self.export_table_to_point_in_time(&req),
229            "DescribeExport" => self.describe_export(&req),
230            "ListExports" => self.list_exports(&req),
231            "ImportTable" => self.import_table(&req),
232            "DescribeImport" => self.describe_import(&req),
233            "ListImports" => self.list_imports(&req),
234            _ => Err(AwsServiceError::action_not_implemented(
235                "dynamodb",
236                &req.action,
237            )),
238        }
239    }
240
241    fn supported_actions(&self) -> &[&str] {
242        &[
243            "CreateTable",
244            "DeleteTable",
245            "DescribeTable",
246            "ListTables",
247            "UpdateTable",
248            "PutItem",
249            "GetItem",
250            "DeleteItem",
251            "UpdateItem",
252            "Query",
253            "Scan",
254            "BatchGetItem",
255            "BatchWriteItem",
256            "TagResource",
257            "UntagResource",
258            "ListTagsOfResource",
259            "TransactGetItems",
260            "TransactWriteItems",
261            "ExecuteStatement",
262            "BatchExecuteStatement",
263            "ExecuteTransaction",
264            "UpdateTimeToLive",
265            "DescribeTimeToLive",
266            "PutResourcePolicy",
267            "GetResourcePolicy",
268            "DeleteResourcePolicy",
269            "DescribeEndpoints",
270            "DescribeLimits",
271            "CreateBackup",
272            "DeleteBackup",
273            "DescribeBackup",
274            "ListBackups",
275            "RestoreTableFromBackup",
276            "RestoreTableToPointInTime",
277            "UpdateContinuousBackups",
278            "DescribeContinuousBackups",
279            "CreateGlobalTable",
280            "DescribeGlobalTable",
281            "DescribeGlobalTableSettings",
282            "ListGlobalTables",
283            "UpdateGlobalTable",
284            "UpdateGlobalTableSettings",
285            "DescribeTableReplicaAutoScaling",
286            "UpdateTableReplicaAutoScaling",
287            "EnableKinesisStreamingDestination",
288            "DisableKinesisStreamingDestination",
289            "DescribeKinesisStreamingDestination",
290            "UpdateKinesisStreamingDestination",
291            "DescribeContributorInsights",
292            "UpdateContributorInsights",
293            "ListContributorInsights",
294            "ExportTableToPointInTime",
295            "DescribeExport",
296            "ListExports",
297            "ImportTable",
298            "DescribeImport",
299            "ListImports",
300        ]
301    }
302}
303// ── Helper functions ────────────────────────────────────────────────────
304
305fn require_str<'a>(body: &'a Value, field: &str) -> Result<&'a str, AwsServiceError> {
306    body[field].as_str().ok_or_else(|| {
307        AwsServiceError::aws_error(
308            StatusCode::BAD_REQUEST,
309            "ValidationException",
310            format!("{field} is required"),
311        )
312    })
313}
314
315fn require_object(
316    body: &Value,
317    field: &str,
318) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
319    let obj = body[field].as_object().ok_or_else(|| {
320        AwsServiceError::aws_error(
321            StatusCode::BAD_REQUEST,
322            "ValidationException",
323            format!("{field} is required"),
324        )
325    })?;
326    Ok(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
327}
328
329fn get_table<'a>(
330    tables: &'a HashMap<String, DynamoTable>,
331    name: &str,
332) -> Result<&'a DynamoTable, AwsServiceError> {
333    tables.get(name).ok_or_else(|| {
334        AwsServiceError::aws_error(
335            StatusCode::BAD_REQUEST,
336            "ResourceNotFoundException",
337            format!("Requested resource not found: Table: {name} not found"),
338        )
339    })
340}
341
342fn get_table_mut<'a>(
343    tables: &'a mut HashMap<String, DynamoTable>,
344    name: &str,
345) -> Result<&'a mut DynamoTable, AwsServiceError> {
346    tables.get_mut(name).ok_or_else(|| {
347        AwsServiceError::aws_error(
348            StatusCode::BAD_REQUEST,
349            "ResourceNotFoundException",
350            format!("Requested resource not found: Table: {name} not found"),
351        )
352    })
353}
354
355fn find_table_by_arn<'a>(
356    tables: &'a HashMap<String, DynamoTable>,
357    arn: &str,
358) -> Result<&'a DynamoTable, AwsServiceError> {
359    tables.values().find(|t| t.arn == arn).ok_or_else(|| {
360        AwsServiceError::aws_error(
361            StatusCode::BAD_REQUEST,
362            "ResourceNotFoundException",
363            format!("Requested resource not found: {arn}"),
364        )
365    })
366}
367
368fn find_table_by_arn_mut<'a>(
369    tables: &'a mut HashMap<String, DynamoTable>,
370    arn: &str,
371) -> Result<&'a mut DynamoTable, AwsServiceError> {
372    tables.values_mut().find(|t| t.arn == arn).ok_or_else(|| {
373        AwsServiceError::aws_error(
374            StatusCode::BAD_REQUEST,
375            "ResourceNotFoundException",
376            format!("Requested resource not found: {arn}"),
377        )
378    })
379}
380
381fn parse_key_schema(val: &Value) -> Result<Vec<KeySchemaElement>, AwsServiceError> {
382    let arr = val.as_array().ok_or_else(|| {
383        AwsServiceError::aws_error(
384            StatusCode::BAD_REQUEST,
385            "ValidationException",
386            "KeySchema is required",
387        )
388    })?;
389    Ok(arr
390        .iter()
391        .map(|elem| KeySchemaElement {
392            attribute_name: elem["AttributeName"]
393                .as_str()
394                .unwrap_or_default()
395                .to_string(),
396            key_type: elem["KeyType"].as_str().unwrap_or("HASH").to_string(),
397        })
398        .collect())
399}
400
401fn parse_attribute_definitions(val: &Value) -> Result<Vec<AttributeDefinition>, AwsServiceError> {
402    let arr = val.as_array().ok_or_else(|| {
403        AwsServiceError::aws_error(
404            StatusCode::BAD_REQUEST,
405            "ValidationException",
406            "AttributeDefinitions is required",
407        )
408    })?;
409    Ok(arr
410        .iter()
411        .map(|elem| AttributeDefinition {
412            attribute_name: elem["AttributeName"]
413                .as_str()
414                .unwrap_or_default()
415                .to_string(),
416            attribute_type: elem["AttributeType"].as_str().unwrap_or("S").to_string(),
417        })
418        .collect())
419}
420
421fn parse_provisioned_throughput(val: &Value) -> Result<ProvisionedThroughput, AwsServiceError> {
422    Ok(ProvisionedThroughput {
423        read_capacity_units: val["ReadCapacityUnits"].as_i64().unwrap_or(5),
424        write_capacity_units: val["WriteCapacityUnits"].as_i64().unwrap_or(5),
425    })
426}
427
428fn parse_gsi(val: &Value) -> Vec<GlobalSecondaryIndex> {
429    let Some(arr) = val.as_array() else {
430        return Vec::new();
431    };
432    arr.iter()
433        .filter_map(|g| {
434            Some(GlobalSecondaryIndex {
435                index_name: g["IndexName"].as_str()?.to_string(),
436                key_schema: parse_key_schema(&g["KeySchema"]).ok()?,
437                projection: parse_projection(&g["Projection"]),
438                provisioned_throughput: parse_provisioned_throughput(&g["ProvisionedThroughput"])
439                    .ok(),
440            })
441        })
442        .collect()
443}
444
445fn parse_lsi(val: &Value) -> Vec<LocalSecondaryIndex> {
446    let Some(arr) = val.as_array() else {
447        return Vec::new();
448    };
449    arr.iter()
450        .filter_map(|l| {
451            Some(LocalSecondaryIndex {
452                index_name: l["IndexName"].as_str()?.to_string(),
453                key_schema: parse_key_schema(&l["KeySchema"]).ok()?,
454                projection: parse_projection(&l["Projection"]),
455            })
456        })
457        .collect()
458}
459
460fn parse_projection(val: &Value) -> Projection {
461    Projection {
462        projection_type: val["ProjectionType"].as_str().unwrap_or("ALL").to_string(),
463        non_key_attributes: val["NonKeyAttributes"]
464            .as_array()
465            .map(|arr| {
466                arr.iter()
467                    .filter_map(|v| v.as_str().map(|s| s.to_string()))
468                    .collect()
469            })
470            .unwrap_or_default(),
471    }
472}
473
474fn parse_tags(val: &Value) -> HashMap<String, String> {
475    let mut tags = HashMap::new();
476    if let Some(arr) = val.as_array() {
477        for tag in arr {
478            if let (Some(k), Some(v)) = (tag["Key"].as_str(), tag["Value"].as_str()) {
479                tags.insert(k.to_string(), v.to_string());
480            }
481        }
482    }
483    tags
484}
485
486fn parse_expression_attribute_names(body: &Value) -> HashMap<String, String> {
487    let mut names = HashMap::new();
488    if let Some(obj) = body["ExpressionAttributeNames"].as_object() {
489        for (k, v) in obj {
490            if let Some(s) = v.as_str() {
491                names.insert(k.clone(), s.to_string());
492            }
493        }
494    }
495    names
496}
497
498fn parse_expression_attribute_values(body: &Value) -> HashMap<String, Value> {
499    let mut values = HashMap::new();
500    if let Some(obj) = body["ExpressionAttributeValues"].as_object() {
501        for (k, v) in obj {
502            values.insert(k.clone(), v.clone());
503        }
504    }
505    values
506}
507
508fn resolve_attr_name(name: &str, expr_attr_names: &HashMap<String, String>) -> String {
509    if name.starts_with('#') {
510        expr_attr_names
511            .get(name)
512            .cloned()
513            .unwrap_or_else(|| name.to_string())
514    } else {
515        name.to_string()
516    }
517}
518
519fn extract_key(
520    table: &DynamoTable,
521    item: &HashMap<String, AttributeValue>,
522) -> HashMap<String, AttributeValue> {
523    let mut key = HashMap::new();
524    let hash_key = table.hash_key_name();
525    if let Some(v) = item.get(hash_key) {
526        key.insert(hash_key.to_string(), v.clone());
527    }
528    if let Some(range_key) = table.range_key_name() {
529        if let Some(v) = item.get(range_key) {
530            key.insert(range_key.to_string(), v.clone());
531        }
532    }
533    key
534}
535
536/// Parse a JSON object into a key map (used for ExclusiveStartKey).
537fn parse_key_map(value: &Value) -> Option<HashMap<String, AttributeValue>> {
538    let obj = value.as_object()?;
539    if obj.is_empty() {
540        return None;
541    }
542    Some(obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
543}
544
545/// Check whether an item's key attributes match the given key map.
546fn item_matches_key(
547    item: &HashMap<String, AttributeValue>,
548    key: &HashMap<String, AttributeValue>,
549    hash_key_name: &str,
550    range_key_name: Option<&str>,
551) -> bool {
552    let hash_match = match (item.get(hash_key_name), key.get(hash_key_name)) {
553        (Some(a), Some(b)) => a == b,
554        _ => false,
555    };
556    if !hash_match {
557        return false;
558    }
559    match range_key_name {
560        Some(rk) => match (item.get(rk), key.get(rk)) {
561            (Some(a), Some(b)) => a == b,
562            (None, None) => true,
563            _ => false,
564        },
565        None => true,
566    }
567}
568
569/// Extract the primary key from an item given explicit key attribute names.
570fn extract_key_for_schema(
571    item: &HashMap<String, AttributeValue>,
572    hash_key_name: &str,
573    range_key_name: Option<&str>,
574) -> HashMap<String, AttributeValue> {
575    let mut key = HashMap::new();
576    if let Some(v) = item.get(hash_key_name) {
577        key.insert(hash_key_name.to_string(), v.clone());
578    }
579    if let Some(rk) = range_key_name {
580        if let Some(v) = item.get(rk) {
581            key.insert(rk.to_string(), v.clone());
582        }
583    }
584    key
585}
586
587fn validate_key_in_item(
588    table: &DynamoTable,
589    item: &HashMap<String, AttributeValue>,
590) -> Result<(), AwsServiceError> {
591    let hash_key = table.hash_key_name();
592    if !item.contains_key(hash_key) {
593        return Err(AwsServiceError::aws_error(
594            StatusCode::BAD_REQUEST,
595            "ValidationException",
596            format!("Missing the key {hash_key} in the item"),
597        ));
598    }
599    if let Some(range_key) = table.range_key_name() {
600        if !item.contains_key(range_key) {
601            return Err(AwsServiceError::aws_error(
602                StatusCode::BAD_REQUEST,
603                "ValidationException",
604                format!("Missing the key {range_key} in the item"),
605            ));
606        }
607    }
608    Ok(())
609}
610
611fn validate_key_attributes_in_key(
612    table: &DynamoTable,
613    key: &HashMap<String, AttributeValue>,
614) -> Result<(), AwsServiceError> {
615    let hash_key = table.hash_key_name();
616    if !key.contains_key(hash_key) {
617        return Err(AwsServiceError::aws_error(
618            StatusCode::BAD_REQUEST,
619            "ValidationException",
620            format!("Missing the key {hash_key} in the item"),
621        ));
622    }
623    Ok(())
624}
625
626fn project_item(
627    item: &HashMap<String, AttributeValue>,
628    body: &Value,
629) -> HashMap<String, AttributeValue> {
630    let projection = body["ProjectionExpression"].as_str();
631    match projection {
632        Some(proj) if !proj.is_empty() => {
633            let expr_attr_names = parse_expression_attribute_names(body);
634            let attrs: Vec<String> = proj
635                .split(',')
636                .map(|s| resolve_projection_path(s.trim(), &expr_attr_names))
637                .collect();
638            let mut result = HashMap::new();
639            for attr in &attrs {
640                if let Some(v) = resolve_nested_path(item, attr) {
641                    insert_nested_value(&mut result, attr, v);
642                }
643            }
644            result
645        }
646        _ => item.clone(),
647    }
648}
649
650/// Resolve expression attribute names within each segment of a projection path.
651/// For example, "people[0].#n" with {"#n": "name"} => "people[0].name".
652fn resolve_projection_path(path: &str, expr_attr_names: &HashMap<String, String>) -> String {
653    // Split on dots, resolve each part, rejoin
654    let mut result = String::new();
655    for (i, segment) in path.split('.').enumerate() {
656        if i > 0 {
657            result.push('.');
658        }
659        // A segment might be like "#n" or "people[0]" or "#attr[0]"
660        if let Some(bracket_pos) = segment.find('[') {
661            let key_part = &segment[..bracket_pos];
662            let index_part = &segment[bracket_pos..];
663            result.push_str(&resolve_attr_name(key_part, expr_attr_names));
664            result.push_str(index_part);
665        } else {
666            result.push_str(&resolve_attr_name(segment, expr_attr_names));
667        }
668    }
669    result
670}
671
672/// Resolve a potentially nested path like "a.b.c" or "a[0].b" from an item.
673fn resolve_nested_path(item: &HashMap<String, AttributeValue>, path: &str) -> Option<Value> {
674    let segments = parse_path_segments(path);
675    if segments.is_empty() {
676        return None;
677    }
678
679    let first = &segments[0];
680    let top_key = match first {
681        PathSegment::Key(k) => k.as_str(),
682        _ => return None,
683    };
684
685    let mut current = item.get(top_key)?.clone();
686
687    for segment in &segments[1..] {
688        match segment {
689            PathSegment::Key(k) => {
690                // Navigate into a Map: {"M": {"key": ...}}
691                current = current.get("M")?.get(k)?.clone();
692            }
693            PathSegment::Index(idx) => {
694                // Navigate into a List: {"L": [...]}
695                current = current.get("L")?.get(*idx)?.clone();
696            }
697        }
698    }
699
700    Some(current)
701}
702
703#[derive(Debug)]
704enum PathSegment {
705    Key(String),
706    Index(usize),
707}
708
709/// Parse a path like "a.b[0].c" into segments: [Key("a"), Key("b"), Index(0), Key("c")]
710fn parse_path_segments(path: &str) -> Vec<PathSegment> {
711    let mut segments = Vec::new();
712    let mut current = String::new();
713
714    let chars: Vec<char> = path.chars().collect();
715    let mut i = 0;
716    while i < chars.len() {
717        match chars[i] {
718            '.' => {
719                if !current.is_empty() {
720                    segments.push(PathSegment::Key(current.clone()));
721                    current.clear();
722                }
723            }
724            '[' => {
725                if !current.is_empty() {
726                    segments.push(PathSegment::Key(current.clone()));
727                    current.clear();
728                }
729                i += 1;
730                let mut num = String::new();
731                while i < chars.len() && chars[i] != ']' {
732                    num.push(chars[i]);
733                    i += 1;
734                }
735                if let Ok(idx) = num.parse::<usize>() {
736                    segments.push(PathSegment::Index(idx));
737                }
738                // skip ']'
739            }
740            c => {
741                current.push(c);
742            }
743        }
744        i += 1;
745    }
746    if !current.is_empty() {
747        segments.push(PathSegment::Key(current));
748    }
749    segments
750}
751
752/// Insert a value at a nested path in the result HashMap.
753/// For a path like "a.b", we set result["a"] = {"M": {"b": value}}.
754fn insert_nested_value(result: &mut HashMap<String, AttributeValue>, path: &str, value: Value) {
755    // Simple case: no nesting
756    if !path.contains('.') && !path.contains('[') {
757        result.insert(path.to_string(), value);
758        return;
759    }
760
761    let segments = parse_path_segments(path);
762    if segments.is_empty() {
763        return;
764    }
765
766    let top_key = match &segments[0] {
767        PathSegment::Key(k) => k.clone(),
768        _ => return,
769    };
770
771    if segments.len() == 1 {
772        result.insert(top_key, value);
773        return;
774    }
775
776    // For nested paths, wrap the value back into the nested structure
777    let wrapped = wrap_value_in_path(&segments[1..], value);
778    // Merge into existing value if present
779    let existing = result.remove(&top_key);
780    let merged = match existing {
781        Some(existing) => merge_attribute_values(existing, wrapped),
782        None => wrapped,
783    };
784    result.insert(top_key, merged);
785}
786
787/// Wrap a value in the nested path structure.
788fn wrap_value_in_path(segments: &[PathSegment], value: Value) -> Value {
789    if segments.is_empty() {
790        return value;
791    }
792    let inner = wrap_value_in_path(&segments[1..], value);
793    match &segments[0] {
794        PathSegment::Key(k) => {
795            json!({"M": {k.clone(): inner}})
796        }
797        PathSegment::Index(idx) => {
798            let mut arr = vec![Value::Null; idx + 1];
799            arr[*idx] = inner;
800            json!({"L": arr})
801        }
802    }
803}
804
805/// Merge two attribute values (for overlapping projections).
806fn merge_attribute_values(a: Value, b: Value) -> Value {
807    if let (Some(a_map), Some(b_map)) = (
808        a.get("M").and_then(|v| v.as_object()),
809        b.get("M").and_then(|v| v.as_object()),
810    ) {
811        let mut merged = a_map.clone();
812        for (k, v) in b_map {
813            if let Some(existing) = merged.get(k) {
814                merged.insert(
815                    k.clone(),
816                    merge_attribute_values(existing.clone(), v.clone()),
817                );
818            } else {
819                merged.insert(k.clone(), v.clone());
820            }
821        }
822        json!({"M": merged})
823    } else {
824        b
825    }
826}
827
828fn evaluate_condition(
829    condition: &str,
830    existing: Option<&HashMap<String, AttributeValue>>,
831    expr_attr_names: &HashMap<String, String>,
832    expr_attr_values: &HashMap<String, Value>,
833) -> Result<(), AwsServiceError> {
834    // ConditionExpression and FilterExpression share the same DynamoDB grammar,
835    // so we delegate to evaluate_filter_expression. An empty map models "item
836    // doesn't exist" correctly: attribute_exists → false, attribute_not_exists
837    // → true, comparisons against missing attributes → None vs Some(val).
838    let empty = HashMap::new();
839    let item = existing.unwrap_or(&empty);
840    if evaluate_filter_expression(condition, item, expr_attr_names, expr_attr_values) {
841        Ok(())
842    } else {
843        Err(AwsServiceError::aws_error(
844            StatusCode::BAD_REQUEST,
845            "ConditionalCheckFailedException",
846            "The conditional request failed",
847        ))
848    }
849}
850
851fn extract_function_arg<'a>(expr: &'a str, func_name: &str) -> Option<&'a str> {
852    // aws-sdk-go v2's expression builder emits function calls with a space
853    // between the name and the opening paren (`attribute_exists (#0)`),
854    // while hand-written expressions usually don't — accept both.
855    let with_paren = format!("{func_name}(");
856    let with_space = format!("{func_name} (");
857    let rest = expr
858        .strip_prefix(&with_paren)
859        .or_else(|| expr.strip_prefix(&with_space))?;
860    let inner = rest.strip_suffix(')')?;
861    Some(inner.trim())
862}
863
864fn evaluate_key_condition(
865    expr: &str,
866    item: &HashMap<String, AttributeValue>,
867    hash_key_name: &str,
868    _range_key_name: Option<&str>,
869    expr_attr_names: &HashMap<String, String>,
870    expr_attr_values: &HashMap<String, Value>,
871) -> bool {
872    let parts: Vec<&str> = split_on_and(expr);
873    for part in &parts {
874        let part = part.trim();
875        if !evaluate_single_key_condition(
876            part,
877            item,
878            hash_key_name,
879            expr_attr_names,
880            expr_attr_values,
881        ) {
882            return false;
883        }
884    }
885    true
886}
887
888/// Split a DynamoDB condition expression on a top-level keyword (``" AND "``,
889/// ``" OR "``), case-insensitively. Parenthesised groups are skipped so only
890/// unparenthesised occurrences of the keyword act as separators.
891fn split_on_top_level_keyword<'a>(expr: &'a str, keyword: &str) -> Vec<&'a str> {
892    let mut parts = Vec::new();
893    let mut start = 0;
894    let len = expr.len();
895    let mut i = 0;
896    let mut depth = 0;
897    while i < len {
898        let ch = expr.as_bytes()[i];
899        if ch == b'(' {
900            depth += 1;
901        } else if ch == b')' {
902            if depth > 0 {
903                depth -= 1;
904            }
905        } else if depth == 0
906            && i + keyword.len() <= len
907            && expr.is_char_boundary(i)
908            && expr.is_char_boundary(i + keyword.len())
909            && expr[i..i + keyword.len()].eq_ignore_ascii_case(keyword)
910        {
911            parts.push(&expr[start..i]);
912            start = i + keyword.len();
913            i = start;
914            continue;
915        }
916        i += 1;
917    }
918    parts.push(&expr[start..]);
919    parts
920}
921
922fn split_on_and(expr: &str) -> Vec<&str> {
923    split_on_top_level_keyword(expr, " AND ")
924}
925
926fn split_on_or(expr: &str) -> Vec<&str> {
927    split_on_top_level_keyword(expr, " OR ")
928}
929
930fn evaluate_single_key_condition(
931    part: &str,
932    item: &HashMap<String, AttributeValue>,
933    _hash_key_name: &str,
934    expr_attr_names: &HashMap<String, String>,
935    expr_attr_values: &HashMap<String, Value>,
936) -> bool {
937    let part = part.trim();
938
939    if let Some(rest) = part
940        .strip_prefix("begins_with(")
941        .or_else(|| part.strip_prefix("begins_with ("))
942    {
943        return key_cond_begins_with(rest, item, expr_attr_names, expr_attr_values);
944    }
945
946    if let Some(between_pos) = part.to_ascii_uppercase().find("BETWEEN") {
947        return key_cond_between(part, between_pos, item, expr_attr_names, expr_attr_values);
948    }
949
950    key_cond_simple_comparison(part, item, expr_attr_names, expr_attr_values)
951}
952
953/// `begins_with(attr, :val)` — KeyCondition variant: supports only
954/// S-typed attributes (mirrors AWS's behavior of returning false for
955/// type mismatches). The filter-expression evaluator has its own
956/// `eval_begins_with` because it operates on filter-grammar inputs.
957fn key_cond_begins_with(
958    rest: &str,
959    item: &HashMap<String, AttributeValue>,
960    expr_attr_names: &HashMap<String, String>,
961    expr_attr_values: &HashMap<String, Value>,
962) -> bool {
963    let Some(inner) = rest.strip_suffix(')') else {
964        return false;
965    };
966    let mut split = inner.splitn(2, ',');
967    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
968        return false;
969    };
970    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
971    let expected = expr_attr_values.get(val_ref.trim());
972    let actual = item.get(&attr_name);
973    match (actual, expected) {
974        (Some(a), Some(e)) => {
975            let a_str = a.get("S").and_then(|v| v.as_str());
976            let e_str = e.get("S").and_then(|v| v.as_str());
977            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
978        }
979        _ => false,
980    }
981}
982
983/// `attr BETWEEN :lo AND :hi` — inclusive range comparison via the
984/// shared `compare_attribute_values` ordering.
985fn key_cond_between(
986    part: &str,
987    between_pos: usize,
988    item: &HashMap<String, AttributeValue>,
989    expr_attr_names: &HashMap<String, String>,
990    expr_attr_values: &HashMap<String, Value>,
991) -> bool {
992    let attr_part = part[..between_pos].trim();
993    let attr_name = resolve_attr_name(attr_part, expr_attr_names);
994    let range_part = &part[between_pos + 7..];
995    let Some(and_pos) = range_part.to_ascii_uppercase().find(" AND ") else {
996        return false;
997    };
998    let lo_ref = range_part[..and_pos].trim();
999    let hi_ref = range_part[and_pos + 5..].trim();
1000    let lo = expr_attr_values.get(lo_ref);
1001    let hi = expr_attr_values.get(hi_ref);
1002    let actual = item.get(&attr_name);
1003    match (actual, lo, hi) {
1004        (Some(a), Some(l), Some(h)) => {
1005            compare_attribute_values(Some(a), Some(l)) != std::cmp::Ordering::Less
1006                && compare_attribute_values(Some(a), Some(h)) != std::cmp::Ordering::Greater
1007        }
1008        _ => false,
1009    }
1010}
1011
1012/// `attr <op> :val` — six operators (`=`, `<>`, `<`, `>`, `<=`, `>=`).
1013/// Multi-character operators come first in the search list so that `<=`
1014/// is not mistakenly matched as `<`.
1015fn key_cond_simple_comparison(
1016    part: &str,
1017    item: &HashMap<String, AttributeValue>,
1018    expr_attr_names: &HashMap<String, String>,
1019    expr_attr_values: &HashMap<String, Value>,
1020) -> bool {
1021    for op in &["<=", ">=", "<>", "=", "<", ">"] {
1022        let Some(pos) = part.find(op) else {
1023            continue;
1024        };
1025        let left = part[..pos].trim();
1026        let right = part[pos + op.len()..].trim();
1027        let attr_name = resolve_attr_name(left, expr_attr_names);
1028        let expected = expr_attr_values.get(right);
1029        let actual = item.get(&attr_name);
1030
1031        return match *op {
1032            "=" => actual == expected,
1033            "<>" => actual != expected,
1034            "<" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Less,
1035            ">" => compare_attribute_values(actual, expected) == std::cmp::Ordering::Greater,
1036            "<=" => {
1037                let cmp = compare_attribute_values(actual, expected);
1038                cmp == std::cmp::Ordering::Less || cmp == std::cmp::Ordering::Equal
1039            }
1040            ">=" => {
1041                let cmp = compare_attribute_values(actual, expected);
1042                cmp == std::cmp::Ordering::Greater || cmp == std::cmp::Ordering::Equal
1043            }
1044            _ => false,
1045        };
1046    }
1047    false
1048}
1049
1050/// Returns the "size" of a DynamoDB attribute value per AWS docs:
1051/// S → character count, N → always 0 (AWS returns size of internal representation, we approximate),
1052/// B → byte count, SS/NS/BS → element count, L → element count, M → element count,
1053/// BOOL/NULL → 1.
1054fn attribute_size(val: &Value) -> Option<usize> {
1055    if let Some(s) = val.get("S").and_then(|v| v.as_str()) {
1056        return Some(s.len());
1057    }
1058    if let Some(b) = val.get("B").and_then(|v| v.as_str()) {
1059        // B is base64-encoded — return decoded byte count
1060        let decoded_len = base64::engine::general_purpose::STANDARD
1061            .decode(b)
1062            .map(|v| v.len())
1063            .unwrap_or(b.len());
1064        return Some(decoded_len);
1065    }
1066    if let Some(arr) = val.get("SS").and_then(|v| v.as_array()) {
1067        return Some(arr.len());
1068    }
1069    if let Some(arr) = val.get("NS").and_then(|v| v.as_array()) {
1070        return Some(arr.len());
1071    }
1072    if let Some(arr) = val.get("BS").and_then(|v| v.as_array()) {
1073        return Some(arr.len());
1074    }
1075    if let Some(arr) = val.get("L").and_then(|v| v.as_array()) {
1076        return Some(arr.len());
1077    }
1078    if let Some(obj) = val.get("M").and_then(|v| v.as_object()) {
1079        return Some(obj.len());
1080    }
1081    if val.get("N").is_some() {
1082        // AWS returns numeric representation size; approximate with string length
1083        return val.get("N").and_then(|v| v.as_str()).map(|s| s.len());
1084    }
1085    if val.get("BOOL").is_some() || val.get("NULL").is_some() {
1086        return Some(1);
1087    }
1088    None
1089}
1090
1091/// Evaluate a `size(path) op :val` comparison expression.
1092fn evaluate_size_comparison(
1093    part: &str,
1094    item: &HashMap<String, AttributeValue>,
1095    expr_attr_names: &HashMap<String, String>,
1096    expr_attr_values: &HashMap<String, Value>,
1097) -> Option<bool> {
1098    // Find the closing paren of size(...)
1099    let open = part.find('(')?;
1100    let close = part[open..].find(')')? + open;
1101    let path = part[open + 1..close].trim();
1102    let remainder = part[close + 1..].trim();
1103
1104    // Parse operator and value ref
1105    let (op, val_ref) = if let Some(rest) = remainder.strip_prefix("<=") {
1106        ("<=", rest.trim())
1107    } else if let Some(rest) = remainder.strip_prefix(">=") {
1108        (">=", rest.trim())
1109    } else if let Some(rest) = remainder.strip_prefix("<>") {
1110        ("<>", rest.trim())
1111    } else if let Some(rest) = remainder.strip_prefix('<') {
1112        ("<", rest.trim())
1113    } else if let Some(rest) = remainder.strip_prefix('>') {
1114        (">", rest.trim())
1115    } else if let Some(rest) = remainder.strip_prefix('=') {
1116        ("=", rest.trim())
1117    } else {
1118        return None;
1119    };
1120
1121    let attr_name = resolve_attr_name(path, expr_attr_names);
1122    let actual = item.get(&attr_name)?;
1123    let size = attribute_size(actual)? as f64;
1124
1125    let expected = extract_number(&expr_attr_values.get(val_ref).cloned())?;
1126
1127    Some(match op {
1128        "=" => (size - expected).abs() < f64::EPSILON,
1129        "<>" => (size - expected).abs() >= f64::EPSILON,
1130        "<" => size < expected,
1131        ">" => size > expected,
1132        "<=" => size <= expected,
1133        ">=" => size >= expected,
1134        _ => false,
1135    })
1136}
1137
1138fn compare_attribute_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
1139    match (a, b) {
1140        (None, None) => std::cmp::Ordering::Equal,
1141        (None, Some(_)) => std::cmp::Ordering::Less,
1142        (Some(_), None) => std::cmp::Ordering::Greater,
1143        (Some(a), Some(b)) => {
1144            let a_type = attribute_type_and_value(a);
1145            let b_type = attribute_type_and_value(b);
1146            match (a_type, b_type) {
1147                (Some(("S", a_val)), Some(("S", b_val))) => {
1148                    let a_str = a_val.as_str().unwrap_or("");
1149                    let b_str = b_val.as_str().unwrap_or("");
1150                    a_str.cmp(b_str)
1151                }
1152                (Some(("N", a_val)), Some(("N", b_val))) => {
1153                    let a_num: f64 = a_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1154                    let b_num: f64 = b_val.as_str().and_then(|s| s.parse().ok()).unwrap_or(0.0);
1155                    a_num
1156                        .partial_cmp(&b_num)
1157                        .unwrap_or(std::cmp::Ordering::Equal)
1158                }
1159                (Some(("B", a_val)), Some(("B", b_val))) => {
1160                    let a_str = a_val.as_str().unwrap_or("");
1161                    let b_str = b_val.as_str().unwrap_or("");
1162                    a_str.cmp(b_str)
1163                }
1164                _ => std::cmp::Ordering::Equal,
1165            }
1166        }
1167    }
1168}
1169
1170fn evaluate_filter_expression(
1171    expr: &str,
1172    item: &HashMap<String, AttributeValue>,
1173    expr_attr_names: &HashMap<String, String>,
1174    expr_attr_values: &HashMap<String, Value>,
1175) -> bool {
1176    let trimmed = expr.trim();
1177
1178    // Split on OR first (lower precedence), respecting parentheses
1179    let or_parts = split_on_or(trimmed);
1180    if or_parts.len() > 1 {
1181        return or_parts.iter().any(|part| {
1182            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1183        });
1184    }
1185
1186    // Then split on AND (higher precedence), respecting parentheses
1187    let and_parts = split_on_and(trimmed);
1188    if and_parts.len() > 1 {
1189        return and_parts.iter().all(|part| {
1190            evaluate_filter_expression(part.trim(), item, expr_attr_names, expr_attr_values)
1191        });
1192    }
1193
1194    // Strip outer parentheses if present
1195    let stripped = strip_outer_parens(trimmed);
1196    if stripped != trimmed {
1197        return evaluate_filter_expression(stripped, item, expr_attr_names, expr_attr_values);
1198    }
1199
1200    // Handle NOT prefix (case-insensitive)
1201    if trimmed.len() > 4 && trimmed[..4].eq_ignore_ascii_case("NOT ") {
1202        return !evaluate_filter_expression(&trimmed[4..], item, expr_attr_names, expr_attr_values);
1203    }
1204
1205    evaluate_single_filter_condition(trimmed, item, expr_attr_names, expr_attr_values)
1206}
1207
1208/// Strip matching outer parentheses from an expression.
1209fn strip_outer_parens(expr: &str) -> &str {
1210    let trimmed = expr.trim();
1211    if !trimmed.starts_with('(') || !trimmed.ends_with(')') {
1212        return trimmed;
1213    }
1214    // Verify the outer parens actually match each other
1215    let inner = &trimmed[1..trimmed.len() - 1];
1216    let mut depth = 0;
1217    for ch in inner.bytes() {
1218        match ch {
1219            b'(' => depth += 1,
1220            b')' => {
1221                if depth == 0 {
1222                    return trimmed; // closing paren matches something inside, not the outer one
1223                }
1224                depth -= 1;
1225            }
1226            _ => {}
1227        }
1228    }
1229    if depth == 0 {
1230        inner
1231    } else {
1232        trimmed
1233    }
1234}
1235
1236fn evaluate_single_filter_condition(
1237    part: &str,
1238    item: &HashMap<String, AttributeValue>,
1239    expr_attr_names: &HashMap<String, String>,
1240    expr_attr_values: &HashMap<String, Value>,
1241) -> bool {
1242    if let Some(inner) = extract_function_arg(part, "attribute_exists") {
1243        let attr = resolve_attr_name(inner, expr_attr_names);
1244        return item.contains_key(&attr);
1245    }
1246
1247    if let Some(inner) = extract_function_arg(part, "attribute_not_exists") {
1248        let attr = resolve_attr_name(inner, expr_attr_names);
1249        return !item.contains_key(&attr);
1250    }
1251
1252    if let Some(rest) = part
1253        .strip_prefix("begins_with(")
1254        .or_else(|| part.strip_prefix("begins_with ("))
1255    {
1256        return eval_begins_with(rest, item, expr_attr_names, expr_attr_values);
1257    }
1258
1259    if let Some(rest) = part
1260        .strip_prefix("contains(")
1261        .or_else(|| part.strip_prefix("contains ("))
1262    {
1263        return eval_contains(rest, item, expr_attr_names, expr_attr_values);
1264    }
1265
1266    if part.starts_with("size(") || part.starts_with("size (") {
1267        if let Some(result) =
1268            evaluate_size_comparison(part, item, expr_attr_names, expr_attr_values)
1269        {
1270            return result;
1271        }
1272    }
1273
1274    if let Some(rest) = part
1275        .strip_prefix("attribute_type(")
1276        .or_else(|| part.strip_prefix("attribute_type ("))
1277    {
1278        return eval_attribute_type(rest, item, expr_attr_names, expr_attr_values);
1279    }
1280
1281    if let Some((attr_ref, value_refs)) = parse_in_expression(part) {
1282        let attr_name = resolve_attr_name(attr_ref, expr_attr_names);
1283        let actual = item.get(&attr_name);
1284        return evaluate_in_match(actual, &value_refs, expr_attr_values);
1285    }
1286
1287    evaluate_single_key_condition(part, item, "", expr_attr_names, expr_attr_values)
1288}
1289
1290/// `begins_with(path, :val)` — only S (string) operands. Returns false on
1291/// any parse failure or type mismatch (this is the same shape DynamoDB
1292/// returns: a malformed predicate is silently false rather than an error).
1293fn eval_begins_with(
1294    rest: &str,
1295    item: &HashMap<String, AttributeValue>,
1296    expr_attr_names: &HashMap<String, String>,
1297    expr_attr_values: &HashMap<String, Value>,
1298) -> bool {
1299    let Some(inner) = rest.strip_suffix(')') else {
1300        return false;
1301    };
1302    let mut split = inner.splitn(2, ',');
1303    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1304        return false;
1305    };
1306    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1307    let expected = expr_attr_values.get(val_ref.trim());
1308    let actual = item.get(&attr_name);
1309    match (actual, expected) {
1310        (Some(a), Some(e)) => {
1311            let a_str = a.get("S").and_then(|v| v.as_str());
1312            let e_str = e.get("S").and_then(|v| v.as_str());
1313            matches!((a_str, e_str), (Some(a), Some(e)) if a.starts_with(e))
1314        }
1315        _ => false,
1316    }
1317}
1318
1319/// `contains(path, :val)` — substring check on S, set membership on
1320/// SS/NS/BS, and element membership on L. Other type pairings return
1321/// false.
1322fn eval_contains(
1323    rest: &str,
1324    item: &HashMap<String, AttributeValue>,
1325    expr_attr_names: &HashMap<String, String>,
1326    expr_attr_values: &HashMap<String, Value>,
1327) -> bool {
1328    let Some(inner) = rest.strip_suffix(')') else {
1329        return false;
1330    };
1331    let mut split = inner.splitn(2, ',');
1332    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1333        return false;
1334    };
1335    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1336    let expected = expr_attr_values.get(val_ref.trim());
1337    let actual = item.get(&attr_name);
1338    let (Some(a), Some(e)) = (actual, expected) else {
1339        return false;
1340    };
1341
1342    if let (Some(a_s), Some(e_s)) = (
1343        a.get("S").and_then(|v| v.as_str()),
1344        e.get("S").and_then(|v| v.as_str()),
1345    ) {
1346        return a_s.contains(e_s);
1347    }
1348    if let Some(set) = a.get("SS").and_then(|v| v.as_array()) {
1349        if let Some(val) = e.get("S") {
1350            return set.contains(val);
1351        }
1352    }
1353    if let Some(set) = a.get("NS").and_then(|v| v.as_array()) {
1354        if let Some(val) = e.get("N") {
1355            return set.contains(val);
1356        }
1357    }
1358    if let Some(set) = a.get("BS").and_then(|v| v.as_array()) {
1359        if let Some(val) = e.get("B") {
1360            return set.contains(val);
1361        }
1362    }
1363    if let Some(list) = a.get("L").and_then(|v| v.as_array()) {
1364        return list.contains(e);
1365    }
1366    false
1367}
1368
1369/// `attribute_type(path, :type)` — checks whether the attribute at `path`
1370/// is stored under the wire type identified by `:type` (one of the
1371/// DynamoDB type letters S/N/B/BOOL/NULL/SS/NS/BS/L/M).
1372fn eval_attribute_type(
1373    rest: &str,
1374    item: &HashMap<String, AttributeValue>,
1375    expr_attr_names: &HashMap<String, String>,
1376    expr_attr_values: &HashMap<String, Value>,
1377) -> bool {
1378    let Some(inner) = rest.strip_suffix(')') else {
1379        return false;
1380    };
1381    let mut split = inner.splitn(2, ',');
1382    let (Some(attr_ref), Some(val_ref)) = (split.next(), split.next()) else {
1383        return false;
1384    };
1385    let attr_name = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1386    let expected_type = expr_attr_values
1387        .get(val_ref.trim())
1388        .and_then(|v| v.get("S"))
1389        .and_then(|v| v.as_str());
1390    let actual = item.get(&attr_name);
1391    let (Some(val), Some(t)) = (actual, expected_type) else {
1392        return false;
1393    };
1394    match t {
1395        "S" => val.get("S").is_some(),
1396        "N" => val.get("N").is_some(),
1397        "B" => val.get("B").is_some(),
1398        "BOOL" => val.get("BOOL").is_some(),
1399        "NULL" => val.get("NULL").is_some(),
1400        "SS" => val.get("SS").is_some(),
1401        "NS" => val.get("NS").is_some(),
1402        "BS" => val.get("BS").is_some(),
1403        "L" => val.get("L").is_some(),
1404        "M" => val.get("M").is_some(),
1405        _ => false,
1406    }
1407}
1408
1409/// Parse an `attr IN (:v1, :v2, ...)` expression. Mirrors the DynamoDB
1410/// ConditionExpression / FilterExpression grammar where IN takes a single
1411/// operand on the left and 1–100 comma-separated value refs inside parens
1412/// on the right. Case-insensitive; tolerates missing spaces after commas
1413/// (aws-sdk-go's `expression` builder emits ", " but hand-built expressions
1414/// often use `strings.Join(..., ",")`). Returns None for non-IN inputs so
1415/// callers can fall through to their other grammar branches.
1416fn parse_in_expression(expr: &str) -> Option<(&str, Vec<&str>)> {
1417    let upper = expr.to_ascii_uppercase();
1418    let in_pos = upper.find(" IN ")?;
1419    let attr_ref = expr[..in_pos].trim();
1420    if attr_ref.is_empty() {
1421        return None;
1422    }
1423    let rest = expr[in_pos + 4..].trim_start();
1424    let inner = rest.strip_prefix('(')?.strip_suffix(')')?;
1425    let values: Vec<&str> = inner
1426        .split(',')
1427        .map(|s| s.trim())
1428        .filter(|s| !s.is_empty())
1429        .collect();
1430    if values.is_empty() {
1431        return None;
1432    }
1433    Some((attr_ref, values))
1434}
1435
1436/// Return true iff `actual` equals any of the `value_refs` resolved through
1437/// `expr_attr_values`. A missing attribute never matches (mirrors AWS, which
1438/// evaluates `IN` against undefined attributes as false).
1439fn evaluate_in_match(
1440    actual: Option<&AttributeValue>,
1441    value_refs: &[&str],
1442    expr_attr_values: &HashMap<String, Value>,
1443) -> bool {
1444    value_refs.iter().any(|v_ref| {
1445        let expected = expr_attr_values.get(*v_ref);
1446        matches!((actual, expected), (Some(a), Some(e)) if a == e)
1447    })
1448}
1449
1450/// One of the four DynamoDB ``UpdateExpression`` action keywords.
1451#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1452enum UpdateAction {
1453    Set,
1454    Remove,
1455    Add,
1456    Delete,
1457}
1458
1459impl UpdateAction {
1460    /// All four keywords as written on the wire — these double as the search
1461    /// terms for ``parse_update_clauses``.
1462    const KEYWORDS: &'static [(&'static str, UpdateAction)] = &[
1463        ("SET", UpdateAction::Set),
1464        ("REMOVE", UpdateAction::Remove),
1465        ("ADD", UpdateAction::Add),
1466        ("DELETE", UpdateAction::Delete),
1467    ];
1468
1469    fn keyword(self) -> &'static str {
1470        match self {
1471            UpdateAction::Set => "SET",
1472            UpdateAction::Remove => "REMOVE",
1473            UpdateAction::Add => "ADD",
1474            UpdateAction::Delete => "DELETE",
1475        }
1476    }
1477}
1478
1479fn apply_update_expression(
1480    item: &mut HashMap<String, AttributeValue>,
1481    expr: &str,
1482    expr_attr_names: &HashMap<String, String>,
1483    expr_attr_values: &HashMap<String, Value>,
1484) -> Result<(), AwsServiceError> {
1485    let clauses = parse_update_clauses(expr);
1486    if clauses.is_empty() && !expr.trim().is_empty() {
1487        return Err(AwsServiceError::aws_error(
1488            StatusCode::BAD_REQUEST,
1489            "ValidationException",
1490            "Invalid UpdateExpression: Syntax error; token: \"<expression>\"",
1491        ));
1492    }
1493    for (action, assignments) in &clauses {
1494        match action {
1495            UpdateAction::Set => {
1496                for assignment in assignments {
1497                    apply_set_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1498                }
1499            }
1500            UpdateAction::Remove => {
1501                for attr_ref in assignments {
1502                    let attr = resolve_attr_name(attr_ref.trim(), expr_attr_names);
1503                    item.remove(&attr);
1504                }
1505            }
1506            UpdateAction::Add => {
1507                for assignment in assignments {
1508                    apply_add_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1509                }
1510            }
1511            UpdateAction::Delete => {
1512                for assignment in assignments {
1513                    apply_delete_assignment(item, assignment, expr_attr_names, expr_attr_values)?;
1514                }
1515            }
1516        }
1517    }
1518    Ok(())
1519}
1520
1521fn parse_update_clauses(expr: &str) -> Vec<(UpdateAction, Vec<String>)> {
1522    let mut clauses: Vec<(UpdateAction, Vec<String>)> = Vec::new();
1523    let upper = expr.to_ascii_uppercase();
1524    let mut positions: Vec<(usize, UpdateAction)> = Vec::new();
1525
1526    for &(kw, action) in UpdateAction::KEYWORDS {
1527        let mut search_from = 0;
1528        while let Some(pos) = upper[search_from..].find(kw) {
1529            let abs_pos = search_from + pos;
1530            let before_ok = abs_pos == 0 || !expr.as_bytes()[abs_pos - 1].is_ascii_alphanumeric();
1531            let after_pos = abs_pos + kw.len();
1532            let after_ok =
1533                after_pos >= expr.len() || !expr.as_bytes()[after_pos].is_ascii_alphanumeric();
1534            if before_ok && after_ok {
1535                positions.push((abs_pos, action));
1536            }
1537            search_from = abs_pos + kw.len();
1538        }
1539    }
1540
1541    positions.sort_by_key(|(pos, _)| *pos);
1542
1543    for (i, &(pos, action)) in positions.iter().enumerate() {
1544        let start = pos + action.keyword().len();
1545        let end = if i + 1 < positions.len() {
1546            positions[i + 1].0
1547        } else {
1548            expr.len()
1549        };
1550        let content = expr[start..end].trim();
1551        let assignments: Vec<String> = content.split(',').map(|s| s.trim().to_string()).collect();
1552        clauses.push((action, assignments));
1553    }
1554
1555    clauses
1556}
1557
1558fn apply_set_assignment(
1559    item: &mut HashMap<String, AttributeValue>,
1560    assignment: &str,
1561    expr_attr_names: &HashMap<String, String>,
1562    expr_attr_values: &HashMap<String, Value>,
1563) -> Result<(), AwsServiceError> {
1564    let Some((left, right)) = assignment.split_once('=') else {
1565        return Ok(());
1566    };
1567
1568    let left_trimmed = left.trim();
1569    // Split off a trailing `[N]` list-index suffix so we can resolve the
1570    // attribute name ref on its own. Without this, `resolve_attr_name` sees
1571    // "#items[0]" as a whole and misses the `#items` → `items` mapping.
1572    let (attr_ref, list_index) = match parse_list_index_suffix(left_trimmed) {
1573        Some((name, idx)) => (name, Some(idx)),
1574        None => (left_trimmed, None),
1575    };
1576    let attr = resolve_attr_name(attr_ref, expr_attr_names);
1577    let right = right.trim();
1578
1579    if let Some(rest) = right
1580        .strip_prefix("if_not_exists(")
1581        .or_else(|| right.strip_prefix("if_not_exists ("))
1582    {
1583        apply_set_if_not_exists(item, &attr, rest, expr_attr_names, expr_attr_values);
1584        return Ok(());
1585    }
1586
1587    if let Some(rest) = right
1588        .strip_prefix("list_append(")
1589        .or_else(|| right.strip_prefix("list_append ("))
1590    {
1591        apply_set_list_append(item, &attr, rest, expr_attr_names, expr_attr_values);
1592        return Ok(());
1593    }
1594
1595    if let Some((arith_left, arith_right, is_add)) = parse_arithmetic(right) {
1596        return apply_set_arithmetic(
1597            item,
1598            &attr,
1599            arith_left,
1600            arith_right,
1601            is_add,
1602            expr_attr_names,
1603            expr_attr_values,
1604        );
1605    }
1606
1607    let val = resolve_value(right, item, expr_attr_names, expr_attr_values);
1608    if let Some(v) = val {
1609        match list_index {
1610            Some(idx) => assign_list_index(item, &attr, idx, v)?,
1611            None => {
1612                item.insert(attr, v);
1613            }
1614        }
1615    }
1616
1617    Ok(())
1618}
1619
1620/// SET ... = if_not_exists(other_attr, :val) — write `:val` into `attr`
1621/// only when `other_attr` is missing from the item. The lookup uses
1622/// `other_attr`, not the SET target, which is what makes it useful as a
1623/// 'create-or-keep' primitive.
1624fn apply_set_if_not_exists(
1625    item: &mut HashMap<String, AttributeValue>,
1626    attr: &str,
1627    rest: &str,
1628    expr_attr_names: &HashMap<String, String>,
1629    expr_attr_values: &HashMap<String, Value>,
1630) {
1631    let Some(inner) = rest.strip_suffix(')') else {
1632        return;
1633    };
1634    let mut split = inner.splitn(2, ',');
1635    let (Some(check_attr), Some(default_ref)) = (split.next(), split.next()) else {
1636        return;
1637    };
1638    let check_name = resolve_attr_name(check_attr.trim(), expr_attr_names);
1639    if item.contains_key(&check_name) {
1640        return;
1641    }
1642    if let Some(val) = expr_attr_values.get(default_ref.trim()) {
1643        item.insert(attr.to_string(), val.clone());
1644    }
1645}
1646
1647/// SET ... = list_append(a, b) — concatenate the L arrays of two list
1648/// operands. Either operand may be missing or non-list, in which case
1649/// it contributes nothing.
1650fn apply_set_list_append(
1651    item: &mut HashMap<String, AttributeValue>,
1652    attr: &str,
1653    rest: &str,
1654    expr_attr_names: &HashMap<String, String>,
1655    expr_attr_values: &HashMap<String, Value>,
1656) {
1657    let Some(inner) = rest.strip_suffix(')') else {
1658        return;
1659    };
1660    let mut split = inner.splitn(2, ',');
1661    let (Some(a_ref), Some(b_ref)) = (split.next(), split.next()) else {
1662        return;
1663    };
1664    let a_val = resolve_value(a_ref.trim(), item, expr_attr_names, expr_attr_values);
1665    let b_val = resolve_value(b_ref.trim(), item, expr_attr_names, expr_attr_values);
1666
1667    let mut merged = Vec::new();
1668    if let Some(Value::Object(obj)) = &a_val {
1669        if let Some(Value::Array(arr)) = obj.get("L") {
1670            merged.extend(arr.clone());
1671        }
1672    }
1673    if let Some(Value::Object(obj)) = &b_val {
1674        if let Some(Value::Array(arr)) = obj.get("L") {
1675            merged.extend(arr.clone());
1676        }
1677    }
1678
1679    item.insert(attr.to_string(), json!({"L": merged}));
1680}
1681
1682/// SET ... = `<arith_left> +/- <arith_right>` — both operands must
1683/// resolve to N values (or the LHS may be missing, in which case it's
1684/// treated as 0). Anything else is rejected with the same
1685/// `ValidationException` AWS returns.
1686fn apply_set_arithmetic(
1687    item: &mut HashMap<String, AttributeValue>,
1688    attr: &str,
1689    arith_left: &str,
1690    arith_right: &str,
1691    is_add: bool,
1692    expr_attr_names: &HashMap<String, String>,
1693    expr_attr_values: &HashMap<String, Value>,
1694) -> Result<(), AwsServiceError> {
1695    let left_val = resolve_value(arith_left.trim(), item, expr_attr_names, expr_attr_values);
1696    let right_val = resolve_value(arith_right.trim(), item, expr_attr_names, expr_attr_values);
1697
1698    let left_num = match extract_number(&left_val) {
1699        Some(n) => n,
1700        None if left_val.is_some() => {
1701            return Err(AwsServiceError::aws_error(
1702                StatusCode::BAD_REQUEST,
1703                "ValidationException",
1704                "An operand in the update expression has an incorrect data type",
1705            ));
1706        }
1707        None => 0.0,
1708    };
1709    let right_num = extract_number(&right_val).ok_or_else(|| {
1710        AwsServiceError::aws_error(
1711            StatusCode::BAD_REQUEST,
1712            "ValidationException",
1713            "An operand in the update expression has an incorrect data type",
1714        )
1715    })?;
1716
1717    let result = if is_add {
1718        left_num + right_num
1719    } else {
1720        left_num - right_num
1721    };
1722
1723    let num_str = if result == result.trunc() {
1724        format!("{}", result as i64)
1725    } else {
1726        format!("{result}")
1727    };
1728
1729    item.insert(attr.to_string(), json!({"N": num_str}));
1730    Ok(())
1731}
1732
1733/// Parse a trailing `[N]` list-index suffix off the LHS of a SET assignment.
1734/// Returns the bare attribute reference and the index, or None when the LHS
1735/// is a plain attribute (or a path shape we don't yet support).
1736fn parse_list_index_suffix(path: &str) -> Option<(&str, usize)> {
1737    let path = path.trim();
1738    if !path.ends_with(']') {
1739        return None;
1740    }
1741    let open = path.rfind('[')?;
1742    // Require no further `.` / `[` / `]` inside the bracketed portion and no
1743    // further path segments after — we only handle the single-index case
1744    // `name[N]`, not nested shapes like `a.b[0].c`.
1745    let idx_str = &path[open + 1..path.len() - 1];
1746    let idx: usize = idx_str.parse().ok()?;
1747    let name = &path[..open];
1748    if name.is_empty() || name.contains('[') || name.contains(']') || name.contains('.') {
1749        return None;
1750    }
1751    Some((name, idx))
1752}
1753
1754/// Assign a value to a specific index of a `L`-typed attribute. If `idx` is
1755/// within the current list, replaces that slot; if it's at the end, appends.
1756/// AWS rejects writes beyond `len`, so we return a `ValidationException` for
1757/// out-of-range indices and non-list attributes.
1758fn assign_list_index(
1759    item: &mut HashMap<String, AttributeValue>,
1760    attr: &str,
1761    idx: usize,
1762    value: Value,
1763) -> Result<(), AwsServiceError> {
1764    let Some(existing) = item.get_mut(attr) else {
1765        return Err(AwsServiceError::aws_error(
1766            StatusCode::BAD_REQUEST,
1767            "ValidationException",
1768            "The document path provided in the update expression is invalid for update",
1769        ));
1770    };
1771    let Some(list) = existing.get_mut("L").and_then(|l| l.as_array_mut()) else {
1772        return Err(AwsServiceError::aws_error(
1773            StatusCode::BAD_REQUEST,
1774            "ValidationException",
1775            "The document path provided in the update expression is invalid for update",
1776        ));
1777    };
1778    if idx < list.len() {
1779        list[idx] = value;
1780    } else if idx == list.len() {
1781        list.push(value);
1782    } else {
1783        return Err(AwsServiceError::aws_error(
1784            StatusCode::BAD_REQUEST,
1785            "ValidationException",
1786            "The document path provided in the update expression is invalid for update",
1787        ));
1788    }
1789    Ok(())
1790}
1791
1792fn resolve_value(
1793    reference: &str,
1794    item: &HashMap<String, AttributeValue>,
1795    expr_attr_names: &HashMap<String, String>,
1796    expr_attr_values: &HashMap<String, Value>,
1797) -> Option<Value> {
1798    let reference = reference.trim();
1799    if reference.starts_with(':') {
1800        expr_attr_values.get(reference).cloned()
1801    } else {
1802        let attr_name = resolve_attr_name(reference, expr_attr_names);
1803        item.get(&attr_name).cloned()
1804    }
1805}
1806
1807fn extract_number(val: &Option<Value>) -> Option<f64> {
1808    val.as_ref()
1809        .and_then(|v| v.get("N"))
1810        .and_then(|n| n.as_str())
1811        .and_then(|s| s.parse().ok())
1812}
1813
1814fn parse_arithmetic(expr: &str) -> Option<(&str, &str, bool)> {
1815    let mut depth = 0;
1816    for (i, c) in expr.char_indices() {
1817        match c {
1818            '(' => depth += 1,
1819            ')' => depth -= 1,
1820            '+' if depth == 0 && i > 0 => {
1821                return Some((&expr[..i], &expr[i + 1..], true));
1822            }
1823            '-' if depth == 0 && i > 0 => {
1824                return Some((&expr[..i], &expr[i + 1..], false));
1825            }
1826            _ => {}
1827        }
1828    }
1829    None
1830}
1831
1832fn apply_add_assignment(
1833    item: &mut HashMap<String, AttributeValue>,
1834    assignment: &str,
1835    expr_attr_names: &HashMap<String, String>,
1836    expr_attr_values: &HashMap<String, Value>,
1837) -> Result<(), AwsServiceError> {
1838    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1839    if parts.len() != 2 {
1840        return Ok(());
1841    }
1842
1843    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1844    let val_ref = parts[1].trim();
1845    let add_val = expr_attr_values.get(val_ref);
1846
1847    if let Some(add_val) = add_val {
1848        if let Some(existing) = item.get(&attr) {
1849            if let (Some(existing_num), Some(add_num)) = (
1850                extract_number(&Some(existing.clone())),
1851                extract_number(&Some(add_val.clone())),
1852            ) {
1853                let result = existing_num + add_num;
1854                let num_str = if result == result.trunc() {
1855                    format!("{}", result as i64)
1856                } else {
1857                    format!("{result}")
1858                };
1859                item.insert(attr, json!({"N": num_str}));
1860            } else if let Some(existing_set) = existing.get("SS").and_then(|v| v.as_array()) {
1861                if let Some(add_set) = add_val.get("SS").and_then(|v| v.as_array()) {
1862                    let mut merged: Vec<Value> = existing_set.clone();
1863                    for v in add_set {
1864                        if !merged.contains(v) {
1865                            merged.push(v.clone());
1866                        }
1867                    }
1868                    item.insert(attr, json!({"SS": merged}));
1869                }
1870            } else if let Some(existing_set) = existing.get("NS").and_then(|v| v.as_array()) {
1871                if let Some(add_set) = add_val.get("NS").and_then(|v| v.as_array()) {
1872                    let mut merged: Vec<Value> = existing_set.clone();
1873                    for v in add_set {
1874                        if !merged.contains(v) {
1875                            merged.push(v.clone());
1876                        }
1877                    }
1878                    item.insert(attr, json!({"NS": merged}));
1879                }
1880            } else if let Some(existing_set) = existing.get("BS").and_then(|v| v.as_array()) {
1881                if let Some(add_set) = add_val.get("BS").and_then(|v| v.as_array()) {
1882                    let mut merged: Vec<Value> = existing_set.clone();
1883                    for v in add_set {
1884                        if !merged.contains(v) {
1885                            merged.push(v.clone());
1886                        }
1887                    }
1888                    item.insert(attr, json!({"BS": merged}));
1889                }
1890            }
1891        } else {
1892            item.insert(attr, add_val.clone());
1893        }
1894    }
1895
1896    Ok(())
1897}
1898
1899fn apply_delete_assignment(
1900    item: &mut HashMap<String, AttributeValue>,
1901    assignment: &str,
1902    expr_attr_names: &HashMap<String, String>,
1903    expr_attr_values: &HashMap<String, Value>,
1904) -> Result<(), AwsServiceError> {
1905    let parts: Vec<&str> = assignment.splitn(2, ' ').collect();
1906    if parts.len() != 2 {
1907        return Ok(());
1908    }
1909
1910    let attr = resolve_attr_name(parts[0].trim(), expr_attr_names);
1911    let val_ref = parts[1].trim();
1912    let del_val = expr_attr_values.get(val_ref);
1913
1914    if let (Some(existing), Some(del_val)) = (item.get(&attr).cloned(), del_val) {
1915        if let (Some(existing_set), Some(del_set)) = (
1916            existing.get("SS").and_then(|v| v.as_array()),
1917            del_val.get("SS").and_then(|v| v.as_array()),
1918        ) {
1919            let filtered: Vec<Value> = existing_set
1920                .iter()
1921                .filter(|v| !del_set.contains(v))
1922                .cloned()
1923                .collect();
1924            if filtered.is_empty() {
1925                item.remove(&attr);
1926            } else {
1927                item.insert(attr, json!({"SS": filtered}));
1928            }
1929        } else if let (Some(existing_set), Some(del_set)) = (
1930            existing.get("NS").and_then(|v| v.as_array()),
1931            del_val.get("NS").and_then(|v| v.as_array()),
1932        ) {
1933            let filtered: Vec<Value> = existing_set
1934                .iter()
1935                .filter(|v| !del_set.contains(v))
1936                .cloned()
1937                .collect();
1938            if filtered.is_empty() {
1939                item.remove(&attr);
1940            } else {
1941                item.insert(attr, json!({"NS": filtered}));
1942            }
1943        } else if let (Some(existing_set), Some(del_set)) = (
1944            existing.get("BS").and_then(|v| v.as_array()),
1945            del_val.get("BS").and_then(|v| v.as_array()),
1946        ) {
1947            let filtered: Vec<Value> = existing_set
1948                .iter()
1949                .filter(|v| !del_set.contains(v))
1950                .cloned()
1951                .collect();
1952            if filtered.is_empty() {
1953                item.remove(&attr);
1954            } else {
1955                item.insert(attr, json!({"BS": filtered}));
1956            }
1957        }
1958    }
1959
1960    Ok(())
1961}
1962
1963pub(super) struct TableDescriptionInput<'a> {
1964    pub arn: &'a str,
1965    pub table_id: &'a str,
1966    pub key_schema: &'a [KeySchemaElement],
1967    pub attribute_definitions: &'a [AttributeDefinition],
1968    pub provisioned_throughput: &'a ProvisionedThroughput,
1969    pub gsi: &'a [GlobalSecondaryIndex],
1970    pub lsi: &'a [LocalSecondaryIndex],
1971    pub billing_mode: &'a str,
1972    pub created_at: chrono::DateTime<chrono::Utc>,
1973    pub item_count: i64,
1974    pub size_bytes: i64,
1975    pub status: &'a str,
1976}
1977
1978fn build_table_description_json(input: &TableDescriptionInput<'_>) -> Value {
1979    let TableDescriptionInput {
1980        arn,
1981        table_id,
1982        key_schema,
1983        attribute_definitions,
1984        provisioned_throughput,
1985        gsi,
1986        lsi,
1987        billing_mode,
1988        created_at,
1989        item_count,
1990        size_bytes,
1991        status,
1992    } = *input;
1993    let table_name = arn.rsplit('/').next().unwrap_or("");
1994    let creation_timestamp =
1995        created_at.timestamp() as f64 + created_at.timestamp_subsec_millis() as f64 / 1000.0;
1996
1997    let ks: Vec<Value> = key_schema
1998        .iter()
1999        .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2000        .collect();
2001
2002    let ad: Vec<Value> = attribute_definitions
2003        .iter()
2004        .map(|a| json!({"AttributeName": a.attribute_name, "AttributeType": a.attribute_type}))
2005        .collect();
2006
2007    let mut desc = json!({
2008        "TableName": table_name,
2009        "TableArn": arn,
2010        "TableId": table_id,
2011        "TableStatus": status,
2012        "KeySchema": ks,
2013        "AttributeDefinitions": ad,
2014        "CreationDateTime": creation_timestamp,
2015        "ItemCount": item_count,
2016        "TableSizeBytes": size_bytes,
2017        "BillingModeSummary": { "BillingMode": billing_mode },
2018    });
2019
2020    if billing_mode != "PAY_PER_REQUEST" {
2021        desc["ProvisionedThroughput"] = json!({
2022            "ReadCapacityUnits": provisioned_throughput.read_capacity_units,
2023            "WriteCapacityUnits": provisioned_throughput.write_capacity_units,
2024            "NumberOfDecreasesToday": 0,
2025        });
2026    } else {
2027        desc["ProvisionedThroughput"] = json!({
2028            "ReadCapacityUnits": 0,
2029            "WriteCapacityUnits": 0,
2030            "NumberOfDecreasesToday": 0,
2031        });
2032    }
2033
2034    // Terraform's AWS provider now waits on WarmThroughput after CreateTable.
2035    // Real AWS returns an ACTIVE warm throughput object for active tables,
2036    // including PAY_PER_REQUEST tables. Returning null keeps the provider in a
2037    // perpetual "still creating" loop.
2038    if status == "ACTIVE" {
2039        desc["WarmThroughput"] = json!({
2040            "ReadUnitsPerSecond": 0,
2041            "WriteUnitsPerSecond": 0,
2042            "Status": "ACTIVE",
2043        });
2044    }
2045
2046    if !gsi.is_empty() {
2047        let gsi_json: Vec<Value> = gsi
2048            .iter()
2049            .map(|g| {
2050                let gks: Vec<Value> = g
2051                    .key_schema
2052                    .iter()
2053                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2054                    .collect();
2055                let mut idx = json!({
2056                    "IndexName": g.index_name,
2057                    "KeySchema": gks,
2058                    "Projection": { "ProjectionType": g.projection.projection_type },
2059                    "IndexStatus": "ACTIVE",
2060                    "IndexArn": format!("{arn}/index/{}", g.index_name),
2061                    "ItemCount": 0,
2062                    "IndexSizeBytes": 0,
2063                });
2064                if !g.projection.non_key_attributes.is_empty() {
2065                    idx["Projection"]["NonKeyAttributes"] = json!(g.projection.non_key_attributes);
2066                }
2067                if let Some(ref pt) = g.provisioned_throughput {
2068                    idx["ProvisionedThroughput"] = json!({
2069                        "ReadCapacityUnits": pt.read_capacity_units,
2070                        "WriteCapacityUnits": pt.write_capacity_units,
2071                        "NumberOfDecreasesToday": 0,
2072                    });
2073                }
2074                idx
2075            })
2076            .collect();
2077        desc["GlobalSecondaryIndexes"] = json!(gsi_json);
2078    }
2079
2080    if !lsi.is_empty() {
2081        let lsi_json: Vec<Value> = lsi
2082            .iter()
2083            .map(|l| {
2084                let lks: Vec<Value> = l
2085                    .key_schema
2086                    .iter()
2087                    .map(|k| json!({"AttributeName": k.attribute_name, "KeyType": k.key_type}))
2088                    .collect();
2089                let mut idx = json!({
2090                    "IndexName": l.index_name,
2091                    "KeySchema": lks,
2092                    "Projection": { "ProjectionType": l.projection.projection_type },
2093                    "IndexArn": format!("{arn}/index/{}", l.index_name),
2094                    "ItemCount": 0,
2095                    "IndexSizeBytes": 0,
2096                });
2097                if !l.projection.non_key_attributes.is_empty() {
2098                    idx["Projection"]["NonKeyAttributes"] = json!(l.projection.non_key_attributes);
2099                }
2100                idx
2101            })
2102            .collect();
2103        desc["LocalSecondaryIndexes"] = json!(lsi_json);
2104    }
2105
2106    desc
2107}
2108
2109fn build_table_description(table: &DynamoTable) -> Value {
2110    let mut desc = build_table_description_json(&TableDescriptionInput {
2111        arn: &table.arn,
2112        table_id: &table.table_id,
2113        key_schema: &table.key_schema,
2114        attribute_definitions: &table.attribute_definitions,
2115        provisioned_throughput: &table.provisioned_throughput,
2116        gsi: &table.gsi,
2117        lsi: &table.lsi,
2118        billing_mode: &table.billing_mode,
2119        created_at: table.created_at,
2120        item_count: table.item_count,
2121        size_bytes: table.size_bytes,
2122        status: &table.status,
2123    });
2124
2125    // Add stream specification if streams are enabled
2126    if table.stream_enabled {
2127        if let Some(ref stream_arn) = table.stream_arn {
2128            desc["LatestStreamArn"] = json!(stream_arn);
2129            desc["LatestStreamLabel"] = json!(stream_arn.rsplit('/').next().unwrap_or(""));
2130        }
2131        if let Some(ref view_type) = table.stream_view_type {
2132            desc["StreamSpecification"] = json!({
2133                "StreamEnabled": true,
2134                "StreamViewType": view_type,
2135            });
2136        }
2137    }
2138
2139    // Add SSE description
2140    if let Some(ref sse_type) = table.sse_type {
2141        let mut sse_desc = json!({
2142            "Status": "ENABLED",
2143            "SSEType": sse_type,
2144        });
2145        if let Some(ref key_arn) = table.sse_kms_key_arn {
2146            sse_desc["KMSMasterKeyArn"] = json!(key_arn);
2147        }
2148        desc["SSEDescription"] = sse_desc;
2149    } else {
2150        // Default: AWS owned key encryption (always enabled in real AWS)
2151        desc["SSEDescription"] = json!({
2152            "Status": "ENABLED",
2153            "SSEType": "AES256",
2154        });
2155    }
2156
2157    desc
2158}
2159
2160fn execute_partiql_statement(
2161    state: &SharedDynamoDbState,
2162    statement: &str,
2163    parameters: &[Value],
2164) -> Result<AwsResponse, AwsServiceError> {
2165    let trimmed = statement.trim();
2166    let upper = trimmed.to_ascii_uppercase();
2167
2168    if upper.starts_with("SELECT") {
2169        execute_partiql_select(state, trimmed, parameters)
2170    } else if upper.starts_with("INSERT") {
2171        execute_partiql_insert(state, trimmed, parameters)
2172    } else if upper.starts_with("UPDATE") {
2173        execute_partiql_update(state, trimmed, parameters)
2174    } else if upper.starts_with("DELETE") {
2175        execute_partiql_delete(state, trimmed, parameters)
2176    } else {
2177        Err(AwsServiceError::aws_error(
2178            StatusCode::BAD_REQUEST,
2179            "ValidationException",
2180            format!("Unsupported PartiQL statement: {trimmed}"),
2181        ))
2182    }
2183}
2184
2185/// Parse a simple `SELECT * FROM tablename WHERE pk = 'value'` or with parameters.
2186fn execute_partiql_select(
2187    state: &SharedDynamoDbState,
2188    statement: &str,
2189    parameters: &[Value],
2190) -> Result<AwsResponse, AwsServiceError> {
2191    // Pattern: SELECT * FROM "tablename" [WHERE col = 'val' | WHERE col = ?]
2192    let upper = statement.to_ascii_uppercase();
2193    let from_pos = upper.find("FROM").ok_or_else(|| {
2194        AwsServiceError::aws_error(
2195            StatusCode::BAD_REQUEST,
2196            "ValidationException",
2197            "Invalid SELECT statement: missing FROM",
2198        )
2199    })?;
2200
2201    let after_from = statement[from_pos + 4..].trim();
2202    let (table_name, rest) = parse_partiql_table_name(after_from);
2203
2204    let state = state.read();
2205    let table = get_table(&state.tables, &table_name)?;
2206
2207    let rest_upper = rest.trim().to_ascii_uppercase();
2208    if rest_upper.starts_with("WHERE") {
2209        let where_clause = rest.trim()[5..].trim();
2210        let matched = evaluate_partiql_where(table, where_clause, parameters)?;
2211        let items: Vec<Value> = matched.iter().map(|item| json!(item)).collect();
2212        DynamoDbService::ok_json(json!({ "Items": items }))
2213    } else {
2214        // No WHERE, return all items
2215        let items: Vec<Value> = table.items.iter().map(|item| json!(item)).collect();
2216        DynamoDbService::ok_json(json!({ "Items": items }))
2217    }
2218}
2219
2220fn execute_partiql_insert(
2221    state: &SharedDynamoDbState,
2222    statement: &str,
2223    parameters: &[Value],
2224) -> Result<AwsResponse, AwsServiceError> {
2225    // Pattern: INSERT INTO "tablename" VALUE {'pk': 'val', 'attr': 'val'}
2226    // or with parameters: INSERT INTO "tablename" VALUE {'pk': ?, 'attr': ?}
2227    let upper = statement.to_ascii_uppercase();
2228    let into_pos = upper.find("INTO").ok_or_else(|| {
2229        AwsServiceError::aws_error(
2230            StatusCode::BAD_REQUEST,
2231            "ValidationException",
2232            "Invalid INSERT statement: missing INTO",
2233        )
2234    })?;
2235
2236    let after_into = statement[into_pos + 4..].trim();
2237    let (table_name, rest) = parse_partiql_table_name(after_into);
2238
2239    let rest_upper = rest.trim().to_ascii_uppercase();
2240    let value_pos = rest_upper.find("VALUE").ok_or_else(|| {
2241        AwsServiceError::aws_error(
2242            StatusCode::BAD_REQUEST,
2243            "ValidationException",
2244            "Invalid INSERT statement: missing VALUE",
2245        )
2246    })?;
2247
2248    let value_str = rest.trim()[value_pos + 5..].trim();
2249    let item = parse_partiql_value_object(value_str, parameters)?;
2250
2251    let mut state = state.write();
2252    let table = get_table_mut(&mut state.tables, &table_name)?;
2253    let key = extract_key(table, &item);
2254    if table.find_item_index(&key).is_some() {
2255        // DynamoDB PartiQL INSERT fails if item exists
2256        return Err(AwsServiceError::aws_error(
2257            StatusCode::BAD_REQUEST,
2258            "DuplicateItemException",
2259            "Duplicate primary key exists in table",
2260        ));
2261    } else {
2262        table.items.push(item);
2263    }
2264    table.recalculate_stats();
2265
2266    DynamoDbService::ok_json(json!({}))
2267}
2268
2269fn execute_partiql_update(
2270    state: &SharedDynamoDbState,
2271    statement: &str,
2272    parameters: &[Value],
2273) -> Result<AwsResponse, AwsServiceError> {
2274    // Pattern: UPDATE "tablename" SET attr='val' WHERE pk='val'
2275    // or: UPDATE "tablename" SET attr=? WHERE pk=?
2276    let after_update = statement[6..].trim(); // skip "UPDATE"
2277    let (table_name, rest) = parse_partiql_table_name(after_update);
2278
2279    let rest_upper = rest.trim().to_ascii_uppercase();
2280    let set_pos = rest_upper.find("SET").ok_or_else(|| {
2281        AwsServiceError::aws_error(
2282            StatusCode::BAD_REQUEST,
2283            "ValidationException",
2284            "Invalid UPDATE statement: missing SET",
2285        )
2286    })?;
2287
2288    let after_set = rest.trim()[set_pos + 3..].trim();
2289
2290    // Split on WHERE
2291    let where_pos = after_set.to_ascii_uppercase().find("WHERE");
2292    let (set_clause, where_clause) = if let Some(wp) = where_pos {
2293        (&after_set[..wp], after_set[wp + 5..].trim())
2294    } else {
2295        (after_set, "")
2296    };
2297
2298    let mut state = state.write();
2299    let table = get_table_mut(&mut state.tables, &table_name)?;
2300
2301    let matched_indices = if !where_clause.is_empty() {
2302        find_partiql_where_indices(table, where_clause, parameters)?
2303    } else {
2304        (0..table.items.len()).collect()
2305    };
2306
2307    // Parse SET assignments: attr=value, attr2=value2
2308    let param_offset = count_params_in_str(where_clause);
2309    let assignments: Vec<&str> = set_clause.split(',').collect();
2310    for idx in &matched_indices {
2311        let mut local_offset = param_offset;
2312        for assignment in &assignments {
2313            let assignment = assignment.trim();
2314            if let Some((attr, val_str)) = assignment.split_once('=') {
2315                let attr = attr.trim().trim_matches('"');
2316                let val_str = val_str.trim();
2317                let value = parse_partiql_literal(val_str, parameters, &mut local_offset);
2318                if let Some(v) = value {
2319                    table.items[*idx].insert(attr.to_string(), v);
2320                }
2321            }
2322        }
2323    }
2324    table.recalculate_stats();
2325
2326    DynamoDbService::ok_json(json!({}))
2327}
2328
2329fn execute_partiql_delete(
2330    state: &SharedDynamoDbState,
2331    statement: &str,
2332    parameters: &[Value],
2333) -> Result<AwsResponse, AwsServiceError> {
2334    // Pattern: DELETE FROM "tablename" WHERE pk='val'
2335    let upper = statement.to_ascii_uppercase();
2336    let from_pos = upper.find("FROM").ok_or_else(|| {
2337        AwsServiceError::aws_error(
2338            StatusCode::BAD_REQUEST,
2339            "ValidationException",
2340            "Invalid DELETE statement: missing FROM",
2341        )
2342    })?;
2343
2344    let after_from = statement[from_pos + 4..].trim();
2345    let (table_name, rest) = parse_partiql_table_name(after_from);
2346
2347    let rest_upper = rest.trim().to_ascii_uppercase();
2348    if !rest_upper.starts_with("WHERE") {
2349        return Err(AwsServiceError::aws_error(
2350            StatusCode::BAD_REQUEST,
2351            "ValidationException",
2352            "DELETE requires a WHERE clause",
2353        ));
2354    }
2355    let where_clause = rest.trim()[5..].trim();
2356
2357    let mut state = state.write();
2358    let table = get_table_mut(&mut state.tables, &table_name)?;
2359
2360    let mut indices = find_partiql_where_indices(table, where_clause, parameters)?;
2361    // Remove from highest index first to avoid invalidating lower indices
2362    indices.sort_unstable();
2363    indices.reverse();
2364    for idx in indices {
2365        table.items.remove(idx);
2366    }
2367    table.recalculate_stats();
2368
2369    DynamoDbService::ok_json(json!({}))
2370}
2371
2372/// Parse a table name that may be quoted with double quotes.
2373/// Returns (table_name, rest_of_string).
2374fn parse_partiql_table_name(s: &str) -> (String, &str) {
2375    let s = s.trim();
2376    if let Some(stripped) = s.strip_prefix('"') {
2377        // Quoted name
2378        if let Some(end) = stripped.find('"') {
2379            let name = &stripped[..end];
2380            let rest = &stripped[end + 1..];
2381            (name.to_string(), rest)
2382        } else {
2383            let end = s.find(' ').unwrap_or(s.len());
2384            (s[..end].trim_matches('"').to_string(), &s[end..])
2385        }
2386    } else {
2387        let end = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
2388        (s[..end].to_string(), &s[end..])
2389    }
2390}
2391
2392/// Evaluate a simple WHERE clause: `col = 'value'` or `col = ?`
2393/// Returns matching items.
2394fn evaluate_partiql_where<'a>(
2395    table: &'a DynamoTable,
2396    where_clause: &str,
2397    parameters: &[Value],
2398) -> Result<Vec<&'a HashMap<String, AttributeValue>>, AwsServiceError> {
2399    let indices = find_partiql_where_indices(table, where_clause, parameters)?;
2400    Ok(indices.iter().map(|i| &table.items[*i]).collect())
2401}
2402
2403fn find_partiql_where_indices(
2404    table: &DynamoTable,
2405    where_clause: &str,
2406    parameters: &[Value],
2407) -> Result<Vec<usize>, AwsServiceError> {
2408    let conditions = split_partiql_and_clauses(where_clause);
2409    let parsed_conditions = parse_partiql_equality_conditions(&conditions, parameters);
2410
2411    let mut indices = Vec::new();
2412    for (i, item) in table.items.iter().enumerate() {
2413        let all_match = parsed_conditions
2414            .iter()
2415            .all(|(attr, expected)| item.get(attr) == Some(expected));
2416        if all_match {
2417            indices.push(i);
2418        }
2419    }
2420
2421    Ok(indices)
2422}
2423
2424/// Split a PartiQL WHERE clause on case-insensitive ` AND ` boundaries.
2425fn split_partiql_and_clauses(where_clause: &str) -> Vec<&str> {
2426    let upper = where_clause.to_uppercase();
2427    if !upper.contains(" AND ") {
2428        return vec![where_clause.trim()];
2429    }
2430    let mut parts = Vec::new();
2431    let mut last = 0;
2432    for (i, _) in upper.match_indices(" AND ") {
2433        parts.push(where_clause[last..i].trim());
2434        last = i + 5;
2435    }
2436    parts.push(where_clause[last..].trim());
2437    parts
2438}
2439
2440/// Parse each `col = literal` (or `col = ?`) condition into an
2441/// `(attribute_name, expected_AttributeValue)` pair. Conditions that
2442/// don't parse as equality, or whose RHS literal can't be resolved, are
2443/// silently dropped — that mirrors the prior inline behavior.
2444fn parse_partiql_equality_conditions(
2445    conditions: &[&str],
2446    parameters: &[Value],
2447) -> Vec<(String, Value)> {
2448    let mut param_idx = 0usize;
2449    let mut parsed = Vec::new();
2450    for cond in conditions {
2451        let cond = cond.trim();
2452        if let Some((left, right)) = cond.split_once('=') {
2453            let attr = left.trim().trim_matches('"').to_string();
2454            let val_str = right.trim();
2455            if let Some(value) = parse_partiql_literal(val_str, parameters, &mut param_idx) {
2456                parsed.push((attr, value));
2457            }
2458        }
2459    }
2460    parsed
2461}
2462
2463/// Parse a PartiQL literal value. Supports:
2464/// - 'string' -> {"S": "string"}
2465/// - 123 -> {"N": "123"}
2466/// - ? -> parameter from list
2467fn parse_partiql_literal(s: &str, parameters: &[Value], param_idx: &mut usize) -> Option<Value> {
2468    let s = s.trim();
2469    if s == "?" {
2470        let idx = *param_idx;
2471        *param_idx += 1;
2472        parameters.get(idx).cloned()
2473    } else if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
2474        let inner = &s[1..s.len() - 1];
2475        Some(json!({"S": inner}))
2476    } else if let Ok(n) = s.parse::<f64>() {
2477        let num_str = if n == n.trunc() {
2478            format!("{}", n as i64)
2479        } else {
2480            format!("{n}")
2481        };
2482        Some(json!({"N": num_str}))
2483    } else {
2484        None
2485    }
2486}
2487
2488/// Parse a PartiQL VALUE object like `{'pk': 'val1', 'attr': 'val2'}` or with ? params.
2489fn parse_partiql_value_object(
2490    s: &str,
2491    parameters: &[Value],
2492) -> Result<HashMap<String, AttributeValue>, AwsServiceError> {
2493    let s = s.trim();
2494    let inner = s
2495        .strip_prefix('{')
2496        .and_then(|s| s.strip_suffix('}'))
2497        .ok_or_else(|| {
2498            AwsServiceError::aws_error(
2499                StatusCode::BAD_REQUEST,
2500                "ValidationException",
2501                "Invalid VALUE: expected object literal",
2502            )
2503        })?;
2504
2505    let mut item = HashMap::new();
2506    let mut param_idx = 0usize;
2507
2508    // Simple comma-separated key:value parsing
2509    for pair in split_partiql_pairs(inner) {
2510        let pair = pair.trim();
2511        if pair.is_empty() {
2512            continue;
2513        }
2514        if let Some((key_part, val_part)) = pair.split_once(':') {
2515            let key = key_part
2516                .trim()
2517                .trim_matches('\'')
2518                .trim_matches('"')
2519                .to_string();
2520            if let Some(val) = parse_partiql_literal(val_part.trim(), parameters, &mut param_idx) {
2521                item.insert(key, val);
2522            }
2523        }
2524    }
2525
2526    Ok(item)
2527}
2528
2529/// Split PartiQL object pairs on commas, respecting nested braces and quotes.
2530fn split_partiql_pairs(s: &str) -> Vec<&str> {
2531    let mut parts = Vec::new();
2532    let mut start = 0;
2533    let mut depth = 0;
2534    let mut in_quote = false;
2535
2536    for (i, c) in s.char_indices() {
2537        match c {
2538            '\'' if !in_quote => in_quote = true,
2539            '\'' if in_quote => in_quote = false,
2540            '{' if !in_quote => depth += 1,
2541            '}' if !in_quote => depth -= 1,
2542            ',' if !in_quote && depth == 0 => {
2543                parts.push(&s[start..i]);
2544                start = i + 1;
2545            }
2546            _ => {}
2547        }
2548    }
2549    parts.push(&s[start..]);
2550    parts
2551}
2552
2553/// Count ? parameters in a string.
2554fn count_params_in_str(s: &str) -> usize {
2555    s.chars().filter(|c| *c == '?').count()
2556}
2557
2558#[cfg(test)]
2559mod tests {
2560    use super::*;
2561    use serde_json::json;
2562
2563    #[test]
2564    fn test_parse_update_clauses_set() {
2565        let clauses = parse_update_clauses("SET #a = :val1, #b = :val2");
2566        assert_eq!(clauses.len(), 1);
2567        assert_eq!(clauses[0].0, UpdateAction::Set);
2568        assert_eq!(clauses[0].1.len(), 2);
2569    }
2570
2571    #[test]
2572    fn test_parse_update_clauses_set_and_remove() {
2573        let clauses = parse_update_clauses("SET #a = :val1 REMOVE #b");
2574        assert_eq!(clauses.len(), 2);
2575        assert_eq!(clauses[0].0, UpdateAction::Set);
2576        assert_eq!(clauses[1].0, UpdateAction::Remove);
2577    }
2578
2579    #[test]
2580    fn test_evaluate_key_condition_simple() {
2581        let mut item = HashMap::new();
2582        item.insert("pk".to_string(), json!({"S": "user1"}));
2583        item.insert("sk".to_string(), json!({"S": "order1"}));
2584
2585        let mut expr_values = HashMap::new();
2586        expr_values.insert(":pk".to_string(), json!({"S": "user1"}));
2587
2588        assert!(evaluate_key_condition(
2589            "pk = :pk",
2590            &item,
2591            "pk",
2592            Some("sk"),
2593            &HashMap::new(),
2594            &expr_values,
2595        ));
2596    }
2597
2598    #[test]
2599    fn test_compare_attribute_values_numbers() {
2600        let a = json!({"N": "10"});
2601        let b = json!({"N": "20"});
2602        assert_eq!(
2603            compare_attribute_values(Some(&a), Some(&b)),
2604            std::cmp::Ordering::Less
2605        );
2606    }
2607
2608    #[test]
2609    fn test_compare_attribute_values_strings() {
2610        let a = json!({"S": "apple"});
2611        let b = json!({"S": "banana"});
2612        assert_eq!(
2613            compare_attribute_values(Some(&a), Some(&b)),
2614            std::cmp::Ordering::Less
2615        );
2616    }
2617
2618    #[test]
2619    fn test_split_on_and() {
2620        let parts = split_on_and("pk = :pk AND sk > :sk");
2621        assert_eq!(parts.len(), 2);
2622        assert_eq!(parts[0].trim(), "pk = :pk");
2623        assert_eq!(parts[1].trim(), "sk > :sk");
2624    }
2625
2626    #[test]
2627    fn test_split_on_and_respects_parentheses() {
2628        // Before fix: split_on_and would split inside the parens
2629        let parts = split_on_and("(a = :a AND b = :b) OR c = :c");
2630        // Should NOT split on the AND inside parentheses
2631        assert_eq!(parts.len(), 1);
2632        assert_eq!(parts[0].trim(), "(a = :a AND b = :b) OR c = :c");
2633    }
2634
2635    #[test]
2636    fn test_evaluate_filter_expression_parenthesized_and_with_or() {
2637        // (a AND b) OR c — should match when c is true but a is false
2638        let mut item = HashMap::new();
2639        item.insert("x".to_string(), json!({"S": "no"}));
2640        item.insert("y".to_string(), json!({"S": "no"}));
2641        item.insert("z".to_string(), json!({"S": "yes"}));
2642
2643        let mut expr_values = HashMap::new();
2644        expr_values.insert(":yes".to_string(), json!({"S": "yes"}));
2645
2646        // x=yes AND y=yes => false, but z=yes => true => overall true
2647        let result = evaluate_filter_expression(
2648            "(x = :yes AND y = :yes) OR z = :yes",
2649            &item,
2650            &HashMap::new(),
2651            &expr_values,
2652        );
2653        assert!(result, "should match because z = :yes is true");
2654
2655        // x=yes AND y=yes => false, z=yes => false => overall false
2656        let mut item2 = HashMap::new();
2657        item2.insert("x".to_string(), json!({"S": "no"}));
2658        item2.insert("y".to_string(), json!({"S": "no"}));
2659        item2.insert("z".to_string(), json!({"S": "no"}));
2660
2661        let result2 = evaluate_filter_expression(
2662            "(x = :yes AND y = :yes) OR z = :yes",
2663            &item2,
2664            &HashMap::new(),
2665            &expr_values,
2666        );
2667        assert!(!result2, "should not match because nothing is true");
2668    }
2669
2670    #[test]
2671    fn test_project_item_nested_path() {
2672        // Item with a list attribute containing maps
2673        let mut item = HashMap::new();
2674        item.insert("pk".to_string(), json!({"S": "key1"}));
2675        item.insert(
2676            "data".to_string(),
2677            json!({"L": [{"M": {"name": {"S": "Alice"}, "age": {"N": "30"}}}, {"M": {"name": {"S": "Bob"}}}]}),
2678        );
2679
2680        let body = json!({
2681            "ProjectionExpression": "data[0].name"
2682        });
2683
2684        let projected = project_item(&item, &body);
2685        // Should contain data[0].name = "Alice", not the entire data[0] element
2686        let name = projected
2687            .get("data")
2688            .and_then(|v| v.get("L"))
2689            .and_then(|v| v.get(0))
2690            .and_then(|v| v.get("M"))
2691            .and_then(|v| v.get("name"))
2692            .and_then(|v| v.get("S"))
2693            .and_then(|v| v.as_str());
2694        assert_eq!(name, Some("Alice"));
2695
2696        // Should NOT contain the "age" field
2697        let age = projected
2698            .get("data")
2699            .and_then(|v| v.get("L"))
2700            .and_then(|v| v.get(0))
2701            .and_then(|v| v.get("M"))
2702            .and_then(|v| v.get("age"));
2703        assert!(age.is_none(), "age should not be present in projection");
2704    }
2705
2706    #[test]
2707    fn test_resolve_nested_path_map() {
2708        let mut item = HashMap::new();
2709        item.insert(
2710            "info".to_string(),
2711            json!({"M": {"address": {"M": {"city": {"S": "NYC"}}}}}),
2712        );
2713
2714        let result = resolve_nested_path(&item, "info.address.city");
2715        assert_eq!(result, Some(json!({"S": "NYC"})));
2716    }
2717
2718    #[test]
2719    fn test_resolve_nested_path_list_then_map() {
2720        let mut item = HashMap::new();
2721        item.insert(
2722            "items".to_string(),
2723            json!({"L": [{"M": {"sku": {"S": "ABC"}}}]}),
2724        );
2725
2726        let result = resolve_nested_path(&item, "items[0].sku");
2727        assert_eq!(result, Some(json!({"S": "ABC"})));
2728    }
2729
2730    // -- Integration-style tests using DynamoDbService --
2731
2732    use crate::state::SharedDynamoDbState;
2733    use parking_lot::RwLock;
2734    use std::sync::Arc;
2735
2736    fn make_service() -> DynamoDbService {
2737        let state: SharedDynamoDbState = Arc::new(RwLock::new(crate::state::DynamoDbState::new(
2738            "123456789012",
2739            "us-east-1",
2740        )));
2741        DynamoDbService::new(state)
2742    }
2743
2744    fn make_request(action: &str, body: Value) -> AwsRequest {
2745        AwsRequest {
2746            service: "dynamodb".to_string(),
2747            action: action.to_string(),
2748            region: "us-east-1".to_string(),
2749            account_id: "123456789012".to_string(),
2750            request_id: "test-id".to_string(),
2751            headers: http::HeaderMap::new(),
2752            query_params: HashMap::new(),
2753            body: serde_json::to_vec(&body).unwrap().into(),
2754            path_segments: vec![],
2755            raw_path: "/".to_string(),
2756            raw_query: String::new(),
2757            method: http::Method::POST,
2758            is_query_protocol: false,
2759            access_key_id: None,
2760        }
2761    }
2762
2763    fn create_test_table(svc: &DynamoDbService) {
2764        let req = make_request(
2765            "CreateTable",
2766            json!({
2767                "TableName": "test-table",
2768                "KeySchema": [
2769                    { "AttributeName": "pk", "KeyType": "HASH" }
2770                ],
2771                "AttributeDefinitions": [
2772                    { "AttributeName": "pk", "AttributeType": "S" }
2773                ],
2774                "BillingMode": "PAY_PER_REQUEST"
2775            }),
2776        );
2777        svc.create_table(&req).unwrap();
2778    }
2779
2780    #[test]
2781    fn describe_table_returns_stable_table_id_and_active_warm_throughput() {
2782        let svc = make_service();
2783        let req = make_request(
2784            "CreateTable",
2785            json!({
2786                "TableName": "warm-throughput-table",
2787                "KeySchema": [
2788                    { "AttributeName": "pk", "KeyType": "HASH" }
2789                ],
2790                "AttributeDefinitions": [
2791                    { "AttributeName": "pk", "AttributeType": "S" }
2792                ],
2793                "BillingMode": "PAY_PER_REQUEST"
2794            }),
2795        );
2796        let create_resp = svc.create_table(&req).unwrap();
2797        let create_body: Value = serde_json::from_slice(create_resp.body.expect_bytes()).unwrap();
2798        let create_table = &create_body["TableDescription"];
2799
2800        assert_eq!(create_table["TableStatus"], "ACTIVE");
2801        assert_eq!(create_table["WarmThroughput"]["Status"], "ACTIVE");
2802        let table_id = create_table["TableId"].as_str().unwrap().to_string();
2803        assert!(!table_id.is_empty());
2804
2805        let describe_req = make_request(
2806            "DescribeTable",
2807            json!({ "TableName": "warm-throughput-table" }),
2808        );
2809        let describe_resp = svc.describe_table(&describe_req).unwrap();
2810        let describe_body: Value =
2811            serde_json::from_slice(describe_resp.body.expect_bytes()).unwrap();
2812        let described_table = &describe_body["Table"];
2813
2814        assert_eq!(described_table["TableStatus"], "ACTIVE");
2815        assert_eq!(described_table["WarmThroughput"]["Status"], "ACTIVE");
2816        assert_eq!(described_table["TableId"], table_id);
2817
2818        let describe_resp_again = svc.describe_table(&describe_req).unwrap();
2819        let describe_body_again: Value =
2820            serde_json::from_slice(describe_resp_again.body.expect_bytes()).unwrap();
2821        assert_eq!(describe_body_again["Table"]["TableId"], table_id);
2822    }
2823
2824    #[test]
2825    fn delete_item_return_values_all_old() {
2826        let svc = make_service();
2827        create_test_table(&svc);
2828
2829        // Put an item
2830        let req = make_request(
2831            "PutItem",
2832            json!({
2833                "TableName": "test-table",
2834                "Item": {
2835                    "pk": { "S": "key1" },
2836                    "name": { "S": "Alice" },
2837                    "age": { "N": "30" }
2838                }
2839            }),
2840        );
2841        svc.put_item(&req).unwrap();
2842
2843        // Delete with ReturnValues=ALL_OLD
2844        let req = make_request(
2845            "DeleteItem",
2846            json!({
2847                "TableName": "test-table",
2848                "Key": { "pk": { "S": "key1" } },
2849                "ReturnValues": "ALL_OLD"
2850            }),
2851        );
2852        let resp = svc.delete_item(&req).unwrap();
2853        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2854
2855        // Verify the old item is returned
2856        let attrs = &body["Attributes"];
2857        assert_eq!(attrs["pk"]["S"].as_str().unwrap(), "key1");
2858        assert_eq!(attrs["name"]["S"].as_str().unwrap(), "Alice");
2859        assert_eq!(attrs["age"]["N"].as_str().unwrap(), "30");
2860
2861        // Verify the item is actually deleted
2862        let req = make_request(
2863            "GetItem",
2864            json!({
2865                "TableName": "test-table",
2866                "Key": { "pk": { "S": "key1" } }
2867            }),
2868        );
2869        let resp = svc.get_item(&req).unwrap();
2870        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2871        assert!(body.get("Item").is_none(), "item should be deleted");
2872    }
2873
2874    #[test]
2875    fn transact_get_items_returns_existing_and_missing() {
2876        let svc = make_service();
2877        create_test_table(&svc);
2878
2879        // Put one item
2880        let req = make_request(
2881            "PutItem",
2882            json!({
2883                "TableName": "test-table",
2884                "Item": {
2885                    "pk": { "S": "exists" },
2886                    "val": { "S": "hello" }
2887                }
2888            }),
2889        );
2890        svc.put_item(&req).unwrap();
2891
2892        let req = make_request(
2893            "TransactGetItems",
2894            json!({
2895                "TransactItems": [
2896                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "exists" } } } },
2897                    { "Get": { "TableName": "test-table", "Key": { "pk": { "S": "missing" } } } }
2898                ]
2899            }),
2900        );
2901        let resp = svc.transact_get_items(&req).unwrap();
2902        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2903        let responses = body["Responses"].as_array().unwrap();
2904        assert_eq!(responses.len(), 2);
2905        assert_eq!(responses[0]["Item"]["pk"]["S"].as_str().unwrap(), "exists");
2906        assert!(responses[1].get("Item").is_none());
2907    }
2908
2909    #[test]
2910    fn transact_write_items_put_and_delete() {
2911        let svc = make_service();
2912        create_test_table(&svc);
2913
2914        // Put initial item
2915        let req = make_request(
2916            "PutItem",
2917            json!({
2918                "TableName": "test-table",
2919                "Item": {
2920                    "pk": { "S": "to-delete" },
2921                    "val": { "S": "bye" }
2922                }
2923            }),
2924        );
2925        svc.put_item(&req).unwrap();
2926
2927        // TransactWrite: put new + delete existing
2928        let req = make_request(
2929            "TransactWriteItems",
2930            json!({
2931                "TransactItems": [
2932                    {
2933                        "Put": {
2934                            "TableName": "test-table",
2935                            "Item": {
2936                                "pk": { "S": "new-item" },
2937                                "val": { "S": "hi" }
2938                            }
2939                        }
2940                    },
2941                    {
2942                        "Delete": {
2943                            "TableName": "test-table",
2944                            "Key": { "pk": { "S": "to-delete" } }
2945                        }
2946                    }
2947                ]
2948            }),
2949        );
2950        let resp = svc.transact_write_items(&req).unwrap();
2951        assert_eq!(resp.status, StatusCode::OK);
2952
2953        // Verify new item exists
2954        let req = make_request(
2955            "GetItem",
2956            json!({
2957                "TableName": "test-table",
2958                "Key": { "pk": { "S": "new-item" } }
2959            }),
2960        );
2961        let resp = svc.get_item(&req).unwrap();
2962        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2963        assert_eq!(body["Item"]["val"]["S"].as_str().unwrap(), "hi");
2964
2965        // Verify deleted item is gone
2966        let req = make_request(
2967            "GetItem",
2968            json!({
2969                "TableName": "test-table",
2970                "Key": { "pk": { "S": "to-delete" } }
2971            }),
2972        );
2973        let resp = svc.get_item(&req).unwrap();
2974        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
2975        assert!(body.get("Item").is_none());
2976    }
2977
2978    #[test]
2979    fn transact_write_items_condition_check_failure() {
2980        let svc = make_service();
2981        create_test_table(&svc);
2982
2983        // TransactWrite with a ConditionCheck that fails (item doesn't exist)
2984        let req = make_request(
2985            "TransactWriteItems",
2986            json!({
2987                "TransactItems": [
2988                    {
2989                        "ConditionCheck": {
2990                            "TableName": "test-table",
2991                            "Key": { "pk": { "S": "nonexistent" } },
2992                            "ConditionExpression": "attribute_exists(pk)"
2993                        }
2994                    }
2995                ]
2996            }),
2997        );
2998        let resp = svc.transact_write_items(&req).unwrap();
2999        // Should be a 400 error response
3000        assert_eq!(resp.status, StatusCode::BAD_REQUEST);
3001        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3002        assert_eq!(
3003            body["__type"].as_str().unwrap(),
3004            "TransactionCanceledException"
3005        );
3006        assert!(body["CancellationReasons"].as_array().is_some());
3007    }
3008
3009    #[test]
3010    fn update_and_describe_time_to_live() {
3011        let svc = make_service();
3012        create_test_table(&svc);
3013
3014        // Enable TTL
3015        let req = make_request(
3016            "UpdateTimeToLive",
3017            json!({
3018                "TableName": "test-table",
3019                "TimeToLiveSpecification": {
3020                    "AttributeName": "ttl",
3021                    "Enabled": true
3022                }
3023            }),
3024        );
3025        let resp = svc.update_time_to_live(&req).unwrap();
3026        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3027        assert_eq!(
3028            body["TimeToLiveSpecification"]["AttributeName"]
3029                .as_str()
3030                .unwrap(),
3031            "ttl"
3032        );
3033        assert!(body["TimeToLiveSpecification"]["Enabled"]
3034            .as_bool()
3035            .unwrap());
3036
3037        // Describe TTL
3038        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3039        let resp = svc.describe_time_to_live(&req).unwrap();
3040        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3041        assert_eq!(
3042            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3043                .as_str()
3044                .unwrap(),
3045            "ENABLED"
3046        );
3047        assert_eq!(
3048            body["TimeToLiveDescription"]["AttributeName"]
3049                .as_str()
3050                .unwrap(),
3051            "ttl"
3052        );
3053
3054        // Disable TTL
3055        let req = make_request(
3056            "UpdateTimeToLive",
3057            json!({
3058                "TableName": "test-table",
3059                "TimeToLiveSpecification": {
3060                    "AttributeName": "ttl",
3061                    "Enabled": false
3062                }
3063            }),
3064        );
3065        svc.update_time_to_live(&req).unwrap();
3066
3067        let req = make_request("DescribeTimeToLive", json!({ "TableName": "test-table" }));
3068        let resp = svc.describe_time_to_live(&req).unwrap();
3069        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3070        assert_eq!(
3071            body["TimeToLiveDescription"]["TimeToLiveStatus"]
3072                .as_str()
3073                .unwrap(),
3074            "DISABLED"
3075        );
3076    }
3077
3078    #[test]
3079    fn resource_policy_lifecycle() {
3080        let svc = make_service();
3081        create_test_table(&svc);
3082
3083        let table_arn = {
3084            let state = svc.state.read();
3085            state.tables.get("test-table").unwrap().arn.clone()
3086        };
3087
3088        // Put policy
3089        let policy_doc = r#"{"Version":"2012-10-17","Statement":[]}"#;
3090        let req = make_request(
3091            "PutResourcePolicy",
3092            json!({
3093                "ResourceArn": table_arn,
3094                "Policy": policy_doc
3095            }),
3096        );
3097        let resp = svc.put_resource_policy(&req).unwrap();
3098        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3099        assert!(body["RevisionId"].as_str().is_some());
3100
3101        // Get policy
3102        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3103        let resp = svc.get_resource_policy(&req).unwrap();
3104        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3105        assert_eq!(body["Policy"].as_str().unwrap(), policy_doc);
3106
3107        // Delete policy
3108        let req = make_request("DeleteResourcePolicy", json!({ "ResourceArn": table_arn }));
3109        svc.delete_resource_policy(&req).unwrap();
3110
3111        // Get should return null now
3112        let req = make_request("GetResourcePolicy", json!({ "ResourceArn": table_arn }));
3113        let resp = svc.get_resource_policy(&req).unwrap();
3114        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3115        assert!(body["Policy"].is_null());
3116    }
3117
3118    #[test]
3119    fn describe_endpoints() {
3120        let svc = make_service();
3121        let req = make_request("DescribeEndpoints", json!({}));
3122        let resp = svc.describe_endpoints(&req).unwrap();
3123        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3124        assert_eq!(body["Endpoints"][0]["CachePeriodInMinutes"], 1440);
3125    }
3126
3127    #[test]
3128    fn describe_limits() {
3129        let svc = make_service();
3130        let req = make_request("DescribeLimits", json!({}));
3131        let resp = svc.describe_limits(&req).unwrap();
3132        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3133        assert_eq!(body["TableMaxReadCapacityUnits"], 40000);
3134    }
3135
3136    #[test]
3137    fn backup_lifecycle() {
3138        let svc = make_service();
3139        create_test_table(&svc);
3140
3141        // Create backup
3142        let req = make_request(
3143            "CreateBackup",
3144            json!({ "TableName": "test-table", "BackupName": "my-backup" }),
3145        );
3146        let resp = svc.create_backup(&req).unwrap();
3147        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3148        let backup_arn = body["BackupDetails"]["BackupArn"]
3149            .as_str()
3150            .unwrap()
3151            .to_string();
3152        assert_eq!(body["BackupDetails"]["BackupStatus"], "AVAILABLE");
3153
3154        // Describe backup
3155        let req = make_request("DescribeBackup", json!({ "BackupArn": backup_arn }));
3156        let resp = svc.describe_backup(&req).unwrap();
3157        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3158        assert_eq!(
3159            body["BackupDescription"]["BackupDetails"]["BackupName"],
3160            "my-backup"
3161        );
3162
3163        // List backups
3164        let req = make_request("ListBackups", json!({}));
3165        let resp = svc.list_backups(&req).unwrap();
3166        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3167        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 1);
3168
3169        // Restore from backup
3170        let req = make_request(
3171            "RestoreTableFromBackup",
3172            json!({ "BackupArn": backup_arn, "TargetTableName": "restored-table" }),
3173        );
3174        svc.restore_table_from_backup(&req).unwrap();
3175
3176        // Verify restored table exists
3177        let req = make_request("DescribeTable", json!({ "TableName": "restored-table" }));
3178        let resp = svc.describe_table(&req).unwrap();
3179        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3180        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3181
3182        // Delete backup
3183        let req = make_request("DeleteBackup", json!({ "BackupArn": backup_arn }));
3184        svc.delete_backup(&req).unwrap();
3185
3186        // List should be empty
3187        let req = make_request("ListBackups", json!({}));
3188        let resp = svc.list_backups(&req).unwrap();
3189        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3190        assert_eq!(body["BackupSummaries"].as_array().unwrap().len(), 0);
3191    }
3192
3193    #[test]
3194    fn continuous_backups() {
3195        let svc = make_service();
3196        create_test_table(&svc);
3197
3198        // Initially disabled
3199        let req = make_request(
3200            "DescribeContinuousBackups",
3201            json!({ "TableName": "test-table" }),
3202        );
3203        let resp = svc.describe_continuous_backups(&req).unwrap();
3204        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3205        assert_eq!(
3206            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3207                ["PointInTimeRecoveryStatus"],
3208            "DISABLED"
3209        );
3210
3211        // Enable
3212        let req = make_request(
3213            "UpdateContinuousBackups",
3214            json!({
3215                "TableName": "test-table",
3216                "PointInTimeRecoverySpecification": {
3217                    "PointInTimeRecoveryEnabled": true
3218                }
3219            }),
3220        );
3221        svc.update_continuous_backups(&req).unwrap();
3222
3223        // Verify
3224        let req = make_request(
3225            "DescribeContinuousBackups",
3226            json!({ "TableName": "test-table" }),
3227        );
3228        let resp = svc.describe_continuous_backups(&req).unwrap();
3229        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3230        assert_eq!(
3231            body["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
3232                ["PointInTimeRecoveryStatus"],
3233            "ENABLED"
3234        );
3235    }
3236
3237    #[test]
3238    fn restore_table_to_point_in_time() {
3239        let svc = make_service();
3240        create_test_table(&svc);
3241
3242        let req = make_request(
3243            "RestoreTableToPointInTime",
3244            json!({
3245                "SourceTableName": "test-table",
3246                "TargetTableName": "pitr-restored"
3247            }),
3248        );
3249        svc.restore_table_to_point_in_time(&req).unwrap();
3250
3251        let req = make_request("DescribeTable", json!({ "TableName": "pitr-restored" }));
3252        let resp = svc.describe_table(&req).unwrap();
3253        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3254        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3255    }
3256
3257    #[test]
3258    fn global_table_lifecycle() {
3259        let svc = make_service();
3260
3261        // Create global table
3262        let req = make_request(
3263            "CreateGlobalTable",
3264            json!({
3265                "GlobalTableName": "my-global",
3266                "ReplicationGroup": [
3267                    { "RegionName": "us-east-1" },
3268                    { "RegionName": "eu-west-1" }
3269                ]
3270            }),
3271        );
3272        let resp = svc.create_global_table(&req).unwrap();
3273        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3274        assert_eq!(
3275            body["GlobalTableDescription"]["GlobalTableStatus"],
3276            "ACTIVE"
3277        );
3278
3279        // Describe
3280        let req = make_request(
3281            "DescribeGlobalTable",
3282            json!({ "GlobalTableName": "my-global" }),
3283        );
3284        let resp = svc.describe_global_table(&req).unwrap();
3285        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3286        assert_eq!(
3287            body["GlobalTableDescription"]["ReplicationGroup"]
3288                .as_array()
3289                .unwrap()
3290                .len(),
3291            2
3292        );
3293
3294        // List
3295        let req = make_request("ListGlobalTables", json!({}));
3296        let resp = svc.list_global_tables(&req).unwrap();
3297        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3298        assert_eq!(body["GlobalTables"].as_array().unwrap().len(), 1);
3299
3300        // Update - add a region
3301        let req = make_request(
3302            "UpdateGlobalTable",
3303            json!({
3304                "GlobalTableName": "my-global",
3305                "ReplicaUpdates": [
3306                    { "Create": { "RegionName": "ap-southeast-1" } }
3307                ]
3308            }),
3309        );
3310        let resp = svc.update_global_table(&req).unwrap();
3311        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3312        assert_eq!(
3313            body["GlobalTableDescription"]["ReplicationGroup"]
3314                .as_array()
3315                .unwrap()
3316                .len(),
3317            3
3318        );
3319
3320        // Describe settings
3321        let req = make_request(
3322            "DescribeGlobalTableSettings",
3323            json!({ "GlobalTableName": "my-global" }),
3324        );
3325        let resp = svc.describe_global_table_settings(&req).unwrap();
3326        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3327        assert_eq!(body["ReplicaSettings"].as_array().unwrap().len(), 3);
3328
3329        // Update settings (no-op, just verify no error)
3330        let req = make_request(
3331            "UpdateGlobalTableSettings",
3332            json!({ "GlobalTableName": "my-global" }),
3333        );
3334        svc.update_global_table_settings(&req).unwrap();
3335    }
3336
3337    #[test]
3338    fn table_replica_auto_scaling() {
3339        let svc = make_service();
3340        create_test_table(&svc);
3341
3342        let req = make_request(
3343            "DescribeTableReplicaAutoScaling",
3344            json!({ "TableName": "test-table" }),
3345        );
3346        let resp = svc.describe_table_replica_auto_scaling(&req).unwrap();
3347        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3348        assert_eq!(
3349            body["TableAutoScalingDescription"]["TableName"],
3350            "test-table"
3351        );
3352
3353        let req = make_request(
3354            "UpdateTableReplicaAutoScaling",
3355            json!({ "TableName": "test-table" }),
3356        );
3357        svc.update_table_replica_auto_scaling(&req).unwrap();
3358    }
3359
3360    #[test]
3361    fn kinesis_streaming_lifecycle() {
3362        let svc = make_service();
3363        create_test_table(&svc);
3364
3365        // Enable
3366        let req = make_request(
3367            "EnableKinesisStreamingDestination",
3368            json!({
3369                "TableName": "test-table",
3370                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3371            }),
3372        );
3373        let resp = svc.enable_kinesis_streaming_destination(&req).unwrap();
3374        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3375        assert_eq!(body["DestinationStatus"], "ACTIVE");
3376
3377        // Describe
3378        let req = make_request(
3379            "DescribeKinesisStreamingDestination",
3380            json!({ "TableName": "test-table" }),
3381        );
3382        let resp = svc.describe_kinesis_streaming_destination(&req).unwrap();
3383        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3384        assert_eq!(
3385            body["KinesisDataStreamDestinations"]
3386                .as_array()
3387                .unwrap()
3388                .len(),
3389            1
3390        );
3391
3392        // Update
3393        let req = make_request(
3394            "UpdateKinesisStreamingDestination",
3395            json!({
3396                "TableName": "test-table",
3397                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream",
3398                "UpdateKinesisStreamingConfiguration": {
3399                    "ApproximateCreationDateTimePrecision": "MICROSECOND"
3400                }
3401            }),
3402        );
3403        svc.update_kinesis_streaming_destination(&req).unwrap();
3404
3405        // Disable
3406        let req = make_request(
3407            "DisableKinesisStreamingDestination",
3408            json!({
3409                "TableName": "test-table",
3410                "StreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream"
3411            }),
3412        );
3413        let resp = svc.disable_kinesis_streaming_destination(&req).unwrap();
3414        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3415        assert_eq!(body["DestinationStatus"], "DISABLED");
3416    }
3417
3418    #[test]
3419    fn contributor_insights_lifecycle() {
3420        let svc = make_service();
3421        create_test_table(&svc);
3422
3423        // Initially disabled
3424        let req = make_request(
3425            "DescribeContributorInsights",
3426            json!({ "TableName": "test-table" }),
3427        );
3428        let resp = svc.describe_contributor_insights(&req).unwrap();
3429        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3430        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3431
3432        // Enable
3433        let req = make_request(
3434            "UpdateContributorInsights",
3435            json!({
3436                "TableName": "test-table",
3437                "ContributorInsightsAction": "ENABLE"
3438            }),
3439        );
3440        let resp = svc.update_contributor_insights(&req).unwrap();
3441        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3442        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3443
3444        // List
3445        let req = make_request("ListContributorInsights", json!({}));
3446        let resp = svc.list_contributor_insights(&req).unwrap();
3447        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3448        assert_eq!(
3449            body["ContributorInsightsSummaries"]
3450                .as_array()
3451                .unwrap()
3452                .len(),
3453            1
3454        );
3455    }
3456
3457    #[test]
3458    fn export_lifecycle() {
3459        let svc = make_service();
3460        create_test_table(&svc);
3461
3462        let table_arn = "arn:aws:dynamodb:us-east-1:123456789012:table/test-table".to_string();
3463
3464        // Export
3465        let req = make_request(
3466            "ExportTableToPointInTime",
3467            json!({
3468                "TableArn": table_arn,
3469                "S3Bucket": "my-bucket"
3470            }),
3471        );
3472        let resp = svc.export_table_to_point_in_time(&req).unwrap();
3473        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3474        let export_arn = body["ExportDescription"]["ExportArn"]
3475            .as_str()
3476            .unwrap()
3477            .to_string();
3478        assert_eq!(body["ExportDescription"]["ExportStatus"], "COMPLETED");
3479
3480        // Describe
3481        let req = make_request("DescribeExport", json!({ "ExportArn": export_arn }));
3482        let resp = svc.describe_export(&req).unwrap();
3483        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3484        assert_eq!(body["ExportDescription"]["S3Bucket"], "my-bucket");
3485
3486        // List
3487        let req = make_request("ListExports", json!({}));
3488        let resp = svc.list_exports(&req).unwrap();
3489        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3490        assert_eq!(body["ExportSummaries"].as_array().unwrap().len(), 1);
3491    }
3492
3493    #[test]
3494    fn import_lifecycle() {
3495        let svc = make_service();
3496
3497        let req = make_request(
3498            "ImportTable",
3499            json!({
3500                "InputFormat": "DYNAMODB_JSON",
3501                "S3BucketSource": { "S3Bucket": "import-bucket" },
3502                "TableCreationParameters": {
3503                    "TableName": "imported-table",
3504                    "KeySchema": [{ "AttributeName": "pk", "KeyType": "HASH" }],
3505                    "AttributeDefinitions": [{ "AttributeName": "pk", "AttributeType": "S" }]
3506                }
3507            }),
3508        );
3509        let resp = svc.import_table(&req).unwrap();
3510        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3511        let import_arn = body["ImportTableDescription"]["ImportArn"]
3512            .as_str()
3513            .unwrap()
3514            .to_string();
3515        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3516
3517        // Describe import
3518        let req = make_request("DescribeImport", json!({ "ImportArn": import_arn }));
3519        let resp = svc.describe_import(&req).unwrap();
3520        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3521        assert_eq!(body["ImportTableDescription"]["ImportStatus"], "COMPLETED");
3522
3523        // List imports
3524        let req = make_request("ListImports", json!({}));
3525        let resp = svc.list_imports(&req).unwrap();
3526        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3527        assert_eq!(body["ImportSummaryList"].as_array().unwrap().len(), 1);
3528
3529        // Verify the table was created
3530        let req = make_request("DescribeTable", json!({ "TableName": "imported-table" }));
3531        let resp = svc.describe_table(&req).unwrap();
3532        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3533        assert_eq!(body["Table"]["TableStatus"], "ACTIVE");
3534    }
3535
3536    #[test]
3537    fn backup_restore_preserves_items() {
3538        let svc = make_service();
3539        create_test_table(&svc);
3540
3541        // Put 3 items
3542        for i in 1..=3 {
3543            let req = make_request(
3544                "PutItem",
3545                json!({
3546                    "TableName": "test-table",
3547                    "Item": {
3548                        "pk": { "S": format!("key{i}") },
3549                        "data": { "S": format!("value{i}") }
3550                    }
3551                }),
3552            );
3553            svc.put_item(&req).unwrap();
3554        }
3555
3556        // Create backup
3557        let req = make_request(
3558            "CreateBackup",
3559            json!({
3560                "TableName": "test-table",
3561                "BackupName": "my-backup"
3562            }),
3563        );
3564        let resp = svc.create_backup(&req).unwrap();
3565        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3566        let backup_arn = body["BackupDetails"]["BackupArn"]
3567            .as_str()
3568            .unwrap()
3569            .to_string();
3570
3571        // Delete all items from the original table
3572        for i in 1..=3 {
3573            let req = make_request(
3574                "DeleteItem",
3575                json!({
3576                    "TableName": "test-table",
3577                    "Key": { "pk": { "S": format!("key{i}") } }
3578                }),
3579            );
3580            svc.delete_item(&req).unwrap();
3581        }
3582
3583        // Verify original table is empty
3584        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3585        let resp = svc.scan(&req).unwrap();
3586        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3587        assert_eq!(body["Count"], 0);
3588
3589        // Restore from backup
3590        let req = make_request(
3591            "RestoreTableFromBackup",
3592            json!({
3593                "BackupArn": backup_arn,
3594                "TargetTableName": "restored-table"
3595            }),
3596        );
3597        svc.restore_table_from_backup(&req).unwrap();
3598
3599        // Scan restored table — should have 3 items
3600        let req = make_request("Scan", json!({ "TableName": "restored-table" }));
3601        let resp = svc.scan(&req).unwrap();
3602        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3603        assert_eq!(body["Count"], 3);
3604        assert_eq!(body["Items"].as_array().unwrap().len(), 3);
3605    }
3606
3607    #[test]
3608    fn global_table_replicates_writes() {
3609        let svc = make_service();
3610        create_test_table(&svc);
3611
3612        // Create global table with replicas
3613        let req = make_request(
3614            "CreateGlobalTable",
3615            json!({
3616                "GlobalTableName": "test-table",
3617                "ReplicationGroup": [
3618                    { "RegionName": "us-east-1" },
3619                    { "RegionName": "eu-west-1" }
3620                ]
3621            }),
3622        );
3623        let resp = svc.create_global_table(&req).unwrap();
3624        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3625        assert_eq!(
3626            body["GlobalTableDescription"]["GlobalTableStatus"],
3627            "ACTIVE"
3628        );
3629
3630        // Put an item
3631        let req = make_request(
3632            "PutItem",
3633            json!({
3634                "TableName": "test-table",
3635                "Item": {
3636                    "pk": { "S": "replicated-key" },
3637                    "data": { "S": "replicated-value" }
3638                }
3639            }),
3640        );
3641        svc.put_item(&req).unwrap();
3642
3643        // Verify the item is readable (since all replicas share the same table)
3644        let req = make_request(
3645            "GetItem",
3646            json!({
3647                "TableName": "test-table",
3648                "Key": { "pk": { "S": "replicated-key" } }
3649            }),
3650        );
3651        let resp = svc.get_item(&req).unwrap();
3652        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3653        assert_eq!(body["Item"]["pk"]["S"], "replicated-key");
3654        assert_eq!(body["Item"]["data"]["S"], "replicated-value");
3655    }
3656
3657    #[test]
3658    fn contributor_insights_tracks_access() {
3659        let svc = make_service();
3660        create_test_table(&svc);
3661
3662        // Enable contributor insights
3663        let req = make_request(
3664            "UpdateContributorInsights",
3665            json!({
3666                "TableName": "test-table",
3667                "ContributorInsightsAction": "ENABLE"
3668            }),
3669        );
3670        svc.update_contributor_insights(&req).unwrap();
3671
3672        // Put items with different partition keys
3673        for key in &["alpha", "beta", "alpha", "alpha", "beta"] {
3674            let req = make_request(
3675                "PutItem",
3676                json!({
3677                    "TableName": "test-table",
3678                    "Item": {
3679                        "pk": { "S": key },
3680                        "data": { "S": "value" }
3681                    }
3682                }),
3683            );
3684            svc.put_item(&req).unwrap();
3685        }
3686
3687        // Get items (to also track read access)
3688        for _ in 0..3 {
3689            let req = make_request(
3690                "GetItem",
3691                json!({
3692                    "TableName": "test-table",
3693                    "Key": { "pk": { "S": "alpha" } }
3694                }),
3695            );
3696            svc.get_item(&req).unwrap();
3697        }
3698
3699        // Describe contributor insights — should show top contributors
3700        let req = make_request(
3701            "DescribeContributorInsights",
3702            json!({ "TableName": "test-table" }),
3703        );
3704        let resp = svc.describe_contributor_insights(&req).unwrap();
3705        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3706        assert_eq!(body["ContributorInsightsStatus"], "ENABLED");
3707
3708        let contributors = body["TopContributors"].as_array().unwrap();
3709        assert!(
3710            !contributors.is_empty(),
3711            "TopContributors should not be empty"
3712        );
3713
3714        // alpha was accessed 3 (put) + 3 (get) = 6 times, beta 2 times
3715        // alpha should be the top contributor
3716        let top = &contributors[0];
3717        assert!(top["Count"].as_u64().unwrap() > 0);
3718
3719        // Verify the rule list is populated
3720        let rules = body["ContributorInsightsRuleList"].as_array().unwrap();
3721        assert!(!rules.is_empty());
3722    }
3723
3724    #[test]
3725    fn contributor_insights_not_tracked_when_disabled() {
3726        let svc = make_service();
3727        create_test_table(&svc);
3728
3729        // Put items without enabling insights
3730        let req = make_request(
3731            "PutItem",
3732            json!({
3733                "TableName": "test-table",
3734                "Item": {
3735                    "pk": { "S": "key1" },
3736                    "data": { "S": "value" }
3737                }
3738            }),
3739        );
3740        svc.put_item(&req).unwrap();
3741
3742        // Describe — should show empty contributors
3743        let req = make_request(
3744            "DescribeContributorInsights",
3745            json!({ "TableName": "test-table" }),
3746        );
3747        let resp = svc.describe_contributor_insights(&req).unwrap();
3748        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3749        assert_eq!(body["ContributorInsightsStatus"], "DISABLED");
3750
3751        let contributors = body["TopContributors"].as_array().unwrap();
3752        assert!(contributors.is_empty());
3753    }
3754
3755    #[test]
3756    fn contributor_insights_disabled_table_no_counters_after_scan() {
3757        let svc = make_service();
3758        create_test_table(&svc);
3759
3760        // Put items
3761        for key in &["alpha", "beta"] {
3762            let req = make_request(
3763                "PutItem",
3764                json!({
3765                    "TableName": "test-table",
3766                    "Item": { "pk": { "S": key } }
3767                }),
3768            );
3769            svc.put_item(&req).unwrap();
3770        }
3771
3772        // Enable insights, then scan, then disable, then check counters are cleared
3773        let req = make_request(
3774            "UpdateContributorInsights",
3775            json!({
3776                "TableName": "test-table",
3777                "ContributorInsightsAction": "ENABLE"
3778            }),
3779        );
3780        svc.update_contributor_insights(&req).unwrap();
3781
3782        // Scan to trigger counter collection
3783        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3784        svc.scan(&req).unwrap();
3785
3786        // Verify counters were collected
3787        let req = make_request(
3788            "DescribeContributorInsights",
3789            json!({ "TableName": "test-table" }),
3790        );
3791        let resp = svc.describe_contributor_insights(&req).unwrap();
3792        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3793        let contributors = body["TopContributors"].as_array().unwrap();
3794        assert!(
3795            !contributors.is_empty(),
3796            "counters should be non-empty while enabled"
3797        );
3798
3799        // Disable insights (this clears counters)
3800        let req = make_request(
3801            "UpdateContributorInsights",
3802            json!({
3803                "TableName": "test-table",
3804                "ContributorInsightsAction": "DISABLE"
3805            }),
3806        );
3807        svc.update_contributor_insights(&req).unwrap();
3808
3809        // Scan again -- should NOT accumulate counters since insights is disabled
3810        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3811        svc.scan(&req).unwrap();
3812
3813        // Verify counters are still empty
3814        let req = make_request(
3815            "DescribeContributorInsights",
3816            json!({ "TableName": "test-table" }),
3817        );
3818        let resp = svc.describe_contributor_insights(&req).unwrap();
3819        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3820        let contributors = body["TopContributors"].as_array().unwrap();
3821        assert!(
3822            contributors.is_empty(),
3823            "counters should be empty after disabling insights"
3824        );
3825    }
3826
3827    #[test]
3828    fn scan_pagination_with_limit() {
3829        let svc = make_service();
3830        create_test_table(&svc);
3831
3832        // Insert 5 items
3833        for i in 0..5 {
3834            let req = make_request(
3835                "PutItem",
3836                json!({
3837                    "TableName": "test-table",
3838                    "Item": {
3839                        "pk": { "S": format!("item{i}") },
3840                        "data": { "S": format!("value{i}") }
3841                    }
3842                }),
3843            );
3844            svc.put_item(&req).unwrap();
3845        }
3846
3847        // Scan with limit=2
3848        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 2 }));
3849        let resp = svc.scan(&req).unwrap();
3850        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3851        assert_eq!(body["Count"], 2);
3852        assert!(
3853            body["LastEvaluatedKey"].is_object(),
3854            "should have LastEvaluatedKey when limit < total items"
3855        );
3856        assert!(body["LastEvaluatedKey"]["pk"].is_object());
3857
3858        // Page through all items
3859        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
3860        let mut lek = body["LastEvaluatedKey"].clone();
3861
3862        while lek.is_object() {
3863            let req = make_request(
3864                "Scan",
3865                json!({
3866                    "TableName": "test-table",
3867                    "Limit": 2,
3868                    "ExclusiveStartKey": lek
3869                }),
3870            );
3871            let resp = svc.scan(&req).unwrap();
3872            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3873            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
3874            lek = body["LastEvaluatedKey"].clone();
3875        }
3876
3877        assert_eq!(
3878            all_items.len(),
3879            5,
3880            "should retrieve all 5 items via pagination"
3881        );
3882    }
3883
3884    #[test]
3885    fn scan_no_pagination_when_all_fit() {
3886        let svc = make_service();
3887        create_test_table(&svc);
3888
3889        for i in 0..3 {
3890            let req = make_request(
3891                "PutItem",
3892                json!({
3893                    "TableName": "test-table",
3894                    "Item": {
3895                        "pk": { "S": format!("item{i}") }
3896                    }
3897                }),
3898            );
3899            svc.put_item(&req).unwrap();
3900        }
3901
3902        // Scan with limit > item count
3903        let req = make_request("Scan", json!({ "TableName": "test-table", "Limit": 10 }));
3904        let resp = svc.scan(&req).unwrap();
3905        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3906        assert_eq!(body["Count"], 3);
3907        assert!(
3908            body["LastEvaluatedKey"].is_null(),
3909            "should not have LastEvaluatedKey when all items fit"
3910        );
3911
3912        // Scan without limit
3913        let req = make_request("Scan", json!({ "TableName": "test-table" }));
3914        let resp = svc.scan(&req).unwrap();
3915        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3916        assert_eq!(body["Count"], 3);
3917        assert!(body["LastEvaluatedKey"].is_null());
3918    }
3919
3920    fn create_composite_table(svc: &DynamoDbService) {
3921        let req = make_request(
3922            "CreateTable",
3923            json!({
3924                "TableName": "composite-table",
3925                "KeySchema": [
3926                    { "AttributeName": "pk", "KeyType": "HASH" },
3927                    { "AttributeName": "sk", "KeyType": "RANGE" }
3928                ],
3929                "AttributeDefinitions": [
3930                    { "AttributeName": "pk", "AttributeType": "S" },
3931                    { "AttributeName": "sk", "AttributeType": "S" }
3932                ],
3933                "BillingMode": "PAY_PER_REQUEST"
3934            }),
3935        );
3936        svc.create_table(&req).unwrap();
3937    }
3938
3939    #[test]
3940    fn query_pagination_with_composite_key() {
3941        let svc = make_service();
3942        create_composite_table(&svc);
3943
3944        // Insert 5 items under the same partition key
3945        for i in 0..5 {
3946            let req = make_request(
3947                "PutItem",
3948                json!({
3949                    "TableName": "composite-table",
3950                    "Item": {
3951                        "pk": { "S": "user1" },
3952                        "sk": { "S": format!("item{i:03}") },
3953                        "data": { "S": format!("value{i}") }
3954                    }
3955                }),
3956            );
3957            svc.put_item(&req).unwrap();
3958        }
3959
3960        // Query with limit=2
3961        let req = make_request(
3962            "Query",
3963            json!({
3964                "TableName": "composite-table",
3965                "KeyConditionExpression": "pk = :pk",
3966                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
3967                "Limit": 2
3968            }),
3969        );
3970        let resp = svc.query(&req).unwrap();
3971        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3972        assert_eq!(body["Count"], 2);
3973        assert!(body["LastEvaluatedKey"].is_object());
3974        assert!(body["LastEvaluatedKey"]["pk"].is_object());
3975        assert!(body["LastEvaluatedKey"]["sk"].is_object());
3976
3977        // Page through all items
3978        let mut all_items: Vec<Value> = body["Items"].as_array().unwrap().clone();
3979        let mut lek = body["LastEvaluatedKey"].clone();
3980
3981        while lek.is_object() {
3982            let req = make_request(
3983                "Query",
3984                json!({
3985                    "TableName": "composite-table",
3986                    "KeyConditionExpression": "pk = :pk",
3987                    "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
3988                    "Limit": 2,
3989                    "ExclusiveStartKey": lek
3990                }),
3991            );
3992            let resp = svc.query(&req).unwrap();
3993            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
3994            all_items.extend(body["Items"].as_array().unwrap().iter().cloned());
3995            lek = body["LastEvaluatedKey"].clone();
3996        }
3997
3998        assert_eq!(
3999            all_items.len(),
4000            5,
4001            "should retrieve all 5 items via pagination"
4002        );
4003
4004        // Verify items came back sorted by sort key
4005        let sks: Vec<String> = all_items
4006            .iter()
4007            .map(|item| item["sk"]["S"].as_str().unwrap().to_string())
4008            .collect();
4009        let mut sorted = sks.clone();
4010        sorted.sort();
4011        assert_eq!(sks, sorted, "items should be sorted by sort key");
4012    }
4013
4014    #[test]
4015    fn query_no_pagination_when_all_fit() {
4016        let svc = make_service();
4017        create_composite_table(&svc);
4018
4019        for i in 0..2 {
4020            let req = make_request(
4021                "PutItem",
4022                json!({
4023                    "TableName": "composite-table",
4024                    "Item": {
4025                        "pk": { "S": "user1" },
4026                        "sk": { "S": format!("item{i}") }
4027                    }
4028                }),
4029            );
4030            svc.put_item(&req).unwrap();
4031        }
4032
4033        let req = make_request(
4034            "Query",
4035            json!({
4036                "TableName": "composite-table",
4037                "KeyConditionExpression": "pk = :pk",
4038                "ExpressionAttributeValues": { ":pk": { "S": "user1" } },
4039                "Limit": 10
4040            }),
4041        );
4042        let resp = svc.query(&req).unwrap();
4043        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4044        assert_eq!(body["Count"], 2);
4045        assert!(
4046            body["LastEvaluatedKey"].is_null(),
4047            "should not have LastEvaluatedKey when all items fit"
4048        );
4049    }
4050
4051    fn create_gsi_table(svc: &DynamoDbService) {
4052        let req = make_request(
4053            "CreateTable",
4054            json!({
4055                "TableName": "gsi-table",
4056                "KeySchema": [
4057                    { "AttributeName": "pk", "KeyType": "HASH" }
4058                ],
4059                "AttributeDefinitions": [
4060                    { "AttributeName": "pk", "AttributeType": "S" },
4061                    { "AttributeName": "gsi_pk", "AttributeType": "S" },
4062                    { "AttributeName": "gsi_sk", "AttributeType": "S" }
4063                ],
4064                "BillingMode": "PAY_PER_REQUEST",
4065                "GlobalSecondaryIndexes": [
4066                    {
4067                        "IndexName": "gsi-index",
4068                        "KeySchema": [
4069                            { "AttributeName": "gsi_pk", "KeyType": "HASH" },
4070                            { "AttributeName": "gsi_sk", "KeyType": "RANGE" }
4071                        ],
4072                        "Projection": { "ProjectionType": "ALL" }
4073                    }
4074                ]
4075            }),
4076        );
4077        svc.create_table(&req).unwrap();
4078    }
4079
4080    #[test]
4081    fn gsi_query_last_evaluated_key_includes_table_pk() {
4082        let svc = make_service();
4083        create_gsi_table(&svc);
4084
4085        // Insert 3 items with the SAME GSI key but different table PKs
4086        for i in 0..3 {
4087            let req = make_request(
4088                "PutItem",
4089                json!({
4090                    "TableName": "gsi-table",
4091                    "Item": {
4092                        "pk": { "S": format!("item{i}") },
4093                        "gsi_pk": { "S": "shared" },
4094                        "gsi_sk": { "S": "sort" }
4095                    }
4096                }),
4097            );
4098            svc.put_item(&req).unwrap();
4099        }
4100
4101        // Query GSI with Limit=1 to trigger pagination
4102        let req = make_request(
4103            "Query",
4104            json!({
4105                "TableName": "gsi-table",
4106                "IndexName": "gsi-index",
4107                "KeyConditionExpression": "gsi_pk = :v",
4108                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4109                "Limit": 1
4110            }),
4111        );
4112        let resp = svc.query(&req).unwrap();
4113        let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4114        assert_eq!(body["Count"], 1);
4115        let lek = &body["LastEvaluatedKey"];
4116        assert!(lek.is_object(), "should have LastEvaluatedKey");
4117        // Must contain the index keys
4118        assert!(lek["gsi_pk"].is_object(), "LEK must contain gsi_pk");
4119        assert!(lek["gsi_sk"].is_object(), "LEK must contain gsi_sk");
4120        // Must also contain the table PK
4121        assert!(
4122            lek["pk"].is_object(),
4123            "LEK must contain table PK for GSI queries"
4124        );
4125    }
4126
4127    #[test]
4128    fn gsi_query_pagination_returns_all_items() {
4129        let svc = make_service();
4130        create_gsi_table(&svc);
4131
4132        // Insert 4 items with the SAME GSI key but different table PKs
4133        for i in 0..4 {
4134            let req = make_request(
4135                "PutItem",
4136                json!({
4137                    "TableName": "gsi-table",
4138                    "Item": {
4139                        "pk": { "S": format!("item{i:03}") },
4140                        "gsi_pk": { "S": "shared" },
4141                        "gsi_sk": { "S": "sort" }
4142                    }
4143                }),
4144            );
4145            svc.put_item(&req).unwrap();
4146        }
4147
4148        // Paginate through all items with Limit=2
4149        let mut all_pks = Vec::new();
4150        let mut lek: Option<Value> = None;
4151
4152        loop {
4153            let mut query = json!({
4154                "TableName": "gsi-table",
4155                "IndexName": "gsi-index",
4156                "KeyConditionExpression": "gsi_pk = :v",
4157                "ExpressionAttributeValues": { ":v": { "S": "shared" } },
4158                "Limit": 2
4159            });
4160            if let Some(ref start_key) = lek {
4161                query["ExclusiveStartKey"] = start_key.clone();
4162            }
4163
4164            let req = make_request("Query", query);
4165            let resp = svc.query(&req).unwrap();
4166            let body: Value = serde_json::from_slice(resp.body.expect_bytes()).unwrap();
4167
4168            for item in body["Items"].as_array().unwrap() {
4169                let pk = item["pk"]["S"].as_str().unwrap().to_string();
4170                all_pks.push(pk);
4171            }
4172
4173            if body["LastEvaluatedKey"].is_object() {
4174                lek = Some(body["LastEvaluatedKey"].clone());
4175            } else {
4176                break;
4177            }
4178        }
4179
4180        all_pks.sort();
4181        assert_eq!(
4182            all_pks,
4183            vec!["item000", "item001", "item002", "item003"],
4184            "pagination should return all items without duplicates"
4185        );
4186    }
4187
4188    fn cond_item(pairs: &[(&str, &str)]) -> HashMap<String, AttributeValue> {
4189        pairs
4190            .iter()
4191            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4192            .collect()
4193    }
4194
4195    fn cond_names(pairs: &[(&str, &str)]) -> HashMap<String, String> {
4196        pairs
4197            .iter()
4198            .map(|(k, v)| (k.to_string(), v.to_string()))
4199            .collect()
4200    }
4201
4202    fn cond_values(pairs: &[(&str, &str)]) -> HashMap<String, Value> {
4203        pairs
4204            .iter()
4205            .map(|(k, v)| (k.to_string(), json!({"S": v})))
4206            .collect()
4207    }
4208
4209    #[test]
4210    fn test_evaluate_condition_bare_not_equal() {
4211        let item = cond_item(&[("state", "active")]);
4212        let names = cond_names(&[("#s", "state")]);
4213        let values = cond_values(&[(":c", "complete")]);
4214
4215        assert!(evaluate_condition("#s <> :c", Some(&item), &names, &values).is_ok());
4216
4217        let item2 = cond_item(&[("state", "complete")]);
4218        assert!(evaluate_condition("#s <> :c", Some(&item2), &names, &values).is_err());
4219    }
4220
4221    #[test]
4222    fn test_evaluate_condition_parenthesized_not_equal() {
4223        let item = cond_item(&[("state", "active")]);
4224        let names = cond_names(&[("#s", "state")]);
4225        let values = cond_values(&[(":c", "complete")]);
4226
4227        assert!(evaluate_condition("(#s <> :c)", Some(&item), &names, &values).is_ok());
4228    }
4229
4230    #[test]
4231    fn test_evaluate_condition_parenthesized_equal_mismatch() {
4232        let item = cond_item(&[("state", "active")]);
4233        let names = cond_names(&[("#s", "state")]);
4234        let values = cond_values(&[(":c", "complete")]);
4235
4236        assert!(evaluate_condition("(#s = :c)", Some(&item), &names, &values).is_err());
4237    }
4238
4239    #[test]
4240    fn test_evaluate_condition_compound_and() {
4241        let item = cond_item(&[("state", "active")]);
4242        let names = cond_names(&[("#s", "state")]);
4243        let values = cond_values(&[(":c", "complete"), (":f", "failed")]);
4244
4245        // active <> complete AND active <> failed => true
4246        assert!(
4247            evaluate_condition("(#s <> :c) AND (#s <> :f)", Some(&item), &names, &values).is_ok()
4248        );
4249    }
4250
4251    #[test]
4252    fn test_evaluate_condition_compound_and_mismatch() {
4253        let item = cond_item(&[("state", "inactive")]);
4254        let names = cond_names(&[("#s", "state")]);
4255        let values = cond_values(&[(":a", "active"), (":b", "active")]);
4256
4257        // inactive = active AND inactive = active => false
4258        assert!(
4259            evaluate_condition("(#s = :a) AND (#s = :b)", Some(&item), &names, &values).is_err()
4260        );
4261    }
4262
4263    #[test]
4264    fn test_evaluate_condition_compound_or() {
4265        let item = cond_item(&[("state", "running")]);
4266        let names = cond_names(&[("#s", "state")]);
4267        let values = cond_values(&[(":a", "active"), (":b", "idle")]);
4268
4269        // running = active OR running = idle => false
4270        assert!(
4271            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values).is_err()
4272        );
4273
4274        // running = active OR running = running => true
4275        let values2 = cond_values(&[(":a", "active"), (":b", "running")]);
4276        assert!(
4277            evaluate_condition("(#s = :a) OR (#s = :b)", Some(&item), &names, &values2).is_ok()
4278        );
4279    }
4280
4281    #[test]
4282    fn test_evaluate_condition_not_operator() {
4283        let item = cond_item(&[("state", "active")]);
4284        let names = cond_names(&[("#s", "state")]);
4285        let values = cond_values(&[(":c", "complete")]);
4286
4287        // NOT (active = complete) => NOT false => true
4288        assert!(evaluate_condition("NOT (#s = :c)", Some(&item), &names, &values).is_ok());
4289
4290        // NOT (active <> complete) => NOT true => false
4291        assert!(evaluate_condition("NOT (#s <> :c)", Some(&item), &names, &values).is_err());
4292
4293        // NOT attribute_exists(#s) on existing item => NOT true => false
4294        assert!(
4295            evaluate_condition("NOT attribute_exists(#s)", Some(&item), &names, &values).is_err()
4296        );
4297
4298        // NOT attribute_exists(#s) on missing item => NOT false => true
4299        assert!(evaluate_condition("NOT attribute_exists(#s)", None, &names, &values).is_ok());
4300    }
4301
4302    #[test]
4303    fn test_evaluate_condition_begins_with() {
4304        // After unification, conditions support begins_with via
4305        // evaluate_single_filter_condition (previously only filters had it).
4306        let item = cond_item(&[("name", "fakecloud-dynamodb")]);
4307        let names = cond_names(&[("#n", "name")]);
4308        let values = cond_values(&[(":p", "fakecloud")]);
4309
4310        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values).is_ok());
4311
4312        let values2 = cond_values(&[(":p", "realcloud")]);
4313        assert!(evaluate_condition("begins_with(#n, :p)", Some(&item), &names, &values2).is_err());
4314    }
4315
4316    #[test]
4317    fn test_evaluate_condition_contains() {
4318        let item = cond_item(&[("tags", "alpha,beta,gamma")]);
4319        let names = cond_names(&[("#t", "tags")]);
4320        let values = cond_values(&[(":v", "beta")]);
4321
4322        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values).is_ok());
4323
4324        let values2 = cond_values(&[(":v", "delta")]);
4325        assert!(evaluate_condition("contains(#t, :v)", Some(&item), &names, &values2).is_err());
4326    }
4327
4328    #[test]
4329    fn test_evaluate_condition_no_existing_item() {
4330        // When no item exists (PutItem with condition), attribute_not_exists
4331        // should succeed and attribute_exists should fail.
4332        let names = cond_names(&[("#s", "state")]);
4333        let values = cond_values(&[(":v", "active")]);
4334
4335        assert!(evaluate_condition("attribute_not_exists(#s)", None, &names, &values).is_ok());
4336        assert!(evaluate_condition("attribute_exists(#s)", None, &names, &values).is_err());
4337        // Comparison against missing item: None != Some(val) => true for <>
4338        assert!(evaluate_condition("#s <> :v", None, &names, &values).is_ok());
4339        // None == Some(val) => false for =
4340        assert!(evaluate_condition("#s = :v", None, &names, &values).is_err());
4341    }
4342
4343    #[test]
4344    fn test_evaluate_filter_not_operator() {
4345        let item = cond_item(&[("status", "pending")]);
4346        let names = cond_names(&[("#s", "status")]);
4347        let values = cond_values(&[(":v", "pending")]);
4348
4349        assert!(!evaluate_filter_expression(
4350            "NOT (#s = :v)",
4351            &item,
4352            &names,
4353            &values
4354        ));
4355        assert!(evaluate_filter_expression(
4356            "NOT (#s <> :v)",
4357            &item,
4358            &names,
4359            &values
4360        ));
4361    }
4362
4363    #[test]
4364    fn test_evaluate_filter_expression_in_match() {
4365        // aws-sdk-go v2's expression.Name("state").In(Value("active"), Value("pending"))
4366        // emits "#0 IN (:0, :1)". Before fix: neither evaluate_single_filter_condition
4367        // nor evaluate_single_key_condition handled IN, so the filter leaf fell through
4368        // to the simple-comparison loop, hit no operators, and returned `true` — meaning
4369        // every item matched every IN filter regardless of value.
4370        let item = cond_item(&[("state", "active")]);
4371        let names = cond_names(&[("#s", "state")]);
4372        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4373
4374        assert!(
4375            evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4376            "state=active should match IN (active, pending)"
4377        );
4378    }
4379
4380    #[test]
4381    fn test_evaluate_filter_expression_in_no_match() {
4382        let item = cond_item(&[("state", "complete")]);
4383        let names = cond_names(&[("#s", "state")]);
4384        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4385
4386        assert!(
4387            !evaluate_filter_expression("#s IN (:a, :p)", &item, &names, &values),
4388            "state=complete should not match IN (active, pending)"
4389        );
4390    }
4391
4392    #[test]
4393    fn test_evaluate_filter_expression_in_no_spaces() {
4394        // orderbot emits the raw form
4395        //     "#status IN (" + strings.Join(keys, ",") + ")"
4396        // which produces "IN (:v0,:v1,:v2)" — no spaces after commas. Must parse.
4397        let item = cond_item(&[("status", "shipped")]);
4398        let names = cond_names(&[("#s", "status")]);
4399        let values = cond_values(&[(":a", "pending"), (":b", "shipped"), (":c", "delivered")]);
4400
4401        assert!(
4402            evaluate_filter_expression("#s IN (:a,:b,:c)", &item, &names, &values),
4403            "no-space IN list should still parse"
4404        );
4405    }
4406
4407    #[test]
4408    fn test_evaluate_filter_expression_in_missing_attribute() {
4409        // A missing attribute must not match any IN list — the silent-true
4410        // fallthrough would wrongly accept these items.
4411        let item: HashMap<String, AttributeValue> = HashMap::new();
4412        let names = cond_names(&[("#s", "state")]);
4413        let values = cond_values(&[(":a", "active")]);
4414
4415        assert!(
4416            !evaluate_filter_expression("#s IN (:a)", &item, &names, &values),
4417            "missing attribute should not match any IN list"
4418        );
4419    }
4420
4421    #[test]
4422    fn test_evaluate_filter_expression_compound_in_and_eq() {
4423        // Shape emitted by `Name("state").In(...).And(Name("priority").Equal(...))`:
4424        //     "(#0 IN (:0, :1)) AND (#1 = :2)"
4425        // split_on_and handles the outer parens, but the IN leaf had the
4426        // silent-true fallthrough, so any item with priority=high would match
4427        // regardless of state.
4428        let item = cond_item(&[("state", "active"), ("priority", "high")]);
4429        let names = cond_names(&[("#s", "state"), ("#p", "priority")]);
4430        let values = cond_values(&[(":a", "active"), (":pe", "pending"), (":h", "high")]);
4431
4432        assert!(
4433            evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item, &names, &values,),
4434            "(active IN (active, pending)) AND (high = high) should match"
4435        );
4436
4437        let item2 = cond_item(&[("state", "complete"), ("priority", "high")]);
4438        assert!(
4439            !evaluate_filter_expression("(#s IN (:a, :pe)) AND (#p = :h)", &item2, &names, &values,),
4440            "(complete IN (active, pending)) AND (high = high) should not match"
4441        );
4442    }
4443
4444    #[test]
4445    fn test_evaluate_condition_attribute_exists_with_space() {
4446        // aws-sdk-go v2's expression.NewBuilder emits function calls with a
4447        // space between the name and the opening paren:
4448        //     "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))"
4449        // Before fix: extract_function_arg used strip_prefix("attribute_exists(")
4450        // with no space, so these fell through the filter leaf entirely and
4451        // hit evaluate_single_key_condition's silent-true fallthrough —
4452        // every conditional write was silently accepted.
4453        let item = cond_item(&[("store_id", "s-1")]);
4454        let names = cond_names(&[("#0", "store_id"), ("#1", "active_viewer_tab_id")]);
4455        let values = cond_values(&[(":0", "tab-A")]);
4456
4457        // On an existing item without active_viewer_tab_id: exists(store_id)
4458        // is true, not_exists(active_viewer_tab_id) is true → OK.
4459        assert!(
4460            evaluate_condition(
4461                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4462                Some(&item),
4463                &names,
4464                &values,
4465            )
4466            .is_ok(),
4467            "claim-lease compound on free item should succeed"
4468        );
4469
4470        // On a missing item: exists(store_id) is false → whole AND false → Err.
4471        assert!(
4472            evaluate_condition(
4473                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4474                None,
4475                &names,
4476                &values,
4477            )
4478            .is_err(),
4479            "claim-lease compound on missing item must fail attribute_exists branch"
4480        );
4481
4482        // On an item already held by tab-B: exists ✓, not_exists ✗, #1 = :0 ✗
4483        // → (✓) AND ((✗) OR (✗)) → false → Err.
4484        let held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-B")]);
4485        assert!(
4486            evaluate_condition(
4487                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4488                Some(&held),
4489                &names,
4490                &values,
4491            )
4492            .is_err(),
4493            "claim-lease compound on item held by another tab must fail"
4494        );
4495
4496        // Same tab re-claiming: exists ✓, not_exists ✗, #1 = :0 ✓
4497        // → (✓) AND ((✗) OR (✓)) → true → Ok.
4498        let self_held = cond_item(&[("store_id", "s-1"), ("active_viewer_tab_id", "tab-A")]);
4499        assert!(
4500            evaluate_condition(
4501                "(attribute_exists (#0)) AND ((attribute_not_exists (#1)) OR (#1 = :0))",
4502                Some(&self_held),
4503                &names,
4504                &values,
4505            )
4506            .is_ok(),
4507            "same-tab re-claim must succeed"
4508        );
4509    }
4510
4511    #[test]
4512    fn test_evaluate_condition_in_match() {
4513        // evaluate_condition delegates to evaluate_filter_expression, so this
4514        // also proves the ConditionExpression path. Before fix: silently Ok.
4515        let item = cond_item(&[("state", "active")]);
4516        let names = cond_names(&[("#s", "state")]);
4517        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4518
4519        assert!(
4520            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_ok(),
4521            "IN should succeed when actual value is in the list"
4522        );
4523    }
4524
4525    #[test]
4526    fn test_evaluate_condition_in_no_match() {
4527        // Before fix: evaluate_condition silently returned Ok(()) for IN — any
4528        // conditional write was accepted regardless of actual state, the
4529        // opposite of what the caller asked for.
4530        let item = cond_item(&[("state", "complete")]);
4531        let names = cond_names(&[("#s", "state")]);
4532        let values = cond_values(&[(":a", "active"), (":p", "pending")]);
4533
4534        assert!(
4535            evaluate_condition("#s IN (:a, :p)", Some(&item), &names, &values).is_err(),
4536            "IN should fail when actual value is not in the list"
4537        );
4538    }
4539
4540    #[test]
4541    fn test_apply_update_set_list_index_replaces_existing() {
4542        // Shape emitted by orderbot's order-item update retry loop:
4543        //     UpdateExpression: fmt.Sprintf("SET #items[%d] = :item", index)
4544        // Before fix: apply_set_assignment called resolve_attr_name on the
4545        // whole "#items[0]" token, which misses the name map, and then
4546        // item.insert("#items[0]", :item), producing a top-level key
4547        // literally named "#items[0]" rather than mutating the list.
4548        let mut item = HashMap::new();
4549        item.insert(
4550            "items".to_string(),
4551            json!({"L": [
4552                {"M": {"sku": {"S": "OLD-A"}}},
4553                {"M": {"sku": {"S": "OLD-B"}}},
4554            ]}),
4555        );
4556
4557        let names = cond_names(&[("#items", "items")]);
4558        let mut values = HashMap::new();
4559        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "NEW-A"}}}));
4560
4561        apply_update_expression(&mut item, "SET #items[0] = :item", &names, &values).unwrap();
4562
4563        let items_list = item
4564            .get("items")
4565            .and_then(|v| v.get("L"))
4566            .and_then(|v| v.as_array())
4567            .expect("items should still be a list");
4568        assert_eq!(items_list.len(), 2, "list length should be unchanged");
4569        let sku0 = items_list[0]
4570            .get("M")
4571            .and_then(|m| m.get("sku"))
4572            .and_then(|s| s.get("S"))
4573            .and_then(|s| s.as_str());
4574        assert_eq!(sku0, Some("NEW-A"), "index 0 should be replaced");
4575        let sku1 = items_list[1]
4576            .get("M")
4577            .and_then(|m| m.get("sku"))
4578            .and_then(|s| s.get("S"))
4579            .and_then(|s| s.as_str());
4580        assert_eq!(sku1, Some("OLD-B"), "index 1 should be untouched");
4581
4582        assert!(!item.contains_key("items[0]"));
4583        assert!(!item.contains_key("#items[0]"));
4584    }
4585
4586    #[test]
4587    fn test_apply_update_set_list_index_second_slot() {
4588        let mut item = HashMap::new();
4589        item.insert(
4590            "items".to_string(),
4591            json!({"L": [
4592                {"M": {"sku": {"S": "A"}}},
4593                {"M": {"sku": {"S": "B"}}},
4594                {"M": {"sku": {"S": "C"}}},
4595            ]}),
4596        );
4597
4598        let names = cond_names(&[("#items", "items")]);
4599        let mut values = HashMap::new();
4600        values.insert(":item".to_string(), json!({"M": {"sku": {"S": "B-PRIME"}}}));
4601
4602        apply_update_expression(&mut item, "SET #items[1] = :item", &names, &values).unwrap();
4603
4604        let items_list = item
4605            .get("items")
4606            .and_then(|v| v.get("L"))
4607            .and_then(|v| v.as_array())
4608            .unwrap();
4609        let skus: Vec<&str> = items_list
4610            .iter()
4611            .map(|v| {
4612                v.get("M")
4613                    .and_then(|m| m.get("sku"))
4614                    .and_then(|s| s.get("S"))
4615                    .and_then(|s| s.as_str())
4616                    .unwrap()
4617            })
4618            .collect();
4619        assert_eq!(skus, vec!["A", "B-PRIME", "C"]);
4620    }
4621
4622    #[test]
4623    fn test_apply_update_set_list_index_without_name_ref() {
4624        // Same fix must also work when the LHS is a literal attribute name,
4625        // not an expression attribute name ref.
4626        let mut item = HashMap::new();
4627        item.insert(
4628            "tags".to_string(),
4629            json!({"L": [{"S": "red"}, {"S": "blue"}]}),
4630        );
4631
4632        let names: HashMap<String, String> = HashMap::new();
4633        let mut values = HashMap::new();
4634        values.insert(":t".to_string(), json!({"S": "green"}));
4635
4636        apply_update_expression(&mut item, "SET tags[1] = :t", &names, &values).unwrap();
4637
4638        let tags = item
4639            .get("tags")
4640            .and_then(|v| v.get("L"))
4641            .and_then(|v| v.as_array())
4642            .unwrap();
4643        assert_eq!(tags[0].get("S").and_then(|s| s.as_str()), Some("red"));
4644        assert_eq!(tags[1].get("S").and_then(|s| s.as_str()), Some("green"));
4645    }
4646
4647    #[test]
4648    fn test_unrecognized_expression_returns_false() {
4649        // evaluate_single_key_condition must fail-closed: an expression shape
4650        // it doesn't recognize should return false (reject), not true (accept).
4651        let item = cond_item(&[("x", "1")]);
4652        let names: HashMap<String, String> = HashMap::new();
4653        let values: HashMap<String, Value> = HashMap::new();
4654
4655        assert!(
4656            !evaluate_single_key_condition("GARBAGE NONSENSE", &item, "", &names, &values),
4657            "unrecognized expression must return false"
4658        );
4659    }
4660
4661    #[test]
4662    fn test_set_list_index_out_of_range_returns_error() {
4663        // SET list[N] where N > len must return a ValidationException,
4664        // not silently no-op.
4665        let mut item = HashMap::new();
4666        item.insert("items".to_string(), json!({"L": [{"S": "a"}, {"S": "b"}]}));
4667
4668        let names: HashMap<String, String> = HashMap::new();
4669        let mut values = HashMap::new();
4670        values.insert(":v".to_string(), json!({"S": "z"}));
4671
4672        let result = apply_update_expression(&mut item, "SET items[5] = :v", &names, &values);
4673        assert!(
4674            result.is_err(),
4675            "out-of-range list index must return an error"
4676        );
4677
4678        // List should be unchanged
4679        let list = item
4680            .get("items")
4681            .and_then(|v| v.get("L"))
4682            .and_then(|v| v.as_array())
4683            .unwrap();
4684        assert_eq!(list.len(), 2);
4685    }
4686
4687    #[test]
4688    fn test_set_list_index_on_non_list_returns_error() {
4689        // SET attr[0] = :v where attr is a string (not a list) must return
4690        // a ValidationException.
4691        let mut item = HashMap::new();
4692        item.insert("name".to_string(), json!({"S": "hello"}));
4693
4694        let names: HashMap<String, String> = HashMap::new();
4695        let mut values = HashMap::new();
4696        values.insert(":v".to_string(), json!({"S": "z"}));
4697
4698        let result = apply_update_expression(&mut item, "SET name[0] = :v", &names, &values);
4699        assert!(
4700            result.is_err(),
4701            "list index on non-list attribute must return an error"
4702        );
4703    }
4704
4705    #[test]
4706    fn test_unrecognized_update_action_returns_error() {
4707        let mut item = HashMap::new();
4708        item.insert("name".to_string(), json!({"S": "hello"}));
4709
4710        let names: HashMap<String, String> = HashMap::new();
4711        let mut values = HashMap::new();
4712        values.insert(":bar".to_string(), json!({"S": "baz"}));
4713
4714        let result = apply_update_expression(&mut item, "INVALID foo = :bar", &names, &values);
4715        assert!(
4716            result.is_err(),
4717            "unrecognized UpdateExpression action must return an error"
4718        );
4719        let err_msg = format!("{}", result.unwrap_err());
4720        assert!(
4721            err_msg.contains("Invalid UpdateExpression") || err_msg.contains("Syntax error"),
4722            "error should mention Invalid UpdateExpression, got: {err_msg}"
4723        );
4724    }
4725
4726    // ── size() function tests ──────────────────────────────────────────
4727
4728    #[test]
4729    fn test_size_string() {
4730        let mut item = HashMap::new();
4731        item.insert("name".to_string(), json!({"S": "hello"}));
4732        let names = HashMap::new();
4733        let mut values = HashMap::new();
4734        values.insert(":limit".to_string(), json!({"N": "5"}));
4735
4736        assert!(evaluate_single_filter_condition(
4737            "size(name) = :limit",
4738            &item,
4739            &names,
4740            &values,
4741        ));
4742        values.insert(":limit".to_string(), json!({"N": "4"}));
4743        assert!(evaluate_single_filter_condition(
4744            "size(name) > :limit",
4745            &item,
4746            &names,
4747            &values,
4748        ));
4749    }
4750
4751    #[test]
4752    fn test_size_list() {
4753        let mut item = HashMap::new();
4754        item.insert(
4755            "items".to_string(),
4756            json!({"L": [{"S": "a"}, {"S": "b"}, {"S": "c"}]}),
4757        );
4758        let names = HashMap::new();
4759        let mut values = HashMap::new();
4760        values.insert(":limit".to_string(), json!({"N": "3"}));
4761
4762        assert!(evaluate_single_filter_condition(
4763            "size(items) = :limit",
4764            &item,
4765            &names,
4766            &values,
4767        ));
4768    }
4769
4770    #[test]
4771    fn test_size_map() {
4772        let mut item = HashMap::new();
4773        item.insert(
4774            "data".to_string(),
4775            json!({"M": {"a": {"S": "1"}, "b": {"S": "2"}}}),
4776        );
4777        let names = HashMap::new();
4778        let mut values = HashMap::new();
4779        values.insert(":limit".to_string(), json!({"N": "2"}));
4780
4781        assert!(evaluate_single_filter_condition(
4782            "size(data) = :limit",
4783            &item,
4784            &names,
4785            &values,
4786        ));
4787    }
4788
4789    #[test]
4790    fn test_size_set() {
4791        let mut item = HashMap::new();
4792        item.insert("tags".to_string(), json!({"SS": ["a", "b", "c", "d"]}));
4793        let names = HashMap::new();
4794        let mut values = HashMap::new();
4795        values.insert(":limit".to_string(), json!({"N": "3"}));
4796
4797        assert!(evaluate_single_filter_condition(
4798            "size(tags) > :limit",
4799            &item,
4800            &names,
4801            &values,
4802        ));
4803    }
4804
4805    // ── attribute_type() function tests ────────────────────────────────
4806
4807    #[test]
4808    fn test_attribute_type_string() {
4809        let mut item = HashMap::new();
4810        item.insert("name".to_string(), json!({"S": "hello"}));
4811        let names = HashMap::new();
4812        let mut values = HashMap::new();
4813        values.insert(":t".to_string(), json!({"S": "S"}));
4814
4815        assert!(evaluate_single_filter_condition(
4816            "attribute_type(name, :t)",
4817            &item,
4818            &names,
4819            &values,
4820        ));
4821
4822        values.insert(":t".to_string(), json!({"S": "N"}));
4823        assert!(!evaluate_single_filter_condition(
4824            "attribute_type(name, :t)",
4825            &item,
4826            &names,
4827            &values,
4828        ));
4829    }
4830
4831    #[test]
4832    fn test_attribute_type_number() {
4833        let mut item = HashMap::new();
4834        item.insert("age".to_string(), json!({"N": "42"}));
4835        let names = HashMap::new();
4836        let mut values = HashMap::new();
4837        values.insert(":t".to_string(), json!({"S": "N"}));
4838
4839        assert!(evaluate_single_filter_condition(
4840            "attribute_type(age, :t)",
4841            &item,
4842            &names,
4843            &values,
4844        ));
4845    }
4846
4847    #[test]
4848    fn test_attribute_type_list() {
4849        let mut item = HashMap::new();
4850        item.insert("items".to_string(), json!({"L": [{"S": "a"}]}));
4851        let names = HashMap::new();
4852        let mut values = HashMap::new();
4853        values.insert(":t".to_string(), json!({"S": "L"}));
4854
4855        assert!(evaluate_single_filter_condition(
4856            "attribute_type(items, :t)",
4857            &item,
4858            &names,
4859            &values,
4860        ));
4861    }
4862
4863    #[test]
4864    fn test_attribute_type_map() {
4865        let mut item = HashMap::new();
4866        item.insert("data".to_string(), json!({"M": {"key": {"S": "val"}}}));
4867        let names = HashMap::new();
4868        let mut values = HashMap::new();
4869        values.insert(":t".to_string(), json!({"S": "M"}));
4870
4871        assert!(evaluate_single_filter_condition(
4872            "attribute_type(data, :t)",
4873            &item,
4874            &names,
4875            &values,
4876        ));
4877    }
4878
4879    #[test]
4880    fn test_attribute_type_bool() {
4881        let mut item = HashMap::new();
4882        item.insert("active".to_string(), json!({"BOOL": true}));
4883        let names = HashMap::new();
4884        let mut values = HashMap::new();
4885        values.insert(":t".to_string(), json!({"S": "BOOL"}));
4886
4887        assert!(evaluate_single_filter_condition(
4888            "attribute_type(active, :t)",
4889            &item,
4890            &names,
4891            &values,
4892        ));
4893    }
4894
4895    // ── begins_with rejects non-string types ───────────────────────────
4896
4897    #[test]
4898    fn test_begins_with_rejects_number_type() {
4899        let mut item = HashMap::new();
4900        item.insert("code".to_string(), json!({"N": "12345"}));
4901        let names = HashMap::new();
4902        let mut values = HashMap::new();
4903        values.insert(":prefix".to_string(), json!({"S": "123"}));
4904
4905        assert!(
4906            !evaluate_single_filter_condition("begins_with(code, :prefix)", &item, &names, &values,),
4907            "begins_with must return false for N-type attributes"
4908        );
4909    }
4910
4911    #[test]
4912    fn test_begins_with_works_on_string_type() {
4913        let mut item = HashMap::new();
4914        item.insert("code".to_string(), json!({"S": "abc123"}));
4915        let names = HashMap::new();
4916        let mut values = HashMap::new();
4917        values.insert(":prefix".to_string(), json!({"S": "abc"}));
4918
4919        assert!(evaluate_single_filter_condition(
4920            "begins_with(code, :prefix)",
4921            &item,
4922            &names,
4923            &values,
4924        ));
4925    }
4926
4927    // ── contains on sets ───────────────────────────────────────────────
4928
4929    #[test]
4930    fn test_contains_string_set() {
4931        let mut item = HashMap::new();
4932        item.insert("tags".to_string(), json!({"SS": ["red", "blue", "green"]}));
4933        let names = HashMap::new();
4934        let mut values = HashMap::new();
4935        values.insert(":val".to_string(), json!({"S": "blue"}));
4936
4937        assert!(evaluate_single_filter_condition(
4938            "contains(tags, :val)",
4939            &item,
4940            &names,
4941            &values,
4942        ));
4943
4944        values.insert(":val".to_string(), json!({"S": "yellow"}));
4945        assert!(!evaluate_single_filter_condition(
4946            "contains(tags, :val)",
4947            &item,
4948            &names,
4949            &values,
4950        ));
4951    }
4952
4953    #[test]
4954    fn test_contains_number_set() {
4955        let mut item = HashMap::new();
4956        item.insert("scores".to_string(), json!({"NS": ["1", "2", "3"]}));
4957        let names = HashMap::new();
4958        let mut values = HashMap::new();
4959        values.insert(":val".to_string(), json!({"N": "2"}));
4960
4961        assert!(evaluate_single_filter_condition(
4962            "contains(scores, :val)",
4963            &item,
4964            &names,
4965            &values,
4966        ));
4967    }
4968
4969    // ── SET arithmetic type validation ─────────────────────────────────
4970
4971    #[test]
4972    fn test_set_arithmetic_rejects_string_operand() {
4973        let mut item = HashMap::new();
4974        item.insert("name".to_string(), json!({"S": "hello"}));
4975        let names = HashMap::new();
4976        let mut values = HashMap::new();
4977        values.insert(":val".to_string(), json!({"N": "1"}));
4978
4979        let result = apply_update_expression(&mut item, "SET name = name + :val", &names, &values);
4980        assert!(
4981            result.is_err(),
4982            "arithmetic on S-type attribute must return a ValidationException"
4983        );
4984    }
4985
4986    #[test]
4987    fn test_set_arithmetic_rejects_string_value() {
4988        let mut item = HashMap::new();
4989        item.insert("count".to_string(), json!({"N": "5"}));
4990        let names = HashMap::new();
4991        let mut values = HashMap::new();
4992        values.insert(":val".to_string(), json!({"S": "notanumber"}));
4993
4994        let result =
4995            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
4996        assert!(
4997            result.is_err(),
4998            "arithmetic with S-type value must return a ValidationException"
4999        );
5000    }
5001
5002    #[test]
5003    fn test_set_arithmetic_valid_numbers() {
5004        let mut item = HashMap::new();
5005        item.insert("count".to_string(), json!({"N": "10"}));
5006        let names = HashMap::new();
5007        let mut values = HashMap::new();
5008        values.insert(":val".to_string(), json!({"N": "3"}));
5009
5010        let result =
5011            apply_update_expression(&mut item, "SET count = count + :val", &names, &values);
5012        assert!(result.is_ok());
5013        assert_eq!(item["count"], json!({"N": "13"}));
5014    }
5015
5016    // ── Binary Set (BS) support in ADD/DELETE ──────────────────────────
5017
5018    #[test]
5019    fn test_add_binary_set() {
5020        let mut item = HashMap::new();
5021        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg=="]}));
5022        let names = HashMap::new();
5023        let mut values = HashMap::new();
5024        values.insert(":val".to_string(), json!({"BS": ["Yw==", "YQ=="]}));
5025
5026        let result = apply_update_expression(&mut item, "ADD data :val", &names, &values);
5027        assert!(result.is_ok());
5028        let bs = item["data"]["BS"].as_array().unwrap();
5029        assert_eq!(bs.len(), 3, "should merge sets without duplicates");
5030        assert!(bs.contains(&json!("YQ==")));
5031        assert!(bs.contains(&json!("Yg==")));
5032        assert!(bs.contains(&json!("Yw==")));
5033    }
5034
5035    #[test]
5036    fn test_delete_binary_set() {
5037        let mut item = HashMap::new();
5038        item.insert("data".to_string(), json!({"BS": ["YQ==", "Yg==", "Yw=="]}));
5039        let names = HashMap::new();
5040        let mut values = HashMap::new();
5041        values.insert(":val".to_string(), json!({"BS": ["Yg=="]}));
5042
5043        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5044        assert!(result.is_ok());
5045        let bs = item["data"]["BS"].as_array().unwrap();
5046        assert_eq!(bs.len(), 2);
5047        assert!(!bs.contains(&json!("Yg==")));
5048    }
5049
5050    #[test]
5051    fn test_delete_binary_set_removes_attr_when_empty() {
5052        let mut item = HashMap::new();
5053        item.insert("data".to_string(), json!({"BS": ["YQ=="]}));
5054        let names = HashMap::new();
5055        let mut values = HashMap::new();
5056        values.insert(":val".to_string(), json!({"BS": ["YQ=="]}));
5057
5058        let result = apply_update_expression(&mut item, "DELETE data :val", &names, &values);
5059        assert!(result.is_ok());
5060        assert!(
5061            !item.contains_key("data"),
5062            "attribute should be removed when set becomes empty"
5063        );
5064    }
5065}